from typing import (
Dict,
List,
Union,
Optional,
)
from functools import cached_property
from enum import Enum, auto
from abc import ABC, abstractmethod
from .plugins import PluginFactory
import sys
[docs]
class DataNode(ABC):
"""Abstract base class for all data nodes in the hierarchy."""
@property
@abstractmethod
def name(self) -> str:
"""Get the name of the node."""
pass
def __repr__(self):
return f"{self.__class__.__name__}(name={self.name})"
[docs]
class DataContainer(DataNode):
"""Abstract base class for containers that hold other nodes."""
@abstractmethod
def __getitem__(self, key: str):
pass
@abstractmethod
def __iter__(self):
pass
@abstractmethod
def __len__(self):
pass
[docs]
class Item(DataNode):
"""Represents a column/item in a category. Always uses eager loading."""
[docs]
def __init__(self, name: str, values: Optional[List[str]] = None):
"""
Initialize an Item with pre-loaded values.
:param name: The name of the item
:param values: Pre-loaded values
"""
self._name = name
self._values = values
@property
def name(self) -> str:
"""Read-only access to the item name."""
return self._name
@cached_property
def values(self) -> List[str]:
"""Values with automatic caching via @cached_property."""
if self._values is not None:
return self._values
[docs]
def add_value(self, value: str) -> None:
"""Add a value directly (for small datasets or immediate loading)."""
if self._values is None:
self._values = []
self._values.append(value)
# Clear cached_property cache when new values are added
if hasattr(self, "values"):
delattr(self, "values")
def __iter__(self):
"""Iterate over values."""
return iter(self.values)
def __len__(self):
"""Get the number of values."""
if self._values is not None:
return len(self._values)
return 0
def __getitem__(self, index: Union[int, slice]) -> Union[str, List[str]]:
"""Get value(s) by index."""
return self.values[index]
def __repr__(self):
values_loaded = (
hasattr(self.__class__.__dict__["values"], "func")
and hasattr(self, "__dict__")
and "values" in self.__dict__
)
return f"Item(name='{self.name}', length={len(self)}, loaded={values_loaded})"
[docs]
class Row(DataNode):
"""Represents a single row of data in a Category."""
[docs]
def __init__(self, category: "Category", row_index: int):
self._category = category
self._row_index = row_index
@property
def name(self) -> str:
"""Return name from the first item value in the row if available, otherwise the row index."""
if len(self._category.items) > 0:
first_item = self._category.items[0]
try:
return self._category[first_item][self._row_index]
except (IndexError, KeyError):
pass
return str(self._row_index)
def __getattr__(self, item_name: str) -> str:
"""Allow dot notation access to item values in this row."""
if item_name in self._category._items:
values = self._category[item_name]
if self._row_index < len(values):
return values[self._row_index]
raise IndexError(f"Row index {self._row_index} is out of range")
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{item_name}'"
)
def __getitem__(self, item_name: str) -> str:
"""Allow dictionary-style access to item values in this row."""
if item_name in self._category._items:
values = self._category[item_name]
if self._row_index < len(values):
return values[self._row_index]
raise KeyError(f"Item '{item_name}' at index {self._row_index} not found")
raise KeyError(item_name)
@property
def data(self) -> Dict[str, str]:
"""Return all item values for this row as a dictionary."""
result = {}
for item_name in self._category.items:
values = self._category[item_name]
if self._row_index < len(values):
result[item_name] = values[self._row_index]
return result
def __repr__(self):
return f"Row({self._row_index}, {self._category.name})"
[docs]
class LazyRowList:
"""A list-like object that creates Row objects only when accessed."""
[docs]
def __init__(self, category: "Category", row_count: int):
self._category = category
self._row_count = row_count
self._cached_rows: Dict[int, "Row"] = {} # Cache created rows
def __len__(self) -> int:
return self._row_count
def __getitem__(self, index: Union[int, slice]) -> Union["Row", List["Row"]]:
if isinstance(index, int):
# Handle negative indices
if index < 0:
index = self._row_count + index
if index < 0 or index >= self._row_count:
raise IndexError(
f"Row index {index} is out of range (0-{self._row_count-1})"
)
# Return cached row or create new one
if index not in self._cached_rows:
self._cached_rows[index] = Row(self._category, index)
return self._cached_rows[index]
elif isinstance(index, slice):
# Handle slice access
indices = range(*index.indices(self._row_count))
return [self[i] for i in indices]
else:
raise TypeError(
f"Row indices must be integers or slices, not {type(index).__name__}"
)
def __iter__(self):
for i in range(self._row_count):
yield self[i]
def __repr__(self):
return f"LazyRowList({self._row_count} rows, {len(self._cached_rows)} cached)"
[docs]
class LazyGemmiColumn(list):
"""
Lazy wrapper for gemmi loop columns - data extracted only when accessed.
Behaves like a list but loads data from gemmi on first access.
"""
[docs]
def __init__(self, gemmi_loop, column_index: int):
"""
Initialize lazy column wrapper.
Args:
gemmi_loop: The gemmi loop object containing the data
column_index: The column index in the loop
"""
super().__init__() # Don't populate the list yet
self._gemmi_loop = gemmi_loop
self._column_index = column_index
self._loaded = False
def _ensure_loaded(self):
"""Load column data from gemmi loop on first access."""
if not self._loaded:
# Extract all values from gemmi and populate the list
for row_idx in range(self._gemmi_loop.length()):
value = self._gemmi_loop[row_idx, self._column_index]
super().append(str(value))
self._loaded = True
# Clear gemmi reference to save memory
self._gemmi_loop = None
def __getitem__(self, index):
self._ensure_loaded()
return super().__getitem__(index)
def __len__(self):
if self._loaded:
return super().__len__()
return self._gemmi_loop.length()
def __iter__(self):
self._ensure_loaded()
return super().__iter__()
def __repr__(self):
# Automatically load data when repr is called (used in print, f-strings, etc.)
self._ensure_loaded()
return super().__repr__()
def __str__(self):
# Automatically load data when converting to string
self._ensure_loaded()
return super().__str__()
def __format__(self, format_spec):
# Automatically load data when used in f-strings
self._ensure_loaded()
return super().__format__(format_spec)
[docs]
class LazyItemDict:
"""A dict-like object that only loads Item values when accessed, providing O(1) creation."""
[docs]
def __init__(self, items: Dict[str, Union[List[str], "Item"]]):
self._items = items
self._cached_values: Dict[str, List[str]] = {}
def __getitem__(self, key: str) -> List[str]:
if key not in self._cached_values:
item = self._items[key]
self._cached_values[key] = (
item.values
if hasattr(item, "values")
and callable(getattr(item, "values", None)) is False
else item
)
return self._cached_values[key]
def __setitem__(self, key: str, value: List[str]) -> None:
# Read-only interface - raise error
raise TypeError("LazyItemDict is read-only")
def __contains__(self, key: str) -> bool:
return key in self._items
def __iter__(self):
return iter(self._items.keys())
def __len__(self) -> int:
return len(self._items)
[docs]
def keys(self):
return self._items.keys()
[docs]
def values(self):
return [self[k] for k in self]
[docs]
def items(self):
return [(k, self[k]) for k in self]
[docs]
def get(self, key: str, default=None):
try:
return self[key]
except KeyError:
return default
def __eq__(self, other) -> bool:
if isinstance(other, LazyItemDict):
# Compare all items (forces loading)
if len(self) != len(other):
return False
for key in self:
if key not in other or self[key] != other[key]:
return False
return True
elif isinstance(other, dict):
return dict(self.items()) == other
return False
def __repr__(self):
cached_count = len(self._cached_values)
total_count = len(self._items)
return f"LazyItemDict({total_count} items, {cached_count} loaded)"
[docs]
class LazyKeyList:
"""A list that dynamically generates prefixed keys without storing them, providing O(1) creation."""
[docs]
def __init__(self, collection: dict, prefix: str = ""):
self._collection = collection
self._prefix = prefix
def __getitem__(self, index: Union[int, slice]) -> Union[str, List[str]]:
if isinstance(index, int):
keys = list(self._collection.keys())
return f"{self._prefix}{keys[index]}"
elif isinstance(index, slice):
keys = list(self._collection.keys())
return [f"{self._prefix}{key}" for key in keys[index]]
else:
raise TypeError(
f"LazyKeyList indices must be integers or slices, not {type(index).__name__}"
)
def __len__(self) -> int:
return len(self._collection)
def __iter__(self):
for key in self._collection.keys():
yield f"{self._prefix}{key}"
def __contains__(self, item: str) -> bool:
if item.startswith(self._prefix):
stripped = item[len(self._prefix) :]
return stripped in self._collection
return False
[docs]
def index(self, item: str) -> int:
if item.startswith(self._prefix):
stripped = item[len(self._prefix) :]
keys = list(self._collection.keys())
return keys.index(stripped)
raise ValueError(f"{item} is not in list")
[docs]
def count(self, item: str) -> int:
return 1 if item in self else 0
def __eq__(self, other) -> bool:
if isinstance(other, LazyKeyList):
return list(self) == list(other)
elif isinstance(other, list):
return list(self) == other
return False
def __repr__(self):
return f"LazyKeyList({len(self)} keys with prefix '{self._prefix}')"
[docs]
class Category(DataContainer):
"""A class to represent a category in a data block."""
# Define attributes that should be handled as normal Python attributes
_RESERVED_ATTRS = {
"_name",
"_items",
"_plugin_factory",
"_batch_buffer",
"_row_cache",
"name",
"plugin_factory",
"items",
"data",
"row_count",
"rows",
}
[docs]
def __init__(
self,
name: str,
plugin_factory: Optional[PluginFactory] = None,
):
# Store the stripped name internally (remove _ prefix if present)
if name.startswith("_"):
self._name = name[1:] # Store without the _ prefix
else:
self._name = name # Already stripped
self._items: Dict[str, Union[List[str], Item]] = {}
self._plugin_factory = plugin_factory
self._batch_buffer: Dict[str, List] = {} # For batching value additions
self._row_cache: Dict[int, "Row"] = {} # Cache for Row objects
@property
def name(self) -> str:
# Return the full name with _ prefix for external API consistency
return f"_{self._name}"
@property
def plugin_factory(self) -> Optional[PluginFactory]:
return self._plugin_factory
@cached_property
def items(self) -> LazyKeyList:
"""Get names of contained items - O(1) lazy list."""
return LazyKeyList(self._items, "")
def __getattr__(self, item_name: str) -> Union[List[str], Item, "PluginWrapper"]:
if item_name in self._items:
item = self._items[item_name]
# Return values for Item objects, the Item itself for direct access
if isinstance(item, Item):
return item.values
return item
# Check for registered plugins (covers "validate" and any user plugins)
if self._plugin_factory is not None:
wrapper = self._plugin_factory.get_wrapper(item_name, self, "category")
if wrapper is not None:
return wrapper
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{item_name}'"
)
def __setattr__(self, name: str, value) -> None:
"""
Enable dot notation assignment for mmCIF items.
Reserved attributes and internal attributes are handled normally.
Everything else is treated as mmCIF item assignment.
"""
# Handle reserved attributes and internal attributes normally
if (
name in self._RESERVED_ATTRS
or name.startswith("__")
or name.startswith("_")
):
super().__setattr__(name, value)
return
# During object initialization, _items might not exist yet
if not hasattr(self, "_items"):
super().__setattr__(name, value)
return
# Validate value type for mmCIF items
if not isinstance(value, (list, Item)):
raise TypeError(
f"mmCIF item '{name}' must be a list or Item object, got {type(value)}"
)
# Set as mmCIF item (equivalent to self[name] = value)
self._items[name] = value
# Invalidate cached properties when items change
if hasattr(self, "items"):
delattr(self, "items")
if hasattr(self, "data"):
delattr(self, "data")
if hasattr(self, "rows"):
delattr(self, "rows")
def __delattr__(self, name: str) -> None:
"""Delete an mmCIF item via ``del category.item_name``."""
if name in self._RESERVED_ATTRS or name.startswith("__") or name.startswith("_"):
super().__delattr__(name)
return
if name in self._items:
del self._items[name]
self._invalidate_caches()
return
raise AttributeError(
f"Item '{name}' not found in category '{self.name}'"
)
[docs]
def delete(self, item_name: str) -> None:
"""Delete an mmCIF item by name (string-based API).
:param item_name: The item name to remove.
:raises KeyError: If the item does not exist.
"""
if item_name not in self._items:
raise KeyError(
f"Item '{item_name}' not found in category '{self.name}'"
)
del self._items[item_name]
self._invalidate_caches()
def __getitem__(
self, key: Union[str, int, slice]
) -> Union[List[str], "Row", List["Row"]]:
"""
Access values by item name or row index/slice.
If key is a string, return all values for that item (column-wise access).
If key is an integer or slice, return Row(s) (row-wise access).
"""
if isinstance(key, str):
# Column access by item name
item = self._items[key]
return item.values if isinstance(item, Item) else item
elif isinstance(key, int):
# Row access by index - use caching to avoid recreating Row objects
row_count = self.row_count
if row_count == 0:
raise IndexError("Cannot access rows in empty category")
# Handle negative indices
if key < 0:
key = row_count + key
if key < 0 or key >= row_count:
raise IndexError(f"Row index {key} is out of range (0-{row_count-1})")
# OPTIMIZATION: Cache Row objects to avoid repeated creation
if key not in self._row_cache:
self._row_cache[key] = Row(self, key)
return self._row_cache[key]
elif isinstance(key, slice):
# Multiple rows access by slice - use lazy approach
row_count = self.row_count
if row_count == 0:
return []
# OPTIMIZATION: Return lazy slice instead of creating all Row objects
indices = range(*key.indices(row_count))
return [self[i] for i in indices] # This will use the int case above
else:
raise TypeError(
f"Category indices must be strings, integers or slices, not {type(key).__name__}"
)
def __setitem__(self, item_name: str, value: Union[List[str], Item]) -> None:
self._items[item_name] = value
# Invalidate cached properties when items change
if hasattr(self, "items"):
delattr(self, "items")
if hasattr(self, "data"):
delattr(self, "data")
if hasattr(self, "rows"):
delattr(self, "rows")
def __iter__(self):
# Iterate over rows, not items, for user-facing API consistency
return iter(self.rows)
def __len__(self):
return len(self._items)
def __repr__(self):
return f"Category(name={self.name}, items={list(self._items.keys())})"
@cached_property
def data(self) -> LazyItemDict:
"""Provides O(1) lazy read-only access to the data (loads items on-demand)."""
return LazyItemDict(self._items)
@property
def row_count(self) -> int:
"""Returns the number of rows in this category."""
if not self._items:
return 0
# Get the length of the first item to determine row count
any_item = next(iter(self._items.values()))
return len(any_item)
@cached_property
def rows(self) -> LazyRowList:
"""Returns all rows in this category as a lazy list (O(1) creation, cached for performance)."""
# Always use LazyRowList for consistent O(1) behavior and memory efficiency
return LazyRowList(self, self.row_count)
[docs]
def get_item(self, item_name: str) -> Union[Item, List[str]]:
"""Get the raw item (Item object or list), without forcing lazy loading."""
return self._items[item_name]
[docs]
def is_lazy_loaded(self, item_name: str) -> bool:
"""Check if an item is lazy-loaded."""
return isinstance(self._items.get(item_name), Item)
def _add_item_value(self, item_name: str, value: str) -> None:
"""Fast value addition for small files without memory mapping overhead."""
# Use batching for better performance with pre-allocation
if item_name not in self._batch_buffer:
self._batch_buffer[item_name] = []
# Pre-allocate space for common case (helps avoid repeated list resizing)
if hasattr(self._batch_buffer[item_name], "extend"):
# Reserve space for typical category sizes
reserved_size = (
1000
if item_name in ["id", "Cartn_x", "Cartn_y", "Cartn_z"]
else 100
)
self._batch_buffer[item_name] = [None] * reserved_size
self._batch_buffer[item_name].clear() # Clear but keep capacity
self._batch_buffer[item_name].append(value)
# Commit batch when it gets large enough (larger batches for fewer invalidations)
if len(self._batch_buffer[item_name]) >= 2000: # Increased from 500
self._commit_batch(item_name)
# OPTIMIZATION: Only invalidate caches when batch is committed, not on every add
# This reduces cache invalidation calls from 7000+ to ~20
def _commit_batch(self, item_name: str) -> None:
"""Commit batched values to the actual items storage."""
if item_name not in self._batch_buffer:
return
values = self._batch_buffer[item_name]
if not values:
return
# OPTIMIZATION: Apply string interning to reduce memory usage
interned_values = [intern_common_value(v) for v in values]
if item_name not in self._items:
self._items[item_name] = interned_values
else:
if isinstance(self._items[item_name], list):
self._items[item_name].extend(interned_values)
else:
# Convert Item to list and extend
if hasattr(self._items[item_name], "values"):
existing_values = self._items[item_name].values[:]
else:
existing_values = []
existing_values.extend(interned_values)
self._items[item_name] = existing_values
# Clear the batch
self._batch_buffer[item_name] = []
# Invalidate caches when batch is committed
self._invalidate_caches()
def _commit_all_batches(self) -> None:
"""Commit all remaining batches at end of parsing."""
for item_name in list(self._batch_buffer.keys()):
self._commit_batch(item_name)
def _invalidate_caches(self) -> None:
"""Invalidate all cached properties when data changes."""
cache_attrs = ["items", "data", "rows"]
for attr in cache_attrs:
if hasattr(self, attr):
delattr(self, attr)
# Also clear row cache
self._row_cache.clear()
class CategoryCollection(dict):
"""A collection that supports both dict and list access for categories, with automatic _ prefix handling."""
def __getitem__(self, key):
if isinstance(key, int):
# List-like access: categories[0], categories[1], etc.
values_list = list(self.values())
return values_list[key]
elif isinstance(key, slice):
# Slice access: categories[0:2], categories[1:], etc.
values_list = list(self.values())
return values_list[key]
else:
# Dict-like access with automatic _ prefix handling
if isinstance(key, str):
# If key starts with _, strip it for internal storage lookup
if key.startswith("_"):
internal_key = key[1:] # Remove the '_' prefix
return super().__getitem__(internal_key)
else:
# Allow access without _ prefix too
return super().__getitem__(key)
return super().__getitem__(key)
def __setitem__(self, key, value):
if isinstance(key, str) and key.startswith("_"):
# Strip the _ prefix for internal storage
internal_key = key[1:]
super().__setitem__(internal_key, value)
else:
super().__setitem__(key, value)
def __contains__(self, key):
if isinstance(key, str) and key.startswith("_"):
# Strip the _ prefix for internal storage lookup
internal_key = key[1:]
return super().__contains__(internal_key)
return super().__contains__(key)
def __iter__(self):
# Iterate over keys (category names) not values
return iter(self.keys())
def keys(self):
# Return stripped keys for internal use
return list(super().keys())
def __repr__(self):
return f"CategoryCollection({len(self)} categories)"
[docs]
class DataBlock(DataContainer):
"""A class to represent a data block in an mmCIF file."""
# Define attributes that should be handled as normal Python attributes
_RESERVED_ATTRS = {
"_name", "_categories", "_plugin_factory", "_auto_create",
"name", "categories", "data", "plugin_factory",
}
[docs]
def __init__(
self,
name: str,
categories: Dict[str, Category] = None,
plugin_factory: Optional[PluginFactory] = None,
auto_create: bool = True,
):
self._name = name
self._plugin_factory = plugin_factory
self._auto_create = auto_create
# Convert categories to use CategoryCollection with stripped names
if categories is not None:
# Strip _ prefix from category names for internal storage
stripped_categories = {}
for cat_name, category in categories.items():
if cat_name.startswith("_"):
stripped_categories[cat_name[1:]] = category
else:
stripped_categories[cat_name] = category
self._categories = CategoryCollection(stripped_categories)
else:
self._categories = CategoryCollection()
@property
def name(self) -> str:
return self._name
@property
def plugin_factory(self) -> Optional[PluginFactory]:
return self._plugin_factory
@cached_property
def categories(self) -> LazyKeyList:
"""Get names of contained categories (prefixed names for external API) - O(1) lazy."""
return LazyKeyList(self._categories, "_")
@property
def data(self) -> CategoryCollection:
"""Provides read-only access to the category objects."""
return self._categories
def __getitem__(self, category_name: str) -> Category:
# Handle both prefixed (_category) and unprefixed (category) names
return self._categories[category_name]
def __setitem__(self, category_name: str, category: Category) -> None:
# Handle both prefixed (_category) and unprefixed (category) names
self._categories[category_name] = category
# Invalidate cached properties when categories change
if hasattr(self, "categories"):
delattr(self, "categories")
def __getattr__(self, category_name: str) -> Category:
try:
# Handle both prefixed (_category) and unprefixed (category) names
# CategoryCollection automatically handles _ prefix stripping/adding
return self._categories[category_name]
except KeyError:
pass
# Check for registered plugins
if self._plugin_factory is not None:
wrapper = self._plugin_factory.get_wrapper(category_name, self, "block")
if wrapper is not None:
return wrapper
# Auto-create the category if it starts with _ (typical mmCIF category)
if category_name.startswith("_"):
if self._auto_create:
new_category = Category(
category_name, plugin_factory=self._plugin_factory
)
self._categories[
category_name
] = new_category # CategoryCollection handles _ stripping
# Invalidate cached properties when categories change
if hasattr(self, "categories"):
delattr(self, "categories")
return new_category
else:
raise AttributeError(
f"Category '{category_name}' does not exist in data block "
f"'{self.name}'. Available: {list(self.categories)}"
)
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{category_name}'"
)
def __setattr__(self, name: str, value) -> None:
"""
Enable dot notation assignment for categories.
Reserved attributes and internal attributes are handled normally.
Category names (starting with _ or regular names) are treated as category assignment.
"""
# Handle reserved attributes and internal attributes normally
if name in self._RESERVED_ATTRS or name.startswith("__"):
super().__setattr__(name, value)
return
# During object initialization, _categories might not exist yet
if not hasattr(self, "_categories"):
super().__setattr__(name, value)
return
# For category names (starting with _ or regular category names), validate and set
if name.startswith("_") or (
hasattr(self, "_categories")
and (name in self._categories or f"_{name}" in self._categories)
):
if not isinstance(value, Category):
raise TypeError(
f"Category '{name}' must be a Category object, got {type(value)}"
)
self._categories[
name
] = value # CategoryCollection handles _ stripping/adding
# Invalidate cached properties when categories change
if hasattr(self, "categories"):
delattr(self, "categories")
else:
# Non-category attributes are handled normally
super().__setattr__(name, value)
def __delattr__(self, name: str) -> None:
"""Delete a category via ``del block._category_name``."""
if name in self._RESERVED_ATTRS or name.startswith("__"):
super().__delattr__(name)
return
# Resolve key (CategoryCollection handles _ prefix)
key = name[1:] if name.startswith("_") else name
if key in self._categories:
del self._categories[key]
if hasattr(self, "categories"):
delattr(self, "categories")
return
raise AttributeError(
f"Category '{name}' not found in data block '{self.name}'"
)
[docs]
def delete(self, category_name: str) -> None:
"""Delete a category by name (string-based API).
:param category_name: The category name to remove (with or without ``_`` prefix).
:raises KeyError: If the category does not exist.
"""
key = category_name[1:] if category_name.startswith("_") else category_name
if key not in self._categories:
raise KeyError(
f"Category '{category_name}' not found in data block '{self.name}'"
)
del self._categories[key]
if hasattr(self, "categories"):
delattr(self, "categories")
def __iter__(self):
return iter(self._categories.values())
def __len__(self):
return len(self._categories)
def __repr__(self):
return f"DataBlock(name={self.name}, categories={list(self.categories)})"
class DataBlockCollection(dict):
"""A collection that supports both dict and list access for data blocks, with automatic data_ prefix handling."""
def __getitem__(self, key):
if isinstance(key, int):
# List-like access: data[0], data[1], etc.
values_list = list(self.values())
return values_list[key]
elif isinstance(key, slice):
# Slice access: data[0:2], data[1:], etc.
values_list = list(self.values())
return values_list[key]
else:
# Dict-like access with automatic data_ prefix handling
if isinstance(key, str):
# If key starts with data_, strip it for internal storage lookup
if key.startswith("data_"):
internal_key = key[5:] # Remove the 'data_' prefix
return super().__getitem__(internal_key)
else:
# Allow access without data_ prefix too
return super().__getitem__(key)
return super().__getitem__(key)
def __setitem__(self, key, value):
if isinstance(key, str) and key.startswith("data_"):
# Strip the data_ prefix for internal storage
internal_key = key[5:]
super().__setitem__(internal_key, value)
else:
super().__setitem__(key, value)
def __contains__(self, key):
if isinstance(key, str) and key.startswith("data_"):
# Strip the data_ prefix for internal storage lookup
internal_key = key[5:]
return super().__contains__(internal_key)
return super().__contains__(key)
def __iter__(self):
# Iterate over values (DataBlock objects) for consistency with list behavior
return iter(self.values())
def keys(self):
# Return stripped keys for internal use
return list(super().keys())
def __repr__(self):
return f"DataBlockCollection({len(self)} blocks)"
[docs]
class MMCIFDataContainer(DataContainer):
"""A class to represent an mmCIF data container."""
# Define attributes that should be handled as normal Python attributes
_RESERVED_ATTRS = {
"_data_blocks", "_plugin_factory", "_auto_create",
"source_format", "name", "blocks", "data", "plugin_factory",
}
[docs]
def __init__(
self,
data_blocks: Dict[str, DataBlock] = None,
source_format: DataSourceFormat = DataSourceFormat.MMCIF,
plugin_factory: Optional[PluginFactory] = None,
auto_create: bool = True,
):
self._data_blocks = DataBlockCollection(
data_blocks if data_blocks is not None else {}
)
self._plugin_factory = plugin_factory
self._auto_create = auto_create
self.source_format = source_format
@property
def name(self) -> str:
return f"MMCIFDataContainer({len(self)} blocks)"
@property
def plugin_factory(self) -> Optional[PluginFactory]:
return self._plugin_factory
def __getitem__(self, block_name: str) -> DataBlock:
# Handle both prefixed (data_block) and unprefixed (block) names
return self._data_blocks[block_name]
def __setitem__(self, block_name: str, block: DataBlock) -> None:
# Handle both prefixed (data_block) and unprefixed (block) names
self._data_blocks[block_name] = block
# Invalidate cached properties when blocks change
if hasattr(self, "blocks"):
delattr(self, "blocks")
def __getattr__(self, block_name: str) -> DataBlock:
if block_name.startswith("data_"):
actual_block_name = block_name[5:] # Remove the 'data_' prefix
if actual_block_name in self._data_blocks:
return self._data_blocks[actual_block_name]
elif self._auto_create:
# Auto-create the data block
new_block = DataBlock(
actual_block_name,
plugin_factory=self._plugin_factory,
auto_create=self._auto_create,
)
self._data_blocks[actual_block_name] = new_block
# Invalidate cached properties when blocks change
if hasattr(self, "blocks"):
delattr(self, "blocks")
return new_block
else:
raise AttributeError(
f"Data block 'data_{actual_block_name}' does not exist. "
f"Available: {list(self.blocks)}"
)
# Check for registered plugins
if self._plugin_factory is not None:
wrapper = self._plugin_factory.get_wrapper(block_name, self, "container")
if wrapper is not None:
return wrapper
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{block_name}'"
)
def __setattr__(self, name: str, value) -> None:
"""
Enable dot notation assignment for data blocks.
Reserved attributes and internal attributes are handled normally.
Data block names (with data_ prefix) are treated as block assignment.
"""
# Handle reserved attributes and internal attributes normally
if name in self._RESERVED_ATTRS or name.startswith("__"):
super().__setattr__(name, value)
return
# During object initialization, _data_blocks might not exist yet
if not hasattr(self, "_data_blocks"):
super().__setattr__(name, value)
return
# For data block names (with data_ prefix), validate and set
if name.startswith("data_"):
block_name = name[5:] # Remove 'data_' prefix
if not isinstance(value, DataBlock):
raise TypeError(
f"Data block 'data_{block_name}' must be a DataBlock object, got {type(value)}"
)
self._data_blocks[block_name] = value
# Invalidate cached properties when blocks change
if hasattr(self, "blocks"):
delattr(self, "blocks")
else:
# Non-block attributes are handled normally
super().__setattr__(name, value)
def __delattr__(self, name: str) -> None:
"""Delete a data block via ``del container.data_blockname``."""
if name in self._RESERVED_ATTRS or name.startswith("__"):
super().__delattr__(name)
return
if name.startswith("data_"):
key = name[5:]
if key in self._data_blocks:
del self._data_blocks[key]
if hasattr(self, "blocks"):
delattr(self, "blocks")
return
raise AttributeError(
f"Data block '{name}' not found in container"
)
[docs]
def delete(self, block_name: str) -> None:
"""Delete a data block by name (string-based API).
:param block_name: The block name (with or without ``data_`` prefix).
:raises KeyError: If the block does not exist.
"""
key = block_name[5:] if block_name.startswith("data_") else block_name
if key not in self._data_blocks:
raise KeyError(f"Data block '{block_name}' not found in container")
del self._data_blocks[key]
if hasattr(self, "blocks"):
delattr(self, "blocks")
def __iter__(self):
return iter(self._data_blocks.values())
def __len__(self):
return len(self._data_blocks)
def __repr__(self):
return f"MMCIFDataContainer({len(self)} blocks)"
@cached_property
def blocks(self) -> LazyKeyList:
"""Provides O(1) lazy list of data block names (prefixed names for consistency)."""
return LazyKeyList(self._data_blocks, "data_")
@property
def data(self) -> DataBlockCollection:
"""Provides access to data blocks with both list and dict interfaces."""
return self._data_blocks
# Common mmCIF value interning for memory efficiency
_COMMON_VALUES = {
"ATOM",
"HETATM",
"C",
"N",
"O",
"P",
"S",
"CA",
"CB",
"CG",
"CD",
"CE",
"CF",
"A",
"B",
"X",
"Y",
"Z",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"0",
".",
"?",
"yes",
"no",
"true",
"false",
}
_INTERNED_VALUES = {val: sys.intern(val) for val in _COMMON_VALUES}
def intern_common_value(value: str) -> str:
"""Intern common mmCIF values to save memory."""
return _INTERNED_VALUES.get(value, value)