Execution & Monitoring
Execution & Monitoring
Learn how to run workflows, track progress, and handle results.
Execution Modes
| Mode | Input | Use Case |
|---|---|---|
| Single | One text/file | Testing, real-time processing |
| Batch | Dataset items | Bulk processing |
Single Execution
Process one input through the workflow.
Text Input
File Input
Specific Version
Execute a specific version instead of the active one:
execution = client.execute_workflow(
workflow_id=workflow.id,
version_id=specific_version.id, # Not the active version
input_mode="single",
single_input="Test input"
)Batch Execution
Process multiple items from a dataset.
# Execute batch from dataset
execution = client.execute_workflow(
workflow_id=workflow.id,
input_mode="batch",
batch_config={
"dataset_id": source_dataset.id,
"dataset_version_id": source_version.id,
"input_field": "text", # Column to use as input
"filter_query": "", # Optional filter
"parallelism": 5 # Concurrent items
}
)
print(f"Processing {execution.progress.total_items} items")Monitoring Execution
Execution States
stateDiagram-v2
[*] --> pending
pending --> running
running --> completed
running --> failed
running --> cancelled
completed --> [*]
failed --> [*]
cancelled --> [*]| Status | Description |
|---|---|
pending | Queued, waiting for resources |
running | Currently processing |
completed | Finished successfully |
failed | Error occurred |
cancelled | User cancelled |
Poll for Status
Progress Object
class ExecutionProgress:
total_nodes: int # Total nodes in workflow
current_node_idx: int # Current node index
current_node_id: str # Current node ID
# Batch mode only
total_items: int # Items to process
processed_items: int # Completed items
failed_items: int # Failed itemsGetting Results
Successful Execution
if execution.status == "completed":
# Results contain final workflow output
results = execution.results
# For single execution
print(f"Output: {results}")
# For batch execution
for item_result in results.get("items", []):
print(f"Item {item_result['source_item_id']}: {item_result['output']}")Per-Node Results
Access intermediate results from each node:
# Node executions contain per-node details
node_results = execution.node_executions
for node_id, node_result in node_results.items():
print(f"\nNode: {node_id}")
print(f" Status: {node_result['status']}")
print(f" Output: {node_result['output']}")
print(f" Prompt sent: {node_result.get('prompt', 'N/A')}")
print(f" Duration: {node_result.get('duration_ms', 'N/A')}ms")Failed Execution
if execution.status == "failed":
print(f"Error: {execution.error}")
# Check which node failed
if execution.progress:
print(f"Failed at node: {execution.progress.current_node_id}")
# Check node-level errors
for node_id, node_result in execution.node_executions.items():
if node_result.get("error"):
print(f"Node {node_id} error: {node_result['error']}")Cancel Execution
Stop a running execution:
List Executions
View execution history:
# Get all executions for a workflow
executions = client.get_workflow_executions(
workflow_id=workflow.id,
limit=50,
status="completed" # Optional filter
)
for exec in executions:
print(f"{exec.id}: {exec.status} - {exec.created_at}")Download Input File
For file-based executions, download the original input:
# Download the input file that was processed
input_file = client.download_execution_input(
workflow_id=workflow.id,
execution_id=execution.id,
output_path="./input_file.pdf"
)Execution Timeouts
Workflow-Level Timeout
Set maximum execution time for the entire workflow:
execution = client.execute_workflow(
workflow_id=workflow.id,
input_mode="single",
single_input="...",
timeout=600 # 10 minutes max
)Node-Level Timeout
Set per-node timeouts during node creation:
node = client.create_workflow_node(
version_id=version.id,
name="Slow Model",
entity_type="model",
entity_id=model.id,
config={
"input_template": "{{input}}",
"timeout": 120 # 2 minutes for this node
}
)Error Handling
Retry Logic
The execution engine includes automatic retries:
# Default: 3 retries with exponential backoff
# Configurable at version level
version = client.update_workflow_version(
workflow_id=workflow.id,
version_id=version.id,
config={
"max_retries": 5,
"retry_delay_seconds": 10
}
)Continue on Failure
Configure nodes to continue even if they fail:
optional_node = client.create_workflow_node(
version_id=version.id,
name="Optional Enhancement",
entity_type="model",
entity_id=model.id,
config={
"input_template": "{{input}}",
"on_failure": "continue" # Don't stop workflow
}
)Complete Example
from seeme import Client
import time
client = Client()
# Execute workflow
execution = client.execute_workflow(
workflow_id="workflow-id",
input_mode="single",
item="./document.pdf"
)
print(f"Started execution: {execution.id}")
# Monitor until complete
while execution.status in ["pending", "running"]:
time.sleep(5)
execution = client.get_workflow_execution(
workflow_id="workflow-id",
execution_id=execution.id
)
if execution.progress:
print(f"Processing node {execution.progress.current_node_idx + 1}/{execution.progress.total_nodes}")
# Handle result
if execution.status == "completed":
print("\n=== Results ===")
print(execution.results)
# Show per-node outputs
print("\n=== Node Details ===")
for node_id, details in execution.node_executions.items():
print(f"{node_id}: {details.get('output', 'No output')[:100]}...")
elif execution.status == "failed":
print(f"\nExecution failed: {execution.error}")
elif execution.status == "cancelled":
print("\nExecution was cancelled")Best Practices
- Set appropriate timeouts - Account for model complexity
- Use batch mode for bulk processing - More efficient than many single calls
- Monitor progress - Don’t just wait blindly
- Handle all status states - completed, failed, cancelled
- Log node-level results - Useful for debugging