Goalο
Transform the existing CLI into a complete, modular, four-stage pipeline:
[ Acquisition (Ingest) ] β [ Processing ] β [ Visualization ] β [ Decimation (Egress) ]
Rename
acquisition/
toconnectors/
to reflect both inbound and outbound data flow.Split into
ingest/
andegress/
submodules, sharing common backend code inbackends/
.Each stage has its own CLI subcommands.
All commands accept stdin/stdout for chaining.
Add
run
command to execute pipelines from YAML/JSON config files.
1. Refactor CLI Structureο
1.1 Top-Level Groupsο
Modify
src/zyra/cli.py
:Create four top-level subparsers:
acquire
,process
,visualize
,decimate
, plusrun
.Remove existing flat commands (
decode-grib2
,convert-format
, etc.) and nest underprocess
.
1.2 Module Self-Registrationο
In each stageβs
__init__.py
, add:
def register_cli(subparsers):
"""Register CLI commands for this stage."""
cli.py
calls:
from zyra.connectors import ingest, egress
from zyraimport processing, visualization
ingest.register_cli(acquire_subparser)
processing.register_cli(process_subparser)
visualization.register_cli(visualize_subparser)
egress.register_cli(decimate_subparser)
2. Connectors Module Structureο
src/zyra/connectors/
backends/
s3.py
http.py
ftp.py
vimeo.py
ingest/
__init__.py
ingest_manager.py
egress/
__init__.py
egress_manager.py
2.1 Ingestο
ingest_manager.py
maps CLI commands to inbound fetchers:acquire http <url>
βbackends/http.py
acquire s3 <bucket>/<key>
βbackends/s3.py
acquire ftp <server>/<path>
βbackends/ftp.py
acquire vimeo <video_id>
βbackends/vimeo.py
All commands:
Accept
--output
(default-
= stdout).Stream binary data directly.
2.2 Egressο
egress_manager.py
maps CLI commands to outbound writers:decimate local <path>
decimate s3 <bucket>/<key>
βbackends/s3.py
decimate ftp <server>/<path>
βbackends/ftp.py
decimate post <url>
βbackends/http.py
All commands:
Accept stdin (
-
) as input.Write binary data directly.
3. Processing (src/zyra/processing/
)ο
Move existing CLI functions into
process
namespace:decode-grib2
extract-variable
convert-format
Add missing processors:
NetCDF subset/extract
Video conversion
All commands:
Accept stdin/stdout.
Auto-detect formats.
4. Visualization (src/zyra/visualization/
)ο
New commands:
visualize plot --type contour|timeseries --var <name>
visualize colormap --set <name>
visualize animate --frames <dir> --output <video>
Output:
Default to stdout.
Save to file if
--output
provided.
6. Pipeline Configsο
6.1 CLI Usageο
zyra run pipeline.yaml
zyra run pipeline.yaml --set var=temp --set output=out.png
6.2 YAML Exampleο
name: Temperature Visualization Pipeline
stages:
- stage: acquisition
command: acquire
args:
backend: s3
bucket: bucket-name
key: data/file.grib2
- stage: processing
command: decode-grib2
args: {}
- stage: processing
command: extract-variable
args:
var: temperature
- stage: visualization
command: plot
args:
type: contour
var: temperature
- stage: decimation
command: s3
args:
bucket: bucket-name
key: products/temperature.png
6.3 Implementationο
New module:
src/zyra/pipeline_runner.py
Parse config β apply overrides β execute stages sequentially via pipes or function calls.
7. Streaming Supportο
File-like object support across all commands.
Chunked reads/writes for large files.
8. Testingο
Unit tests for each CLI command (file and pipe).
Integration tests for multi-stage pipelines.
Pipeline config tests.
9. Documentationο
Update README to show four-stage CLI and
connectors/
refactor.Add βPipeline Patternsβ to wiki.
Provide sample pipeline configs.