Batch Import & Python SDK Guide¶
A comprehensive guide for batch importing content into the Musingly platform using the Python SDK with snapshot-based versioning.
Table of Contents¶
- Overview
- Quick Start
- Architecture
- SDK Setup
- Creating Import Bundles
- Validation
- Import Process
- Snapshot Lifecycle
- Best Practices
- Troubleshooting
Overview¶
The batch import system enables external Python services (content generators, AI pipelines, etc.) to create and update content in bulk with:
- Strong Contracts: Pydantic models generated from TypeScript Zod schemas ensure type safety across services
- Snapshot Versioning: All imports create new snapshots, enabling atomic updates and instant rollbacks
- Slug-Based References: Foreign keys use slugs instead of UUIDs, making bundles portable and human-readable
- Validation Layers: Pre-flight and post-import validation catch errors before they reach production
Key Benefits¶
| Feature | Benefit |
|---|---|
| Pydantic Models | Type-safe Python code with IDE autocompletion |
| JSON Schema | Language-agnostic contract definition |
| Snapshot Isolation | New content is isolated until explicitly promoted |
| Rollback Support | Instant revert to previous content versions |
| Audit Trail | Complete import history with timing and error logs |
Quick Start¶
CLI Commands¶
The deno task import command provides a complete import workflow:
# Validate a bundle without making database changes
deno task import content/bundle.json --dry-run
# Import bundle as a draft snapshot (requires manual promotion)
deno task import content/bundle.json
# Import and automatically promote to current
deno task import content/bundle.json --promote
Environment Setup¶
Set these environment variables before running imports:
export SUPABASE_URL="https://your-project.supabase.co"
export SUPABASE_SERVICE_ROLE_KEY="your-service-role-key"
Or use a .env file in the project root.
Typical Workflow¶
# 1. Build the SDK (generates Python models from Zod schemas)
deno task sdk:build:full
# 2. Create bundle in Python using the SDK
cd /path/to/content-generator
pip install -e /path/to/core/python-sdk
python generate_content.py --output bundle.json
# 3. Validate the bundle
deno task import bundle.json --dry-run
# 4. Import to staging (as draft)
deno task import bundle.json
# 5. Verify in database, then promote
# Via SQL: SELECT promote_import('<import_id>', '<snapshot_id>');
# Or re-run with --promote flag
Command Reference¶
| Command | Description |
|---|---|
deno task import <file> |
Import bundle as draft snapshot |
deno task import <file> --promote |
Import and promote to current |
deno task import <file> --dry-run |
Validate only, no database changes |
deno task import <file> --validate-only |
Same as --dry-run |
deno task import --help |
Show help |
Architecture¶
┌─────────────────────────────────────────────────────────────────────┐
│ CONTENT GENERATION PIPELINE │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ AI/LLM │───▶│ Python │───▶│ Import │ │
│ │ Generator │ │ Service │ │ Bundle │ │
│ └──────────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │
│ Uses Python SDK │ │
│ (Pydantic Models) │ │
└──────────────────────────────────────────────────│───────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ VALIDATION LAYER │
│ │
│ • Schema validation (Pydantic/JSON Schema) │
│ • Referential integrity (slug resolution) │
│ • Business rules (no orphaned entities) │
│ • DAG cycle detection │
└──────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ SUPABASE │
│ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ SNAPSHOT SYSTEM │ │
│ ├───────────────────────────────────────────────────────────────┤ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ v1.0.0 │───▶│ v1.1.0 │───▶│ v2.0.0 │ ◀── CURRENT │ │
│ │ │ archived │ │ archived │ │ current │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────┐ │ │
│ │ │ v2.1.0 │ ◀── DRAFT │ │
│ │ │ draft │ (importing) │ │
│ │ └──────────┘ │ │
│ └───────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
SDK Setup¶
Building the SDKs¶
The unified build command generates both TypeScript and Python SDKs:
# Full build: OpenAPI → SDK Operations → TypeScript SDK → JSON Schema → Python SDK
deno task sdk:build:full
This produces:
| Output | Location | Description |
|---|---|---|
| TypeScript SDK | sdk/dist/ |
npm-compatible package |
| JSON Schema | sdk/dist/import-schemas.json |
Language-agnostic schemas |
| Python SDK | python-sdk/ |
Pydantic models package |
Installing the Python SDK¶
Option 1: Local File Reference (Development)¶
Or in pyproject.toml:
Option 2: Published Package (Production)¶
# After publishing to GitHub Packages
pip install musingly-core --index-url https://npm.pkg.github.com
Verifying Installation¶
from musingly_core.models import ImportBundle, DomainImport, SparkImport
# Check version matches TypeScript SDK
import musingly_core
print(f"SDK Version: {musingly_core.__version__}")
Creating Import Bundles¶
Bundle Structure¶
An import bundle contains all entities needed for a complete content update:
from musingly_core.models import (
ImportBundle,
ImportMetadata,
DomainImport,
TrailImport,
ConceptImport,
SparkImport,
BeaconImport,
ConceptSparkImport,
SparkBeaconImport,
SparkLinkImport,
ConceptLinkImport,
)
from datetime import datetime, timezone
# Create the import bundle
bundle = ImportBundle(
schema_version="1.0.0",
version_label="v2.1.0", # Your versioning scheme
name="January 2026 Content Update",
metadata=ImportMetadata(
generated_at=datetime.now(timezone.utc).isoformat(),
source_system="content-generator",
source_version="1.0.0",
description="AI-generated ML curriculum expansion",
),
domains=[...],
trails=[...],
concepts=[...],
sparks=[...],
beacons=[...],
concept_sparks=[...],
spark_beacons=[...],
spark_links=[...],
concept_links=[...],
)
Entity Examples¶
Domains (Top-Level Categories)¶
domains = [
DomainImport(
name="Machine Learning",
slug="machine-learning",
description="Fundamentals and advanced topics in machine learning",
icon="brain",
color="#6366F1",
is_published=True,
),
DomainImport(
name="Software Engineering",
slug="software-engineering",
description="Best practices for building robust software systems",
icon="code",
color="#10B981",
is_published=True,
),
]
Trails (Learning Paths)¶
trails = [
TrailImport(
domain_slug="machine-learning", # References domain by slug
title="Transformer Architecture Mastery",
slug="transformer-mastery",
description="Deep dive into attention mechanisms and transformer models",
difficulty_level="intermediate",
is_published=False, # Draft until reviewed
),
]
Concepts (Topic Clusters)¶
concepts = [
ConceptImport(
trail_slug="transformer-mastery", # References trail by slug
title="Attention Mechanisms",
slug="attention-mechanisms",
description="Understanding self-attention and its variants",
order_index=0,
is_published=False,
),
ConceptImport(
trail_slug="transformer-mastery",
title="Positional Encoding",
slug="positional-encoding",
description="How transformers understand sequence order",
order_index=1,
is_published=False,
),
]
Sparks (Micro-Lessons)¶
sparks = [
SparkImport(
title="Self-Attention Explained",
slug="self-attention-explained",
summary="Learn how tokens communicate through attention weights",
content_md="""
# Self-Attention Explained
Self-attention allows each token in a sequence to attend to all other tokens...
## Key Concepts
1. **Query, Key, Value**: The three projections...
2. **Attention Weights**: Computed via softmax...
3. **Scaled Dot-Product**: Why we divide by √d_k...
## Example
```python
import torch
attention = torch.softmax(Q @ K.T / math.sqrt(d_k), dim=-1) @ V
#### Beacons (Tags)
```python
beacons = [
BeaconImport(
name="Transformers",
slug="transformers",
description="Content related to transformer architectures",
category="technology",
),
BeaconImport(
name="NLP",
slug="nlp",
description="Natural Language Processing topics",
category="application",
),
]
Junction Tables (Relationships)¶
# Link sparks to concepts
concept_sparks = [
ConceptSparkImport(
concept_slug="attention-mechanisms",
spark_slug="self-attention-explained",
order_index=0,
),
ConceptSparkImport(
concept_slug="attention-mechanisms",
spark_slug="multi-head-attention",
order_index=1,
),
]
# Tag sparks with beacons
spark_beacons = [
SparkBeaconImport(
spark_slug="self-attention-explained",
beacon_slug="transformers",
),
SparkBeaconImport(
spark_slug="self-attention-explained",
beacon_slug="nlp",
),
]
DAG Edges (Prerequisites)¶
# Spark-to-Spark prerequisites
spark_links = [
SparkLinkImport(
source_slug="linear-algebra-basics", # Must learn this first
target_slug="self-attention-explained", # To understand this
link_type="prerequisite",
weight=0.9, # Strong dependency
description="Linear algebra concepts needed for attention",
),
SparkLinkImport(
source_slug="self-attention-explained",
target_slug="multi-head-attention",
link_type="prerequisite",
weight=0.95,
),
]
# Concept-to-Concept prerequisites
concept_links = [
ConceptLinkImport(
source_slug="attention-mechanisms",
target_slug="transformer-encoder",
link_type="prerequisite",
weight=0.85,
),
]
Complete Example¶
from musingly_core.models import ImportBundle, ImportMetadata
from datetime import datetime, timezone
import json
def create_content_bundle() -> ImportBundle:
"""Create a complete import bundle for new ML content."""
return ImportBundle(
schema_version="1.0.0",
version_label="v2.1.0",
name="ML Curriculum Expansion Q1 2026",
metadata=ImportMetadata(
generated_at=datetime.now(timezone.utc).isoformat(),
source_system="ml-content-generator",
source_version="2.0.0",
description="Expanded coverage of transformer architectures",
),
domains=[
# ... domain definitions
],
trails=[
# ... trail definitions
],
concepts=[
# ... concept definitions
],
sparks=[
# ... spark definitions
],
beacons=[
# ... beacon definitions
],
concept_sparks=[
# ... concept-spark links
],
spark_beacons=[
# ... spark-beacon tags
],
spark_links=[
# ... spark prerequisite graph
],
concept_links=[
# ... concept prerequisite graph
],
)
def export_bundle(bundle: ImportBundle, output_path: str) -> None:
"""Export bundle to JSON file for import."""
with open(output_path, "w") as f:
json.dump(bundle.model_dump(), f, indent=2)
print(f"Bundle exported to {output_path}")
# Usage
if __name__ == "__main__":
bundle = create_content_bundle()
export_bundle(bundle, "import-bundle-v2.1.0.json")
Validation¶
Pre-Flight Validation (Python)¶
Pydantic provides automatic validation when creating models:
from pydantic import ValidationError
from musingly_core.models import ImportBundle
import json
def validate_bundle(bundle_path: str) -> tuple[bool, list[str]]:
"""Validate a bundle file before import."""
errors = []
try:
with open(bundle_path) as f:
data = json.load(f)
# Pydantic validates schema, types, and constraints
bundle = ImportBundle.model_validate(data)
# Custom validation rules
errors.extend(validate_referential_integrity(bundle))
errors.extend(validate_dag_acyclic(bundle))
errors.extend(validate_slug_uniqueness(bundle))
except ValidationError as e:
for error in e.errors():
loc = ".".join(str(x) for x in error["loc"])
errors.append(f"Schema error at {loc}: {error['msg']}")
except json.JSONDecodeError as e:
errors.append(f"Invalid JSON: {e}")
return len(errors) == 0, errors
def validate_referential_integrity(bundle: ImportBundle) -> list[str]:
"""Check all slug references point to existing entities."""
errors = []
# Collect all defined slugs
domain_slugs = {d.slug for d in bundle.domains}
trail_slugs = {t.slug for t in bundle.trails}
concept_slugs = {c.slug for c in bundle.concepts}
spark_slugs = {s.slug for s in bundle.sparks}
beacon_slugs = {b.slug for b in bundle.beacons}
# Validate trail -> domain references
for trail in bundle.trails:
if trail.domain_slug not in domain_slugs:
errors.append(
f"Trail '{trail.slug}' references unknown domain '{trail.domain_slug}'"
)
# Validate concept -> trail references
for concept in bundle.concepts:
if concept.trail_slug and concept.trail_slug not in trail_slugs:
errors.append(
f"Concept '{concept.slug}' references unknown trail '{concept.trail_slug}'"
)
# Validate concept_sparks
for cs in bundle.concept_sparks:
if cs.concept_slug not in concept_slugs:
errors.append(f"concept_sparks references unknown concept '{cs.concept_slug}'")
if cs.spark_slug not in spark_slugs:
errors.append(f"concept_sparks references unknown spark '{cs.spark_slug}'")
# Validate spark_beacons
for sb in bundle.spark_beacons:
if sb.spark_slug not in spark_slugs:
errors.append(f"spark_beacons references unknown spark '{sb.spark_slug}'")
if sb.beacon_slug not in beacon_slugs:
errors.append(f"spark_beacons references unknown beacon '{sb.beacon_slug}'")
# Validate spark_links
for link in bundle.spark_links:
if link.source_slug not in spark_slugs:
errors.append(f"spark_links references unknown source '{link.source_slug}'")
if link.target_slug not in spark_slugs:
errors.append(f"spark_links references unknown target '{link.target_slug}'")
return errors
def validate_dag_acyclic(bundle: ImportBundle) -> list[str]:
"""Ensure spark and concept links form a DAG (no cycles)."""
errors = []
# Build adjacency list for spark graph
spark_graph: dict[str, set[str]] = {}
for link in bundle.spark_links:
if link.source_slug not in spark_graph:
spark_graph[link.source_slug] = set()
spark_graph[link.source_slug].add(link.target_slug)
# Detect cycles using DFS
def has_cycle(graph: dict[str, set[str]]) -> list[str] | None:
visited = set()
path = set()
path_list = []
def dfs(node: str) -> list[str] | None:
visited.add(node)
path.add(node)
path_list.append(node)
for neighbor in graph.get(node, []):
if neighbor in path:
cycle_start = path_list.index(neighbor)
return path_list[cycle_start:] + [neighbor]
if neighbor not in visited:
result = dfs(neighbor)
if result:
return result
path.remove(node)
path_list.pop()
return None
for node in graph:
if node not in visited:
cycle = dfs(node)
if cycle:
return cycle
return None
cycle = has_cycle(spark_graph)
if cycle:
errors.append(f"Cycle detected in spark_links: {' -> '.join(cycle)}")
return errors
def validate_slug_uniqueness(bundle: ImportBundle) -> list[str]:
"""Ensure slugs are unique within each entity type."""
errors = []
def check_duplicates(items: list, entity_type: str) -> list[str]:
slugs = [item.slug for item in items]
seen = set()
duplicates = set()
for slug in slugs:
if slug in seen:
duplicates.add(slug)
seen.add(slug)
return [f"Duplicate {entity_type} slug: '{s}'" for s in duplicates]
errors.extend(check_duplicates(bundle.domains, "domain"))
errors.extend(check_duplicates(bundle.trails, "trail"))
errors.extend(check_duplicates(bundle.concepts, "concept"))
errors.extend(check_duplicates(bundle.sparks, "spark"))
errors.extend(check_duplicates(bundle.beacons, "beacon"))
return errors
# Usage
if __name__ == "__main__":
valid, errors = validate_bundle("import-bundle-v2.1.0.json")
if valid:
print("✓ Bundle is valid")
else:
print("✗ Validation failed:")
for error in errors:
print(f" - {error}")
Import Process¶
Import Workflow¶
┌─────────────────────────────────────────────────────────────────────┐
│ 1. PRE-FLIGHT │
│ • Validate bundle JSON against Pydantic models │
│ • Check referential integrity │
│ • Detect DAG cycles │
│ • Verify slug uniqueness │
└─────────────────────────────────────────────────────────────────────┘
│
▼ (validation passed)
┌─────────────────────────────────────────────────────────────────────┐
│ 2. CREATE DRAFT SNAPSHOT │
│ • Call: create_import_snapshot(name, version_label) │
│ • Returns: snapshot_id (UUID) │
│ • Status: 'draft' │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 3. START IMPORT LOG │
│ • Call: start_import(version_label, source_system) │
│ • Returns: import_id (UUID) │
│ • Status: 'pending' │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 4. BULK INSERT (Order Matters!) │
│ 1. Domains (no dependencies) │
│ 2. Trails (depends on domains) │
│ 3. Concepts (depends on trails) │
│ 4. Sparks (no direct dependencies) │
│ 5. Beacons (no dependencies) │
│ 6. Concept_sparks (depends on concepts + sparks) │
│ 7. Spark_beacons (depends on sparks + beacons) │
│ 8. Spark_links (depends on sparks) │
│ 9. Concept_links (depends on concepts) │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 5. FINALIZE IMPORT │
│ • Call: finalize_import(import_id, snapshot_id, entity_counts) │
│ • Status: 'ready' │
│ • Snapshot ready for promotion │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 6. VERIFY (Optional but Recommended) │
│ • Run queries against draft snapshot │
│ • Verify entity counts match expected │
│ • Check sample content renders correctly │
│ • Validate search index (if applicable) │
└─────────────────────────────────────────────────────────────────────┘
│
▼ (verification passed)
┌─────────────────────────────────────────────────────────────────────┐
│ 7. PROMOTE │
│ • Call: promote_import(import_id, snapshot_id) │
│ • Snapshot becomes 'current' │
│ • Previous snapshot becomes 'archived' │
│ • Import status: 'promoted' │
└─────────────────────────────────────────────────────────────────────┘
Python Import Client¶
import psycopg2
from psycopg2.extras import execute_values
from musingly_core.models import ImportBundle
import json
from uuid import UUID
class MusinglyImporter:
"""Client for importing bundles into Supabase."""
def __init__(self, connection_string: str):
self.conn = psycopg2.connect(connection_string)
def import_bundle(
self,
bundle: ImportBundle,
auto_promote: bool = False,
) -> dict:
"""
Import a bundle into a new draft snapshot.
Args:
bundle: The validated import bundle
auto_promote: If True, automatically promote on success
Returns:
Import result with snapshot_id, import_id, and entity_counts
"""
cursor = self.conn.cursor()
try:
# 1. Create draft snapshot
cursor.execute(
"SELECT create_import_snapshot(%s, %s, %s)",
(bundle.name, bundle.version_label, bundle.metadata.description)
)
snapshot_id = cursor.fetchone()[0]
# 2. Start import log
cursor.execute(
"SELECT start_import(%s, %s, %s)",
(
bundle.version_label,
bundle.metadata.source_system,
bundle.metadata.source_version,
)
)
import_id = cursor.fetchone()[0]
# 3. Update status to importing
cursor.execute(
"SELECT update_import_status(%s, %s, %s)",
(import_id, "importing", snapshot_id)
)
# 4. Bulk insert all entities
entity_counts = self._insert_entities(cursor, bundle, snapshot_id)
# 5. Finalize import
cursor.execute(
"SELECT finalize_import(%s, %s, %s)",
(import_id, snapshot_id, json.dumps(entity_counts))
)
self.conn.commit()
# 6. Optionally promote
if auto_promote:
self.promote(import_id, snapshot_id)
return {
"success": True,
"snapshot_id": str(snapshot_id),
"import_id": str(import_id),
"version_label": bundle.version_label,
"entity_counts": entity_counts,
"status": "promoted" if auto_promote else "ready",
}
except Exception as e:
self.conn.rollback()
# Log failure if import_id was created
if 'import_id' in locals():
cursor.execute(
"SELECT update_import_status(%s, %s, NULL, NULL, NULL, %s)",
(import_id, "failed", str(e))
)
self.conn.commit()
raise
def _insert_entities(
self,
cursor,
bundle: ImportBundle,
snapshot_id: UUID,
) -> dict:
"""Insert all entities in dependency order."""
counts = {}
# Insert domains
if bundle.domains:
counts["domains"] = self._insert_domains(cursor, bundle.domains, snapshot_id)
# Build slug -> UUID mapping for domains
domain_map = self._get_slug_map(cursor, "domains", snapshot_id)
# Insert trails
if bundle.trails:
counts["trails"] = self._insert_trails(
cursor, bundle.trails, snapshot_id, domain_map
)
# Build trail mapping
trail_map = self._get_slug_map(cursor, "trails", snapshot_id)
# Insert concepts
if bundle.concepts:
counts["concepts"] = self._insert_concepts(
cursor, bundle.concepts, snapshot_id, trail_map
)
# Build concept mapping
concept_map = self._get_slug_map(cursor, "concepts", snapshot_id)
# Insert sparks
if bundle.sparks:
counts["sparks"] = self._insert_sparks(cursor, bundle.sparks, snapshot_id)
# Build spark mapping
spark_map = self._get_slug_map(cursor, "sparks", snapshot_id)
# Insert beacons
if bundle.beacons:
counts["beacons"] = self._insert_beacons(cursor, bundle.beacons, snapshot_id)
# Build beacon mapping
beacon_map = self._get_slug_map(cursor, "beacons", snapshot_id)
# Insert junction tables
if bundle.concept_sparks:
counts["concept_sparks"] = self._insert_concept_sparks(
cursor, bundle.concept_sparks, snapshot_id, concept_map, spark_map
)
if bundle.spark_beacons:
counts["spark_beacons"] = self._insert_spark_beacons(
cursor, bundle.spark_beacons, snapshot_id, spark_map, beacon_map
)
# Insert DAG edges
if bundle.spark_links:
counts["spark_links"] = self._insert_spark_links(
cursor, bundle.spark_links, snapshot_id, spark_map
)
if bundle.concept_links:
counts["concept_links"] = self._insert_concept_links(
cursor, bundle.concept_links, snapshot_id, concept_map
)
return counts
def _insert_domains(self, cursor, domains, snapshot_id) -> int:
"""Insert domain entities."""
values = [
(
snapshot_id,
d.name,
d.slug,
d.description,
d.icon,
d.color,
d.is_published,
)
for d in domains
]
execute_values(
cursor,
"""
INSERT INTO domains (snapshot_id, name, slug, description, icon, color, is_published)
VALUES %s
""",
values,
)
return len(domains)
def _get_slug_map(self, cursor, table: str, snapshot_id: UUID) -> dict[str, UUID]:
"""Get slug -> UUID mapping for a table."""
cursor.execute(
f"SELECT slug, id FROM {table} WHERE snapshot_id = %s",
(snapshot_id,)
)
return {row[0]: row[1] for row in cursor.fetchall()}
# ... additional _insert_* methods for other entity types ...
def promote(self, import_id: UUID, snapshot_id: UUID) -> None:
"""Promote a snapshot to current."""
cursor = self.conn.cursor()
cursor.execute(
"SELECT promote_import(%s, %s)",
(import_id, snapshot_id)
)
self.conn.commit()
def rollback(self, import_id: UUID = None) -> UUID:
"""Rollback to previous snapshot."""
cursor = self.conn.cursor()
cursor.execute(
"SELECT rollback_import(%s)",
(import_id,)
)
parent_id = cursor.fetchone()[0]
self.conn.commit()
return parent_id
def delete_draft(self, snapshot_id: UUID) -> None:
"""Delete a draft snapshot (e.g., after failed import)."""
cursor = self.conn.cursor()
cursor.execute(
"SELECT delete_draft_snapshot(%s)",
(snapshot_id,)
)
self.conn.commit()
# Usage
if __name__ == "__main__":
from musingly_core.models import ImportBundle
import json
# Load and validate bundle
with open("import-bundle-v2.1.0.json") as f:
bundle = ImportBundle.model_validate(json.load(f))
# Import to Supabase
importer = MusinglyImporter(
"postgresql://user:password@localhost:54322/postgres"
)
result = importer.import_bundle(bundle, auto_promote=False)
print(f"Import complete!")
print(f" Snapshot ID: {result['snapshot_id']}")
print(f" Status: {result['status']}")
print(f" Entities: {result['entity_counts']}")
# After manual verification, promote
# importer.promote(result['import_id'], result['snapshot_id'])
Snapshot Lifecycle¶
Snapshot States¶
┌───────────────────────────────────────────────────────────────────┐
│ SNAPSHOT LIFECYCLE │
├───────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ DRAFT │ ─── Import in progress, not visible to users │
│ └────┬────┘ │
│ │ │
│ │ finalize_import() │
│ ▼ │
│ ┌─────────┐ │
│ │ READY │ ─── Import complete, awaiting promotion │
│ └────┬────┘ │
│ │ │
│ │ promote_import() │
│ ▼ │
│ ┌──────────┐ │
│ │ CURRENT │ ─── Active snapshot, served to all users │
│ └────┬─────┘ │
│ │ │
│ │ (new snapshot promoted) │
│ ▼ │
│ ┌──────────┐ │
│ │ ARCHIVED │ ─── Historical, can be used for rollback │
│ └──────────┘ │
│ │
└───────────────────────────────────────────────────────────────────┘
Rollback Procedure¶
# If issues are discovered after promotion
importer = MusinglyImporter(connection_string)
# Option 1: Rollback to parent snapshot
previous_snapshot_id = importer.rollback()
print(f"Rolled back to: {previous_snapshot_id}")
# Option 2: Rollback with import_id tracking
previous_snapshot_id = importer.rollback(import_id=failing_import_id)
Cleanup Old Snapshots¶
-- Keep only the 5 most recent published snapshots
SELECT cleanup_old_snapshots(5);
-- Check import statistics
SELECT * FROM get_import_stats(30); -- Last 30 days
Best Practices¶
1. Version Labeling¶
Use semantic versioning or date-based labels:
# Semantic versioning
version_label = "v2.1.0"
# Date-based
version_label = "2026-01-Q1"
# Content-based
version_label = "ml-expansion-jan2026"
2. Incremental Updates¶
For partial content updates, only include changed entities:
# Update only new/modified sparks
bundle = ImportBundle(
schema_version="1.0.0",
version_label="v2.1.1-hotfix",
metadata=ImportMetadata(
generated_at=datetime.now(timezone.utc).isoformat(),
source_system="content-editor",
description="Fixed typos in attention-mechanisms sparks",
),
# Empty arrays for unchanged entity types
domains=[],
trails=[],
concepts=[],
# Only include modified sparks
sparks=[
SparkImport(
slug="self-attention-explained",
# ... updated content
),
],
beacons=[],
concept_sparks=[],
spark_beacons=[],
spark_links=[],
concept_links=[],
)
3. CI/CD Integration¶
# .github/workflows/content-deploy.yml
name: Deploy Content
on:
push:
branches: [main]
paths:
- 'content/**'
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install SDK
run: pip install ./python-sdk
- name: Validate Bundle
run: python scripts/validate_bundle.py content/bundle.json
- name: Import to Staging
env:
DATABASE_URL: ${{ secrets.STAGING_DATABASE_URL }}
run: python scripts/import_bundle.py content/bundle.json --no-promote
- name: Run Tests
run: pytest tests/content/
- name: Promote if Tests Pass
env:
DATABASE_URL: ${{ secrets.STAGING_DATABASE_URL }}
run: python scripts/promote_snapshot.py
4. Error Handling¶
from musingly_core.models import ImportBundle
import logging
logger = logging.getLogger(__name__)
def safe_import(bundle_path: str, importer: MusinglyImporter) -> dict:
"""Import with comprehensive error handling."""
# Load and validate
try:
with open(bundle_path) as f:
bundle = ImportBundle.model_validate(json.load(f))
except ValidationError as e:
logger.error(f"Schema validation failed: {e}")
raise
# Pre-flight checks
valid, errors = validate_bundle(bundle_path)
if not valid:
logger.error(f"Pre-flight validation failed: {errors}")
raise ValueError(f"Validation errors: {errors}")
# Import
snapshot_id = None
try:
result = importer.import_bundle(bundle, auto_promote=False)
snapshot_id = result["snapshot_id"]
logger.info(f"Import successful: {result}")
return result
except Exception as e:
logger.error(f"Import failed: {e}")
# Cleanup draft snapshot if created
if snapshot_id:
try:
importer.delete_draft(snapshot_id)
logger.info(f"Cleaned up draft snapshot: {snapshot_id}")
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup draft: {cleanup_error}")
raise
Troubleshooting¶
Common Issues¶
1. "Duplicate slug" Error¶
Cause: Same slug used twice in the bundle.
Fix: Ensure each entity has a unique slug within its type.
# Check for duplicates before import
slugs = [s.slug for s in bundle.sparks]
duplicates = [s for s in slugs if slugs.count(s) > 1]
if duplicates:
raise ValueError(f"Duplicate spark slugs: {set(duplicates)}")
2. Foreign Key Violation¶
Cause: Trail references a domain_slug that doesn't exist.
Fix: Ensure all referenced entities are included in the bundle.
3. DAG Cycle Detected¶
Cause: Prerequisite chain forms a loop.
Fix: Review spark_links and remove the edge creating the cycle.
4. Schema Version Mismatch¶
Cause: Bundle uses a schema version the SDK doesn't support.
Fix: Rebuild the Python SDK to get the latest schema:
Viewing Import Logs¶
-- Recent imports
SELECT
version_label,
status,
started_at,
completed_at,
entity_counts,
error_message
FROM import_logs
ORDER BY started_at DESC
LIMIT 10;
-- Failed imports with details
SELECT
version_label,
error_message,
error_details,
started_at
FROM import_logs
WHERE status = 'failed'
ORDER BY started_at DESC;
-- Import statistics
SELECT * FROM get_import_stats(30);
API Reference¶
Database Functions¶
| Function | Description |
|---|---|
create_import_snapshot(name, version_label, description, parent_id) |
Creates a new draft snapshot |
start_import(version_label, source_system, source_version, created_by) |
Starts an import log entry |
update_import_status(import_id, status, ...) |
Updates import status and details |
finalize_import(import_id, snapshot_id, entity_counts) |
Marks import as ready |
promote_import(import_id, snapshot_id) |
Promotes snapshot to current |
rollback_import(import_id) |
Rolls back to previous snapshot |
delete_draft_snapshot(snapshot_id) |
Deletes a draft and its data |
cleanup_old_snapshots(keep_count) |
Archives old snapshots |
get_import_stats(days) |
Returns import statistics |
Python SDK Models¶
| Model | Description |
|---|---|
ImportBundle |
Complete import payload |
ImportMetadata |
Source and timing metadata |
DomainImport |
Top-level category |
TrailImport |
Learning path |
ConceptImport |
Topic cluster |
SparkImport |
Micro-lesson content |
BeaconImport |
Discovery tag |
ConceptSparkImport |
Concept-Spark relationship |
SparkBeaconImport |
Spark-Beacon tag |
SparkLinkImport |
Spark prerequisite edge |
ConceptLinkImport |
Concept prerequisite edge |
Next Steps¶
- Set up your Python environment with the SDK installed
- Create a sample bundle using the examples above
- Run validation to ensure correctness
- Test import on a staging database
- Integrate into your content generation pipeline