DSL Builder¶
The DSL provides a fluent way to compose named sources and processes into a workflow with optional monitoring, custom wiring, and execution graph inspection.
Module: pulse.dsl
Workflow Basics¶
from pulse.dsl import Workflow
texts = ["I love pizza", "I hate rain"]
wf = (
Workflow()
.source("docs", texts)
.theme_generation(source="docs", min_themes=2)
.sentiment(source="docs")
)
result = wf.run(fast=True) # or pass a CoreClient via client=...
print(result.theme_generation.themes)
print(result.sentiment.sentiments)
Key ideas:
- Register data sources with .source(name, data)
.
- Add steps; each step can optionally name its input source or derive it from prior steps.
- run()
executes steps in order and returns a container with attributes for each step id (or alias).
Steps and Parameters¶
All steps accept name=
to alias the step; subsequent steps can reference the alias.
.theme_generation(...)
¶
Parameters:
- min_themes=2
, max_themes=10
, context=None
, version=None
, prune=None
, fast=None
, source: str | None = None
, name=None
.
If source
is omitted, uses the dataset source (see run behavior below).
.sentiment(...)
¶
Parameters:
- fast=None
, source=None
, name=None
.
.theme_allocation(...)
¶
Parameters:
- themes: list[str] | None = None
, single_label=True
, threshold=0.5
, fast=None
, inputs: str | None = None
, themes_from: str | None = None
, name=None
.
Notes:
- If themes=None
, it pulls themes from the most recent .theme_generation(...)
step, or from themes_from
if specified.
- inputs
selects the text source (defaults to dataset).
.theme_extraction(...)
¶
Parameters:
- themes=None
, version=None
, fast=None
, inputs=None
, themes_from=None
, name=None
.
.cluster(...)
¶
Parameters:
- k=2
, source=None
, fast=None
, name=None
.
Running a Workflow¶
result = wf.run(dataset=texts, fast=True)
- If you registered any
.source(...)
,run()
operates in DSL mode: - Optional
dataset
is registered under the implicit source"dataset"
if not already set. - Each process reads inputs from its wired
_inputs
(default"dataset"
). - A
CoreClient
is created if one is not provided. - If you did not register any
.source(...)
,run(dataset=..., ...)
delegates to theAnalyzer
with the accumulated processes.
Monitoring¶
Register lifecycle hooks for observability:
def on_run_start(): ...
def on_process_start(pid): ...
def on_process_end(pid, result): ...
def on_run_end(): ...
wf = Workflow().monitor(
on_run_start=on_run_start,
on_process_start=on_process_start,
on_process_end=on_process_end,
on_run_end=on_run_end,
)
From File¶
You can build workflows from JSON or YAML files with a top‑level pipeline
array, where each entry is a single‑key map of step name to parameters:
pipeline:
- theme_generation: { min_themes: 2, max_themes: 5 }
- sentiment: {}
Load and run:
wf = Workflow.from_file("pipeline.yml")
res = wf.run(dataset=texts)
Nested Inputs (Sentiment)¶
For .sentiment(...)
, DSL supports nested list inputs by flattening before the API call and reconstructing the nested shape on output. Other steps operate on flat lists.
Graph¶
graph()
returns an adjacency list describing the execution DAG, including static dependencies and wired inputs. This helps visualize or validate your pipeline.
edges = wf.graph()
for node, deps in edges.items():
print(node, "<-", deps)