Workflow Concepts

Workflow Concepts

Understanding the core concepts behind SeeMe.ai workflows will help you build effective pipelines.

Architecture Overview

graph TD
    subgraph Workflow
        V1[Version 1]
        V2[Version 2 - Active]
    end

    subgraph Version
        N1[Node: Start]
        N2[Node: Model]
        N3[Node: Dataset]
        E1[Edge: Data]
        E2[Edge: Context]
    end

    V2 --> N1
    V2 --> N2
    V2 --> N3
    N1 -->|E1| N2
    N2 -->|E1| N3

Core Entities

Workflow

A workflow is the top-level container that holds one or more versions.

workflow = client.create_workflow(
    name="Document Analysis",
    description="Process documents through multiple AI models"
)

Properties:

FieldDescription
idUnique identifier
nameHuman-readable name
descriptionWhat the workflow does
active_version_idCurrently deployed version
versionsList of all versions

Workflow Version

A version is a specific configuration of nodes and edges. You can have multiple versions for A/B testing or gradual rollouts.

version = client.create_workflow_version(
    workflow_id=workflow.id,
    name="v2",
    config={"timeout": 300}
)

Properties:

FieldDescription
idVersion identifier
versionVersion name (e.g., “v1”)
version_numberSequential number
nodesList of nodes
edgesList of edges
configVersion-level configuration

Workflow Node

A node is a single processing step in the workflow. Nodes can be models, datasets, or control nodes.

node = client.create_workflow_node(
    version_id=version.id,
    name="Sentiment Analysis",
    entity_type="model",
    entity_id=model.id,
    config={
        "input_template": "Analyze sentiment: {{input}}",
        "timeout": 30,
        "on_failure": "continue"
    }
)

Properties:

FieldDescription
idNode identifier
nameDisplay name
entity_typeType: “model”, “dataset”, “start”, “end”
entity_idID of the model or dataset
configNode-specific configuration
positionUI position {x, y}
timeoutMax execution time (seconds)
on_failure“stop” or “continue”

Workflow Edge

An edge connects two nodes and defines how data flows between them.

edge = client.create_workflow_edge(
    version_id=version.id,
    begin_node_id=source_node.id,
    end_node_id=target_node.id,
    edge_type="data"  # or "context"
)

Properties:

FieldDescription
idEdge identifier
begin_node_idSource node
end_node_idTarget node
edge_type“data” or “context”

Execution Model

Execution Flow

sequenceDiagram
    participant User
    participant API
    participant Engine
    participant Queue

    User->>API: POST /execute
    API->>Engine: Create Execution
    Engine->>Queue: Enqueue Job
    Queue-->>Engine: Process
    Engine->>Engine: Execute Nodes
    Engine->>API: Update Status
    User->>API: GET /executions/{id}
    API-->>User: Results

Execution States

StatusDescription
pendingQueued, waiting to start
runningCurrently executing
completedFinished successfully
failedError occurred
cancelledUser cancelled

Execution Context

As a workflow executes, it maintains a context that carries data between nodes:

class ExecutionContext:
    execution_id: str
    workflow_id: str
    user_id: str

    # Data accumulated from nodes
    variables: dict      # {node_id: output}
    context_data: dict   # Data from context edges

    # Current state
    input: str           # Current text input
    input_file_path: str # Current file path
    current_node: str    # Active node

Data Flow

Variables

Each node’s output is stored in variables[node_id]:

Execute Node A → variables["node_a"] = "output from A"
Execute Node B → variables["node_b"] = "output from B"

Access previous outputs in templates:

{{node_a}}  → Gets output from Node A
{{node_b}}  → Gets output from Node B

Input Propagation

The input field carries the current text through the workflow:

Initial input: "Hello world"
Node A (STT): input = "Hello world" → output = "Hello world transcribed"
Node B (NER): input = "Hello world transcribed" → output = [entities]

Template Syntax

Templates allow dynamic input construction using data from the execution context.

Basic Substitution

"Analyze: {{input}}"

Node Output Reference

"Previous result: {{node_id}}"

Nested Properties

"Customer: {{customer.name}}, Status: {{customer.status}}"

Array Iteration

"Rules:
{{#each rules}}
- {{name}}: {{text}}
{{/each}}"

Example

node = client.create_workflow_node(
    version_id=version.id,
    name="LLM Analysis",
    entity_type="model",
    entity_id=llm_model.id,
    config={
        "input_template": """
Analyze the following transcript using these guidelines:
{{#each guidelines}}
- {{rule}}
{{/each}}

Transcript:
{{stt_node}}

Provide a summary.
"""
    }
)

Topological Execution

Nodes execute in topological order based on edges:

graph LR
    A[Node 1] --> C[Node 3]
    B[Node 2] --> C
    C --> D[Node 4]

Execution order: Node 1 → Node 2 → Node 3 → Node 4

Nodes with no dependencies can run in parallel (future feature).

Error Handling

Node-Level

Configure how failures are handled:

node = client.create_workflow_node(
    version_id=version.id,
    name="Optional Step",
    entity_type="model",
    entity_id=model.id,
    config={
        "on_failure": "continue",  # Don't stop workflow
        "timeout": 60
    }
)
on_failureBehavior
stopHalt workflow, mark as failed
continueSkip node, continue execution

Workflow-Level

# Get execution with error details
execution = client.get_workflow_execution(
    workflow_id=workflow.id,
    execution_id=exec_id
)

if execution.status == "failed":
    print(f"Error: {execution.error}")
    print(f"Failed at node: {execution.progress.current_node_id}")

Versioning Strategy

Development Workflow

v1 (draft) → Test → Activate
v2 (draft) → Test → Activate (v1 archived)

Best Practices

  1. Never modify active versions - Create new version instead
  2. Test before activating - Use specific version execution
  3. Keep versions documented - Use clear names and descriptions
  4. Gradual rollout - Test with subset before full switch
# Execute specific version (testing)
execution = client.execute_workflow(
    workflow_id=workflow.id,
    version_id=new_version.id,  # Test new version
    single_input="Test input"
)

# After testing, activate
client.activate_workflow_version(
    workflow_id=workflow.id,
    version_id=new_version.id
)

Next Steps