Workflow Concepts
Understanding the core concepts behind SeeMe.ai workflows will help you build effective pipelines.
Architecture Overview
graph TD
subgraph Workflow
V1[Version 1]
V2[Version 2 - Active]
end
subgraph Version
N1[Node: Start]
N2[Node: Model]
N3[Node: Dataset]
E1[Edge: Data]
E2[Edge: Context]
end
V2 --> N1
V2 --> N2
V2 --> N3
N1 -->|E1| N2
N2 -->|E1| N3Core Entities
Workflow
A workflow is the top-level container that holds one or more versions.
workflow = client.create_workflow(
name="Document Analysis",
description="Process documents through multiple AI models"
)Properties:
| Field | Description |
|---|---|
id | Unique identifier |
name | Human-readable name |
description | What the workflow does |
active_version_id | Currently deployed version |
versions | List of all versions |
Workflow Version
A version is a specific configuration of nodes and edges. You can have multiple versions for A/B testing or gradual rollouts.
version = client.create_workflow_version(
workflow_id=workflow.id,
name="v2",
config={"timeout": 300}
)Properties:
| Field | Description |
|---|---|
id | Version identifier |
version | Version name (e.g., “v1”) |
version_number | Sequential number |
nodes | List of nodes |
edges | List of edges |
config | Version-level configuration |
Workflow Node
A node is a single processing step in the workflow. Nodes can be models, datasets, or control nodes.
node = client.create_workflow_node(
version_id=version.id,
name="Sentiment Analysis",
entity_type="model",
entity_id=model.id,
config={
"input_template": "Analyze sentiment: {{input}}",
"timeout": 30,
"on_failure": "continue"
}
)Properties:
| Field | Description |
|---|---|
id | Node identifier |
name | Display name |
entity_type | Type: “model”, “dataset”, “start”, “end” |
entity_id | ID of the model or dataset |
config | Node-specific configuration |
position | UI position {x, y} |
timeout | Max execution time (seconds) |
on_failure | “stop” or “continue” |
Workflow Edge
An edge connects two nodes and defines how data flows between them.
edge = client.create_workflow_edge(
version_id=version.id,
begin_node_id=source_node.id,
end_node_id=target_node.id,
edge_type="data" # or "context"
)Properties:
| Field | Description |
|---|---|
id | Edge identifier |
begin_node_id | Source node |
end_node_id | Target node |
edge_type | “data” or “context” |
Execution Model
Execution Flow
sequenceDiagram
participant User
participant API
participant Engine
participant Queue
User->>API: POST /execute
API->>Engine: Create Execution
Engine->>Queue: Enqueue Job
Queue-->>Engine: Process
Engine->>Engine: Execute Nodes
Engine->>API: Update Status
User->>API: GET /executions/{id}
API-->>User: ResultsExecution States
| Status | Description |
|---|---|
pending | Queued, waiting to start |
running | Currently executing |
completed | Finished successfully |
failed | Error occurred |
cancelled | User cancelled |
Execution Context
As a workflow executes, it maintains a context that carries data between nodes:
class ExecutionContext:
execution_id: str
workflow_id: str
user_id: str
# Data accumulated from nodes
variables: dict # {node_id: output}
context_data: dict # Data from context edges
# Current state
input: str # Current text input
input_file_path: str # Current file path
current_node: str # Active nodeData Flow
Variables
Each node’s output is stored in variables[node_id]:
Execute Node A → variables["node_a"] = "output from A"
Execute Node B → variables["node_b"] = "output from B"Access previous outputs in templates:
{{node_a}} → Gets output from Node A
{{node_b}} → Gets output from Node BInput Propagation
The input field carries the current text through the workflow:
Initial input: "Hello world"
Node A (STT): input = "Hello world" → output = "Hello world transcribed"
Node B (NER): input = "Hello world transcribed" → output = [entities]Template Syntax
Templates allow dynamic input construction using data from the execution context.
Basic Substitution
"Analyze: {{input}}"Node Output Reference
"Previous result: {{node_id}}"Nested Properties
"Customer: {{customer.name}}, Status: {{customer.status}}"Array Iteration
"Rules:
{{#each rules}}
- {{name}}: {{text}}
{{/each}}"Example
node = client.create_workflow_node(
version_id=version.id,
name="LLM Analysis",
entity_type="model",
entity_id=llm_model.id,
config={
"input_template": """
Analyze the following transcript using these guidelines:
{{#each guidelines}}
- {{rule}}
{{/each}}
Transcript:
{{stt_node}}
Provide a summary.
"""
}
)Topological Execution
Nodes execute in topological order based on edges:
graph LR
A[Node 1] --> C[Node 3]
B[Node 2] --> C
C --> D[Node 4]Execution order: Node 1 → Node 2 → Node 3 → Node 4
Nodes with no dependencies can run in parallel (future feature).
Error Handling
Node-Level
Configure how failures are handled:
node = client.create_workflow_node(
version_id=version.id,
name="Optional Step",
entity_type="model",
entity_id=model.id,
config={
"on_failure": "continue", # Don't stop workflow
"timeout": 60
}
)| on_failure | Behavior |
|---|---|
stop | Halt workflow, mark as failed |
continue | Skip node, continue execution |
Workflow-Level
# Get execution with error details
execution = client.get_workflow_execution(
workflow_id=workflow.id,
execution_id=exec_id
)
if execution.status == "failed":
print(f"Error: {execution.error}")
print(f"Failed at node: {execution.progress.current_node_id}")Versioning Strategy
Development Workflow
v1 (draft) → Test → Activate
v2 (draft) → Test → Activate (v1 archived)Best Practices
- Never modify active versions - Create new version instead
- Test before activating - Use specific version execution
- Keep versions documented - Use clear names and descriptions
- Gradual rollout - Test with subset before full switch
# Execute specific version (testing)
execution = client.execute_workflow(
workflow_id=workflow.id,
version_id=new_version.id, # Test new version
single_input="Test input"
)
# After testing, activate
client.activate_workflow_version(
workflow_id=workflow.id,
version_id=new_version.id
)