Monitoring Post-Processor Jobs
Monitoring Post-Processor Jobs
Track processing status, handle failures, and maintain healthy processing pipelines.
Job Status Overview
Check Job Status
Individual Item Status
# Get processing status for a specific item
jobs = client.get_post_processor_jobs(
dataset_id=dataset.id,
item_id=item.id
)
for job in jobs:
print(f"Processor: {job.processor_name}")
print(f" Status: {job.status}")
print(f" Started: {job.started_at}")
print(f" Completed: {job.completed_at}")
if job.error:
print(f" Error: {job.error}")Batch Status
# Get status summary for entire dataset
summary = client.get_post_processor_summary(dataset_id=dataset.id)
print(f"Total items: {summary['total_items']}")
print(f"Processed: {summary['processed']}")
print(f"Pending: {summary['pending']}")
print(f"Failed: {summary['failed']}")
print(f"Processing rate: {summary['items_per_minute']:.1f}/min")Handle Failures
View Failed Jobs
# Get all failed jobs
failed_jobs = client.get_post_processor_jobs(
dataset_id=dataset.id,
status="failed"
)
for job in failed_jobs:
print(f"Item: {job.item_id}")
print(f"Error: {job.error}")
print(f"Error type: {job.error_type}")Retry Failed Jobs
Common Error Types
| Error Type | Cause | Solution |
|---|---|---|
timeout | Processing took too long | Increase timeout, optimize model |
rate_limit | API rate limit hit | Reduce batch size, add delays |
invalid_input | File format issue | Check file validity |
model_error | Model inference failed | Check model compatibility |
out_of_memory | GPU memory exceeded | Reduce batch size |
Processing Metrics
Get Processing Statistics
# Get metrics for a time range
from datetime import datetime, timedelta
metrics = client.get_post_processor_metrics(
dataset_id=dataset.id,
start_time=datetime.now() - timedelta(days=7),
end_time=datetime.now()
)
print(f"Items processed: {metrics['total_processed']}")
print(f"Success rate: {metrics['success_rate']:.1%}")
print(f"Avg processing time: {metrics['avg_duration_ms']:.0f}ms")
print(f"Throughput: {metrics['items_per_hour']:.0f}/hour")
# By processor
for proc_name, proc_metrics in metrics['by_processor'].items():
print(f"\n{proc_name}:")
print(f" Processed: {proc_metrics['processed']}")
print(f" Failed: {proc_metrics['failed']}")Alerts and Notifications
Set Up Alerts
# Alert on high failure rate
alert = client.create_alert(
resource_type="post_processor",
resource_id=processor.id,
name="High Failure Rate",
condition={
"metric": "failure_rate",
"operator": "gt",
"threshold": 0.1, # 10%
"window": "1h"
},
notify=["email:team@company.com"]
)
# Alert on queue backlog
backlog_alert = client.create_alert(
resource_type="dataset",
resource_id=dataset.id,
name="Processing Backlog",
condition={
"metric": "pending_jobs",
"operator": "gt",
"threshold": 1000,
"window": "15m"
}
)View Alert History
# Get triggered alerts
alerts = client.get_alerts(
resource_type="post_processor",
status="triggered"
)
for alert in alerts:
print(f"Alert: {alert.name}")
print(f"Triggered at: {alert.triggered_at}")
print(f"Value: {alert.value}")Queue Management
Pause Processing
# Pause a processor
client.update_post_processor(
processor_id=processor.id,
enabled=False
)
# Resume
client.update_post_processor(
processor_id=processor.id,
enabled=True
)Clear Queue
# Cancel all pending jobs for a processor
client.cancel_post_processor_jobs(
dataset_id=dataset.id,
processor_id=processor.id,
status="pending"
)Reprocess All Items
# Rerun processor on all items
client.reprocess_dataset(
dataset_id=dataset.id,
processor_id=processor.id,
force=True # Even if already processed
)Monitoring Dashboard
# Build a monitoring view
def get_processing_status(client, dataset_id):
summary = client.get_post_processor_summary(dataset_id)
jobs = client.get_post_processor_jobs(
dataset_id=dataset_id,
status="processing"
)
print("=" * 50)
print("POST-PROCESSOR STATUS")
print("=" * 50)
print(f"Total items: {summary['total_items']}")
print(f"Processed: {summary['processed']} ({summary['processed']/summary['total_items']*100:.1f}%)")
print(f"Pending: {summary['pending']}")
print(f"Processing: {len(jobs)}")
print(f"Failed: {summary['failed']}")
print("-" * 50)
if jobs:
print("\nCurrently Processing:")
for job in jobs[:5]:
print(f" - {job.processor_name}: item {job.item_id}")
# Run periodically
import time
while True:
get_processing_status(client, dataset.id)
time.sleep(60)Best Practices
- Monitor failure rates - Alert on spikes
- Watch queue depth - Prevent backlogs
- Review errors regularly - Fix systematic issues
- Use batch retries - Efficient error recovery
- Track throughput - Ensure meeting SLAs
Troubleshooting
| Symptom | Possible Cause | Solution |
|---|---|---|
| Jobs stuck in pending | Processor disabled | Enable processor |
| High failure rate | Model issue or bad data | Check error messages |
| Slow processing | Underpowered resources | Scale up or batch smaller |
| Duplicate processing | Retry loop | Check for job completion bugs |