This plan ensures all legacy functionality from acquisition/ is migrated into the new connectors/ (and utils/ where appropriate).
Once complete, the old rtvideo.py workflow will be fully reproducible in declarative pipelines.


1. FTP (acquisition/ftp_manager.py)

Current (legacy)

  • sync_ftp_directory(remote_dir, local_dir, dataset_period)

    • Lists remote files

    • Filters by regex/date range

    • Downloads missing files

    • Cleans zero-byte files

Migration

  • Connector API (connectors/backends/ftp.py):

    • list_files(remote_dir, pattern=None, since=None, until=None)

    • sync_directory(remote_dir, local_dir, pattern=None, since=None, until=None)

    • delete(path), exists(path), stat(path)

  • Pipeline Support:

    • YAML config example:

      since: "2024-08-01"
      until: "2025-08-01"
      file_pattern: "image_(\d{8})\.png"
      date_format: "%Y%m%d"
      

Restores rtvideo.py β€œlast year’s files” functionality.


2. HTTP (acquisition/http_manager.py)

Current (legacy)

  • Fetch binary/text/JSON (fetch_data, fetch_text, fetch_json)

  • POST data (post_data)

  • list_files(url, pattern) β€” scrape directory listings

  • .idx handling (parse lines, derive byte ranges, ranged downloads)

Migration

  • Connector (connectors/backends/http.py):

    • fetch_bytes(url)

    • fetch_text(url)

    • fetch_json(url)

    • post_data(url, data, headers)

    • list_files(url, pattern) (optional)

  • Utils (utils/grib.py):

    • .idx parsing + byte range helpers

    • parallel downloads


3. S3 (acquisition/s3_manager.py)

Current (legacy)

  • list_files(prefix, pattern)

  • fetch, upload, exists, delete, stat

  • .idx parsing, ranged downloads

Migration

  • Connector (connectors/backends/s3.py):

    • list_files(prefix=None, pattern=None, since=None, until=None)

    • exists(key), delete(key), stat(key)

  • Utils (utils/grib.py):

    • .idx parsing + ranged downloads


4. Vimeo (acquisition/vimeo_manager.py)

Current (legacy)

  • upload_video(file)

  • update_video(file, video_uri)

  • update_video_description(video_uri, new_description)

  • No fetch/list

Migration

  • Connector (connectors/backends/vimeo.py):

    • upload_bytes(path)

    • update_video(path, video_uri)

    • update_description(video_uri, text)

Treat Vimeo as upload-only sink connector.


5. GRIB Utils (acquisition/grib_utils.py)

Current (legacy)

  • .idx helpers

  • Byte range utilities

  • Parallel downloads

Migration

  • Move entirely to utils/grib.py

  • Imported by FTP/HTTP/S3 connectors when needed.


6. Pipeline & CLI Enhancements

  • Add support for since, until, file_pattern, date_format in YAML configs.

  • Add transform stage type for structured metadata (e.g. start/end dates β†’ JSON).

  • CLI support example:

    zyra run pipeline.yaml --set fetch_images.since=2024-08-01
    

7. Removal of acquisition/

  • Once all functionality above is ported:

    • Update imports to use connectors/ and utils/

    • Update pipelines with new connector features

    • Remove acquisition/ directory completely

    • Add tests to ensure no regressions from old scripts (rtvideo.py)


Outcome

  • rtvideo.py workflow replicated in pipelines:

    • FTP sync by date range

    • ffmpeg movie composition

    • Vimeo update/replace upload

    • S3 metadata upload

  • All acquisition/ functionality migrated or retired

  • Pipelines fully declarative, reproducible, and testable