Skip to content

Batch Import & Python SDK Guide

A comprehensive guide for batch importing content into the Musingly platform using the Python SDK with snapshot-based versioning.


Table of Contents

  1. Overview
  2. Quick Start
  3. Architecture
  4. SDK Setup
  5. Creating Import Bundles
  6. Validation
  7. Import Process
  8. Snapshot Lifecycle
  9. Best Practices
  10. Troubleshooting

Overview

The batch import system enables external Python services (content generators, AI pipelines, etc.) to create and update content in bulk with:

  • Strong Contracts: Pydantic models generated from TypeScript Zod schemas ensure type safety across services
  • Snapshot Versioning: All imports create new snapshots, enabling atomic updates and instant rollbacks
  • Slug-Based References: Foreign keys use slugs instead of UUIDs, making bundles portable and human-readable
  • Validation Layers: Pre-flight and post-import validation catch errors before they reach production

Key Benefits

Feature Benefit
Pydantic Models Type-safe Python code with IDE autocompletion
JSON Schema Language-agnostic contract definition
Snapshot Isolation New content is isolated until explicitly promoted
Rollback Support Instant revert to previous content versions
Audit Trail Complete import history with timing and error logs

Quick Start

CLI Commands

The deno task import command provides a complete import workflow:

# Validate a bundle without making database changes
deno task import content/bundle.json --dry-run

# Import bundle as a draft snapshot (requires manual promotion)
deno task import content/bundle.json

# Import and automatically promote to current
deno task import content/bundle.json --promote

Environment Setup

Set these environment variables before running imports:

export SUPABASE_URL="https://your-project.supabase.co"
export SUPABASE_SERVICE_ROLE_KEY="your-service-role-key"

Or use a .env file in the project root.

Typical Workflow

# 1. Build the SDK (generates Python models from Zod schemas)
deno task sdk:build:full

# 2. Create bundle in Python using the SDK
cd /path/to/content-generator
pip install -e /path/to/core/python-sdk
python generate_content.py --output bundle.json

# 3. Validate the bundle
deno task import bundle.json --dry-run

# 4. Import to staging (as draft)
deno task import bundle.json

# 5. Verify in database, then promote
# Via SQL: SELECT promote_import('<import_id>', '<snapshot_id>');
# Or re-run with --promote flag

Command Reference

Command Description
deno task import <file> Import bundle as draft snapshot
deno task import <file> --promote Import and promote to current
deno task import <file> --dry-run Validate only, no database changes
deno task import <file> --validate-only Same as --dry-run
deno task import --help Show help

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                     CONTENT GENERATION PIPELINE                      │
│                                                                      │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
│  │   AI/LLM     │───▶│   Python     │───▶│   Import     │          │
│  │  Generator   │    │   Service    │    │   Bundle     │          │
│  └──────────────┘    └──────────────┘    └──────┬───────┘          │
│                                                  │                   │
│                      Uses Python SDK             │                   │
│                      (Pydantic Models)           │                   │
└──────────────────────────────────────────────────│───────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│                        VALIDATION LAYER                              │
│                                                                      │
│  • Schema validation (Pydantic/JSON Schema)                         │
│  • Referential integrity (slug resolution)                          │
│  • Business rules (no orphaned entities)                            │
│  • DAG cycle detection                                               │
└──────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│                         SUPABASE                                     │
│                                                                      │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                     SNAPSHOT SYSTEM                            │  │
│  ├───────────────────────────────────────────────────────────────┤  │
│  │                                                                │  │
│  │  ┌──────────┐    ┌──────────┐    ┌──────────┐                │  │
│  │  │ v1.0.0   │───▶│ v1.1.0   │───▶│ v2.0.0   │ ◀── CURRENT   │  │
│  │  │ archived │    │ archived │    │ current  │                │  │
│  │  └──────────┘    └──────────┘    └──────────┘                │  │
│  │                                        │                      │  │
│  │                                        ▼                      │  │
│  │                                  ┌──────────┐                │  │
│  │                                  │ v2.1.0   │ ◀── DRAFT     │  │
│  │                                  │ draft    │    (importing) │  │
│  │                                  └──────────┘                │  │
│  └───────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────┘

SDK Setup

Building the SDKs

The unified build command generates both TypeScript and Python SDKs:

# Full build: OpenAPI → SDK Operations → TypeScript SDK → JSON Schema → Python SDK
deno task sdk:build:full

This produces:

Output Location Description
TypeScript SDK sdk/dist/ npm-compatible package
JSON Schema sdk/dist/import-schemas.json Language-agnostic schemas
Python SDK python-sdk/ Pydantic models package

Installing the Python SDK

Option 1: Local File Reference (Development)

# In your Python project
pip install -e /path/to/core/python-sdk

Or in pyproject.toml:

[project]
dependencies = [
    "musingly-core @ file:///path/to/core/python-sdk",
]

Option 2: Published Package (Production)

# After publishing to GitHub Packages
pip install musingly-core --index-url https://npm.pkg.github.com

Verifying Installation

from musingly_core.models import ImportBundle, DomainImport, SparkImport

# Check version matches TypeScript SDK
import musingly_core
print(f"SDK Version: {musingly_core.__version__}")

Creating Import Bundles

Bundle Structure

An import bundle contains all entities needed for a complete content update:

from musingly_core.models import (
    ImportBundle,
    ImportMetadata,
    DomainImport,
    TrailImport,
    ConceptImport,
    SparkImport,
    BeaconImport,
    ConceptSparkImport,
    SparkBeaconImport,
    SparkLinkImport,
    ConceptLinkImport,
)
from datetime import datetime, timezone

# Create the import bundle
bundle = ImportBundle(
    schema_version="1.0.0",
    version_label="v2.1.0",  # Your versioning scheme
    name="January 2026 Content Update",
    metadata=ImportMetadata(
        generated_at=datetime.now(timezone.utc).isoformat(),
        source_system="content-generator",
        source_version="1.0.0",
        description="AI-generated ML curriculum expansion",
    ),
    domains=[...],
    trails=[...],
    concepts=[...],
    sparks=[...],
    beacons=[...],
    concept_sparks=[...],
    spark_beacons=[...],
    spark_links=[...],
    concept_links=[...],
)

Entity Examples

Domains (Top-Level Categories)

domains = [
    DomainImport(
        name="Machine Learning",
        slug="machine-learning",
        description="Fundamentals and advanced topics in machine learning",
        icon="brain",
        color="#6366F1",
        is_published=True,
    ),
    DomainImport(
        name="Software Engineering",
        slug="software-engineering",
        description="Best practices for building robust software systems",
        icon="code",
        color="#10B981",
        is_published=True,
    ),
]

Trails (Learning Paths)

trails = [
    TrailImport(
        domain_slug="machine-learning",  # References domain by slug
        title="Transformer Architecture Mastery",
        slug="transformer-mastery",
        description="Deep dive into attention mechanisms and transformer models",
        difficulty_level="intermediate",
        is_published=False,  # Draft until reviewed
    ),
]

Concepts (Topic Clusters)

concepts = [
    ConceptImport(
        trail_slug="transformer-mastery",  # References trail by slug
        title="Attention Mechanisms",
        slug="attention-mechanisms",
        description="Understanding self-attention and its variants",
        order_index=0,
        is_published=False,
    ),
    ConceptImport(
        trail_slug="transformer-mastery",
        title="Positional Encoding",
        slug="positional-encoding",
        description="How transformers understand sequence order",
        order_index=1,
        is_published=False,
    ),
]

Sparks (Micro-Lessons)

sparks = [
    SparkImport(
        title="Self-Attention Explained",
        slug="self-attention-explained",
        summary="Learn how tokens communicate through attention weights",
        content_md="""
# Self-Attention Explained

Self-attention allows each token in a sequence to attend to all other tokens...

## Key Concepts

1. **Query, Key, Value**: The three projections...
2. **Attention Weights**: Computed via softmax...
3. **Scaled Dot-Product**: Why we divide by √d_k...

## Example

```python
import torch
attention = torch.softmax(Q @ K.T / math.sqrt(d_k), dim=-1) @ V
""", estimated_mins=8, difficulty="intermediate", is_published=False, ai_metadata={ "generated_by": "gpt-4", "generation_date": datetime.now(timezone.utc).isoformat(), "source_materials": ["attention-is-all-you-need.pdf"], "extraction_confidence": 0.92, "review_status": "pending", }, ), ]
#### Beacons (Tags)

```python
beacons = [
    BeaconImport(
        name="Transformers",
        slug="transformers",
        description="Content related to transformer architectures",
        category="technology",
    ),
    BeaconImport(
        name="NLP",
        slug="nlp",
        description="Natural Language Processing topics",
        category="application",
    ),
]

Junction Tables (Relationships)

# Link sparks to concepts
concept_sparks = [
    ConceptSparkImport(
        concept_slug="attention-mechanisms",
        spark_slug="self-attention-explained",
        order_index=0,
    ),
    ConceptSparkImport(
        concept_slug="attention-mechanisms",
        spark_slug="multi-head-attention",
        order_index=1,
    ),
]

# Tag sparks with beacons
spark_beacons = [
    SparkBeaconImport(
        spark_slug="self-attention-explained",
        beacon_slug="transformers",
    ),
    SparkBeaconImport(
        spark_slug="self-attention-explained",
        beacon_slug="nlp",
    ),
]

DAG Edges (Prerequisites)

# Spark-to-Spark prerequisites
spark_links = [
    SparkLinkImport(
        source_slug="linear-algebra-basics",  # Must learn this first
        target_slug="self-attention-explained",  # To understand this
        link_type="prerequisite",
        weight=0.9,  # Strong dependency
        description="Linear algebra concepts needed for attention",
    ),
    SparkLinkImport(
        source_slug="self-attention-explained",
        target_slug="multi-head-attention",
        link_type="prerequisite",
        weight=0.95,
    ),
]

# Concept-to-Concept prerequisites
concept_links = [
    ConceptLinkImport(
        source_slug="attention-mechanisms",
        target_slug="transformer-encoder",
        link_type="prerequisite",
        weight=0.85,
    ),
]

Complete Example

from musingly_core.models import ImportBundle, ImportMetadata
from datetime import datetime, timezone
import json

def create_content_bundle() -> ImportBundle:
    """Create a complete import bundle for new ML content."""

    return ImportBundle(
        schema_version="1.0.0",
        version_label="v2.1.0",
        name="ML Curriculum Expansion Q1 2026",
        metadata=ImportMetadata(
            generated_at=datetime.now(timezone.utc).isoformat(),
            source_system="ml-content-generator",
            source_version="2.0.0",
            description="Expanded coverage of transformer architectures",
        ),
        domains=[
            # ... domain definitions
        ],
        trails=[
            # ... trail definitions
        ],
        concepts=[
            # ... concept definitions
        ],
        sparks=[
            # ... spark definitions
        ],
        beacons=[
            # ... beacon definitions
        ],
        concept_sparks=[
            # ... concept-spark links
        ],
        spark_beacons=[
            # ... spark-beacon tags
        ],
        spark_links=[
            # ... spark prerequisite graph
        ],
        concept_links=[
            # ... concept prerequisite graph
        ],
    )

def export_bundle(bundle: ImportBundle, output_path: str) -> None:
    """Export bundle to JSON file for import."""
    with open(output_path, "w") as f:
        json.dump(bundle.model_dump(), f, indent=2)
    print(f"Bundle exported to {output_path}")

# Usage
if __name__ == "__main__":
    bundle = create_content_bundle()
    export_bundle(bundle, "import-bundle-v2.1.0.json")

Validation

Pre-Flight Validation (Python)

Pydantic provides automatic validation when creating models:

from pydantic import ValidationError
from musingly_core.models import ImportBundle
import json

def validate_bundle(bundle_path: str) -> tuple[bool, list[str]]:
    """Validate a bundle file before import."""
    errors = []

    try:
        with open(bundle_path) as f:
            data = json.load(f)

        # Pydantic validates schema, types, and constraints
        bundle = ImportBundle.model_validate(data)

        # Custom validation rules
        errors.extend(validate_referential_integrity(bundle))
        errors.extend(validate_dag_acyclic(bundle))
        errors.extend(validate_slug_uniqueness(bundle))

    except ValidationError as e:
        for error in e.errors():
            loc = ".".join(str(x) for x in error["loc"])
            errors.append(f"Schema error at {loc}: {error['msg']}")
    except json.JSONDecodeError as e:
        errors.append(f"Invalid JSON: {e}")

    return len(errors) == 0, errors

def validate_referential_integrity(bundle: ImportBundle) -> list[str]:
    """Check all slug references point to existing entities."""
    errors = []

    # Collect all defined slugs
    domain_slugs = {d.slug for d in bundle.domains}
    trail_slugs = {t.slug for t in bundle.trails}
    concept_slugs = {c.slug for c in bundle.concepts}
    spark_slugs = {s.slug for s in bundle.sparks}
    beacon_slugs = {b.slug for b in bundle.beacons}

    # Validate trail -> domain references
    for trail in bundle.trails:
        if trail.domain_slug not in domain_slugs:
            errors.append(
                f"Trail '{trail.slug}' references unknown domain '{trail.domain_slug}'"
            )

    # Validate concept -> trail references
    for concept in bundle.concepts:
        if concept.trail_slug and concept.trail_slug not in trail_slugs:
            errors.append(
                f"Concept '{concept.slug}' references unknown trail '{concept.trail_slug}'"
            )

    # Validate concept_sparks
    for cs in bundle.concept_sparks:
        if cs.concept_slug not in concept_slugs:
            errors.append(f"concept_sparks references unknown concept '{cs.concept_slug}'")
        if cs.spark_slug not in spark_slugs:
            errors.append(f"concept_sparks references unknown spark '{cs.spark_slug}'")

    # Validate spark_beacons
    for sb in bundle.spark_beacons:
        if sb.spark_slug not in spark_slugs:
            errors.append(f"spark_beacons references unknown spark '{sb.spark_slug}'")
        if sb.beacon_slug not in beacon_slugs:
            errors.append(f"spark_beacons references unknown beacon '{sb.beacon_slug}'")

    # Validate spark_links
    for link in bundle.spark_links:
        if link.source_slug not in spark_slugs:
            errors.append(f"spark_links references unknown source '{link.source_slug}'")
        if link.target_slug not in spark_slugs:
            errors.append(f"spark_links references unknown target '{link.target_slug}'")

    return errors

def validate_dag_acyclic(bundle: ImportBundle) -> list[str]:
    """Ensure spark and concept links form a DAG (no cycles)."""
    errors = []

    # Build adjacency list for spark graph
    spark_graph: dict[str, set[str]] = {}
    for link in bundle.spark_links:
        if link.source_slug not in spark_graph:
            spark_graph[link.source_slug] = set()
        spark_graph[link.source_slug].add(link.target_slug)

    # Detect cycles using DFS
    def has_cycle(graph: dict[str, set[str]]) -> list[str] | None:
        visited = set()
        path = set()
        path_list = []

        def dfs(node: str) -> list[str] | None:
            visited.add(node)
            path.add(node)
            path_list.append(node)

            for neighbor in graph.get(node, []):
                if neighbor in path:
                    cycle_start = path_list.index(neighbor)
                    return path_list[cycle_start:] + [neighbor]
                if neighbor not in visited:
                    result = dfs(neighbor)
                    if result:
                        return result

            path.remove(node)
            path_list.pop()
            return None

        for node in graph:
            if node not in visited:
                cycle = dfs(node)
                if cycle:
                    return cycle
        return None

    cycle = has_cycle(spark_graph)
    if cycle:
        errors.append(f"Cycle detected in spark_links: {' -> '.join(cycle)}")

    return errors

def validate_slug_uniqueness(bundle: ImportBundle) -> list[str]:
    """Ensure slugs are unique within each entity type."""
    errors = []

    def check_duplicates(items: list, entity_type: str) -> list[str]:
        slugs = [item.slug for item in items]
        seen = set()
        duplicates = set()
        for slug in slugs:
            if slug in seen:
                duplicates.add(slug)
            seen.add(slug)
        return [f"Duplicate {entity_type} slug: '{s}'" for s in duplicates]

    errors.extend(check_duplicates(bundle.domains, "domain"))
    errors.extend(check_duplicates(bundle.trails, "trail"))
    errors.extend(check_duplicates(bundle.concepts, "concept"))
    errors.extend(check_duplicates(bundle.sparks, "spark"))
    errors.extend(check_duplicates(bundle.beacons, "beacon"))

    return errors

# Usage
if __name__ == "__main__":
    valid, errors = validate_bundle("import-bundle-v2.1.0.json")

    if valid:
        print("✓ Bundle is valid")
    else:
        print("✗ Validation failed:")
        for error in errors:
            print(f"  - {error}")

Import Process

Import Workflow

┌─────────────────────────────────────────────────────────────────────┐
│  1. PRE-FLIGHT                                                       │
│     • Validate bundle JSON against Pydantic models                  │
│     • Check referential integrity                                   │
│     • Detect DAG cycles                                             │
│     • Verify slug uniqueness                                        │
└─────────────────────────────────────────────────────────────────────┘
                                ▼ (validation passed)
┌─────────────────────────────────────────────────────────────────────┐
│  2. CREATE DRAFT SNAPSHOT                                            │
│     • Call: create_import_snapshot(name, version_label)             │
│     • Returns: snapshot_id (UUID)                                   │
│     • Status: 'draft'                                               │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│  3. START IMPORT LOG                                                 │
│     • Call: start_import(version_label, source_system)              │
│     • Returns: import_id (UUID)                                     │
│     • Status: 'pending'                                             │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│  4. BULK INSERT (Order Matters!)                                     │
│     1. Domains (no dependencies)                                    │
│     2. Trails (depends on domains)                                  │
│     3. Concepts (depends on trails)                                 │
│     4. Sparks (no direct dependencies)                              │
│     5. Beacons (no dependencies)                                    │
│     6. Concept_sparks (depends on concepts + sparks)                │
│     7. Spark_beacons (depends on sparks + beacons)                  │
│     8. Spark_links (depends on sparks)                              │
│     9. Concept_links (depends on concepts)                          │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│  5. FINALIZE IMPORT                                                  │
│     • Call: finalize_import(import_id, snapshot_id, entity_counts)  │
│     • Status: 'ready'                                               │
│     • Snapshot ready for promotion                                  │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│  6. VERIFY (Optional but Recommended)                                │
│     • Run queries against draft snapshot                            │
│     • Verify entity counts match expected                           │
│     • Check sample content renders correctly                        │
│     • Validate search index (if applicable)                         │
└─────────────────────────────────────────────────────────────────────┘
                                ▼ (verification passed)
┌─────────────────────────────────────────────────────────────────────┐
│  7. PROMOTE                                                          │
│     • Call: promote_import(import_id, snapshot_id)                  │
│     • Snapshot becomes 'current'                                    │
│     • Previous snapshot becomes 'archived'                          │
│     • Import status: 'promoted'                                     │
└─────────────────────────────────────────────────────────────────────┘

Python Import Client

import psycopg2
from psycopg2.extras import execute_values
from musingly_core.models import ImportBundle
import json
from uuid import UUID

class MusinglyImporter:
    """Client for importing bundles into Supabase."""

    def __init__(self, connection_string: str):
        self.conn = psycopg2.connect(connection_string)

    def import_bundle(
        self,
        bundle: ImportBundle,
        auto_promote: bool = False,
    ) -> dict:
        """
        Import a bundle into a new draft snapshot.

        Args:
            bundle: The validated import bundle
            auto_promote: If True, automatically promote on success

        Returns:
            Import result with snapshot_id, import_id, and entity_counts
        """
        cursor = self.conn.cursor()

        try:
            # 1. Create draft snapshot
            cursor.execute(
                "SELECT create_import_snapshot(%s, %s, %s)",
                (bundle.name, bundle.version_label, bundle.metadata.description)
            )
            snapshot_id = cursor.fetchone()[0]

            # 2. Start import log
            cursor.execute(
                "SELECT start_import(%s, %s, %s)",
                (
                    bundle.version_label,
                    bundle.metadata.source_system,
                    bundle.metadata.source_version,
                )
            )
            import_id = cursor.fetchone()[0]

            # 3. Update status to importing
            cursor.execute(
                "SELECT update_import_status(%s, %s, %s)",
                (import_id, "importing", snapshot_id)
            )

            # 4. Bulk insert all entities
            entity_counts = self._insert_entities(cursor, bundle, snapshot_id)

            # 5. Finalize import
            cursor.execute(
                "SELECT finalize_import(%s, %s, %s)",
                (import_id, snapshot_id, json.dumps(entity_counts))
            )

            self.conn.commit()

            # 6. Optionally promote
            if auto_promote:
                self.promote(import_id, snapshot_id)

            return {
                "success": True,
                "snapshot_id": str(snapshot_id),
                "import_id": str(import_id),
                "version_label": bundle.version_label,
                "entity_counts": entity_counts,
                "status": "promoted" if auto_promote else "ready",
            }

        except Exception as e:
            self.conn.rollback()

            # Log failure if import_id was created
            if 'import_id' in locals():
                cursor.execute(
                    "SELECT update_import_status(%s, %s, NULL, NULL, NULL, %s)",
                    (import_id, "failed", str(e))
                )
                self.conn.commit()

            raise

    def _insert_entities(
        self,
        cursor,
        bundle: ImportBundle,
        snapshot_id: UUID,
    ) -> dict:
        """Insert all entities in dependency order."""
        counts = {}

        # Insert domains
        if bundle.domains:
            counts["domains"] = self._insert_domains(cursor, bundle.domains, snapshot_id)

        # Build slug -> UUID mapping for domains
        domain_map = self._get_slug_map(cursor, "domains", snapshot_id)

        # Insert trails
        if bundle.trails:
            counts["trails"] = self._insert_trails(
                cursor, bundle.trails, snapshot_id, domain_map
            )

        # Build trail mapping
        trail_map = self._get_slug_map(cursor, "trails", snapshot_id)

        # Insert concepts
        if bundle.concepts:
            counts["concepts"] = self._insert_concepts(
                cursor, bundle.concepts, snapshot_id, trail_map
            )

        # Build concept mapping
        concept_map = self._get_slug_map(cursor, "concepts", snapshot_id)

        # Insert sparks
        if bundle.sparks:
            counts["sparks"] = self._insert_sparks(cursor, bundle.sparks, snapshot_id)

        # Build spark mapping
        spark_map = self._get_slug_map(cursor, "sparks", snapshot_id)

        # Insert beacons
        if bundle.beacons:
            counts["beacons"] = self._insert_beacons(cursor, bundle.beacons, snapshot_id)

        # Build beacon mapping
        beacon_map = self._get_slug_map(cursor, "beacons", snapshot_id)

        # Insert junction tables
        if bundle.concept_sparks:
            counts["concept_sparks"] = self._insert_concept_sparks(
                cursor, bundle.concept_sparks, snapshot_id, concept_map, spark_map
            )

        if bundle.spark_beacons:
            counts["spark_beacons"] = self._insert_spark_beacons(
                cursor, bundle.spark_beacons, snapshot_id, spark_map, beacon_map
            )

        # Insert DAG edges
        if bundle.spark_links:
            counts["spark_links"] = self._insert_spark_links(
                cursor, bundle.spark_links, snapshot_id, spark_map
            )

        if bundle.concept_links:
            counts["concept_links"] = self._insert_concept_links(
                cursor, bundle.concept_links, snapshot_id, concept_map
            )

        return counts

    def _insert_domains(self, cursor, domains, snapshot_id) -> int:
        """Insert domain entities."""
        values = [
            (
                snapshot_id,
                d.name,
                d.slug,
                d.description,
                d.icon,
                d.color,
                d.is_published,
            )
            for d in domains
        ]
        execute_values(
            cursor,
            """
            INSERT INTO domains (snapshot_id, name, slug, description, icon, color, is_published)
            VALUES %s
            """,
            values,
        )
        return len(domains)

    def _get_slug_map(self, cursor, table: str, snapshot_id: UUID) -> dict[str, UUID]:
        """Get slug -> UUID mapping for a table."""
        cursor.execute(
            f"SELECT slug, id FROM {table} WHERE snapshot_id = %s",
            (snapshot_id,)
        )
        return {row[0]: row[1] for row in cursor.fetchall()}

    # ... additional _insert_* methods for other entity types ...

    def promote(self, import_id: UUID, snapshot_id: UUID) -> None:
        """Promote a snapshot to current."""
        cursor = self.conn.cursor()
        cursor.execute(
            "SELECT promote_import(%s, %s)",
            (import_id, snapshot_id)
        )
        self.conn.commit()

    def rollback(self, import_id: UUID = None) -> UUID:
        """Rollback to previous snapshot."""
        cursor = self.conn.cursor()
        cursor.execute(
            "SELECT rollback_import(%s)",
            (import_id,)
        )
        parent_id = cursor.fetchone()[0]
        self.conn.commit()
        return parent_id

    def delete_draft(self, snapshot_id: UUID) -> None:
        """Delete a draft snapshot (e.g., after failed import)."""
        cursor = self.conn.cursor()
        cursor.execute(
            "SELECT delete_draft_snapshot(%s)",
            (snapshot_id,)
        )
        self.conn.commit()

# Usage
if __name__ == "__main__":
    from musingly_core.models import ImportBundle
    import json

    # Load and validate bundle
    with open("import-bundle-v2.1.0.json") as f:
        bundle = ImportBundle.model_validate(json.load(f))

    # Import to Supabase
    importer = MusinglyImporter(
        "postgresql://user:password@localhost:54322/postgres"
    )

    result = importer.import_bundle(bundle, auto_promote=False)

    print(f"Import complete!")
    print(f"  Snapshot ID: {result['snapshot_id']}")
    print(f"  Status: {result['status']}")
    print(f"  Entities: {result['entity_counts']}")

    # After manual verification, promote
    # importer.promote(result['import_id'], result['snapshot_id'])

Snapshot Lifecycle

Snapshot States

┌───────────────────────────────────────────────────────────────────┐
│                      SNAPSHOT LIFECYCLE                            │
├───────────────────────────────────────────────────────────────────┤
│                                                                    │
│   ┌─────────┐                                                     │
│   │  DRAFT  │ ─── Import in progress, not visible to users       │
│   └────┬────┘                                                     │
│        │                                                          │
│        │ finalize_import()                                        │
│        ▼                                                          │
│   ┌─────────┐                                                     │
│   │  READY  │ ─── Import complete, awaiting promotion             │
│   └────┬────┘                                                     │
│        │                                                          │
│        │ promote_import()                                         │
│        ▼                                                          │
│   ┌──────────┐                                                    │
│   │ CURRENT  │ ─── Active snapshot, served to all users          │
│   └────┬─────┘                                                    │
│        │                                                          │
│        │ (new snapshot promoted)                                  │
│        ▼                                                          │
│   ┌──────────┐                                                    │
│   │ ARCHIVED │ ─── Historical, can be used for rollback          │
│   └──────────┘                                                    │
│                                                                    │
└───────────────────────────────────────────────────────────────────┘

Rollback Procedure

# If issues are discovered after promotion
importer = MusinglyImporter(connection_string)

# Option 1: Rollback to parent snapshot
previous_snapshot_id = importer.rollback()
print(f"Rolled back to: {previous_snapshot_id}")

# Option 2: Rollback with import_id tracking
previous_snapshot_id = importer.rollback(import_id=failing_import_id)

Cleanup Old Snapshots

-- Keep only the 5 most recent published snapshots
SELECT cleanup_old_snapshots(5);

-- Check import statistics
SELECT * FROM get_import_stats(30);  -- Last 30 days

Best Practices

1. Version Labeling

Use semantic versioning or date-based labels:

# Semantic versioning
version_label = "v2.1.0"

# Date-based
version_label = "2026-01-Q1"

# Content-based
version_label = "ml-expansion-jan2026"

2. Incremental Updates

For partial content updates, only include changed entities:

# Update only new/modified sparks
bundle = ImportBundle(
    schema_version="1.0.0",
    version_label="v2.1.1-hotfix",
    metadata=ImportMetadata(
        generated_at=datetime.now(timezone.utc).isoformat(),
        source_system="content-editor",
        description="Fixed typos in attention-mechanisms sparks",
    ),
    # Empty arrays for unchanged entity types
    domains=[],
    trails=[],
    concepts=[],
    # Only include modified sparks
    sparks=[
        SparkImport(
            slug="self-attention-explained",
            # ... updated content
        ),
    ],
    beacons=[],
    concept_sparks=[],
    spark_beacons=[],
    spark_links=[],
    concept_links=[],
)

3. CI/CD Integration

# .github/workflows/content-deploy.yml
name: Deploy Content

on:
  push:
    branches: [main]
    paths:
      - 'content/**'

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.10'

      - name: Install SDK
        run: pip install ./python-sdk

      - name: Validate Bundle
        run: python scripts/validate_bundle.py content/bundle.json

      - name: Import to Staging
        env:
          DATABASE_URL: ${{ secrets.STAGING_DATABASE_URL }}
        run: python scripts/import_bundle.py content/bundle.json --no-promote

      - name: Run Tests
        run: pytest tests/content/

      - name: Promote if Tests Pass
        env:
          DATABASE_URL: ${{ secrets.STAGING_DATABASE_URL }}
        run: python scripts/promote_snapshot.py

4. Error Handling

from musingly_core.models import ImportBundle
import logging

logger = logging.getLogger(__name__)

def safe_import(bundle_path: str, importer: MusinglyImporter) -> dict:
    """Import with comprehensive error handling."""

    # Load and validate
    try:
        with open(bundle_path) as f:
            bundle = ImportBundle.model_validate(json.load(f))
    except ValidationError as e:
        logger.error(f"Schema validation failed: {e}")
        raise

    # Pre-flight checks
    valid, errors = validate_bundle(bundle_path)
    if not valid:
        logger.error(f"Pre-flight validation failed: {errors}")
        raise ValueError(f"Validation errors: {errors}")

    # Import
    snapshot_id = None
    try:
        result = importer.import_bundle(bundle, auto_promote=False)
        snapshot_id = result["snapshot_id"]
        logger.info(f"Import successful: {result}")
        return result

    except Exception as e:
        logger.error(f"Import failed: {e}")

        # Cleanup draft snapshot if created
        if snapshot_id:
            try:
                importer.delete_draft(snapshot_id)
                logger.info(f"Cleaned up draft snapshot: {snapshot_id}")
            except Exception as cleanup_error:
                logger.warning(f"Failed to cleanup draft: {cleanup_error}")

        raise

Troubleshooting

Common Issues

1. "Duplicate slug" Error

Error: duplicate key value violates unique constraint "sparks_slug_snapshot_key"

Cause: Same slug used twice in the bundle.

Fix: Ensure each entity has a unique slug within its type.

# Check for duplicates before import
slugs = [s.slug for s in bundle.sparks]
duplicates = [s for s in slugs if slugs.count(s) > 1]
if duplicates:
    raise ValueError(f"Duplicate spark slugs: {set(duplicates)}")

2. Foreign Key Violation

Error: insert or update on table "trails" violates foreign key constraint

Cause: Trail references a domain_slug that doesn't exist.

Fix: Ensure all referenced entities are included in the bundle.

3. DAG Cycle Detected

Error: Cycle detected in spark_links: A -> B -> C -> A

Cause: Prerequisite chain forms a loop.

Fix: Review spark_links and remove the edge creating the cycle.

4. Schema Version Mismatch

Error: Invalid literal for schema_version: "2.0.0"

Cause: Bundle uses a schema version the SDK doesn't support.

Fix: Rebuild the Python SDK to get the latest schema:

deno task sdk:build:full
pip install -e /path/to/core/python-sdk --force-reinstall

Viewing Import Logs

-- Recent imports
SELECT
    version_label,
    status,
    started_at,
    completed_at,
    entity_counts,
    error_message
FROM import_logs
ORDER BY started_at DESC
LIMIT 10;

-- Failed imports with details
SELECT
    version_label,
    error_message,
    error_details,
    started_at
FROM import_logs
WHERE status = 'failed'
ORDER BY started_at DESC;

-- Import statistics
SELECT * FROM get_import_stats(30);

API Reference

Database Functions

Function Description
create_import_snapshot(name, version_label, description, parent_id) Creates a new draft snapshot
start_import(version_label, source_system, source_version, created_by) Starts an import log entry
update_import_status(import_id, status, ...) Updates import status and details
finalize_import(import_id, snapshot_id, entity_counts) Marks import as ready
promote_import(import_id, snapshot_id) Promotes snapshot to current
rollback_import(import_id) Rolls back to previous snapshot
delete_draft_snapshot(snapshot_id) Deletes a draft and its data
cleanup_old_snapshots(keep_count) Archives old snapshots
get_import_stats(days) Returns import statistics

Python SDK Models

Model Description
ImportBundle Complete import payload
ImportMetadata Source and timing metadata
DomainImport Top-level category
TrailImport Learning path
ConceptImport Topic cluster
SparkImport Micro-lesson content
BeaconImport Discovery tag
ConceptSparkImport Concept-Spark relationship
SparkBeaconImport Spark-Beacon tag
SparkLinkImport Spark prerequisite edge
ConceptLinkImport Concept prerequisite edge

Next Steps

  1. Set up your Python environment with the SDK installed
  2. Create a sample bundle using the examples above
  3. Run validation to ensure correctness
  4. Test import on a staging database
  5. Integrate into your content generation pipeline