Workflows

Workflows

Workflows allow you to chain multiple models together to create complex AI pipelines. For example, you can create a workflow that first detects objects in an image, then classifies each detected object.

Workflows

Get all workflows

workflows = client.get_workflows()

Create a workflow

from seeme.types import Workflow

my_workflow = Workflow(
    name="Object Detection Pipeline",
    description="Detect and classify objects in images"
)

my_workflow = client.create_workflow(my_workflow)
ParameterTypeDescription
workflowWorkflowThe workflow object

Workflow properties:

PropertyTypeDescription
idstrUnique id for the workflow
created_atstrThe creation date
updated_atstrLast updated date
namestrThe workflow name
descriptionstrThe workflow description
user_idstrThe user id of the workflow creator
active_version_idstrThe id of the active workflow version
versionsList[WorkflowVersion]List of workflow versions
publicboolWhether the workflow is public
has_logoboolWhether the workflow has a logo

Get a workflow

workflow = client.get_workflow(my_workflow.id)
ParameterTypeDescription
workflow_idstrThe workflow id

Update a workflow

my_workflow.description = "Updated description"
client.update_workflow(my_workflow)
ParameterTypeDescription
workflowWorkflowThe workflow object

Delete a workflow

client.delete_workflow(my_workflow.id)
ParameterTypeDescription
workflow_idstrThe workflow id

Upload workflow logo

client.upload_workflow_logo(
    workflow_id=my_workflow.id,
    folder="path/to/logo",
    filename="logo.jpg"
)

Download workflow logo

client.get_workflow_logo(my_workflow, download_path="workflow_logo.jpg")

Delete workflow logo

client.delete_workflow_logo(my_workflow.id)

Workflow Versions

A workflow can have multiple versions, allowing you to iterate on your pipeline design.

Get all workflow versions

versions = client.get_workflow_versions(my_workflow.id)

Create a workflow version

from seeme.types import WorkflowVersion

new_version = WorkflowVersion(
    name="v2",
    description="Improved pipeline",
    workflow_id=my_workflow.id
)

new_version = client.create_workflow_version(my_workflow.id, new_version)
ParameterTypeDescription
workflow_idstrThe workflow id
workflow_versionWorkflowVersionThe workflow version object

WorkflowVersion properties:

PropertyTypeDescription
idstrUnique id for the version
namestrThe version name
descriptionstrThe version description
workflow_idstrThe parent workflow id
nodesList[WorkflowNode]List of nodes in this version
edgesList[WorkflowEdge]List of edges connecting nodes

Get a workflow version

version = client.get_workflow_version(my_workflow.id, new_version.id)

Update a workflow version

new_version.description = "Production-ready pipeline"
client.update_workflow_version(my_workflow.id, new_version)

Delete a workflow version

client.delete_workflow_version(my_workflow.id, new_version.id)

Workflow Nodes

Nodes represent individual steps in your workflow, such as models or datasets.

Get all nodes

nodes = client.get_workflow_nodes(my_workflow.id, new_version.id)

Create a node

from seeme.types import WorkflowNode

node = WorkflowNode(
    name="Object Detector",
    entity_type="model",
    entity_id=my_model.id,
    entity_version_id=my_model.active_version_id,
    version_id=new_version.id,
    config="{}"
)

node = client.create_workflow_node(my_workflow.id, new_version.id, node)
ParameterTypeDescription
workflow_idstrThe workflow id
workflow_version_idstrThe workflow version id
workflow_nodeWorkflowNodeThe node object

WorkflowNode properties:

PropertyTypeDescription
idstrUnique id for the node
namestrThe node name
entity_typestrType of entity: “model” or “dataset”
entity_idstrThe id of the model or dataset
entity_version_idstrThe version id of the entity
version_idstrThe workflow version this node belongs to
configstrJSON configuration for the node

Update a node

node.name = "Primary Detector"
client.update_workflow_node(my_workflow.id, new_version.id, node)

Delete a node

client.delete_workflow_node(my_workflow.id, new_version.id, node.id)

Workflow Edges

Edges connect nodes together, defining the flow of data through your workflow.

Get all edges

edges = client.get_workflow_edges(my_workflow.id, new_version.id)

Create an edge

from seeme.types import WorkflowEdge

edge = WorkflowEdge(
    name="Detection to Classification",
    version_id=new_version.id,
    begin_node_id=detector_node.id,
    end_node_id=classifier_node.id
)

edge = client.create_workflow_edge(my_workflow.id, new_version.id, edge)
ParameterTypeDescription
workflow_idstrThe workflow id
workflow_version_idstrThe workflow version id
workflow_edgeWorkflowEdgeThe edge object

WorkflowEdge properties:

PropertyTypeDescription
idstrUnique id for the edge
namestrThe edge name
version_idstrThe workflow version this edge belongs to
begin_node_idstrThe source node id
end_node_idstrThe target node id

Update an edge

edge.name = "Primary Connection"
client.update_workflow_edge(my_workflow.id, new_version.id, edge)

Delete an edge

client.delete_workflow_edge(my_workflow.id, new_version.id, edge.id)

Running Workflows

Run a workflow (simple)

Run a workflow with the active version:

result = client.run_workflow(
    workflow_id=my_workflow.id,
    item="path/to/image.jpg"
)
ParameterTypeDescription
workflow_idstrThe workflow id
itemstr or dictInput data (file path or dict)

Execute workflow asynchronously

For longer-running workflows, use async execution:

from seeme.types import WorkflowExecutionRequest, InputMode

request = WorkflowExecutionRequest(
    input_mode=InputMode.SINGLE,
    single_input="path/to/image.jpg"
)

response = client.execute_workflow_async(my_workflow.id, request)
print(f"Execution started: {response.execution_id}")

Execute with file upload

response = client.execute_workflow_async_with_file(
    workflow_id=my_workflow.id,
    folder="path/to/files",
    filename="input.jpg"
)

Execute a specific version

response = client.execute_workflow_version_async(
    workflow_id=my_workflow.id,
    version_id=new_version.id,
    request=request
)

Get workflow executions

executions = client.get_workflow_executions(my_workflow.id)

for execution in executions:
    print(f"{execution.id}: {execution.status}")

Get a specific execution

execution = client.get_workflow_execution(my_workflow.id, execution_id)

Download execution input file

client.download_workflow_execution_input_file(
    workflow_id=my_workflow.id,
    execution_id=execution.id,
    download_location="input_file.jpg"
)

Cancel an execution

client.cancel_workflow_execution(my_workflow.id, execution.id)

WorkflowExecution properties:

PropertyTypeDescription
idstrUnique execution id
workflow_idstrThe workflow id
workflow_version_idstrThe version used
statusstrStatus: “pending”, “processing”, “completed”, “failed”
progressExecutionProgressProgress information
started_atstrStart time
completed_atstrCompletion time
errorstrError message if failed