"""
mmCIF Serializer - Data structure parsers and relationship resolvers
Provides dictionary parsing, mapping generation, caching, and relationship resolution.
"""
import os
import re
import hashlib
import threading
import pickle
import shlex
from pathlib import Path
from typing import Dict, List, Optional, Any, Union, Tuple, Set
from .models import MMCIFDataContainer
from .defaults import (
CacheType, DictDataType, FrameMarker, LoopDataKey,
TabularDataCategory, TabularDataField, RelationshipKey, DictItemKey,
MappingDataKey, CategoryPrefix, BooleanValue, SemanticToken,
RelationshipType, RelationshipTerm,
# Consolidated classes
DataValue, FileOperation
)
# ====================== Formal Relationship Type Definitions ======================
[docs]
class RelationshipConstraint:
"""Represents a formal constraint on relationships"""
[docs]
def __init__(self, metadata: RelationshipMetadata, is_validated: bool = False):
self.metadata = metadata
self.is_validated = is_validated
[docs]
def validate(self, data: Dict[str, Any]) -> bool:
"""Validate constraint against actual data"""
child_data = data.get(self.metadata.child_cat, [])
parent_data = data.get(self.metadata.parent_cat, [])
if not child_data or not parent_data:
return False
# Check if child field values are subset of parent field values
child_values = {row.get(self.metadata.child_field) for row in child_data}
child_values.discard(None)
parent_values = {row.get(self.metadata.parent_field) for row in parent_data}
parent_values.discard(None)
return child_values.issubset(parent_values) if child_values and parent_values else False
# ====================== Unified High-Performance Caching ======================
# Global caches for maximum performance
_GLOBAL_CACHES = {
CacheType.DICTIONARY.value: {},
CacheType.MAPPING_RULES.value: {}
}
_CACHE_LOCK = threading.Lock()
[docs]
class CacheManager:
"""
Unified cache manager that combines global in-memory caching with optional disk persistence.
"""
[docs]
def __init__(self, cache_dir: Optional[str] = None, enable_disk_cache: bool = True):
self.cache_dir = Path(cache_dir) if cache_dir else None
self.enable_disk_cache = enable_disk_cache
if self.cache_dir and enable_disk_cache:
self.cache_dir.mkdir(parents=True, exist_ok=True)
[docs]
def get(self, cache_type: str, key: str) -> Optional[Any]:
"""Get from global cache first, then fallback to disk if enabled"""
# Fast path: global memory cache
with _CACHE_LOCK:
if key in _GLOBAL_CACHES.get(cache_type, {}):
return _GLOBAL_CACHES[cache_type][key]
# Fallback: disk cache
if self.enable_disk_cache and self.cache_dir:
return self._load_from_disk(cache_type, key)
return None
[docs]
def set(self, cache_type: str, key: str, value: Any) -> None:
"""Store in global cache and optionally on disk"""
# Always store in global cache for speed
with _CACHE_LOCK:
if cache_type not in _GLOBAL_CACHES:
_GLOBAL_CACHES[cache_type] = {}
_GLOBAL_CACHES[cache_type][key] = value
# Optionally store on disk for persistence
if self.enable_disk_cache and self.cache_dir:
self._save_to_disk(cache_type, key, value)
def _load_from_disk(self, cache_type: str, key: str) -> Optional[Any]:
"""Load from disk cache using pickle for speed"""
cache_file = self.cache_dir / f"{cache_type}_{key}{FileOperation.PICKLE_EXT.value}"
if not cache_file.exists():
return None
try:
with open(cache_file, FileOperation.READ_BINARY.value) as f:
value = pickle.load(f)
# Also store in global cache for next access
with _CACHE_LOCK:
if cache_type not in _GLOBAL_CACHES:
_GLOBAL_CACHES[cache_type] = {}
_GLOBAL_CACHES[cache_type][key] = value
return value
except (pickle.UnpicklingError, EOFError, KeyError, AttributeError):
# Remove corrupted cache file
try:
cache_file.unlink()
except OSError:
pass
return None
def _save_to_disk(self, cache_type: str, key: str, value: Any) -> None:
"""Save to disk cache using pickle for speed"""
cache_file = self.cache_dir / f"{cache_type}_{key}{FileOperation.PICKLE_EXT.value}"
try:
with open(cache_file, FileOperation.WRITE_BINARY.value) as f:
pickle.dump(value, f, protocol=pickle.HIGHEST_PROTOCOL)
except (OSError, pickle.PicklingError):
pass # Don't fail if we can't cache
[docs]
@staticmethod
def clear_global_caches():
"""Clear all global caches"""
with _CACHE_LOCK:
for cache_type in _GLOBAL_CACHES:
_GLOBAL_CACHES[cache_type].clear()
# Create a default cache manager instance
_default_cache_manager = None
[docs]
def get_cache_manager(cache_dir: Optional[str] = None) -> CacheManager:
"""Get or create the default cache manager"""
global _default_cache_manager # pylint: disable=global-statement
if _default_cache_manager is None or (cache_dir and _default_cache_manager.cache_dir != Path(cache_dir)):
_default_cache_manager = CacheManager(
cache_dir or os.path.join(os.path.expanduser("~"), ".sloth_cache")
)
return _default_cache_manager
# ====================== Metadata Parsers ======================
[docs]
class DictionaryParser:
"""Parses mmCIF dictionary files"""
[docs]
def __init__(self, cache_manager: CacheManager, quiet: bool = False):
self.cache_manager = cache_manager
self.quiet = quiet
self.source = None
[docs]
def parse(self, source: Union[str, Path]) -> Dict[str, Any]: # pylint: disable=arguments-renamed
"""Parse dictionary from path (renamed from 'source' to 'dict_path' for clarity)"""
dict_path = source # Keep parent signature, use clearer name internally
self.source = dict_path
if not dict_path or not Path(dict_path).exists():
return self._empty_dict()
cache_key = self._generate_cache_key(dict_path)
cached = self.cache_manager.get(CacheType.DICTIONARY.value, cache_key)
if cached:
if not self.quiet:
print("π¦ Using cached dictionary data")
return cached
if not self.quiet:
print("π Parsing dictionary...")
with open(dict_path, FileOperation.READ.value, encoding='utf-8') as f:
content = f.read()
return self._parse_content(content, dict_path, cache_key)
def _empty_dict(self) -> Dict[str, Any]:
"""Return empty dictionary structure"""
return {
DictDataType.CATEGORIES.value: {},
DictDataType.ITEMS.value: {},
DictDataType.RELATIONSHIPS.value: [],
DictDataType.ENUMERATIONS.value: {},
DictDataType.ITEM_TYPES.value: {}
}
def _generate_cache_key(self, dict_path: Union[str, Path]) -> str:
"""Generate cache key based on file path and modification time"""
dict_path_resolved = str(Path(dict_path).resolve())
mtime = os.path.getmtime(dict_path)
return f"dict_{hashlib.md5(f'{dict_path_resolved}_{mtime}'.encode()).hexdigest()}"
def _parse_content(self, content: str, dict_path: str, cache_key: str) -> Dict[str, Any]:
"""Parse dictionary content and process frames"""
frames = re.split(r'\nsave_', content)
parser = SaveFrameParser(self.quiet)
processor = FrameDataProcessor(self.quiet)
# Process each save frame
for frame_content in frames[1:]:
frame_data = parser.parse_save_frame(frame_content)
processor.process_frame(frame_data)
# Parse tabular data using MMCIF parser
tabular_parser = TabularDataParser(self.quiet)
tabular_parser.parse_tabular_data(dict_path, processor)
# Extract primary keys
primary_keys = PrimaryKeyExtractor.extract(processor.categories)
result = {
DictDataType.CATEGORIES.value: processor.categories,
DictDataType.ITEMS.value: processor.items,
DictDataType.RELATIONSHIPS.value: processor.relationships,
DictDataType.ENUMERATIONS.value: processor.enumerations,
DictDataType.ITEM_TYPES.value: tabular_parser.item_types,
DictDataType.PRIMARY_KEYS.value: primary_keys
}
# Debug output
if not self.quiet:
print(f"π Parsed {len(processor.categories)} categories, {len(processor.items)} items")
print(f"π Found {len(primary_keys)} primary keys:")
for cat, key in primary_keys.items():
print(f" - {cat}: {key}")
# Store in unified cache
self.cache_manager.set(CacheType.DICTIONARY.value, cache_key, result)
return result
class SaveFrameParser:
"""Parses individual save frames from dictionary files"""
def __init__(self, quiet: bool = False):
self.quiet = quiet
def parse_save_frame(self, frame_content: str) -> Dict[str, Any]:
"""Parse a single save frame into structured data"""
lines = frame_content.strip().split('\n')
if not lines:
return {}
frame_data = {}
i = 1
while i < len(lines):
line = lines[i].strip()
if not line or line.startswith(FrameMarker.HASH.value):
i += 1
continue
if line == FrameMarker.SAVE_END.value:
break
if line.startswith(FrameMarker.UNDERSCORE.value) and i + 1 < len(lines) and lines[i + 1].strip().startswith(FrameMarker.MULTILINE_DELIMITER.value):
frame_data.update(self._parse_multiline(lines, i))
i = frame_data.pop(LoopDataKey.NEXT_INDEX.value)
continue
if line.startswith(FrameMarker.UNDERSCORE.value):
frame_data.update(self._parse_key_value(line))
i += 1
continue
if line == FrameMarker.LOOP_START.value:
loop_data, new_index = self._parse_loop(lines, i + 1)
frame_data.setdefault(LoopDataKey.LOOP_DATA.value, []).append(loop_data)
i = new_index
continue
i += 1
return frame_data
def _parse_multiline(self, lines: List[str], index: int) -> Dict[str, Any]:
"""Parse multiline text blocks"""
key = lines[index].strip().strip(FrameMarker.UNDERSCORE.value)
# The opening ';' may carry trailing text (e.g. "; description...")
opening_line = lines[index + 1].strip()
first_line_text = opening_line[1:] # everything after the leading ';'
i = index + 2 # skip key line and opening ';' line
multiline_content = [first_line_text] if first_line_text.strip() else []
while i < len(lines):
if lines[i].strip() == FrameMarker.MULTILINE_DELIMITER.value:
break
multiline_content.append(lines[i])
i += 1
return {
key: '\n'.join(multiline_content).strip(),
LoopDataKey.NEXT_INDEX.value: i + 1
}
def _parse_key_value(self, line: str) -> Dict[str, str]:
"""Parse simple key-value pairs"""
parts = line.split(None, 1)
key = parts[0].strip(FrameMarker.UNDERSCORE.value)
value = parts[1].strip().strip(f'{FileOperation.DOUBLE_QUOTE.value}{FileOperation.SINGLE_QUOTE.value}') if len(parts) == 2 else DataValue.EMPTY_STRING.value
return {key: value}
def _parse_loop(self, lines: List[str], start_index: int) -> Tuple[Dict[str, Any], int]:
"""Parse loop structures"""
i = start_index
loop_headers = []
# Collect loop headers
while i < len(lines) and lines[i].strip().startswith(FrameMarker.UNDERSCORE.value):
loop_headers.append(lines[i].strip().strip(FrameMarker.UNDERSCORE.value))
i += 1
# Collect loop data
loop_data = []
while i < len(lines):
line = lines[i].strip()
if not line or line.startswith(FrameMarker.HASH.value) or line in (FrameMarker.SAVE_END.value, FrameMarker.LOOP_START.value) or line.startswith(FrameMarker.UNDERSCORE.value):
break
try:
row_data = shlex.split(line)
except ValueError:
row_data = line.split()
if len(row_data) >= len(loop_headers):
loop_data.append(row_data)
i += 1
# Format loop data
loop_items = []
for row in loop_data:
row_data = {}
for j, header in enumerate(loop_headers):
if j < len(row):
row_data[header] = row[j].strip(f'{FileOperation.DOUBLE_QUOTE.value}{FileOperation.SINGLE_QUOTE.value}')
loop_items.append(row_data)
return {
LoopDataKey.HEADERS.value: loop_headers,
LoopDataKey.ITEMS.value: loop_items
}, i
class FrameDataProcessor:
"""Processes parsed frame data into dictionary structures"""
def __init__(self, quiet: bool = False):
self.quiet = quiet
self.categories = {}
self.items = {}
self.relationships = []
self.enumerations = {}
def process_frame(self, frame_data: Dict[str, Any]):
"""Process a single frame's data"""
if LoopDataKey.LOOP_DATA.value in frame_data:
self._process_loop_frame(frame_data)
else:
self._process_non_loop_frame(frame_data)
def _process_loop_frame(self, frame_data: Dict[str, Any]):
"""Process frames with loop data (may contain multiple loops)"""
loops = frame_data[LoopDataKey.LOOP_DATA.value]
# Flatten all loop items across all loops in this frame
for loop_info in loops:
for loop_item in loop_info[LoopDataKey.ITEMS.value]:
combined_data = {**frame_data, **loop_item}
self._classify_data(combined_data)
def _process_non_loop_frame(self, frame_data: Dict[str, Any]):
"""Process frames without loop data"""
self._classify_data(frame_data)
def _classify_data(self, data: Dict[str, Any]):
"""Classify data into categories, items, or relationships"""
if DictItemKey.CATEGORY_ID.value in data:
self.categories[data[DictItemKey.CATEGORY_ID.value]] = data
elif DictItemKey.ITEM_NAME.value in data:
item_name = data[DictItemKey.ITEM_NAME.value].strip(f'{FileOperation.DOUBLE_QUOTE.value}{FileOperation.SINGLE_QUOTE.value}')
self.items[item_name] = data
self._process_enumeration(data, item_name)
elif RelationshipKey.ITEM_LINKED_CHILD_NAME.value in data and RelationshipKey.ITEM_LINKED_PARENT_NAME.value in data:
self.relationships.append(data)
elif RelationshipKey.PDBX_CHILD_CATEGORY_ID.value in data:
self._process_group_list(data)
def _process_enumeration(self, data: Dict[str, Any], item_name: str):
"""Process enumeration values if present"""
if DictItemKey.ITEM_ENUMERATION_VALUE.value in data:
values = data[DictItemKey.ITEM_ENUMERATION_VALUE.value]
if isinstance(values, str):
values = [values]
self.enumerations[item_name] = values
def _process_group_list(self, data: Dict[str, Any]):
"""Process pdbx_item_linked_group_list entries"""
child_cat = data.get(RelationshipKey.PDBX_CHILD_CATEGORY_ID.value)
child_name = data.get(RelationshipKey.PDBX_CHILD_NAME.value)
parent_name = data.get(RelationshipKey.PDBX_PARENT_NAME.value)
parent_cat = data.get(RelationshipKey.PDBX_PARENT_CATEGORY_ID.value)
if child_cat and child_name and parent_name and parent_cat:
self.relationships.append({
RelationshipKey.CHILD_CATEGORY.value: child_cat,
RelationshipKey.CHILD_NAME.value: child_name,
RelationshipKey.PARENT_CATEGORY.value: parent_cat,
RelationshipKey.PARENT_NAME.value: parent_name
})
class TabularDataParser:
"""Parses tabular data from dictionary files"""
def __init__(self, quiet: bool = False):
self.quiet = quiet
self.item_types = {}
def parse_tabular_data(self, dict_path: str, processor: FrameDataProcessor):
"""Parse tabular data using MMCIFParser"""
try:
from .parser import MMCIFParser
parser = MMCIFParser()
container = parser.parse(dict_path)
self._process_item_types(container)
self._process_linked_groups(container, processor)
except (KeyError, ValueError, AttributeError) as e:
if not self.quiet:
print(f"Warning: Could not parse tabular data: {e}")
import traceback
traceback.print_exc()
def _process_item_types(self, container):
"""Extract item type information"""
if TabularDataCategory.ITEM_TYPE_LIST.value in container[0].data:
type_list = container[0].data[TabularDataCategory.ITEM_TYPE_LIST.value]
for i in range(type_list.row_count):
row = type_list[i].data
code = row.get(TabularDataField.CODE.value)
if code:
self.item_types[code] = row
def _process_linked_groups(self, container, processor):
"""Extract relationships from pdbx_item_linked_group_list"""
if TabularDataCategory.PDBX_ITEM_LINKED_GROUP_LIST.value in container[0].data:
linked_list = container[0].data[TabularDataCategory.PDBX_ITEM_LINKED_GROUP_LIST.value]
if not self.quiet:
print(f"π Found {linked_list.row_count} relationships in dictionary")
for i in range(linked_list.row_count):
row = linked_list[i].data
child_cat = row.get(TabularDataField.CHILD_CATEGORY_ID.value)
child_name = row.get(TabularDataField.CHILD_NAME.value, DataValue.EMPTY_STRING.value).strip(FileOperation.DOUBLE_QUOTE.value)
parent_name = row.get(TabularDataField.PARENT_NAME.value, DataValue.EMPTY_STRING.value).strip(FileOperation.DOUBLE_QUOTE.value)
parent_cat = row.get(TabularDataField.PARENT_CATEGORY_ID.value)
if child_cat and child_name and parent_name and parent_cat:
processor.relationships.append({
RelationshipKey.CHILD_CATEGORY.value: child_cat,
RelationshipKey.CHILD_NAME.value: child_name,
RelationshipKey.PARENT_CATEGORY.value: parent_cat,
RelationshipKey.PARENT_NAME.value: parent_name
})
class PrimaryKeyExtractor:
"""Extracts primary key information from categories"""
@staticmethod
def extract(categories: Dict[str, Any]) -> Dict[str, Union[str, List[str]]]:
"""Extract primary keys from category data"""
primary_keys = {}
for cat_name, cat_data in categories.items():
key_items = []
# Check for direct key field
if DictItemKey.CATEGORY_KEY_NAME.value in cat_data:
key_item = cat_data[DictItemKey.CATEGORY_KEY_NAME.value].strip(f'{FileOperation.DOUBLE_QUOTE.value}{FileOperation.SINGLE_QUOTE.value}')
if key_item:
key_items.append(key_item)
# Check for composite keys in loop data
if LoopDataKey.LOOP_DATA.value in cat_data:
loops = cat_data[LoopDataKey.LOOP_DATA.value]
for loop_data in loops:
for item in loop_data[LoopDataKey.ITEMS.value]:
if DictItemKey.CATEGORY_KEY_NAME.value in item:
key_item = item[DictItemKey.CATEGORY_KEY_NAME.value].strip(f'{FileOperation.DOUBLE_QUOTE.value}{FileOperation.SINGLE_QUOTE.value}')
if key_item and key_item not in key_items:
key_items.append(key_item)
# Process found key items
if key_items:
fields = []
for key_item in key_items:
if key_item.startswith(FrameMarker.UNDERSCORE.value) and DataValue.DOT.value in key_item:
field_name = key_item.split(DataValue.DOT.value)[-1]
fields.append(field_name)
if fields:
primary_keys[cat_name] = fields[0] if len(fields) == 1 else fields
return primary_keys
# ====================== Mapping Generator ======================
[docs]
class MappingGenerator:
"""Generates mapping rules from mmCIF dictionary metadata"""
[docs]
def __init__(
self,
dict_parser: DictionaryParser,
cache_manager: CacheManager,
quiet: bool = False
):
self.dict_parser = dict_parser
self.cache_manager = cache_manager
self.quiet = quiet
self._mapping_rules = None
[docs]
def get_mapping_rules(self) -> Dict[str, Any]:
if self._mapping_rules is not None:
return self._mapping_rules
cache_key = self._generate_cache_key()
cached = self.cache_manager.get(CacheType.MAPPING_RULES.value, cache_key)
if cached:
self._mapping_rules = cached
if not self.quiet:
print("π¦ Using cached mapping rules")
return cached
if not self.quiet:
print("π§© Generating mapping rules...")
dict_meta = self.dict_parser.parse(self.dict_parser.source)
self._mapping_rules = self._generate_mapping(dict_meta)
self.cache_manager.set(CacheType.MAPPING_RULES.value, cache_key, self._mapping_rules)
return self._mapping_rules
def _generate_cache_key(self) -> str:
"""Generate cache key based on source files and modification times"""
cache_key_parts = []
if self.dict_parser.source and Path(self.dict_parser.source).exists():
dict_path = str(Path(self.dict_parser.source).resolve())
dict_mtime = os.path.getmtime(self.dict_parser.source)
cache_key_parts.append(f"dict_{dict_path}_{dict_mtime}")
return f"mapping_{hashlib.md5('|'.join(cache_key_parts).encode()).hexdigest()}"
def _generate_mapping(
self,
dict_meta: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate complete mapping rules from dictionary metadata"""
builder = MappingBuilder(dict_meta)
builder.build_primary_mappings()
builder.build_foreign_key_map()
return {
MappingDataKey.CATEGORY_MAPPING.value: builder.category_mapping,
MappingDataKey.ITEM_MAPPING.value: builder.item_mapping,
MappingDataKey.FK_MAP.value: builder.fk_map,
DictDataType.PRIMARY_KEYS.value: dict_meta.get(DictDataType.PRIMARY_KEYS.value, {})
}
class MappingBuilder:
"""Builds mapping rules from mmCIF dictionary metadata"""
def __init__(self, dict_meta: Dict[str, Any]):
self.dict_meta = dict_meta
self.category_mapping = {}
self.item_mapping = {}
self.fk_map = {}
self._temp_fk_relationships = {} # Stores all FK candidates before priority selection
def build_primary_mappings(self):
"""Build primary category and item mappings"""
for cat_name, cat_data in self.dict_meta[DictDataType.CATEGORIES.value].items():
self._process_category(cat_name, cat_data)
def _process_category(self, cat_name: str, _cat_data: Dict[str, Any]):
"""Process a single category from dictionary metadata"""
# Get all items for this category
cat_items = self._get_category_items(cat_name)
# Create category mapping
self.category_mapping[cat_name] = {MappingDataKey.FIELDS.value: sorted(list(cat_items))}
# Map individual items
self.item_mapping[cat_name] = {}
for field_name in cat_items:
self._map_item(cat_name, field_name)
def _get_category_items(self, cat_name: str) -> Set[str]:
"""Get all item names for a category"""
cat_items = set()
for item_name in self.dict_meta[DictDataType.ITEMS.value]:
if item_name.startswith(f"{FrameMarker.UNDERSCORE.value}{cat_name}{DataValue.DOT.value}"):
field_name = item_name[len(f"{FrameMarker.UNDERSCORE.value}{cat_name}{DataValue.DOT.value}"):]
cat_items.add(field_name)
return cat_items
def _map_item(self, cat_name: str, field_name: str):
"""Map a single item from dictionary metadata"""
item_name = f"_{cat_name}.{field_name}"
item_data = self.dict_meta[DictDataType.ITEMS.value].get(item_name, {})
# Create item mapping
self.item_mapping[cat_name][field_name] = {
MappingDataKey.TYPE.value: item_data.get(DictItemKey.ITEM_TYPE_CODE.value, DataValue.EMPTY_STRING.value),
MappingDataKey.ENUM.value: self.dict_meta[DictDataType.ENUMERATIONS.value].get(item_name),
MappingDataKey.DESCRIPTION.value: item_data.get(DictItemKey.ITEM_DESCRIPTION.value, DataValue.EMPTY_STRING.value)
}
def build_foreign_key_map(self):
"""Build foreign key mapping from relationships"""
# First pass: collect all relationships
for rel in self.dict_meta[DictDataType.RELATIONSHIPS.value]:
self._process_relationship(rel)
# Second pass: resolve collisions using priority-based selection
self._resolve_fk_collisions()
def _process_relationship(self, rel: Dict[str, Any]):
"""Process a single relationship entry and store for collision resolution"""
# Extract relationship data (both formats)
child_name = rel.get(RelationshipKey.ITEM_LINKED_CHILD_NAME.value) or rel.get(RelationshipKey.CHILD_NAME.value)
parent_name = rel.get(RelationshipKey.ITEM_LINKED_PARENT_NAME.value) or rel.get(RelationshipKey.PARENT_NAME.value)
child_cat = rel.get(RelationshipKey.CHILD_CATEGORY.value)
parent_cat = rel.get(RelationshipKey.PARENT_CATEGORY.value)
if not child_name or not parent_name:
return
# Extract category and field names (unified for both formats)
if child_cat and parent_cat:
# Explicit categories provided
child_field = self._extract_field_name(child_name)
parent_field = self._extract_field_name(parent_name)
else:
# Extract from dotted notation
child_parts = child_name.strip("_").split(".")
parent_parts = parent_name.strip("_").split(".")
if len(child_parts) != 2 or len(parent_parts) != 2:
return
child_cat, child_field = child_parts
parent_cat, parent_field = parent_parts
# Store relationship (may be multiple per child field)
child_key = (child_cat, child_field)
parent_value = (parent_cat, parent_field)
if child_key not in self._temp_fk_relationships:
self._temp_fk_relationships[child_key] = []
self._temp_fk_relationships[child_key].append(parent_value)
def _extract_field_name(self, name: str) -> str:
"""Extract field name from full item name"""
return name.strip("_").split(".")[-1] if "." in name else name
def _resolve_fk_collisions(self):
"""Resolve FK collisions using priority-based selection.
When multiple foreign key targets exist for the same child field,
select the "most primary" parent using these heuristics:
1. Prefer parents with simpler names (fewer underscores)
2. Prefer non-prefixed names over pdbx_/cif_ prefixed names
3. Prefer shorter category names (more general)
This ensures that subtype patterns like entity_poly -> entity
and pdbx_entity_nonpoly -> entity are correctly identified.
"""
for child_key, parent_candidates in self._temp_fk_relationships.items():
if len(parent_candidates) == 1:
# No collision, use the single relationship
self.fk_map[child_key] = parent_candidates[0]
else:
# Collision detected - select best parent by priority
best_parent = self._select_primary_parent(parent_candidates)
self.fk_map[child_key] = best_parent
def _select_primary_parent(self, candidates: List[Tuple[str, str]]) -> Tuple[str, str]:
"""Select the most primary parent from multiple candidates.
Priority rules (lower score = more primary):
1. Count underscores in category name (fewer is better)
2. Penalize pdbx_/cif_ prefixes (indicates extension tables)
3. Prefer shorter names (more general concepts)
"""
def parent_priority_score(parent: Tuple[str, str]) -> Tuple[int, int, int]:
parent_cat, _parent_field = parent
# Count underscores (fewer = more primary)
underscore_count = parent_cat.count('_')
# Penalize extension prefixes
has_prefix = 1 if parent_cat.startswith((CategoryPrefix.PDBX.value, CategoryPrefix.CIF.value, CategoryPrefix.RCSB.value)) else 0
# Prefer shorter names (more general)
name_length = len(parent_cat)
return (underscore_count, has_prefix, name_length)
# Sort by priority score (lowest = best)
return min(candidates, key=parent_priority_score)
# ====================== Relationship Resolver ======================
[docs]
class RelationshipResolver:
"""Resolves entity relationships for nested JSON output from mmCIF data"""
[docs]
def __init__(
self,
mapping_generator: MappingGenerator
):
self.mapping_generator = mapping_generator
self.denormalize = False # Will be set via setter if needed
self.ownership_analyzer = OwnershipAnalyzer(self.mapping_generator)
self.nesting_builder = NestingBuilder()
[docs]
def set_denormalize(self, value: bool):
"""Enable/disable full denormalization mode"""
self.denormalize = value
@property
def mapping_rules(self) -> Dict[str, Any]:
"""Cached access to mapping rules"""
return self.mapping_generator.get_mapping_rules()
[docs]
def resolve_relationships(self, mmcif_data: MMCIFDataContainer) -> Dict[str, Any]:
"""Resolve relationships directly from mmCIF data to create nested JSON"""
# Convert mmCIF container to flat dict
flat = self._flatten_mmcif(mmcif_data)
# Get mapping rules
mapping = self.mapping_rules
fk_map = mapping[MappingDataKey.FK_MAP.value]
primary_keys = mapping.get(DictDataType.PRIMARY_KEYS.value, {})
# Separate FK map into ownership vs reference relationships
ownership_fk_map, reference_fk_map = self.ownership_analyzer.filter_ownership_relationships(fk_map, flat)
# Build nested structure
# If denormalize=True, pass reference_fk_map for reverse nesting
return self.nesting_builder.build_nested_structure(
flat,
ownership_fk_map,
primary_keys,
reference_fk_map if self.denormalize else {}
)
def _flatten_mmcif(self, mmcif_data: MMCIFDataContainer) -> Dict[str, Any]:
"""Convert mmCIF container to flat dictionary structure"""
flat = {}
for block in mmcif_data:
for category_name, category in block.data.items():
# Remove underscore prefix from category name
entity_name = category_name.lstrip('_')
# Convert each row to a dictionary
for row in category:
row_data = row.data
flat.setdefault(entity_name, []).append(row_data)
return flat
class ConstraintExtractor:
"""Extracts formal relationship constraints from dictionary metadata"""
def __init__(self, dict_meta: Dict[str, Any], data: Optional[Dict[str, Any]] = None):
self.dict_meta = dict_meta
self.data = data # Actual data for cardinality analysis
def extract_constraints(self) -> List[RelationshipConstraint]:
"""Extract formal relationship constraints from dictionary"""
constraints = []
for rel in self.dict_meta.get(DictDataType.RELATIONSHIPS.value, []):
metadata = self._build_relationship_metadata(rel)
if metadata:
constraints.append(RelationshipConstraint(metadata))
return constraints
def _build_relationship_metadata(self, rel: Dict[str, Any]) -> Optional[RelationshipMetadata]:
"""Build relationship metadata from dictionary entry"""
# Extract relationship data
child_name = rel.get(RelationshipKey.ITEM_LINKED_CHILD_NAME.value) or rel.get(RelationshipKey.CHILD_NAME.value)
parent_name = rel.get(RelationshipKey.ITEM_LINKED_PARENT_NAME.value) or rel.get(RelationshipKey.PARENT_NAME.value)
child_cat = rel.get(RelationshipKey.CHILD_CATEGORY.value)
parent_cat = rel.get(RelationshipKey.PARENT_CATEGORY.value)
if not child_name or not parent_name:
return None
# Extract field names
if child_cat and parent_cat:
child_field = self._extract_field_name(child_name)
parent_field = self._extract_field_name(parent_name)
else:
# Dotted notation format (e.g. _category.field)
child_parts = child_name.strip("_").split(".")
parent_parts = parent_name.strip("_").split(".")
if len(child_parts) != 2 or len(parent_parts) != 2:
return None
child_cat, child_field = child_parts
parent_cat, parent_field = parent_parts
# Determine relationship type from dictionary metadata
rel_type = self._determine_relationship_type(rel, child_cat, child_field, parent_cat)
return RelationshipMetadata(
child_cat=child_cat,
child_field=child_field,
parent_cat=parent_cat,
parent_field=parent_field,
relationship_type=rel_type
)
def _extract_field_name(self, name: str) -> str:
"""Extract field name from full item name"""
return name.strip("_").split(".")[-1] if "." in name else name
def _determine_relationship_type(self, rel: Dict, child_cat: str,
child_field: str, parent_cat: str) -> RelationshipType:
"""Determine relationship type using dictionary metadata and naming patterns"""
description = rel.get(MappingDataKey.DESCRIPTION.value, DataValue.EMPTY_STRING.value).lower()
# Check explicit indicators in dictionary description
ownership_terms = [RelationshipTerm.BELONGS_TO.value, RelationshipTerm.OWNED_BY.value,
RelationshipTerm.PART_OF.value, RelationshipTerm.CONTAINED_IN.value,
RelationshipTerm.MEMBER_OF.value]
reference_terms = [RelationshipTerm.REFERS_TO.value, RelationshipTerm.REFERENCES.value,
RelationshipTerm.LOOKUP.value, RelationshipTerm.TYPE_OF.value,
RelationshipTerm.CODE_FOR.value, RelationshipTerm.CATEGORY_OF.value]
if any(term in description for term in ownership_terms):
return RelationshipType.COMPOSITIONAL
if any(term in description for term in reference_terms):
return RelationshipType.REFERENTIAL
# Naming pattern analysis: field names indicate relationship type
lookup_suffixes = ['_type', '_code', '_symbol', '_method', '_class']
if any(child_field.endswith(suffix) for suffix in lookup_suffixes):
return RelationshipType.REFERENTIAL
# Category hierarchy pattern: parent name in child name suggests composition
if child_field == f'{parent_cat}_id' or (parent_cat in child_cat and parent_cat != child_cat):
return RelationshipType.COMPOSITIONAL
# Lookup table pattern: parent category name suggests reference
lookup_patterns = ['_type', '_class', '_method', '_status', '_code', '_symbol',
'_enum', '_dict', '_list', '_table', '_ref']
if any(pattern in parent_cat for pattern in lookup_patterns):
return RelationshipType.REFERENTIAL
return RelationshipType.UNKNOWN
[docs]
class OwnershipAnalyzer:
"""
Analyzes relationships to determine ownership using structural algorithms.
Rule Hierarchy (applied in order):
1. PK extension (CORE STRUCTURAL - child PK extends parent PK)
2. Single-FK child (CORE STRUCTURAL - detail table with one FK)
3. Strong FK dependency (STRUCTURAL/SEMANTIC - mandatory FK + name tokens)
4. Explicit constraint type (dictionary metadata)
"""
[docs]
def __init__(self, mapping_generator: MappingGenerator):
self.mapping_generator = mapping_generator
self.constraints: List[RelationshipConstraint] = []
self.decision_log: List[Dict[str, Any]] = [] # Audit trail of decisions
[docs]
def filter_ownership_relationships(self, fk_map: Dict, data: Dict) -> tuple:
"""
Filter FK map to separate ownership vs reference relationships.
Returns:
tuple: (ownership_fk_map, reference_fk_map)
- ownership_fk_map: Compositional relationships (child owned by parent)
- reference_fk_map: Referential/lookup relationships (child references parent)
"""
# Get dictionary metadata for structural analysis
dict_meta = self.mapping_generator.dict_parser.parse(
self.mapping_generator.dict_parser.source
)
primary_keys = dict_meta.get(DictDataType.PRIMARY_KEYS.value, {})
# Extract formal constraints from dictionary with actual data for cardinality analysis
extractor = ConstraintExtractor(dict_meta, data)
self.constraints = extractor.extract_constraints()
# Validate constraints against actual data
for constraint in self.constraints:
constraint.is_validated = constraint.validate(data)
# Separate FK map into ownership vs reference relationships
ownership_fk_map = {}
reference_fk_map = {}
for (child_cat, child_field), (parent_cat, parent_field) in fk_map.items():
if self._is_ownership_relationship(
child_cat, child_field, parent_cat, parent_field, fk_map, primary_keys, dict_meta
):
ownership_fk_map[(child_cat, child_field)] = (parent_cat, parent_field)
else:
# This is a reference/lookup relationship
reference_fk_map[(child_cat, child_field)] = (parent_cat, parent_field)
return ownership_fk_map, reference_fk_map
def _is_ownership_relationship(
self,
child_cat: str,
child_field: str,
parent_cat: str,
parent_field: str,
fk_map: Dict,
primary_keys: Dict[str, Union[str, List[str]]],
dict_meta: Dict
) -> bool:
"""
Determine ownership using structural analysis of the schema.
Ownership is defined structurally:
1. Child's PK extends parent's PK (key structure hierarchy)
2. Child has single FK and no outgoing references (detail table)
3. FK field is mandatory and references parent's PK (strong dependency)
4. Explicit relationship type from validated constraints
All rules are generic and derived from the dictionary schema.
"""
# Rule 1: Structural ownership via key extension
if self._is_ownership_structural(child_cat, child_field, parent_cat, parent_field, primary_keys):
return True
# Rule 2: Single-FK child ownership (detail/dependent table)
# CORE STRUCTURAL RULE
if self._is_single_fk_child_ownership(child_cat, child_field, parent_cat, fk_map, primary_keys):
return True
# Rule 3: Strong FK dependency (mandatory FK to parent's PK)
# STRUCTURAL/SEMANTIC RULE (uses mandatory flag + name tokens)
if self._is_strong_fk_dependency(child_cat, child_field, parent_cat, parent_field, primary_keys, dict_meta):
return True
# Rule 4: Check explicit relationship type from validated constraints
for constraint in self.constraints:
meta = constraint.metadata
if (meta.child_cat == child_cat and meta.child_field == child_field and
meta.parent_cat == parent_cat and meta.parent_field == parent_field):
if constraint.is_validated:
return meta.relationship_type == RelationshipType.COMPOSITIONAL
# Default: not ownership
return False
def _is_ownership_structural(
self,
child_cat: str,
child_field: str,
parent_cat: str,
parent_field: str,
primary_keys: Dict[str, Union[str, List[str]]]
) -> bool:
"""
CORE STRUCTURAL RULE: Child's PK extends parent's PK.
Confidence: HIGH (pure PK/FK structure analysis)
Example: parent PK = {id}, child PK = {id, ordinal}
This indicates child is a detail/component of parent.
Algorithm:
1. Normalize PKs to sets
2. Verify FK references parent's PK
3. Verify FK is part of child's PK
4. Verify parent PK β child PK and parent PK β child PK
Pure structural analysis - no category names, no hardcoding.
Deterministic for a given dictionary.
"""
# Normalize PKs to sets
def norm_pk(pk):
if pk is None:
return set()
if isinstance(pk, str):
return {pk}
return set(pk)
pk_child = norm_pk(primary_keys.get(child_cat))
pk_parent = norm_pk(primary_keys.get(parent_cat))
# No primary key info β can't determine structurally
if not pk_child or not pk_parent:
return False
# The FK must hit the parent's PK
if parent_field not in pk_parent:
return False
# The child FK must be part of the child's PK
if child_field not in pk_child:
return False
# Ownership: child's PK extends parent's PK
# Parent's PK is subset of child's PK, and they're not identical
return pk_parent.issubset(pk_child) and pk_child != pk_parent
def _is_single_fk_child_ownership(
self,
child_cat: str,
child_field: str,
parent_cat: str,
fk_map: Dict[Tuple[str, str], Tuple[str, str]],
primary_keys: Dict[str, Union[str, List[str]]]
) -> bool:
"""
CORE STRUCTURAL RULE: Category with exactly one FK is owned by that parent.
Confidence: HIGH (pure FK/PK graph structure)
Algorithm:
1. Count FKs where child_cat is the child (must be exactly 1)
2. Verify it's the relationship being examined
3. Check if child has no PK β detail table β ownership
4. If child has PK, check if it's a surrogate key:
- Single-field PK only
- Not referenced as parent elsewhere
This catches detail/dependent tables that:
- Have only one foreign key relationship
- Have no PK, or have a surrogate PK not referenced elsewhere
Pure structural analysis - generic across all dictionaries.
Deterministic for a given dictionary.
"""
# Find all FKs where this category is the child
child_fks = [
(c_field, p_cat, p_field)
for (c_cat, c_field), (p_cat, p_field) in fk_map.items()
if c_cat == child_cat
]
# Must have exactly one FK
if len(child_fks) != 1:
return False
# Ensure it's the relationship we're examining
only_field, only_parent_cat, _ = child_fks[0]
if only_field != child_field or only_parent_cat != parent_cat:
return False
pk = primary_keys.get(child_cat)
# No PK β treat as detail table owned by parent
if not pk:
return True
# PK exists - check if it's a surrogate key
if isinstance(pk, str):
pk_fields = {pk}
else:
pk_fields = set(pk)
# Multi-field PK β likely independent entity
if len(pk_fields) != 1:
return False
pk_field = next(iter(pk_fields))
# Check if this PK field is referenced as a parent elsewhere
# If not, it's just a surrogate key for this detail table
referenced_as_parent = any(
(p_cat == child_cat and p_field == pk_field)
for (_, _), (p_cat, p_field) in fk_map.items()
)
# Not referenced β surrogate key β detail table β ownership
return not referenced_as_parent
def _is_strong_fk_dependency(
self,
child_cat: str,
child_field: str,
parent_cat: str,
parent_field: str,
primary_keys: Dict[str, Union[str, List[str]]],
dict_meta: Dict
) -> bool:
"""
STRUCTURAL/SEMANTIC RULE: FK is mandatory and references parent's PK.
Confidence: MEDIUM (uses mandatory flag + name token matching)
Algorithm:
1. Verify FK references parent's PK
2. Check item.mandatory_code in dictionary
3. Perform token-based name matching (generic, not hardcoded)
Derives ownership from:
1. FK references parent's primary key (structural)
2. Field is mandatory in dictionary (structural)
3. Field name contains a token from parent category name (semantic)
No hardcoded category names - all derived from actual schema.
Deterministic for a given dictionary.
"""
# Normalize parent PK
def norm_pk(pk):
if pk is None:
return set()
if isinstance(pk, str):
return {pk}
return set(pk)
pk_parent = norm_pk(primary_keys.get(parent_cat))
# Rule 1: FK must reference parent's PK
if not pk_parent or parent_field not in pk_parent:
return False
# Rule 2: Check if field is mandatory in dictionary
child_item_name = f'_{child_cat}.{child_field}'
child_item = dict_meta.get(DictDataType.ITEMS.value, {}).get(child_item_name, {})
mandatory = child_item.get(DictItemKey.ITEM_MANDATORY_CODE.value, DataValue.EMPTY_STRING.value).strip().lower()
is_mandatory = BooleanValue.is_true(mandatory)
if not is_mandatory:
# Not mandatory β not a strong dependency
return False
# Rule 3: Field name must contain meaningful token from parent category
# Split both names into tokens (by underscores)
field_tokens = set(child_field.lower().split('_'))
parent_tokens = set(parent_cat.lower().split('_'))
# Remove common non-semantic tokens
non_semantic = SemanticToken.get_non_semantic_tokens()
field_tokens -= non_semantic
parent_tokens -= non_semantic
# Check for token overlap
if field_tokens & parent_tokens: # Intersection
return True
return False
[docs]
class NestingBuilder:
"""Builds nested structure from flat data using relationships"""
[docs]
def build_nested_structure(
self,
flat: Dict[str, Any],
fk_map: Dict,
primary_keys: Dict[str, Any],
reference_fk_map: Dict = None
) -> Dict[str, Any]:
"""
Build nested structure from flat data.
Args:
flat: Flat data dictionary
fk_map: Ownership FK relationships (standard nesting: child in parent)
primary_keys: Primary key definitions
reference_fk_map: Reference/lookup relationships (for denormalization: parent in child)
"""
reference_fk_map = reference_fk_map or {}
# Identify child-only categories
child_only_cats = self._identify_child_only_categories(fk_map, flat, primary_keys)
# Create indexed structure
indexed = self._create_indexed_structure(flat, primary_keys, child_only_cats)
# Standard ownership nesting: child IN parent
self._assign_children(indexed, fk_map, primary_keys)
# Denormalization: reverse-nest reference relationships (parent IN child)
if reference_fk_map:
self._assign_parents_to_children(indexed, reference_fk_map)
# Build top-level structure
return self._build_top_level(indexed)
def _identify_child_only_categories(
self,
fk_map: Dict,
flat: Dict[str, Any],
primary_keys: Dict[str, Any]
) -> Set[str]:
"""Identify categories that are only children with duplicate keys"""
child_only_cats = set()
parent_cats = {p for (c, _), (p, _) in fk_map.items()}
child_cats = {c for (c, _) in fk_map.keys()}
for cat in child_cats:
if cat not in parent_cats:
pk_field = primary_keys.get(cat, 'id')
pk_values = [row.get(pk_field) for row in flat.get(cat, [])]
if len(pk_values) != len(set(pk_values)):
child_only_cats.add(cat)
return child_only_cats
def _create_indexed_structure(
self,
flat: Dict[str, Any],
primary_keys: Dict[str, Any],
child_only_cats: Set[str]
) -> Dict[str, Any]:
"""Create indexed structure from flat data"""
indexed = {}
for entity_name, entity_list in flat.items():
if entity_name in child_only_cats:
# Use index as key for child-only categories
indexed[entity_name] = {str(i): row for i, row in enumerate(entity_list)}
else:
# Use primary key for indexing
pk_field = primary_keys.get(entity_name, 'id')
entity_dict = {}
for row in entity_list:
if isinstance(pk_field, list):
# Composite key β join field values
parts = [str(row.get(f, '')) for f in pk_field]
key = '_'.join(parts) if any(parts) else str(len(entity_dict))
else:
pk_value = row.get(pk_field)
key = str(pk_value) if pk_value is not None else str(len(entity_dict))
entity_dict[key] = row
indexed[entity_name] = entity_dict
return indexed
def _assign_children(
self,
indexed: Dict[str, Any],
fk_map: Dict,
primary_keys: Dict[str, Any] = None
):
"""Assign children to parents using foreign key relationships"""
# Filter FK map to only include relationships where data exists
usable_fk_map = self._filter_usable_relationships(indexed, fk_map)
# Select primary nesting parent for each child from usable relationships
nesting_fk_map = self._select_primary_nesting_parents(usable_fk_map, primary_keys or {})
for (child_cat, child_col), (parent_cat, _parent_col) in nesting_fk_map.items():
for _child_pk, row in indexed.get(child_cat, {}).items():
if fk := row.get(child_col):
if parent := indexed.get(parent_cat, {}).get(str(fk)):
# Ensure nested category names have underscore prefix
nested_cat_name = f"_{child_cat}" if not child_cat.startswith("_") else child_cat
parent.setdefault(nested_cat_name, []).append(row)
def _assign_parents_to_children(
self,
indexed: Dict[str, Any],
reference_fk_map: Dict
):
"""
Reverse-nest reference/lookup relationships for denormalization.
For reference relationships like pdbx_entity_nonpoly.comp_id β chem_comp.id,
embed the parent (chem_comp) data INTO the child (pdbx_entity_nonpoly).
This creates self-contained documents where lookup data is embedded.
"""
usable_refs = self._filter_usable_relationships(indexed, reference_fk_map)
for (child_cat, child_col), (parent_cat, _) in usable_refs.items():
for _child_pk, child_row in indexed.get(child_cat, {}).items():
if fk_value := child_row.get(child_col):
if parent_row := indexed.get(parent_cat, {}).get(str(fk_value)):
# Embed parent data INTO child (reverse direction from standard nesting)
nested_parent_name = f"_{parent_cat}" if not parent_cat.startswith("_") else parent_cat
# Copy parent data, excluding any nested children to avoid deep recursion
parent_copy = {k: v for k, v in parent_row.items() if not k.startswith("_")}
# Embed as a list for consistency with mmCIF structure
child_row.setdefault(nested_parent_name, []).append(parent_copy)
def _filter_usable_relationships(
self,
indexed: Dict[str, Any],
fk_map: Dict
) -> Dict:
"""
Filter FK relationships to only include those where:
1. The child field actually exists in the child data
2. The parent category exists in the data
This ensures we only consider viable nesting relationships.
"""
usable_fk_map = {}
for (child_cat, child_col), (parent_cat, parent_col) in fk_map.items():
# Check if parent category exists
if parent_cat not in indexed:
continue
# Check if any child rows have the FK field
child_data = indexed.get(child_cat, {})
has_fk_field = any(child_col in row for row in child_data.values())
if has_fk_field:
usable_fk_map[(child_cat, child_col)] = (parent_cat, parent_col)
return usable_fk_map
def _select_primary_nesting_parents(self, fk_map: Dict, primary_keys: Dict[str, Any] = None) -> Dict:
"""
When a child has multiple parent relationships, select the primary parent for nesting.
This prevents duplication when a child can nest under multiple parents (e.g., atom_site
has relationships to both entity and struct_asym, but should only nest under struct_asym).
Selection is deterministic from the schema graph β no heuristic scoring:
1. Filter out unjoinable parents (composite PK that a single FK can't match)
2. Among joinable parents, pick the deepest one in the FK ancestry graph
"""
primary_keys = primary_keys or {}
# Group FK relationships by child category
child_to_parents: Dict[str, list] = {}
for (child_cat, child_col), (parent_cat, parent_col) in fk_map.items():
if child_cat not in child_to_parents:
child_to_parents[child_cat] = []
child_to_parents[child_cat].append(((child_cat, child_col), (parent_cat, parent_col)))
# Build parentβparent ancestry from the full FK map for depth resolution
parent_of: Dict[str, set] = {} # cat β set of its parents
for (child_cat, _), (parent_cat, _) in fk_map.items():
parent_of.setdefault(child_cat, set()).add(parent_cat)
# Select primary parent for each child
nesting_fk_map = {}
for child_cat, parents in child_to_parents.items():
if len(parents) == 1:
nesting_fk_map[parents[0][0]] = parents[0][1]
else:
primary = self._choose_primary_parent(parents, primary_keys, parent_of)
nesting_fk_map[primary[0]] = primary[1]
return nesting_fk_map
def _choose_primary_parent(
self,
parents: List[Tuple[Tuple[str, str], Tuple[str, str]]],
primary_keys: Dict[str, Any],
parent_of: Dict[str, set],
) -> Tuple[Tuple[str, str], Tuple[str, str]]:
"""
Choose the primary parent for nesting β deterministically, not heuristically.
Two structural rules applied in order:
1. **Joinability filter**: _assign_children matches child FK values against
the parent's indexed keys (built from the full PK). A single FK field
can only match a single-field PK. Parents with composite PKs are
therefore unjoinable and eliminated.
2. **Graph depth**: among the remaining candidates, pick the deepest parent
in the FK ancestry graph. If parent A is itself a child of parent B
(both candidates), A is more specific and wins. This is deterministic
from the schema topology.
Fallback: if all candidates are filtered out (shouldn't happen in practice),
return the first candidate to avoid crashing.
"""
# Step 1: filter to joinable parents only
joinable = []
for rel in parents:
(_, (parent_cat, _)) = rel
parent_pk = primary_keys.get(parent_cat)
if isinstance(parent_pk, list) and len(parent_pk) > 1:
continue # composite PK β single FK can't match indexed key
joinable.append(rel)
if not joinable:
joinable = parents # fallback: keep all if none are joinable
if len(joinable) == 1:
return joinable[0]
# Step 2: pick the deepest parent in the FK graph
# A parent that is itself a child of another candidate is deeper/more specific
candidate_cats = {rel[1][0] for rel in joinable}
def depth(cat: str, visited: set = None) -> int:
"""Count how many candidate ancestors this category has."""
if visited is None:
visited = set()
if cat in visited:
return 0
visited.add(cat)
ancestors = parent_of.get(cat, set()) & candidate_cats
if not ancestors:
return 0
return 1 + max(depth(a, visited) for a in ancestors)
return max(joinable, key=lambda rel: depth(rel[1][0]))
def _build_top_level(self, indexed: Dict[str, Any]) -> Dict[str, Any]:
"""Build top-level structure from indexed data"""
actually_nested_cats = self._find_actually_nested_categories(indexed)
top = {}
for k, v in indexed.items():
if k not in actually_nested_cats:
top[k] = [item for _, item in sorted(v.items())] if isinstance(v, dict) else v
return top
def _find_actually_nested_categories(self, indexed: Dict[str, Any]) -> Set[str]:
"""Find categories that are actually nested as children"""
actually_nested_cats = set()
for entity_dict in indexed.values():
for entity_data in entity_dict.values():
for key in entity_data.keys():
# Remove underscore prefix to match indexed keys
key_without_prefix = key[1:] if key.startswith('_') else key
if key_without_prefix in indexed and isinstance(entity_data.get(key), list):
actually_nested_cats.add(key_without_prefix)
return actually_nested_cats
# ====================== End of File ======================