End-to-End Pipelines

End-to-End Pipelines

Combine datasets, post-processors, training jobs, and workflows into automated pipelines that take raw data from upload to production.

What’s an End-to-End Pipeline?

An end-to-end pipeline connects every stage of the ML lifecycle:

graph LR
    A[Raw Data] --> B[Auto-Label]
    B --> C[Review]
    C --> D[Train]
    D --> E[Evaluate]
    E --> F[Deploy]
    F --> G[Monitor]
    G -->|Feedback| A

Instead of manually triggering each step, you build a system where data flows through the entire process with minimal intervention.

Pipeline Patterns

Pattern 1: Label → Train → Deploy

The most common pipeline. New data gets labeled, used for training, and the best model gets deployed.

graph TD
    subgraph "Data Ingestion"
        A[Upload Data] --> B[Post-Processor: Auto-Label]
        B --> C[Human Review Queue]
    end

    subgraph "Training"
        C --> D[Create Dataset Version]
        D --> E[Launch Training Job]
        E --> F[Model Artifact]
    end

    subgraph "Evaluation"
        F --> G[Run on Validation Set]
        G --> H{Better than current?}
    end

    subgraph "Deployment"
        H -->|Yes| I[Optimize Model]
        I --> J[Deploy to Production]
        H -->|No| K[Keep Current Model]
    end
from seeme import Client

client = Client()

## --- Stage 1: Data Ingestion with Auto-Labeling ---

dataset = client.get_dataset("product-images")
version = client.get_dataset_version(dataset.id, "v3")

# Auto-label new uploads using existing production model
processor = client.create_post_processor(
    dataset_id=dataset.id,
    name="Pre-Label with Current Model",
    model_type="classification",
    model_id=current_production_model.id,
    output_target="annotations",
    confidence_threshold=0.8,
    auto_create_labels=True,
    enabled=True
)

# Upload new batch of images
for path in new_image_paths:
    client.create_dataset_item(
        version_id=version.id,
        split_id=train_split.id,
        file_path=path
    )

# Wait for processing, then review in web UI...

# --- Stage 2: Train New Model ---

job = client.create_job(
    dataset_id=dataset.id,
    version_id=version.id,
    name=f"Product Classifier v{next_version}",
    config={
        "architecture": "efficientnet_b0",
        "epochs": 30,
        "learning_rate": 0.001,
        "batch_size": 32
    }
)

# Wait for training
import time
while job.status in ["pending", "running"]:
    time.sleep(60)
    job = client.get_job(job.id)

new_model = client.get_model(job.model_id)

# --- Stage 3: Evaluate Against Current Production Model ---

val_results_new = client.predict(
    model_id=new_model.id,
    dataset_id=dataset.id,
    version_id=version.id,
    split="validation"
)

val_results_current = client.predict(
    model_id=current_production_model.id,
    dataset_id=dataset.id,
    version_id=version.id,
    split="validation"
)

new_accuracy = sum(1 for r in val_results_new if r.prediction == r.ground_truth) / len(val_results_new)
current_accuracy = sum(1 for r in val_results_current if r.prediction == r.ground_truth) / len(val_results_current)

print(f"Current model: {current_accuracy:.1%}")
print(f"New model: {new_accuracy:.1%}")

# --- Stage 4: Deploy if Better ---

if new_accuracy > current_accuracy:
    # Optimize
    optimized = client.optimize_model(
        model_id=new_model.id,
        target_format="onnx"
    )

    # Deploy
    client.deploy_model(
        model_id=optimized.id,
        name="Product Classifier Production"
    )

    # Update post-processor to use new model
    client.update_post_processor(
        processor_id=processor.id,
        model_id=optimized.id
    )

    print(f"Deployed new model ({new_accuracy:.1%} vs {current_accuracy:.1%})")
else:
    print(f"Keeping current model ({current_accuracy:.1%} vs {new_accuracy:.1%})")

Pattern 2: Multi-Model Workflow Pipeline

Chain multiple models together using workflows, with each model processing a different aspect of the data.

graph TD
    A[Input Document] --> B[OCR Model]
    B --> C[NER Model]
    B --> D[Classification Model]
    C --> E[Merge Results]
    D --> E
    E --> F[Store in Graph]
    F --> G[Output Dataset]
# Create a workflow that chains multiple models
workflow = client.create_workflow(
    name="Document Intelligence Pipeline",
    description="OCR → NER + Classification → Graph Storage"
)

version = client.create_workflow_version(
    workflow_id=workflow.id,
    name="v1"
)

# Node 1: OCR
ocr_node = client.create_workflow_node(
    version_id=version.id,
    name="Extract Text",
    entity_type="model",
    entity_id=ocr_model.id,
    config={"input_template": "{{input}}"}
)

# Node 2: NER (runs on OCR output)
ner_node = client.create_workflow_node(
    version_id=version.id,
    name="Find Entities",
    entity_type="model",
    entity_id=ner_model.id,
    config={"input_template": "{{Extract Text}}"}
)

# Node 3: Classification (runs on OCR output in parallel)
classify_node = client.create_workflow_node(
    version_id=version.id,
    name="Classify Document",
    entity_type="model",
    entity_id=classifier_model.id,
    config={"input_template": "{{Extract Text}}"}
)

# Connect nodes
client.create_workflow_edge(
    version_id=version.id,
    begin_node_id=ocr_node.id,
    end_node_id=ner_node.id,
    edge_type="data"
)

client.create_workflow_edge(
    version_id=version.id,
    begin_node_id=ocr_node.id,
    end_node_id=classify_node.id,
    edge_type="data"
)

# Execute on a batch
for doc_path in document_paths:
    execution = client.execute_workflow(
        workflow_id=workflow.id,
        input_mode="single",
        file_path=doc_path
    )

Pattern 3: Distillation Pipeline

Automate the full distillation cycle from Model Distillation:

graph TD
    A[Unlabeled Data] --> B[Teacher Model Labels]
    B --> C[Human Review]
    C --> D[Train Student]
    D --> E[Evaluate Teacher vs Student]
    E --> F{Student good enough?}
    F -->|Yes| G[Deploy Student]
    F -->|No| H[Collect More Data]
    H --> A
    G --> I[Monitor Performance]
    I -->|Drift detected| A
def run_distillation_cycle(
    client,
    teacher_model_id,
    dataset_id,
    version_id,
    student_config,
    min_accuracy=0.90
):
    """Run one cycle of the distillation pipeline."""

    # Step 1: Label with teacher
    processor = client.create_post_processor(
        dataset_id=dataset_id,
        name="Teacher Labeler",
        model_type="classification",
        model_id=teacher_model_id,
        output_target="annotations",
        confidence_threshold=0.8,
        auto_create_labels=True,
        enabled=True
    )

    # Step 2: Wait for labeling to complete
    while True:
        jobs = client.get_post_processor_jobs(
            dataset_id=dataset_id,
            status="pending"
        )
        if len(jobs) == 0:
            break
        time.sleep(10)

    # Step 3: Train student
    job = client.create_job(
        dataset_id=dataset_id,
        version_id=version_id,
        name="Student Model",
        config=student_config
    )

    while job.status in ["pending", "running"]:
        time.sleep(30)
        job = client.get_job(job.id)

    student_model = client.get_model(job.model_id)

    # Step 4: Evaluate
    student_results = client.predict(
        model_id=student_model.id,
        dataset_id=dataset_id,
        version_id=version_id,
        split="validation"
    )

    accuracy = sum(
        1 for r in student_results
        if r.prediction == r.ground_truth
    ) / len(student_results)

    print(f"Student accuracy: {accuracy:.1%}")

    if accuracy >= min_accuracy:
        print("Student meets quality bar. Ready for deployment.")
        return student_model, accuracy
    else:
        print(f"Student below threshold ({min_accuracy:.0%}). Need more data.")
        return None, accuracy


# Run the pipeline
student, accuracy = run_distillation_cycle(
    client=client,
    teacher_model_id=large_model.id,
    dataset_id=dataset.id,
    version_id=version.id,
    student_config={
        "architecture": "mobilenet_v2",
        "epochs": 30,
        "learning_rate": 0.001
    },
    min_accuracy=0.90
)

if student:
    optimized = client.optimize_model(
        model_id=student.id,
        target_format="onnx"
    )
    client.deploy_model(model_id=optimized.id, name="Production Classifier")

Pattern 4: Feedback Loop Pipeline

Use production predictions to continuously improve your model:

graph LR
    A[Production API] --> B[Log Predictions]
    B --> C[Sample Low-Confidence]
    C --> D[Human Review]
    D --> E[Add to Training Set]
    E --> F[Retrain]
    F --> G[Evaluate]
    G --> A
# Collect low-confidence predictions for review
predictions = client.get_predictions(
    model_id=production_model.id,
    min_date="2024-01-01",
    max_confidence=0.7  # Only uncertain predictions
)

print(f"Found {len(predictions)} uncertain predictions to review")

# Add uncertain samples to training dataset for review
for pred in predictions:
    client.create_dataset_item(
        version_id=review_version.id,
        split_id=review_split.id,
        file_path=pred.input_path,
        metadata={"source": "production_feedback", "original_prediction": pred.label}
    )

# After human review → retrain → evaluate → deploy if better

Combining Post-Processors and Workflows

Post-processors and workflows serve different purposes but complement each other:

FeaturePost-ProcessorsWorkflows
TriggerAutomatic on data uploadManual or API-triggered
ScopeSingle datasetAcross datasets and models
ComplexitySingle model per stepMulti-model graphs
Best forLabeling, enrichmentMulti-step processing

Use post-processors for automated labeling, and workflows for complex multi-model processing:

# Post-processor: auto-label every new image uploaded
processor = client.create_post_processor(
    dataset_id=incoming_dataset.id,
    name="Auto-Classify Incoming",
    model_type="classification",
    model_id=classifier.id,
    output_target="annotations",
    enabled=True
)

# Workflow: periodically process labeled data through full pipeline
workflow_execution = client.execute_workflow(
    workflow_id=processing_pipeline.id,
    input_mode="dataset",
    dataset_id=incoming_dataset.id,
    version_id=latest_version.id
)

Best Practices

  1. Version everything - Dataset versions, model versions, workflow versions. You need to reproduce results
  2. Automate evaluation - Never deploy without comparing to the current production model
  3. Start simple - Begin with Pattern 1 (Label → Train → Deploy). Add complexity only when needed
  4. Monitor continuously - Production performance degrades over time (data drift)
  5. Keep humans in the loop - Automated pipelines still need human review at the labeling stage

Next Step

Learn how to make your deployed models faster and cheaper with Production Optimization.