Source code for zyra.connectors.ingest

# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import argparse
import os
from typing import Any

from zyra.cli_common import add_output_option
from zyra.connectors.backends import ftp as ftp_backend
from zyra.connectors.backends import http as http_backend
from zyra.connectors.backends import s3 as s3_backend
from zyra.connectors.credentials import (
    CredentialResolutionError,
    apply_auth_header,
    apply_http_credentials,
    parse_header_strings,
    resolve_basic_auth_credentials,
    resolve_credentials,
)
from zyra.utils.cli_helpers import configure_logging_from_env
from zyra.utils.date_manager import DateManager
from zyra.utils.io_utils import open_output


def _sanitize_headers_for_validation(values: dict[str, str]) -> dict[str, str]:
    sensitive_headers = {
        "authorization",
        "proxy-authorization",
        "token",
        "secret",
        "key",
    }
    sanitized: dict[str, str] = {}
    for name, val in values.items():
        if name.lower() in sensitive_headers:
            sanitized[name] = "<redacted>"
        else:
            sanitized[name] = val
    return sanitized


def _cmd_http(ns: argparse.Namespace) -> int:
    """Acquire data over HTTP(S) and write to stdout or file."""
    if getattr(ns, "verbose", False):
        os.environ["ZYRA_VERBOSITY"] = "debug"
    elif getattr(ns, "quiet", False):
        os.environ["ZYRA_VERBOSITY"] = "quiet"
    if getattr(ns, "trace", False):
        os.environ["ZYRA_SHELL_TRACE"] = "1"
    configure_logging_from_env()
    headers = parse_header_strings(getattr(ns, "header", None))
    credential_entries = list(getattr(ns, "credential", []) or [])
    if credential_entries:
        try:
            resolved = resolve_credentials(
                credential_entries,
                credential_file=getattr(ns, "credential_file", None),
            )
        except CredentialResolutionError as exc:
            raise SystemExit(f"Credential error: {exc}") from exc
        apply_http_credentials(headers, resolved.values)
    apply_auth_header(headers, getattr(ns, "auth", None))

    inputs = list(getattr(ns, "inputs", []) or [])
    if getattr(ns, "manifest", None):
        try:
            from pathlib import Path

            with Path(ns.manifest).open(encoding="utf-8") as f:
                for line in f:
                    s = line.strip()
                    if s and not s.startswith("#"):
                        inputs.append(s)
        except Exception as e:
            raise SystemExit(f"Failed to read manifest: {e}") from e
    # Listing mode
    if getattr(ns, "list", False):
        urls = http_backend.list_files(
            ns.url,
            pattern=getattr(ns, "pattern", None),
            headers=headers or None,
        )
        # Optional date filter using DateManager on URL basenames
        since = getattr(ns, "since", None)
        until = getattr(ns, "until", None)
        # Support ISO period
        if not since and getattr(ns, "since_period", None):
            dm = DateManager()
            start, _ = dm.get_date_range_iso(ns.since_period)
            since = start.isoformat()
        if since or until:
            dm = DateManager(
                [getattr(ns, "date_format", None)]
                if getattr(ns, "date_format", None)
                else None
            )
            from datetime import datetime

            start = datetime.min if not since else datetime.fromisoformat(since)
            end = datetime.max if not until else datetime.fromisoformat(until)
            urls = [u for u in urls if dm.is_date_in_range(u, start, end)]
        for u in urls:
            print(u)
        return 0

    if inputs:
        if ns.output_dir is None:
            raise SystemExit("--output-dir is required with --inputs")
        from pathlib import Path

        outdir = Path(ns.output_dir)
        outdir.mkdir(parents=True, exist_ok=True)
        for u in inputs:
            data = http_backend.fetch_bytes(u, headers=headers or None)
            name = Path(u).name or "download.bin"
            with (outdir / name).open("wb") as f:
                f.write(data)
        return 0
    if os.environ.get("ZYRA_SHELL_TRACE"):
        import logging as _log

        from zyra.utils.cli_helpers import sanitize_for_log

        _log.info("+ http get '%s'", sanitize_for_log(ns.url))
    data = http_backend.fetch_bytes(ns.url, headers=headers or None)
    with open_output(ns.output) as f:
        f.write(data)
    return 0


def _cmd_s3(ns: argparse.Namespace) -> int:
    """Acquire data from S3 (s3:// or bucket/key) and write to stdout or file."""
    if getattr(ns, "verbose", False):
        os.environ["ZYRA_VERBOSITY"] = "debug"
    elif getattr(ns, "quiet", False):
        os.environ["ZYRA_VERBOSITY"] = "quiet"
    if getattr(ns, "trace", False):
        os.environ["ZYRA_SHELL_TRACE"] = "1"
    configure_logging_from_env()
    # Batch via s3:// URLs
    inputs = list(getattr(ns, "inputs", []) or [])
    if getattr(ns, "manifest", None):
        try:
            from pathlib import Path

            with Path(ns.manifest).open(encoding="utf-8") as f:
                for line in f:
                    s = line.strip()
                    if s and not s.startswith("#"):
                        inputs.append(s)
        except Exception as e:
            raise SystemExit(f"Failed to read manifest: {e}") from e
    # Listing mode
    if getattr(ns, "list", False):
        # Prefer full s3:// URL when provided
        prefix = (
            ns.url
            if getattr(ns, "url", None)
            else (ns.bucket if getattr(ns, "bucket", None) else None)
        )
        keys = s3_backend.list_files(
            prefix,
            pattern=getattr(ns, "pattern", None),
            since=(
                lambda sp, s: (
                    DateManager().get_date_range_iso(sp)[0].isoformat()
                    if sp and not s
                    else s
                )
            )(getattr(ns, "since_period", None), getattr(ns, "since", None)),
            until=getattr(ns, "until", None),
            date_format=getattr(ns, "date_format", None),
        )
        for k in keys or []:
            print(k)
        return 0

    if inputs:
        if ns.output_dir is None:
            raise SystemExit("--output-dir is required with --inputs")
        from pathlib import Path

        outdir = Path(ns.output_dir)
        outdir.mkdir(parents=True, exist_ok=True)
        for u in inputs:
            if os.environ.get("ZYRA_SHELL_TRACE"):
                import logging as _log

                from zyra.utils.cli_helpers import sanitize_for_log

                _log.info("+ s3 get '%s'", sanitize_for_log(u))
            data = s3_backend.fetch_bytes(u, unsigned=ns.unsigned)
            name = Path(u).name or "object.bin"
            with (outdir / name).open("wb") as f:
                f.write(data)
        return 0
    # Accept either s3://bucket/key or split bucket/key
    if ns.url.startswith("s3://"):
        data = s3_backend.fetch_bytes(ns.url, unsigned=ns.unsigned)
    else:
        data = s3_backend.fetch_bytes(ns.bucket, ns.key, unsigned=ns.unsigned)
    with open_output(ns.output) as f:
        f.write(data)
    return 0


def _cmd_ftp(ns: argparse.Namespace) -> int:
    """Acquire data from FTP and write to stdout or file."""
    configure_logging_from_env()
    credential_entries = list(getattr(ns, "credential", []) or [])
    if getattr(ns, "user", None):
        credential_entries.append(f"user={ns.user}")
    if getattr(ns, "password", None):
        credential_entries.append(f"password={ns.password}")
    username: str | None = None
    password: str | None = None
    if credential_entries:
        try:
            resolved = resolve_credentials(
                credential_entries,
                credential_file=getattr(ns, "credential_file", None),
            )
        except CredentialResolutionError as exc:
            raise SystemExit(f"Credential error: {exc}") from exc
        username, password = resolve_basic_auth_credentials(resolved.values)
    inputs = list(getattr(ns, "inputs", []) or [])
    if getattr(ns, "manifest", None):
        try:
            from pathlib import Path

            with Path(ns.manifest).open(encoding="utf-8") as f:
                for line in f:
                    s = line.strip()
                    if s and not s.startswith("#"):
                        inputs.append(s)
        except Exception as e:
            raise SystemExit(f"Failed to read manifest: {e}") from e
    # Listing mode
    if getattr(ns, "list", False):
        names = (
            ftp_backend.list_files(
                ns.path,
                pattern=getattr(ns, "pattern", None),
                since=(
                    lambda sp, s: (
                        DateManager().get_date_range_iso(sp)[0].isoformat()
                        if sp and not s
                        else s
                    )
                )(getattr(ns, "since_period", None), getattr(ns, "since", None)),
                until=getattr(ns, "until", None),
                date_format=getattr(ns, "date_format", None),
                username=username,
                password=password,
            )
            or []
        )
        for n in names:
            print(n)
        return 0

    # Sync mode
    if getattr(ns, "sync_dir", None):
        ftp_backend.sync_directory(
            ns.path,
            ns.sync_dir,
            pattern=getattr(ns, "pattern", None),
            since=(
                lambda sp, s: (
                    DateManager().get_date_range_iso(sp)[0].isoformat()
                    if sp and not s
                    else s
                )
            )(getattr(ns, "since_period", None), getattr(ns, "since", None)),
            until=getattr(ns, "until", None),
            date_format=getattr(ns, "date_format", None),
            username=username,
            password=password,
        )
        return 0

    if inputs:
        if ns.output_dir is None:
            raise SystemExit("--output-dir is required with --inputs")
        from pathlib import Path

        outdir = Path(ns.output_dir)
        outdir.mkdir(parents=True, exist_ok=True)
        for p in inputs:
            data = ftp_backend.fetch_bytes(p, username=username, password=password)
            name = Path(p).name or "download.bin"
            with (outdir / name).open("wb") as f:
                f.write(data)
        return 0
    data = ftp_backend.fetch_bytes(ns.path, username=username, password=password)
    with open_output(ns.output) as f:
        f.write(data)
    return 0


def _cmd_vimeo(ns: argparse.Namespace) -> int:  # pragma: no cover - placeholder
    """Placeholder for Vimeo acquisition; not implemented."""
    if getattr(ns, "verbose", False):
        os.environ["ZYRA_VERBOSITY"] = "debug"
    elif getattr(ns, "quiet", False):
        os.environ["ZYRA_VERBOSITY"] = "quiet"
    if getattr(ns, "trace", False):
        os.environ["ZYRA_SHELL_TRACE"] = "1"
    configure_logging_from_env()
    raise SystemExit("acquire vimeo is not implemented yet")


def _parse_kv_params(s: str | None) -> dict[str, str]:
    if not s:
        return {}
    out: dict[str, str] = {}
    for pair in s.split("&"):
        if not pair:
            continue
        if "=" in pair:
            k, v = pair.split("=", 1)
            out[k] = v
        else:
            out[pair] = ""
    return out


def _load_data_arg(data: str | None) -> bytes | str | dict | None:
    if not data:
        return None
    s = data.strip()
    if s.startswith("@"):
        from pathlib import Path

        path = s[1:]
        p = Path(path)
        b = p.read_bytes()
        # Try JSON first
        try:
            import json

            return json.loads(b.decode("utf-8"))
        except Exception:
            return b
    # Try to parse inline JSON
    try:
        import json

        return json.loads(s)
    except Exception:
        return s


def _cmd_api(ns: argparse.Namespace) -> int:
    """Call a REST API endpoint and write the response.

    Supports:
    - Methods, headers, params, and JSON/body data (``--data`` or ``@file``)
    - Streaming binary downloads with resume, content-type validation, and
      filename inference (``--stream``, ``--resume``, ``--expect-content-type``,
      ``--detect-filename``)
    - Pagination: cursor, page, and RFC 5988 Link with NDJSON output or
      aggregated JSON array (``--paginate``, ``--newline-json``)
    - Provider presets (e.g., ``--preset limitless-lifelogs`` and
      ``--preset limitless-audio``)
    """
    # Map verbosity/trace
    if getattr(ns, "verbose", False):
        os.environ["ZYRA_VERBOSITY"] = "debug"
    elif getattr(ns, "quiet", False):
        os.environ["ZYRA_VERBOSITY"] = "quiet"
    if getattr(ns, "trace", False):
        os.environ["ZYRA_SHELL_TRACE"] = "1"
    configure_logging_from_env()

    headers = parse_header_strings(getattr(ns, "header", None))
    params = _parse_kv_params(getattr(ns, "params", None))
    if getattr(ns, "content_type", None):
        headers.setdefault("Content-Type", ns.content_type)
    body = _load_data_arg(getattr(ns, "data", None))
    credential_entries = list(getattr(ns, "credential", []) or [])
    if credential_entries:
        try:
            resolved = resolve_credentials(
                credential_entries,
                credential_file=getattr(ns, "credential_file", None),
            )
        except CredentialResolutionError as exc:
            raise SystemExit(f"Credential error: {exc}") from exc
        apply_http_credentials(headers, resolved.values)
    apply_auth_header(headers, getattr(ns, "auth", None))
    validation_headers = _sanitize_headers_for_validation(headers)

    from zyra.connectors.backends import api as api_backend

    method = (getattr(ns, "method", "GET") or "GET").upper()
    paginate = getattr(ns, "paginate", "none") or "none"
    timeout = int(getattr(ns, "timeout", 60) or 60)
    max_retries = int(getattr(ns, "max_retries", 3) or 3)
    retry_backoff = float(getattr(ns, "retry_backoff", 0.5) or 0.5)
    allow_non_2xx = bool(getattr(ns, "allow_non_2xx", False))

    # OpenAPI: help/validation (before making requests)
    if getattr(ns, "openapi_help", False) or getattr(ns, "openapi_validate", False):
        from urllib.parse import urlparse as _urlparse  # noqa: I001
        from zyra.connectors.openapi import validate as _ov  # noqa: I001

        openapi_url = getattr(ns, "openapi_url", None)
        if openapi_url:
            spec = _ov.load_openapi_url(openapi_url)
        else:
            if not getattr(ns, "url", None):
                raise SystemExit("--url is required for OpenAPI help/validation")
            try:
                pr = _urlparse(ns.url)
                base_root = f"{pr.scheme}://{pr.netloc}"
            except Exception:
                base_root = ns.url
            spec = _ov.load_openapi(base_root)
        if not spec:
            print("OpenAPI: not found", file=__import__("sys").stderr)
            if getattr(ns, "openapi_help", False):
                return 0
        if getattr(ns, "openapi_help", False) and spec:
            txt = _ov.help_text(spec=spec, url=ns.url, method=method)
            print(txt)
            return 0
        if getattr(ns, "openapi_validate", False) and spec:
            issues = _ov.validate_request(
                spec=spec,
                url=ns.url,
                method=method,
                headers=validation_headers,
                params=params,
                data=body,
            )
            if issues:
                import sys as _sys

                for it in issues:
                    loc = it.get("loc")
                    name = it.get("name")
                    msg = it.get("message")
                    if (
                        isinstance(name, str)
                        and isinstance(msg, str)
                        and name.lower()
                        in {
                            "authorization",
                            "proxy-authorization",
                            "token",
                            "secret",
                            "key",
                        }
                    ):
                        msg = "<redacted>"
                    _sys.stderr.write(f"OpenAPI validation: {loc} {name}: {msg}\n")
                if getattr(ns, "openapi_strict", False):
                    raise SystemExit(2)
            else:
                print("OpenAPI validation: OK")
                return 0

    # Preset defaults (user-provided flags win)
    preset = getattr(ns, "preset", None)
    if preset == "limitless-lifelogs":
        if not getattr(ns, "paginate", None) or ns.paginate == "none":
            paginate = "cursor"
        if not getattr(ns, "cursor_param", None):
            ns.cursor_param = "cursor"
        if not getattr(ns, "next_cursor_json_path", None):
            ns.next_cursor_json_path = "meta.lifelogs.nextCursor"
        # Map --since to 'start'
        if getattr(ns, "since", None) and "start" not in params:
            params["start"] = ns.since
    elif preset == "limitless-audio":
        import os as _os

        # Default URL if not provided
        if not getattr(ns, "url", None):
            base = _os.environ.get("LIMITLESS_API_URL", "https://api.limitless.ai/v1")
            ns.url = base.rstrip("/") + "/download-audio"
        # Headers: Accept and expected content type
        headers.setdefault("Accept", "audio/ogg")
        if not getattr(ns, "expect_content_type", None):
            ns.expect_content_type = "audio/ogg"
        # Default audio source if not supplied
        if getattr(ns, "audio_source", None) and "audioSource" not in params:
            params["audioSource"] = ns.audio_source
        elif "audioSource" not in params:
            params["audioSource"] = "pendant"
        # Map start/end or since/duration to epoch ms (startMs/endMs)
        from datetime import datetime, timezone

        def _parse_iso(s: str) -> datetime:
            try:
                # Support 'Z'
                if s.endswith("Z"):
                    s = s[:-1] + "+00:00"
                return datetime.fromisoformat(s)
            except Exception as exc:  # pragma: no cover - defensive
                raise SystemExit(
                    f"Invalid ISO datetime for --start/--end: {s}"
                ) from exc

        def _to_ms(dt: datetime) -> str:
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return str(int(dt.timestamp() * 1000))

        start_iso = getattr(ns, "start", None)
        end_iso = getattr(ns, "end", None)
        since_iso = getattr(ns, "since", None)
        duration_iso = getattr(ns, "duration", None)
        if start_iso and end_iso:
            start_dt = _parse_iso(start_iso)
            end_dt = _parse_iso(end_iso)
        elif since_iso and duration_iso:
            start_dt = _parse_iso(since_iso)
            # Robust ISO-8601 duration parsing (P[nD]T[nH][nM][nS])
            from zyra.utils.iso8601 import iso_duration_to_timedelta

            try:
                td = iso_duration_to_timedelta(duration_iso)
            except Exception as exc:
                raise SystemExit(
                    f"Unsupported ISO-8601 duration: {duration_iso}"
                ) from exc
            end_dt = start_dt + td
        else:
            # Expect startMs/endMs already via --params
            start_dt = end_dt = None
        if start_dt and end_dt:
            # Validate max 2 hours
            if (end_dt - start_dt).total_seconds() > 7200:
                raise SystemExit(
                    "Limitless audio maximum duration is 2 hours per request"
                )
            params.setdefault("startMs", _to_ms(start_dt))
            params.setdefault("endMs", _to_ms(end_dt))
        # Streaming is recommended
        if not getattr(ns, "stream", False):
            ns.stream = True

    pages: list[bytes] = []
    if paginate == "none" and not getattr(ns, "stream", False):
        status, _resp_headers, content = api_backend.request_with_retries(
            method,
            ns.url,
            headers=headers,
            params=params,
            data=body,
            timeout=timeout,
            max_retries=max_retries,
            retry_backoff=retry_backoff,
        )
        if not allow_non_2xx and status >= 400:
            raise SystemExit(f"HTTP error {status}")
        if getattr(ns, "pretty", False):
            try:
                import json

                obj = json.loads(content.decode("utf-8"))
                content = (json.dumps(obj, ensure_ascii=False, indent=2) + "\n").encode(
                    "utf-8"
                )
            except Exception:
                pass
        with open_output(ns.output) as f:
            f.write(content)
        return 0

    # Streaming (binary-safe, large payloads)
    if getattr(ns, "stream", False):
        try:
            import requests as _requests  # type: ignore
        except Exception as exc:  # pragma: no cover - runtime error path
            raise SystemExit(
                "The 'requests' package is required for streaming; install extras: 'pip install \"zyra[connectors]\"'"
            ) from exc
        # Optional HEAD preflight
        if getattr(ns, "head_first", False):
            r_head = _requests.head(
                ns.url,
                headers=headers,
                params=params,
                timeout=timeout,
                allow_redirects=True,
            )
            ct = r_head.headers.get("Content-Type")
            if getattr(ns, "expect_content_type", None) and (
                not ct or ns.expect_content_type not in ct
            ):
                raise SystemExit(f"Unexpected Content-Type: {ct!r}")
        # Accept header
        if getattr(ns, "accept", None):
            headers.setdefault("Accept", ns.accept)
        # Resume support
        out_path = getattr(ns, "output", "-")
        start_at = 0
        if getattr(ns, "resume", False) and out_path not in (None, "-"):
            from pathlib import Path as _P

            p = _P(out_path)
            if p.is_file():
                start_at = p.stat().st_size
                if start_at > 0:
                    headers["Range"] = f"bytes={start_at}-"
        resp = _requests.request(
            method,
            ns.url,
            headers=headers,
            params=params,
            data=body,
            timeout=timeout,
            stream=True,
            allow_redirects=True,
        )
        status = resp.status_code
        if not allow_non_2xx and status >= 400:
            raise SystemExit(f"HTTP error {status}")
        ct = resp.headers.get("Content-Type")
        if getattr(ns, "expect_content_type", None) and (
            not ct or ns.expect_content_type not in ct
        ):
            raise SystemExit(f"Unexpected Content-Type: {ct!r}")
        # Detect filename when output is a directory
        out = getattr(ns, "output", "-") or "-"
        if out not in ("-", None):
            from pathlib import Path as _P

            out_p = _P(out)
            if (out_p.exists() and out_p.is_dir()) or str(out).endswith("/"):
                if not getattr(ns, "detect_filename", False):
                    raise SystemExit(
                        "Output is a directory; pass --detect-filename to infer a name or specify a file path"
                    )
                name = None
                cd = resp.headers.get("Content-Disposition") or ""
                if "filename=" in cd:
                    name = cd.split("filename=", 1)[1].strip().strip('"')
                if not name and ct:
                    ct_main = ct.split(";", 1)[0].strip().lower()
                    ext_map = {
                        "audio/ogg": ".ogg",
                        "audio/mpeg": ".mp3",
                        "audio/wav": ".wav",
                        "video/mp4": ".mp4",
                        "video/webm": ".webm",
                        "video/ogg": ".ogv",
                        "image/png": ".png",
                        "image/jpeg": ".jpg",
                        "image/gif": ".gif",
                        "image/webp": ".webp",
                        "application/pdf": ".pdf",
                        "application/zip": ".zip",
                        "application/octet-stream": ".bin",
                    }
                    ext = ext_map.get(ct_main)
                    if ext:
                        name = f"download{ext}"
                if not name:
                    name = "download.bin"
                out_p = out_p / name
                ns.output = str(out_p)
        # Write chunks
        total = 0
        try:
            total = int(resp.headers.get("Content-Length") or 0)
        except Exception:
            total = 0
        downloaded = 0
        show_progress = bool(getattr(ns, "progress", False)) and total > 0
        # When resuming and the file already exists, append to it; otherwise, open normally
        from contextlib import ExitStack as _ExitStack

        with _ExitStack() as _stack:
            if out not in ("-", None) and getattr(ns, "resume", False) and start_at > 0:
                from pathlib import Path as _P

                writer = _stack.enter_context(_P(ns.output).open("ab"))
            else:
                # Use standard helper which respects '-' to stdout
                from zyra.utils.io_utils import open_output as _open_output

                writer = _stack.enter_context(_open_output(ns.output))
            for chunk in resp.iter_content(chunk_size=1024 * 1024):
                if not chunk:
                    continue
                writer.write(chunk)
                if show_progress:
                    downloaded += len(chunk)
                    try:
                        import sys as _sys

                        pct = (downloaded / total) * 100.0
                        _sys.stderr.write(
                            f"\rDownloaded {downloaded:,}/{total:,} bytes ({pct:5.1f}%)"
                        )
                        _sys.stderr.flush()
                    except Exception:
                        show_progress = False
        if show_progress:
            try:
                import sys as _sys

                _sys.stderr.write("\n")
                _sys.stderr.flush()
            except Exception:
                pass
        return 0

    # Pagination
    out_is_ndjson = bool(getattr(ns, "newline_json", False))
    if out_is_ndjson:
        with open_output(ns.output) as writer:
            if paginate == "cursor":
                cursor_param = getattr(ns, "cursor_param", "cursor")
                next_cursor_json_path = getattr(ns, "next_cursor_json_path", "next")
                for status, _h, content in api_backend.paginate_cursor(
                    method,
                    ns.url,
                    headers=headers,
                    params=params,
                    data=body,
                    timeout=timeout,
                    max_retries=max_retries,
                    retry_backoff=retry_backoff,
                    cursor_param=cursor_param,
                    next_cursor_json_path=next_cursor_json_path,
                ):
                    if not allow_non_2xx and status >= 400:
                        raise SystemExit(f"HTTP error {status}")
                    writer.write(content.rstrip(b"\n") + b"\n")
            elif paginate == "page":
                page_param = getattr(ns, "page_param", "page")
                page_start = int(getattr(ns, "page_start", 1) or 1)
                page_size_param = getattr(ns, "page_size_param", None)
                page_size = getattr(ns, "page_size", None)
                empty_json_path = getattr(ns, "empty_json_path", None)
                for status, _h, content in api_backend.paginate_page(
                    method,
                    ns.url,
                    headers=headers,
                    params=params,
                    data=body,
                    timeout=timeout,
                    max_retries=max_retries,
                    retry_backoff=retry_backoff,
                    page_param=page_param,
                    page_start=page_start,
                    page_size_param=page_size_param,
                    page_size=page_size,
                    empty_json_path=empty_json_path,
                ):
                    if not allow_non_2xx and status >= 400:
                        raise SystemExit(f"HTTP error {status}")
                    writer.write(content.rstrip(b"\n") + b"\n")
            elif paginate == "link":
                link_rel = getattr(ns, "link_rel", "next")
                for status, _h, content in api_backend.paginate_link(
                    method,
                    ns.url,
                    headers=headers,
                    params=params,
                    data=body,
                    timeout=timeout,
                    max_retries=max_retries,
                    retry_backoff=retry_backoff,
                    link_rel=link_rel,
                ):
                    if not allow_non_2xx and status >= 400:
                        raise SystemExit(f"HTTP error {status}")
                    writer.write(content.rstrip(b"\n") + b"\n")
            else:
                raise SystemExit(
                    "Unsupported --paginate value. Use 'none', 'page', 'cursor', or 'link'"
                )
        return 0

    # Aggregate paginated pages as a JSON array when not using NDJSON
    import json as _json

    if paginate == "cursor":
        for status, _h, content in api_backend.paginate_cursor(
            method,
            ns.url,
            headers=headers,
            params=params,
            data=body,
            timeout=timeout,
            max_retries=max_retries,
            retry_backoff=retry_backoff,
            cursor_param=getattr(ns, "cursor_param", "cursor"),
            next_cursor_json_path=getattr(ns, "next_cursor_json_path", "next"),
        ):
            if not allow_non_2xx and status >= 400:
                raise SystemExit(f"HTTP error {status}")
            pages.append(content)
    elif paginate == "page":
        for status, _h, content in api_backend.paginate_page(
            method,
            ns.url,
            headers=headers,
            params=params,
            data=body,
            timeout=timeout,
            max_retries=max_retries,
            retry_backoff=retry_backoff,
            page_param=getattr(ns, "page_param", "page"),
            page_start=int(getattr(ns, "page_start", 1) or 1),
            page_size_param=getattr(ns, "page_size_param", None),
            page_size=getattr(ns, "page_size", None),
            empty_json_path=getattr(ns, "empty_json_path", None),
        ):
            if not allow_non_2xx and status >= 400:
                raise SystemExit(f"HTTP error {status}")
            pages.append(content)
    elif paginate == "link":
        for status, _h, content in api_backend.paginate_link(
            method,
            ns.url,
            headers=headers,
            params=params,
            data=body,
            timeout=timeout,
            max_retries=max_retries,
            retry_backoff=retry_backoff,
            link_rel=getattr(ns, "link_rel", "next"),
        ):
            if not allow_non_2xx and status >= 400:
                raise SystemExit(f"HTTP error {status}")
            pages.append(content)
    else:
        raise SystemExit(
            "Unsupported --paginate value. Use 'none', 'page', 'cursor', or 'link'"
        )

    arr = []
    for b in pages:
        try:
            arr.append(_json.loads(b.decode("utf-8")))
        except Exception:
            arr.append(None)
    payload = (_json.dumps(arr, ensure_ascii=False) + "\n").encode("utf-8")
    with open_output(ns.output) as f:
        f.write(payload)
    return 0


[docs] def register_cli(acq_subparsers: Any) -> None: # http p_http = acq_subparsers.add_parser( "http", help="Fetch via HTTP(S)", description=( "Fetch a file via HTTP(S) to a local path. Optionally list/filter directory pages, " "or fetch multiple URLs with --inputs/--manifest." ), ) p_http.add_argument("url") add_output_option(p_http) p_http.add_argument( "--list", action="store_true", help="List links on a directory page" ) p_http.add_argument("--pattern", help="Regex to filter listed links") p_http.add_argument("--since", help="ISO date filter for list mode") p_http.add_argument( "--since-period", dest="since_period", help="ISO-8601 duration for lookback (e.g., P1Y, P6M, P7D, PT24H)", ) p_http.add_argument("--until", help="ISO date filter for list mode") p_http.add_argument( "--date-format", dest="date_format", help="Filename date format for list filtering (e.g., YYYYMMDD)", ) p_http.add_argument("--inputs", nargs="+", help="Multiple HTTP URLs to fetch") p_http.add_argument("--manifest", help="Path to a file listing URLs (one per line)") p_http.add_argument( "--output-dir", dest="output_dir", help="Directory to write outputs for --inputs", ) p_http.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) p_http.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) p_http.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) p_http.add_argument( "--header", action="append", help="Add custom HTTP header 'Name: Value' (repeatable)", ) p_http.add_argument( "--auth", help=( "Convenience auth helper: 'bearer:$TOKEN' -> Authorization: Bearer <value>, " "'basic:user:pass' sets HTTP Basic auth" ), ) p_http.add_argument( "--credential", action="append", dest="credential", help=( "Credential slot resolution (repeatable), e.g., 'token=$API_TOKEN' or " "'header.Authorization=@EUMETSAT_TOKEN'" ), ) p_http.add_argument( "--credential-file", dest="credential_file", help="Optional dotenv file for resolving @KEY credentials", ) p_http.set_defaults(func=_cmd_http) # s3 p_s3 = acq_subparsers.add_parser( "s3", help="Fetch from S3", description=( "Fetch objects from Amazon S3 via s3:// URL or bucket/key. Supports unsigned access, " "listing prefixes, and batch via --inputs/--manifest." ), ) # Either a single s3:// URL or bucket+key grp = p_s3.add_mutually_exclusive_group(required=True) grp.add_argument("--url", help="Full URL s3://bucket/key") grp.add_argument("--bucket", help="Bucket name") p_s3.add_argument("--key", help="Object key (when using --bucket)") p_s3.add_argument( "--unsigned", action="store_true", help="Use unsigned access for public buckets" ) p_s3.add_argument("--list", action="store_true", help="List keys under a prefix") p_s3.add_argument("--pattern", help="Regex to filter listed keys") p_s3.add_argument("--since", help="ISO date filter for list mode") p_s3.add_argument( "--since-period", dest="since_period", help="ISO-8601 duration for lookback (e.g., P1Y, P6M, P7D, PT24H)", ) p_s3.add_argument("--until", help="ISO date filter for list mode") p_s3.add_argument( "--date-format", dest="date_format", help="Filename date format for list filtering (e.g., YYYYMMDD)", ) p_s3.add_argument("--inputs", nargs="+", help="Multiple s3:// URLs to fetch") p_s3.add_argument( "--manifest", help="Path to a file listing s3:// URLs (one per line)" ) p_s3.add_argument( "--output-dir", dest="output_dir", help="Directory to write outputs for --inputs", ) add_output_option(p_s3) p_s3.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) p_s3.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) p_s3.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) p_s3.set_defaults(func=_cmd_s3) # ftp p_ftp = acq_subparsers.add_parser( "ftp", help="Fetch from FTP", description=( "Fetch files via FTP (single path or batch). Optionally list or sync directories to a local folder." ), ) p_ftp.add_argument("path", help="ftp://host/path or host/path") add_output_option(p_ftp) p_ftp.add_argument( "--list", action="store_true", help="List files in an FTP directory" ) p_ftp.add_argument( "--sync-dir", dest="sync_dir", help="Sync FTP directory to a local directory" ) p_ftp.add_argument("--pattern", help="Regex to filter list/sync") p_ftp.add_argument("--since", help="ISO date filter for list/sync") p_ftp.add_argument( "--since-period", dest="since_period", help="ISO-8601 duration for lookback (e.g., P1Y, P6M, P7D, PT24H)", ) p_ftp.add_argument("--until", help="ISO date filter for list/sync") p_ftp.add_argument( "--date-format", dest="date_format", help="Filename date format for filtering (e.g., YYYYMMDD)", ) p_ftp.add_argument("--inputs", nargs="+", help="Multiple FTP paths to fetch") p_ftp.add_argument( "--manifest", help="Path to a file listing FTP paths (one per line)" ) p_ftp.add_argument( "--output-dir", dest="output_dir", help="Directory to write outputs for --inputs", ) p_ftp.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) p_ftp.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) p_ftp.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) p_ftp.add_argument( "--user", help="FTP username (alias for --credential user=...)", ) p_ftp.add_argument( "--password", help="FTP password (alias for --credential password=...)", ) p_ftp.add_argument( "--credential", action="append", dest="credential", help=( "Credential slot resolution (repeatable), e.g., 'user=@FTP_USER' or " "'password=$FTP_PASS'" ), ) p_ftp.add_argument( "--credential-file", dest="credential_file", help="Optional dotenv file for resolving @KEY credentials", ) p_ftp.set_defaults(func=_cmd_ftp) # vimeo (placeholder) p_vimeo = acq_subparsers.add_parser( "vimeo", help="Fetch video by id (not implemented)", description=( "Placeholder for fetching Vimeo videos by id. Not implemented yet." ), ) p_vimeo.add_argument("video_id") add_output_option(p_vimeo) p_vimeo.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) p_vimeo.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) p_vimeo.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) p_vimeo.set_defaults(func=_cmd_vimeo) # api (generic REST) p_api = acq_subparsers.add_parser( "api", help="Generic REST API fetch", description=( "Call a REST API endpoint with headers/params/body. Supports cursor/page pagination." ), ) p_api.add_argument( "--preset", choices=["limitless-lifelogs", "limitless-audio"], help="Apply provider-specific defaults (e.g., Limitless lifelogs cursor mapping; Limitless audio download)", ) p_api.add_argument("--url", help="Target endpoint URL (may be set by preset)") p_api.add_argument( "--method", default="GET", help="HTTP method (GET, POST, DELETE, PUT, PATCH)", ) add_output_option(p_api) p_api.add_argument( "--header", action="append", help="Custom header 'K: V' (repeatable)", ) p_api.add_argument( "--content-type", dest="content_type", help="Content-Type header (e.g., application/json)", ) p_api.add_argument( "--auth", help=( "Convenience auth helper: 'bearer:$TOKEN' -> Authorization: Bearer <value>, " "'basic:user:pass' -> Authorization: Basic <base64(user:pass)>" ), ) p_api.add_argument( "--params", help="URL query parameters as k1=v1&k2=v2", ) p_api.add_argument( "--credential", action="append", dest="credential", help="Credential slot resolution (repeatable), e.g., token=$API_TOKEN", ) p_api.add_argument( "--credential-file", dest="credential_file", help="Optional dotenv file for resolving @KEY credentials", ) p_api.add_argument( "--since", help="Convenience ISO start time; may map to provider param under presets", ) p_api.add_argument( "--data", help="Inline JSON string or @path/to/file (JSON or raw)", ) p_api.add_argument( "--paginate", choices=["none", "page", "cursor", "link"], default="none", help="Pagination mode", ) # page-based p_api.add_argument("--page-param", default="page") p_api.add_argument("--page-start", type=int, default=1) p_api.add_argument("--page-size-param") p_api.add_argument("--page-size", type=int) p_api.add_argument( "--empty-json-path", help="Dot path for list to detect empty page (stops when empty)", ) # cursor-based p_api.add_argument("--cursor-param", default="cursor") p_api.add_argument( "--next-cursor-json-path", default="next", help="Dot path to next cursor in response", ) # link-based p_api.add_argument( "--link-rel", dest="link_rel", default="next", help="Link relation to follow when --paginate link (default: next)", ) # output behavior p_api.add_argument( "--newline-json", action="store_true", help="Write each page as one JSON line (NDJSON)", ) p_api.add_argument( "--pretty", action="store_true", default=True, help="Pretty-print JSON for single response", ) # Binary/streaming options p_api.add_argument( "--stream", action="store_true", help="Stream large/binary responses to output" ) p_api.add_argument( "--detect-filename", dest="detect_filename", action="store_true", help="When output is a directory, infer filename from headers/content-type", ) p_api.add_argument( "--accept", help="Set Accept header (e.g., audio/ogg)", ) p_api.add_argument( "--expect-content-type", dest="expect_content_type", help="Fail if response Content-Type does not contain this value", ) p_api.add_argument( "--head-first", dest="head_first", action="store_true", help="Send a HEAD request before GET to validate type/size", ) p_api.add_argument( "--resume", action="store_true", help="Attempt HTTP Range resume when possible" ) p_api.add_argument( "--progress", action="store_true", help="Show simple byte progress when Content-Length is available", ) # OpenAPI-aided help and validation p_api.add_argument( "--openapi-help", action="store_true", help="Fetch OpenAPI and print required params/headers/body for the resolved operation", ) p_api.add_argument( "--openapi-validate", action="store_true", help="Validate provided params/headers/body against OpenAPI (prints issues)", ) p_api.add_argument( "--openapi-strict", action="store_true", help="Exit non-zero when --openapi-validate finds issues", ) p_api.add_argument( "--openapi-url", help=( "Explicit OpenAPI spec URL (json/yaml). Overrides automatic discovery based on --url" ), ) # Limitless audio helpers p_api.add_argument( "--start", help="ISO-8601 start time (e.g., 2025-08-01T00:00:00Z)" ) p_api.add_argument("--end", help="ISO-8601 end time (e.g., 2025-08-01T02:00:00Z)") p_api.add_argument( "--duration", help="ISO-8601 duration for limitless-audio preset (e.g., PT2H, PT30M)", ) p_api.add_argument( "--audio-source", dest="audio_source", choices=["pendant", "app"], help="Limitless audio source (maps to audioSource)", ) # retries & timeouts p_api.add_argument("--timeout", type=int, default=60) p_api.add_argument("--max-retries", type=int, default=3) p_api.add_argument("--retry-backoff", type=float, default=0.5) p_api.add_argument( "--allow-non-2xx", action="store_true", help="Do not exit non-zero for HTTP >= 400", ) p_api.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) p_api.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) p_api.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) p_api.set_defaults(func=_cmd_api)