Chaining Processors

Chaining Processors

Build multi-step processing pipelines by chaining post-processors together.

How Chaining Works

Post-processors execute in order based on their order field. Each processor can use the output of previous processors.

graph LR
    A[Upload File] --> B[Processor 1: OCR]
    B --> C[Processor 2: NER]
    C --> D[Processor 3: LLM]
    D --> E[Final Output]

Setting Execution Order

The order field determines execution sequence:

## First: Extract text from image
ocr_processor = client.create_post_processor(
    dataset_id=dataset.id,
    name="Extract Text",
    model_type="ocr",
    model_id=ocr_model.id,
    output_target="text",
    order=1  # Runs first
)

# Second: Extract entities from text
ner_processor = client.create_post_processor(
    dataset_id=dataset.id,
    name="Extract Entities",
    model_type="ner",
    model_id=ner_model.id,
    output_target="annotations",
    order=2  # Runs second, uses OCR output
)

# Third: Summarize with LLM
summary_processor = client.create_post_processor(
    dataset_id=dataset.id,
    name="Generate Summary",
    model_type="llm",
    model_id=llm_model.id,
    prompt="Summarize the key information from this document.",
    output_target="text",
    order=3  # Runs third, uses OCR text
)

Common Chains

Document Intelligence

OCR → NER → Classification

# 1. OCR: Extract text
client.create_post_processor(
    dataset_id=dataset.id,
    name="OCR",
    model_type="ocr",
    model_id=ocr_model.id,
    output_target="text",
    order=1
)

# 2. NER: Find entities
client.create_post_processor(
    dataset_id=dataset.id,
    name="NER",
    model_type="ner",
    model_id=ner_model.id,
    output_target="annotations",
    auto_create_labels=True,
    order=2
)

# 3. Classification: Categorize document
client.create_post_processor(
    dataset_id=dataset.id,
    name="Classify",
    model_type="classification",
    model_id=classifier.id,
    output_target="annotations",
    auto_create_labels=True,
    order=3
)

Audio Analysis

STT → NER → Sentiment

# 1. Transcribe audio
client.create_post_processor(
    dataset_id=dataset.id,
    name="Transcribe",
    model_type="stt",
    model_id=whisper_model.id,
    output_target="text",
    order=1
)

# 2. Extract names and entities
client.create_post_processor(
    dataset_id=dataset.id,
    name="Extract Entities",
    model_type="ner",
    model_id=ner_model.id,
    output_target="annotations",
    order=2
)

# 3. Analyze sentiment
client.create_post_processor(
    dataset_id=dataset.id,
    name="Sentiment",
    model_type="classification",
    model_id=sentiment_model.id,
    output_target="annotations",
    order=3
)

Meeting Recording

STT-Diarization → NER → LLM Summary

# 1. Transcribe with speakers
client.create_post_processor(
    dataset_id=dataset.id,
    name="Transcribe with Speakers",
    model_type="stt-diarization",
    model_id=diarization_model.id,
    output_target="text",
    order=1
)

# 2. Extract people and topics
client.create_post_processor(
    dataset_id=dataset.id,
    name="Extract Mentions",
    model_type="ner",
    model_id=ner_model.id,
    output_target="annotations",
    order=2
)

# 3. Generate meeting notes
client.create_post_processor(
    dataset_id=dataset.id,
    name="Meeting Notes",
    model_type="llm",
    model_id=llm_model.id,
    prompt="""
    Create meeting notes from this transcript:
    - Key discussion points
    - Decisions made
    - Action items (with owners if mentioned)
    """,
    output_target="text",
    order=3
)

Data Flow Between Processors

Text Field Flow

Each processor can read the item’s text field (updated by output_target="text"):

Item created with file: meeting.mp3
  ↓
Processor 1 (STT): Reads audio → Writes text field
  Item.text = "Hello everyone, let's discuss..."
  ↓
Processor 2 (NER): Reads text field → Writes annotations
  Item.annotations = [{"entity": "everyone", "label": "GROUP"}]
  ↓
Processor 3 (LLM): Reads text field → Appends to text field
  Item.text = "Hello everyone...\n\n--- Summary ---\nMeeting discussed..."

Annotation Accumulation

Annotations from multiple processors accumulate:

After OCR: No annotations
After NER: [PERSON, ORG, DATE] annotations
After Classification: [PERSON, ORG, DATE, document_type] annotations

Conditional Processing

Use confidence thresholds to control flow:

# Only run NER if OCR confidence is high
client.create_post_processor(
    dataset_id=dataset.id,
    name="NER (High Confidence OCR Only)",
    model_type="ner",
    model_id=ner_model.id,
    output_target="annotations",
    confidence_threshold=0.8,  # Skip if previous output confidence < 80%
    order=2
)

Error Handling in Chains

Continue on Failure

# Non-critical processor - continue if fails
client.create_post_processor(
    dataset_id=dataset.id,
    name="Optional Enhancement",
    model_type="llm",
    model_id=llm_model.id,
    config={
        "on_failure": "continue"  # Don't block next processors
    },
    order=2
)

Stop on Failure (Default)

# Critical processor - stop chain if fails
client.create_post_processor(
    dataset_id=dataset.id,
    name="Required OCR",
    model_type="ocr",
    model_id=ocr_model.id,
    config={
        "on_failure": "stop"  # Default behavior
    },
    order=1
)

Monitoring Chain Progress

# Check all jobs for an item
jobs = client.get_post_processor_jobs(
    dataset_id=dataset.id,
    item_id=item.id
)

for job in jobs:
    print(f"[{job.order}] {job.processor_name}: {job.status}")
    if job.error:
        print(f"    Error: {job.error}")

Output:

[1] Extract Text (OCR): completed
[2] Extract Entities (NER): completed
[3] Generate Summary (LLM): processing

Best Practices

  1. Order logically - Text extraction before text analysis
  2. Use text output for chains - Passes data between processors
  3. Set confidence thresholds - Skip downstream if upstream is low quality
  4. Handle failures gracefully - Use “continue” for optional steps
  5. Monitor the chain - Check each step’s status

Limitations

  • Processors run sequentially (no parallel execution within a chain)
  • Each processor creates a separate job
  • Large chains may have higher latency
  • Consider using Workflows for complex logic