Monitoring Post-Processor Jobs

Monitoring Post-Processor Jobs

Track processing status, handle failures, and maintain healthy processing pipelines.

Job Status Overview

Check Job Status

Individual Item Status

# Get processing status for a specific item
jobs = client.get_post_processor_jobs(
    dataset_id=dataset.id,
    item_id=item.id
)

for job in jobs:
    print(f"Processor: {job.processor_name}")
    print(f"  Status: {job.status}")
    print(f"  Started: {job.started_at}")
    print(f"  Completed: {job.completed_at}")
    if job.error:
        print(f"  Error: {job.error}")

Batch Status

# Get status summary for entire dataset
summary = client.get_post_processor_summary(dataset_id=dataset.id)

print(f"Total items: {summary['total_items']}")
print(f"Processed: {summary['processed']}")
print(f"Pending: {summary['pending']}")
print(f"Failed: {summary['failed']}")
print(f"Processing rate: {summary['items_per_minute']:.1f}/min")

Handle Failures

View Failed Jobs

# Get all failed jobs
failed_jobs = client.get_post_processor_jobs(
    dataset_id=dataset.id,
    status="failed"
)

for job in failed_jobs:
    print(f"Item: {job.item_id}")
    print(f"Error: {job.error}")
    print(f"Error type: {job.error_type}")

Retry Failed Jobs

Common Error Types

Error TypeCauseSolution
timeoutProcessing took too longIncrease timeout, optimize model
rate_limitAPI rate limit hitReduce batch size, add delays
invalid_inputFile format issueCheck file validity
model_errorModel inference failedCheck model compatibility
out_of_memoryGPU memory exceededReduce batch size

Processing Metrics

Get Processing Statistics

# Get metrics for a time range
from datetime import datetime, timedelta

metrics = client.get_post_processor_metrics(
    dataset_id=dataset.id,
    start_time=datetime.now() - timedelta(days=7),
    end_time=datetime.now()
)

print(f"Items processed: {metrics['total_processed']}")
print(f"Success rate: {metrics['success_rate']:.1%}")
print(f"Avg processing time: {metrics['avg_duration_ms']:.0f}ms")
print(f"Throughput: {metrics['items_per_hour']:.0f}/hour")

# By processor
for proc_name, proc_metrics in metrics['by_processor'].items():
    print(f"\n{proc_name}:")
    print(f"  Processed: {proc_metrics['processed']}")
    print(f"  Failed: {proc_metrics['failed']}")

Alerts and Notifications

Set Up Alerts

# Alert on high failure rate
alert = client.create_alert(
    resource_type="post_processor",
    resource_id=processor.id,
    name="High Failure Rate",
    condition={
        "metric": "failure_rate",
        "operator": "gt",
        "threshold": 0.1,  # 10%
        "window": "1h"
    },
    notify=["email:team@company.com"]
)

# Alert on queue backlog
backlog_alert = client.create_alert(
    resource_type="dataset",
    resource_id=dataset.id,
    name="Processing Backlog",
    condition={
        "metric": "pending_jobs",
        "operator": "gt",
        "threshold": 1000,
        "window": "15m"
    }
)

View Alert History

# Get triggered alerts
alerts = client.get_alerts(
    resource_type="post_processor",
    status="triggered"
)

for alert in alerts:
    print(f"Alert: {alert.name}")
    print(f"Triggered at: {alert.triggered_at}")
    print(f"Value: {alert.value}")

Queue Management

Pause Processing

# Pause a processor
client.update_post_processor(
    processor_id=processor.id,
    enabled=False
)

# Resume
client.update_post_processor(
    processor_id=processor.id,
    enabled=True
)

Clear Queue

# Cancel all pending jobs for a processor
client.cancel_post_processor_jobs(
    dataset_id=dataset.id,
    processor_id=processor.id,
    status="pending"
)

Reprocess All Items

# Rerun processor on all items
client.reprocess_dataset(
    dataset_id=dataset.id,
    processor_id=processor.id,
    force=True  # Even if already processed
)

Monitoring Dashboard

# Build a monitoring view
def get_processing_status(client, dataset_id):
    summary = client.get_post_processor_summary(dataset_id)
    jobs = client.get_post_processor_jobs(
        dataset_id=dataset_id,
        status="processing"
    )

    print("=" * 50)
    print("POST-PROCESSOR STATUS")
    print("=" * 50)
    print(f"Total items:  {summary['total_items']}")
    print(f"Processed:    {summary['processed']} ({summary['processed']/summary['total_items']*100:.1f}%)")
    print(f"Pending:      {summary['pending']}")
    print(f"Processing:   {len(jobs)}")
    print(f"Failed:       {summary['failed']}")
    print("-" * 50)

    if jobs:
        print("\nCurrently Processing:")
        for job in jobs[:5]:
            print(f"  - {job.processor_name}: item {job.item_id}")

# Run periodically
import time
while True:
    get_processing_status(client, dataset.id)
    time.sleep(60)

Best Practices

  1. Monitor failure rates - Alert on spikes
  2. Watch queue depth - Prevent backlogs
  3. Review errors regularly - Fix systematic issues
  4. Use batch retries - Efficient error recovery
  5. Track throughput - Ensure meeting SLAs

Troubleshooting

SymptomPossible CauseSolution
Jobs stuck in pendingProcessor disabledEnable processor
High failure rateModel issue or bad dataCheck error messages
Slow processingUnderpowered resourcesScale up or batch smaller
Duplicate processingRetry loopCheck for job completion bugs

Next Steps