Source code for sloth.mmcif.importer

#!/usr/bin/env python3
"""
Importer for SLOTH - JSON import capabilities.

This module provides functionality to import nested JSON data
back into mmCIF format, enabling round-trip conversions.
"""

import json
import os
from pathlib import Path
from typing import Dict, Any, Optional, Union, List
import jsonschema
from .models import MMCIFDataContainer, DataBlock, Category
from .parser import MMCIFParser
from .common import BaseImporter
# StructureFormat removed - JSON is always nested
from .serializer import (
    RelationshipResolver,
    DictionaryParser,
    MappingGenerator,
    get_cache_manager
)
from .validator import ValidationError


[docs] class JSONImporter(BaseImporter): """Import JSON data back to mmCIF format."""
[docs] def __init__( self, dict_path: Optional[Union[str, Path]] = None, cache_dir: Optional[str] = None, quiet: bool = False ): """Initialize the JSON importer. Args: dict_path: Path to mmCIF dictionary file cache_dir: Directory for caching quiet: Suppress output messages """ super().__init__(dict_path, cache_dir, quiet) # Always skip validation self.resolver = None
[docs] def import_data( self, data: Union[str, Dict[str, Any], Path] ) -> MMCIFDataContainer: """Import JSON data back to mmCIF format. JSON import always expects nested structure since that's our default export format. Args: data: JSON data as string, dict, or file path Returns: MMCIFDataContainer with imported data """ # Parse JSON input json_data = self._parse_json_input(data) # Always use nested JSON import since that's our default format return self._import_nested_json(json_data)
def _parse_json_input(self, data: Union[str, Dict[str, Any], Path]) -> Dict[str, Any]: """Parse JSON input from various formats.""" if isinstance(data, dict): return data elif isinstance(data, (str, Path)): # Determine if it's a file path or JSON string data_str = str(data) if len(data_str) < 512 and not data_str.strip().startswith('{') and Path(data).exists(): # It's a file path with open(data, 'r', encoding='utf-8') as f: return json.load(f) else: # It's a JSON string return json.loads(data_str) else: raise ValidationError("Invalid data type for JSON import") def _import_nested_json( self, json_data: Dict[str, Any] ) -> MMCIFDataContainer: """Import nested JSON back to mmCIF format. This mirrors the JSONExporter._to_nested_json() process in reverse: 1. Flatten nested JSON to flat format 2. Convert flat JSON to mmCIF container """ # Convert nested JSON to flat format first flat_structure = self._flatten_nested_json(json_data) # Convert flat JSON to mmCIF container container = self._convert_flat_json_to_mmcif(flat_structure) # Return the container return container def _flatten_nested_json(self, nested_data: Dict[str, Any]) -> Dict[str, Any]: """ Flatten nested JSON structure back to flat format. This reverses the nesting done by RelationshipResolver. Extracts child categories nested within parent rows and creates separate top-level categories for each. Handles recursive nesting (e.g., atom_site nested in struct_asym nested in entity). Complexity: O(n) where n is total number of items across all categories, with each item processed once regardless of nesting depth. """ flat_data = {} for block_name, block_data in nested_data.items(): flat_block = {} # Process each top-level category for category_name, category_data in block_data.items(): nested_categories = {} flattened_data = self._extract_and_flatten(category_data, nested_categories) # Add flattened parent category flat_block[category_name] = flattened_data # Add all extracted nested categories flat_block.update(nested_categories) flat_data[block_name] = flat_block return flat_data def _extract_and_flatten(self, data: Any, accumulated_categories: Dict[str, List]) -> Any: """ Extract nested categories and return flattened data. Single-pass algorithm that processes each row once, extracting nested categories while building the flattened parent. Args: data: Category data (list of rows, single dict, or primitive) accumulated_categories: Dict accumulating extracted nested categories Returns: Flattened data with nested categories removed """ if not isinstance(data, (list, dict)): return data if isinstance(data, list): return self._flatten_list_category(data, accumulated_categories) else: return self._flatten_dict_category(data, accumulated_categories) def _flatten_list_category(self, rows: List[Dict], accumulated_categories: Dict[str, List]) -> List[Dict]: """ Flatten a multi-row category, extracting nested categories from each row. Args: rows: List of row dictionaries accumulated_categories: Dict accumulating extracted nested categories Returns: List of flattened rows without nested categories """ flattened_rows = [] for row in rows: if not isinstance(row, dict): flattened_rows.append(row) continue flattened_row = self._extract_nested_from_row(row, accumulated_categories) flattened_rows.append(flattened_row) return flattened_rows def _flatten_dict_category(self, row: Dict[str, Any], accumulated_categories: Dict[str, List]) -> Dict[str, Any]: """ Flatten a single-row category, extracting nested categories. Args: row: Single row dictionary accumulated_categories: Dict accumulating extracted nested categories Returns: Flattened row without nested categories """ return self._extract_nested_from_row(row, accumulated_categories) def _extract_nested_from_row(self, row: Dict[str, Any], accumulated_categories: Dict[str, List]) -> Dict[str, Any]: """ Extract nested categories from a single row. Separates regular items from nested categories. Nested categories are identified as dict/list values (except for 'id' which is always regular). Args: row: Row dictionary containing both regular items and nested categories accumulated_categories: Dict accumulating extracted nested categories Returns: Row dictionary with only regular items """ regular_items = {} for key, value in row.items(): if self._is_regular_item(key, value): regular_items[key] = value else: self._extract_nested_category(key, value, accumulated_categories) return regular_items def _is_regular_item(self, key: str, value: Any) -> bool: """ Determine if a row item is a regular data item vs nested category. Args: key: Item key/name value: Item value Returns: True if regular item, False if nested category """ # 'id' is always a regular item even if it's a list/dict if key == 'id': return True # Non-list/dict values are regular items return not isinstance(value, (list, dict)) def _extract_nested_category(self, key: str, value: Any, accumulated_categories: Dict[str, List]) -> None: """ Extract a nested category and add it to accumulated categories. Args: key: Category key (without underscore prefix) value: Nested category data accumulated_categories: Dict accumulating extracted nested categories """ # Add underscore prefix for category name cat_name = f"_{key}" if not key.startswith('_') else key # Initialize category list if first occurrence if cat_name not in accumulated_categories: accumulated_categories[cat_name] = [] # Recursively flatten nested data nested_flattened = self._extract_and_flatten(value, accumulated_categories) # Add to accumulated categories if isinstance(nested_flattened, list): accumulated_categories[cat_name].extend(nested_flattened) else: accumulated_categories[cat_name].append(nested_flattened) def _is_item_dict(self, data: Dict[str, Any]) -> bool: """Check if a dictionary represents item data vs nested categories.""" # Item dictionaries typically have string/primitive values # Category dictionaries have dict/list values return all(not isinstance(v, (dict, list)) or (isinstance(v, list) and all(not isinstance(item, dict) for item in v)) for v in data.values()) def _convert_flat_json_to_mmcif(self, json_data: Dict[str, Any]) -> MMCIFDataContainer: """ Convert flat JSON structure to mmCIF format. This mirrors the flat JSON structure used in JSONExporter._to_flat_json(). """ blocks = {} for block_name, block_data in json_data.items(): # Remove data_ prefix if present for internal storage internal_block_name = block_name[5:] if block_name.startswith("data_") else block_name categories = {} for category_name, category_data in block_data.items(): # Create category (name should already have underscore prefix) category = Category(name=category_name) if isinstance(category_data, list): # Multi-row category for row in category_data: if isinstance(row, dict): for item_name, item_value in row.items(): category._add_item_value(item_name, str(item_value)) elif isinstance(category_data, dict): # Single-row category for item_name, item_value in category_data.items(): category._add_item_value(item_name, str(item_value)) # Commit batches to make data available category._commit_all_batches() categories[category_name] = category blocks[internal_block_name] = DataBlock(internal_block_name, categories) return MMCIFDataContainer(blocks)