End-to-End Pipelines
Combine datasets, post-processors, training jobs, and workflows into automated pipelines that take raw data from upload to production.
What’s an End-to-End Pipeline?
An end-to-end pipeline connects every stage of the ML lifecycle:
graph LR
A[Raw Data] --> B[Auto-Label]
B --> C[Review]
C --> D[Train]
D --> E[Evaluate]
E --> F[Deploy]
F --> G[Monitor]
G -->|Feedback| AInstead of manually triggering each step, you build a system where data flows through the entire process with minimal intervention.
Pipeline Patterns
Pattern 1: Label → Train → Deploy
The most common pipeline. New data gets labeled, used for training, and the best model gets deployed.
graph TD
subgraph "Data Ingestion"
A[Upload Data] --> B[Post-Processor: Auto-Label]
B --> C[Human Review Queue]
end
subgraph "Training"
C --> D[Create Dataset Version]
D --> E[Launch Training Job]
E --> F[Model Artifact]
end
subgraph "Evaluation"
F --> G[Run on Validation Set]
G --> H{Better than current?}
end
subgraph "Deployment"
H -->|Yes| I[Optimize Model]
I --> J[Deploy to Production]
H -->|No| K[Keep Current Model]
endfrom seeme import Client
client = Client()
## --- Stage 1: Data Ingestion with Auto-Labeling ---
dataset = client.get_dataset("product-images")
version = client.get_dataset_version(dataset.id, "v3")
# Auto-label new uploads using existing production model
processor = client.create_post_processor(
dataset_id=dataset.id,
name="Pre-Label with Current Model",
model_type="classification",
model_id=current_production_model.id,
output_target="annotations",
confidence_threshold=0.8,
auto_create_labels=True,
enabled=True
)
# Upload new batch of images
for path in new_image_paths:
client.create_dataset_item(
version_id=version.id,
split_id=train_split.id,
file_path=path
)
# Wait for processing, then review in web UI...
# --- Stage 2: Train New Model ---
job = client.create_job(
dataset_id=dataset.id,
version_id=version.id,
name=f"Product Classifier v{next_version}",
config={
"architecture": "efficientnet_b0",
"epochs": 30,
"learning_rate": 0.001,
"batch_size": 32
}
)
# Wait for training
import time
while job.status in ["pending", "running"]:
time.sleep(60)
job = client.get_job(job.id)
new_model = client.get_model(job.model_id)
# --- Stage 3: Evaluate Against Current Production Model ---
val_results_new = client.predict(
model_id=new_model.id,
dataset_id=dataset.id,
version_id=version.id,
split="validation"
)
val_results_current = client.predict(
model_id=current_production_model.id,
dataset_id=dataset.id,
version_id=version.id,
split="validation"
)
new_accuracy = sum(1 for r in val_results_new if r.prediction == r.ground_truth) / len(val_results_new)
current_accuracy = sum(1 for r in val_results_current if r.prediction == r.ground_truth) / len(val_results_current)
print(f"Current model: {current_accuracy:.1%}")
print(f"New model: {new_accuracy:.1%}")
# --- Stage 4: Deploy if Better ---
if new_accuracy > current_accuracy:
# Optimize
optimized = client.optimize_model(
model_id=new_model.id,
target_format="onnx"
)
# Deploy
client.deploy_model(
model_id=optimized.id,
name="Product Classifier Production"
)
# Update post-processor to use new model
client.update_post_processor(
processor_id=processor.id,
model_id=optimized.id
)
print(f"Deployed new model ({new_accuracy:.1%} vs {current_accuracy:.1%})")
else:
print(f"Keeping current model ({current_accuracy:.1%} vs {new_accuracy:.1%})")Pattern 2: Multi-Model Workflow Pipeline
Chain multiple models together using workflows, with each model processing a different aspect of the data.
graph TD
A[Input Document] --> B[OCR Model]
B --> C[NER Model]
B --> D[Classification Model]
C --> E[Merge Results]
D --> E
E --> F[Store in Graph]
F --> G[Output Dataset]# Create a workflow that chains multiple models
workflow = client.create_workflow(
name="Document Intelligence Pipeline",
description="OCR → NER + Classification → Graph Storage"
)
version = client.create_workflow_version(
workflow_id=workflow.id,
name="v1"
)
# Node 1: OCR
ocr_node = client.create_workflow_node(
version_id=version.id,
name="Extract Text",
entity_type="model",
entity_id=ocr_model.id,
config={"input_template": "{{input}}"}
)
# Node 2: NER (runs on OCR output)
ner_node = client.create_workflow_node(
version_id=version.id,
name="Find Entities",
entity_type="model",
entity_id=ner_model.id,
config={"input_template": "{{Extract Text}}"}
)
# Node 3: Classification (runs on OCR output in parallel)
classify_node = client.create_workflow_node(
version_id=version.id,
name="Classify Document",
entity_type="model",
entity_id=classifier_model.id,
config={"input_template": "{{Extract Text}}"}
)
# Connect nodes
client.create_workflow_edge(
version_id=version.id,
begin_node_id=ocr_node.id,
end_node_id=ner_node.id,
edge_type="data"
)
client.create_workflow_edge(
version_id=version.id,
begin_node_id=ocr_node.id,
end_node_id=classify_node.id,
edge_type="data"
)
# Execute on a batch
for doc_path in document_paths:
execution = client.execute_workflow(
workflow_id=workflow.id,
input_mode="single",
file_path=doc_path
)Pattern 3: Distillation Pipeline
Automate the full distillation cycle from Model Distillation:
graph TD
A[Unlabeled Data] --> B[Teacher Model Labels]
B --> C[Human Review]
C --> D[Train Student]
D --> E[Evaluate Teacher vs Student]
E --> F{Student good enough?}
F -->|Yes| G[Deploy Student]
F -->|No| H[Collect More Data]
H --> A
G --> I[Monitor Performance]
I -->|Drift detected| Adef run_distillation_cycle(
client,
teacher_model_id,
dataset_id,
version_id,
student_config,
min_accuracy=0.90
):
"""Run one cycle of the distillation pipeline."""
# Step 1: Label with teacher
processor = client.create_post_processor(
dataset_id=dataset_id,
name="Teacher Labeler",
model_type="classification",
model_id=teacher_model_id,
output_target="annotations",
confidence_threshold=0.8,
auto_create_labels=True,
enabled=True
)
# Step 2: Wait for labeling to complete
while True:
jobs = client.get_post_processor_jobs(
dataset_id=dataset_id,
status="pending"
)
if len(jobs) == 0:
break
time.sleep(10)
# Step 3: Train student
job = client.create_job(
dataset_id=dataset_id,
version_id=version_id,
name="Student Model",
config=student_config
)
while job.status in ["pending", "running"]:
time.sleep(30)
job = client.get_job(job.id)
student_model = client.get_model(job.model_id)
# Step 4: Evaluate
student_results = client.predict(
model_id=student_model.id,
dataset_id=dataset_id,
version_id=version_id,
split="validation"
)
accuracy = sum(
1 for r in student_results
if r.prediction == r.ground_truth
) / len(student_results)
print(f"Student accuracy: {accuracy:.1%}")
if accuracy >= min_accuracy:
print("Student meets quality bar. Ready for deployment.")
return student_model, accuracy
else:
print(f"Student below threshold ({min_accuracy:.0%}). Need more data.")
return None, accuracy
# Run the pipeline
student, accuracy = run_distillation_cycle(
client=client,
teacher_model_id=large_model.id,
dataset_id=dataset.id,
version_id=version.id,
student_config={
"architecture": "mobilenet_v2",
"epochs": 30,
"learning_rate": 0.001
},
min_accuracy=0.90
)
if student:
optimized = client.optimize_model(
model_id=student.id,
target_format="onnx"
)
client.deploy_model(model_id=optimized.id, name="Production Classifier")Pattern 4: Feedback Loop Pipeline
Use production predictions to continuously improve your model:
graph LR
A[Production API] --> B[Log Predictions]
B --> C[Sample Low-Confidence]
C --> D[Human Review]
D --> E[Add to Training Set]
E --> F[Retrain]
F --> G[Evaluate]
G --> A# Collect low-confidence predictions for review
predictions = client.get_predictions(
model_id=production_model.id,
min_date="2024-01-01",
max_confidence=0.7 # Only uncertain predictions
)
print(f"Found {len(predictions)} uncertain predictions to review")
# Add uncertain samples to training dataset for review
for pred in predictions:
client.create_dataset_item(
version_id=review_version.id,
split_id=review_split.id,
file_path=pred.input_path,
metadata={"source": "production_feedback", "original_prediction": pred.label}
)
# After human review → retrain → evaluate → deploy if betterCombining Post-Processors and Workflows
Post-processors and workflows serve different purposes but complement each other:
| Feature | Post-Processors | Workflows |
|---|---|---|
| Trigger | Automatic on data upload | Manual or API-triggered |
| Scope | Single dataset | Across datasets and models |
| Complexity | Single model per step | Multi-model graphs |
| Best for | Labeling, enrichment | Multi-step processing |
Use post-processors for automated labeling, and workflows for complex multi-model processing:
# Post-processor: auto-label every new image uploaded
processor = client.create_post_processor(
dataset_id=incoming_dataset.id,
name="Auto-Classify Incoming",
model_type="classification",
model_id=classifier.id,
output_target="annotations",
enabled=True
)
# Workflow: periodically process labeled data through full pipeline
workflow_execution = client.execute_workflow(
workflow_id=processing_pipeline.id,
input_mode="dataset",
dataset_id=incoming_dataset.id,
version_id=latest_version.id
)Best Practices
- Version everything - Dataset versions, model versions, workflow versions. You need to reproduce results
- Automate evaluation - Never deploy without comparing to the current production model
- Start simple - Begin with Pattern 1 (Label → Train → Deploy). Add complexity only when needed
- Monitor continuously - Production performance degrades over time (data drift)
- Keep humans in the loop - Automated pipelines still need human review at the labeling stage
Next Step
Learn how to make your deployed models faster and cheaper with Production Optimization.