DSL Builder¶
The DSL provides a fluent way to compose named sources and processes into a workflow with optional monitoring, custom wiring, and execution graph inspection.
Module: pulse.dsl
Async Support¶
For async/await applications, use AsyncWorkflow from pulse.async_dsl:
import asyncio
from pulse.async_dsl import AsyncWorkflow
async def main():
texts = ["I love pizza", "I hate rain"]
async with AsyncWorkflow() as workflow:
workflow.source("docs", texts)
workflow.theme_generation(source="docs", min_themes=2)
workflow.sentiment(source="docs")
result = await workflow.run(fast=True)
print(result.theme_generation.themes)
print(result.sentiment.sentiments)
asyncio.run(main())
The async workflow provides the same DSL interface as the sync version but with full async/await support. See the Async Patterns Guide for detailed documentation.
Workflow Basics¶
from pulse.dsl import Workflow
texts = ["I love pizza", "I hate rain"]
wf = (
Workflow()
.source("docs", texts)
.theme_generation(source="docs", min_themes=2)
.sentiment(source="docs")
)
result = wf.run(fast=True) # or pass a CoreClient via client=...
print(result.theme_generation.themes)
print(result.sentiment.sentiments)
Key ideas:
- Register data sources with .source(name, data).
- Add steps; each step can optionally name its input source or derive it from prior steps.
- run() executes steps in order and returns a container with attributes for each step id (or alias).
Steps and Parameters¶
All steps accept name= to alias the step; subsequent steps can reference the alias.
.theme_generation(...)¶
Parameters:
- min_themes=2, max_themes=10, context=None, version=None, prune=None, fast=None, source: str | None = None, name=None.
If source is omitted, uses the dataset source (see run behavior below).
.sentiment(...)¶
Parameters:
- fast=None, source=None, name=None.
.theme_allocation(...)¶
Parameters:
- themes: list[str] | None = None, single_label=True, threshold=0.5, fast=None, inputs: str | None = None, themes_from: str | None = None, name=None.
Notes:
- If themes=None, it pulls themes from the most recent .theme_generation(...) step, or from themes_from if specified.
- inputs selects the text source (defaults to dataset).
.theme_extraction(...)¶
Parameters:
- themes=None, version=None, fast=None, inputs=None, themes_from=None, name=None.
.cluster(...)¶
Parameters:
- k=2, source=None, fast=None, name=None.
Running a Workflow¶
result = wf.run(dataset=texts, fast=True)
- If you registered any
.source(...),run()operates in DSL mode: - Optional
datasetis registered under the implicit source"dataset"if not already set. - Each process reads inputs from its wired
_inputs(default"dataset"). - A
CoreClientis created if one is not provided. - If you did not register any
.source(...),run(dataset=..., ...)delegates to theAnalyzerwith the accumulated processes.
Monitoring¶
Register lifecycle hooks for observability:
def on_run_start(): ...
def on_process_start(pid): ...
def on_process_end(pid, result): ...
def on_run_end(): ...
wf = Workflow().monitor(
on_run_start=on_run_start,
on_process_start=on_process_start,
on_process_end=on_process_end,
on_run_end=on_run_end,
)
From File¶
You can build workflows from JSON or YAML files with a top‑level pipeline array, where each entry is a single‑key map of step name to parameters:
pipeline:
- theme_generation: { min_themes: 2, max_themes: 5 }
- sentiment: {}
Load and run:
wf = Workflow.from_file("pipeline.yml")
res = wf.run(dataset=texts)
Nested Inputs (Sentiment)¶
For .sentiment(...), DSL supports nested list inputs by flattening before the API call and reconstructing the nested shape on output. Other steps operate on flat lists.
Graph¶
graph() returns an adjacency list describing the execution DAG, including static dependencies and wired inputs. This helps visualize or validate your pipeline.
edges = wf.graph()
for node, deps in edges.items():
print(node, "<-", deps)