Workflows
Workflows allow you to chain multiple models together to create complex AI pipelines. For example, you can create a workflow that first detects objects in an image, then classifies each detected object.
Workflows
Get all workflows
workflows = client.get_workflows()
Create a workflow
from seeme.types import Workflow
my_workflow = Workflow(
name="Object Detection Pipeline",
description="Detect and classify objects in images"
)
my_workflow = client.create_workflow(my_workflow)
| Parameter | Type | Description |
|---|
| workflow | Workflow | The workflow object |
Workflow properties:
| Property | Type | Description |
|---|
| id | str | Unique id for the workflow |
| created_at | str | The creation date |
| updated_at | str | Last updated date |
| name | str | The workflow name |
| description | str | The workflow description |
| user_id | str | The user id of the workflow creator |
| active_version_id | str | The id of the active workflow version |
| versions | List[WorkflowVersion] | List of workflow versions |
| public | bool | Whether the workflow is public |
| has_logo | bool | Whether the workflow has a logo |
Get a workflow
workflow = client.get_workflow(my_workflow.id)
| Parameter | Type | Description |
|---|
| workflow_id | str | The workflow id |
Update a workflow
my_workflow.description = "Updated description"
client.update_workflow(my_workflow)
| Parameter | Type | Description |
|---|
| workflow | Workflow | The workflow object |
Delete a workflow
client.delete_workflow(my_workflow.id)
| Parameter | Type | Description |
|---|
| workflow_id | str | The workflow id |
Upload workflow logo
client.upload_workflow_logo(
workflow_id=my_workflow.id,
folder="path/to/logo",
filename="logo.jpg"
)
Download workflow logo
client.get_workflow_logo(my_workflow, download_path="workflow_logo.jpg")
Delete workflow logo
client.delete_workflow_logo(my_workflow.id)
Workflow Versions
A workflow can have multiple versions, allowing you to iterate on your pipeline design.
Get all workflow versions
versions = client.get_workflow_versions(my_workflow.id)
Create a workflow version
from seeme.types import WorkflowVersion
new_version = WorkflowVersion(
name="v2",
description="Improved pipeline",
workflow_id=my_workflow.id
)
new_version = client.create_workflow_version(my_workflow.id, new_version)
| Parameter | Type | Description |
|---|
| workflow_id | str | The workflow id |
| workflow_version | WorkflowVersion | The workflow version object |
WorkflowVersion properties:
| Property | Type | Description |
|---|
| id | str | Unique id for the version |
| name | str | The version name |
| description | str | The version description |
| workflow_id | str | The parent workflow id |
| nodes | List[WorkflowNode] | List of nodes in this version |
| edges | List[WorkflowEdge] | List of edges connecting nodes |
Get a workflow version
version = client.get_workflow_version(my_workflow.id, new_version.id)
Update a workflow version
new_version.description = "Production-ready pipeline"
client.update_workflow_version(my_workflow.id, new_version)
Delete a workflow version
client.delete_workflow_version(my_workflow.id, new_version.id)
Workflow Nodes
Nodes represent individual steps in your workflow, such as models or datasets.
Get all nodes
nodes = client.get_workflow_nodes(my_workflow.id, new_version.id)
Create a node
from seeme.types import WorkflowNode
node = WorkflowNode(
name="Object Detector",
entity_type="model",
entity_id=my_model.id,
entity_version_id=my_model.active_version_id,
version_id=new_version.id,
config="{}"
)
node = client.create_workflow_node(my_workflow.id, new_version.id, node)
| Parameter | Type | Description |
|---|
| workflow_id | str | The workflow id |
| workflow_version_id | str | The workflow version id |
| workflow_node | WorkflowNode | The node object |
WorkflowNode properties:
| Property | Type | Description |
|---|
| id | str | Unique id for the node |
| name | str | The node name |
| entity_type | str | Type of entity: “model” or “dataset” |
| entity_id | str | The id of the model or dataset |
| entity_version_id | str | The version id of the entity |
| version_id | str | The workflow version this node belongs to |
| config | str | JSON configuration for the node |
Update a node
node.name = "Primary Detector"
client.update_workflow_node(my_workflow.id, new_version.id, node)
Delete a node
client.delete_workflow_node(my_workflow.id, new_version.id, node.id)
Workflow Edges
Edges connect nodes together, defining the flow of data through your workflow.
Get all edges
edges = client.get_workflow_edges(my_workflow.id, new_version.id)
Create an edge
from seeme.types import WorkflowEdge
edge = WorkflowEdge(
name="Detection to Classification",
version_id=new_version.id,
begin_node_id=detector_node.id,
end_node_id=classifier_node.id
)
edge = client.create_workflow_edge(my_workflow.id, new_version.id, edge)
| Parameter | Type | Description |
|---|
| workflow_id | str | The workflow id |
| workflow_version_id | str | The workflow version id |
| workflow_edge | WorkflowEdge | The edge object |
WorkflowEdge properties:
| Property | Type | Description |
|---|
| id | str | Unique id for the edge |
| name | str | The edge name |
| version_id | str | The workflow version this edge belongs to |
| begin_node_id | str | The source node id |
| end_node_id | str | The target node id |
Update an edge
edge.name = "Primary Connection"
client.update_workflow_edge(my_workflow.id, new_version.id, edge)
Delete an edge
client.delete_workflow_edge(my_workflow.id, new_version.id, edge.id)
Running Workflows
Run a workflow (simple)
Run a workflow with the active version:
result = client.run_workflow(
workflow_id=my_workflow.id,
item="path/to/image.jpg"
)
| Parameter | Type | Description |
|---|
| workflow_id | str | The workflow id |
| item | str or dict | Input data (file path or dict) |
Execute workflow asynchronously
For longer-running workflows, use async execution:
from seeme.types import WorkflowExecutionRequest, InputMode
request = WorkflowExecutionRequest(
input_mode=InputMode.SINGLE,
single_input="path/to/image.jpg"
)
response = client.execute_workflow_async(my_workflow.id, request)
print(f"Execution started: {response.execution_id}")
Execute with file upload
response = client.execute_workflow_async_with_file(
workflow_id=my_workflow.id,
folder="path/to/files",
filename="input.jpg"
)
Execute a specific version
response = client.execute_workflow_version_async(
workflow_id=my_workflow.id,
version_id=new_version.id,
request=request
)
Get workflow executions
executions = client.get_workflow_executions(my_workflow.id)
for execution in executions:
print(f"{execution.id}: {execution.status}")
Get a specific execution
execution = client.get_workflow_execution(my_workflow.id, execution_id)
Download execution input file
client.download_workflow_execution_input_file(
workflow_id=my_workflow.id,
execution_id=execution.id,
download_location="input_file.jpg"
)
Cancel an execution
client.cancel_workflow_execution(my_workflow.id, execution.id)
WorkflowExecution properties:
| Property | Type | Description |
|---|
| id | str | Unique execution id |
| workflow_id | str | The workflow id |
| workflow_version_id | str | The version used |
| status | str | Status: “pending”, “processing”, “completed”, “failed” |
| progress | ExecutionProgress | Progress information |
| started_at | str | Start time |
| completed_at | str | Completion time |
| error | str | Error message if failed |