datavizhub.acquisition package

Data acquisition managers (FTP, HTTP, S3, Vimeo) and base interface.

Provides DataAcquirer and concrete managers for fetching and uploading resources from remote sources. See each manager’s docstrings for usage and supported capabilities.

Notes

  • HTTPManager is provided as a convenience alias to HTTPHandler for naming consistency across managers. Both refer to the same implementation.

exception datavizhub.acquisition.AcquisitionError[source]

Bases: Exception

Base exception for acquisition-related errors.

class datavizhub.acquisition.DataAcquirer[source]

Bases: ABC

Abstract interface for acquiring data from remote sources.

This abstract base class defines the minimal contract that any remote data source manager must implement to interoperate with the rest of the library. It standardizes how to connect to a source, fetch a resource to a local destination, enumerate available resources, and cleanly disconnect.

Parameters:
  • source (str) – Conceptual identifier of the remote source. Concrete implementations interpret this as needed (e.g., FTP host, S3 bucket, base URL, API). The value is typically provided through the concrete class constructor.

  • destination (str) – Conceptual identifier for the local destination (e.g., local file path) used by fetch(). This is usually supplied per call via local_filename and not stored on the instance.

Notes

  • Implementations are responsible for their own connection client lifecycle (initialize in connect(), cleanup in disconnect()).

  • list_files() may return None for sources that do not support listing (e.g., generic HTTP URLs).

Examples

Basic pipeline using a concrete manager:

from datavizhub.acquisition.ftp_manager import FTPManager

acq = FTPManager(host="ftp.example.com", username="anonymous", password="test@test.com")
acq.connect()
# Optional: enumerate files under a directory on the server
for name in (acq.list_files("/pub") or []):
    print(name)
# Fetch a file to the current working directory
acq.fetch("/pub/data/file.txt", local_filename="file.txt")
acq.disconnect()

Selecting a manager dynamically by source type:

def get_manager(config):
    if config["type"] == "s3":
        from datavizhub.acquisition.s3_manager import S3Manager
        return S3Manager(config["access_key"], config["secret_key"], config["bucket"])
    elif config["type"] == "ftp":
        from datavizhub.acquisition.ftp_manager import FTPManager
        return FTPManager(config["host"], config.get("port", 21), config["user"], config["pass"])
    else:
        from datavizhub.acquisition.http_manager import HTTPHandler
        return HTTPHandler()

mgr = get_manager(cfg)
mgr.connect()
mgr.fetch(cfg["remote"], cfg.get("local"))
mgr.disconnect()

Using as a context manager:

from datavizhub.acquisition.ftp_manager import FTPManager

with FTPManager(host="ftp.example.com") as mgr:
    mgr.fetch("/pub/data/file.txt", local_filename="file.txt")
property capabilities: set[str]

Set of capability strings (e.g., {‘fetch’,’upload’,’list’}).

abstract connect() None[source]

Establish a connection or initialize the client as needed.

Notes

Implementations should set up any underlying network clients or authenticate to remote services. This method should be idempotent and safe to call multiple times.

property connected: bool

Whether the manager considers itself connected.

delete(remote_path: str) bool[source]

Delete a remote resource if supported.

Notes

Default implementation is not supported. Subclasses may override.

Raises:

NotSupportedError – Always raised for the default implementation.

abstract disconnect() None[source]

Tear down the connection or client resources if applicable.

Notes

Implementations should release sockets/clients and clear references so that instances are reusable or can be garbage-collected cleanly.

exists(remote_path: str) bool[source]

Check whether a remote path exists.

Notes

Default implementation is not supported. Subclasses may override.

Raises:

NotSupportedError – Always raised for the default implementation.

abstract fetch(remote_path: str, local_filename: str | None = None) bool[source]

Fetch a remote resource to a local file.

Parameters:
  • remote_path (str) – Remote identifier (e.g., URL, S3 key, FTP path) of the resource to download.

  • local_filename (str, optional) – Local destination filename or path. If omitted, implementations may infer a name from remote_path.

Returns:

True on successful fetch, False on failure.

Return type:

bool

fetch_many(items: Iterable[str], dest_dir: str) list[Tuple[str, bool]][source]

Fetch multiple remote resources to a destination directory.

Parameters:
  • items (Iterable[str]) – Collection of remote paths/identifiers to fetch.

  • dest_dir (str) – Local directory where files will be written.

Returns:

A list of (remote_path, success) tuples.

Return type:

list of (str, bool)

abstract list_files(remote_path: str | None = None) Iterable[str] | None[source]

List files or resources available at the remote path.

Parameters:

remote_path (str, optional) – Remote path, prefix, or locator to enumerate. If omitted, the implementation may list a default location (e.g., current directory for FTP or entire bucket/prefix for S3).

Returns:

Iterable of resource names/keys/paths. May return None if the operation is not supported by the source (e.g., HTTP URLs).

Return type:

Iterable of str or None

stat(remote_path: str)[source]

Return remote metadata if supported.

Notes

Default implementation is not supported. Subclasses may override.

Raises:

NotSupportedError – Always raised for the default implementation.

abstract upload(local_path: str, remote_path: str) bool[source]

Upload a local resource to the remote destination.

Parameters:
  • local_path (str) – Local filesystem path of the resource to upload.

  • remote_path (str) – Remote destination identifier (e.g., FTP path, S3 key).

Returns:

True on success, False on failure.

Return type:

bool

class datavizhub.acquisition.FTPManager(host: str, port: int = 21, username: str = 'anonymous', password: str = 'test@test.com', timeout: int = 30)[source]

Bases: DataAcquirer

Acquire files from FTP servers using passive mode.

This manager wraps Python’s ftplib to provide reliable FTP interactions including connecting, listing directories, and downloading files. It standardizes the acquisition interface via DataAcquirer and preserves the original convenience methods used elsewhere in the project.

Supported Protocols

  • ftp://

param host:

FTP server hostname or IP address.

type host:

str

param port:

FTP server port.

type port:

int, default=21

param username:

Username for authentication.

type username:

str, default=”anonymous”

param password:

Password for authentication.

type password:

str, default=”test@test.com

param timeout:

Socket timeout in seconds.

type timeout:

int, default=30

Examples

Download a file from an FTP directory:

from datavizhub.acquisition.ftp_manager import FTPManager

ftp = FTPManager("ftp.example.com")
ftp.connect()
ftp.fetch("/pub/some/file.txt", "file.txt")
ftp.disconnect()
CAPABILITIES = {'fetch', 'list', 'upload'}
connect() None[source]

Connect to the FTP server and enable passive mode.

Raises:

Exception – Propagates any underlying connection or authentication failure.

delete(remote_path: str) bool[source]

Delete a remote file if possible.

Parameters:

remote_path (str) – Path to the remote file (may include directories).

delete_empty_files(dir_path: Path) None[source]

Delete zero-byte files in a directory.

Parameters:

dir_path (pathlib.Path) – Directory to scan for empty files.

disconnect() None[source]

Close the FTP session if connected.

download_byteranges(remote_path: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes[source]

Download multiple byte ranges using parallel FTP connections.

Notes

  • Spawns one short-lived FTP connection per range (per thread) to maintain thread safety across requests; this incurs connection overhead but avoids shared-socket issues in ftplib.

  • Uses REST to position the transfer at the desired start byte; stops reading after the requested range length. Results are concatenated in the input order.

download_file(remote_file_path: str, local_file_path: str) bool[source]

Download a single file via FTP with retries.

Parameters:
  • remote_file_path (str) – Remote file path (may include directories).

  • local_file_path (str) – Local destination path including filename.

Returns:

True if downloaded and non-zero in size; False otherwise.

Return type:

bool

Raises:
  • FileNotFoundError – If the remote file does not exist.

  • Exception – If the final attempt fails for any other reason.

exists(remote_path: str) bool[source]

Return True if the remote file exists on the FTP server.

Parameters:

remote_path (str) – Path to the remote file (may include directories).

fetch(remote_path: str, local_filename: str | None = None) bool[source]

Download a remote file to a local path.

Parameters:
  • remote_path (str) – Full remote file path (may include directories).

  • local_filename (str, optional) – Local destination path. Defaults to basename of remote_path.

Returns:

True on success, False otherwise.

Return type:

bool

get_chunks(remote_path: str, chunk_size: int = 524288000) list[str][source]
get_idx_lines(remote_path: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3) list[str] | None[source]

Fetch and parse the GRIB .idx for a remote path.

Appends .idx to remote_path unless an explicit .idx is provided.

get_size(remote_path: str) int | None[source]

Return the size in bytes for a remote file using SIZE.

idx_to_byteranges(lines: list[str], search_str: str) dict[str, str][source]
list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None[source]

List names at the given remote path and optionally filter by regex.

Parameters:
  • remote_path (str, optional) – Remote directory to list. Defaults to current server directory.

  • pattern (str, optional) – Regular expression applied to names returned by NLST (via re.search()).

Returns:

Filenames present in the directory, or None on error.

Return type:

list of str or None

stat(remote_path: str)[source]

Return minimal metadata for a remote file (size in bytes).

Parameters:

remote_path (str) – Path to the remote file (may include directories).

Returns:

A mapping with {"size": int} if available; None on error.

Return type:

dict or None

sync_ftp_directory(remote_dir: str, local_dir: str, dataset_period: str) None[source]

Synchronize a remote directory to local storage by date range.

Parameters:
  • remote_dir (str) – Remote FTP directory to synchronize.

  • local_dir (str) – Local directory to mirror files into.

  • dataset_period (str) – Period spec parsable by DateManager (e.g., "7d", "24h").

Notes

  • Downloads files within the date range not present locally or with size 0.

  • Deletes local files no longer present remotely.

upload(local_path: str, remote_path: str) bool[source]

Standardized upload implementation delegating to upload_file().

Parameters:
  • local_path (str) – Local file path to upload.

  • remote_path (str) – Remote destination path.

Returns:

True on success.

Return type:

bool

upload_file(local_file_path: str, remote_file_path: str) None[source]

Upload a local file to the FTP server with retries.

Parameters:
  • local_file_path (str) – Local file path to upload.

  • remote_file_path (str) – Remote path including target filename.

Raises:

Exception – When the final attempt fails to upload the file.

class datavizhub.acquisition.HTTPHandler[source]

Bases: DataAcquirer

CAPABILITIES = {'fetch'}

Acquire files over HTTP/HTTPS.

This lightweight manager performs simple HTTP(S) GETs to fetch remote resources to the local filesystem. Because HTTP is stateless for these operations, connect() and disconnect() are no-ops.

Supported Protocols

  • http://

  • https://

Examples

Download a file via HTTPS:

from datavizhub.acquisition.http_manager import HTTPHandler

http = HTTPHandler()
http.connect()  # no-op
http.fetch("https://example.com/data.json", "data.json")
http.disconnect()  # no-op
connect() None[source]

Initialize the handler (no persistent connection).

Notes

Provided for API parity; does nothing for basic HTTP GETs.

disconnect() None[source]

No persistent connection to tear down.

download_byteranges(url: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes[source]

Parallel ranged downloads concatenated in the order of byte_ranges.

Parameters:
  • url (str) – Target URL for the ranged GET requests.

  • byte_ranges (Iterable[str]) – Iterable of Range header values, e.g., "bytes=0-99".

  • max_workers (int, default=10) – Number of worker threads for parallelism.

  • timeout (int, default=30) – Per-request timeout (seconds).

static download_file(url: str, filename: str) None[source]

Compatibility helper that downloads a file.

Parameters:
  • url (str) – File URL to download.

  • filename (str) – Local destination path.

fetch(remote_path: str, local_filename: str | None = None) bool[source]

Download content at remote_path to local_filename.

Parameters:
  • remote_path (str) – Full HTTP(S) URL to download.

  • local_filename (str, optional) – Local destination path. Defaults to the basename of the URL.

Returns:

True on success, False if request fails.

Return type:

bool

static fetch_data(url: str)[source]

Fetch binary payload via GET.

Parameters:

url (str) – URL to request.

Returns:

Raw response body on success, otherwise None.

Return type:

bytes or None

static fetch_headers(url: str)[source]

Perform a HEAD request and return headers.

Parameters:

url (str) – URL to request.

Returns:

Response headers on success, otherwise None.

Return type:

Mapping or None

static fetch_json(url: str)[source]

Fetch JSON content via GET and parse it.

Parameters:

url (str) – URL to request.

Returns:

Parsed JSON object on success, otherwise None.

Return type:

dict or list or None

static fetch_text(url: str)[source]

Fetch text content via GET.

Parameters:

url (str) – URL to request.

Returns:

Text response on success, otherwise None.

Return type:

str or None

get_chunks(url: str, chunk_size: int = 524288000) list[str][source]
get_idx_lines(url: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3) list[str] | None[source]

Fetch and parse the GRIB index (.idx) for a URL.

Appends .idx to url unless an explicit .idx path is provided. Retries are applied on transient failures.

get_size(url: str) int | None[source]

Return the Content-Length from a HEAD request if provided.

idx_to_byteranges(lines: list[str], search_str: str) dict[str, str][source]
list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None[source]

Attempt to list files by scraping anchor tags from an index page.

This is best-effort and intended for directory-style endpoints such as NOMADS listings. If the page is not HTML or contains no anchors, an empty list is returned.

Parameters:
  • remote_path (str) – Page URL to scrape for anchors.

  • pattern (str, optional) – Regular expression applied to full URLs (via re.search()).

static post_data(url: str, data, headers=None)[source]

Send a POST request and return the body.

Parameters:
  • url (str) – URL to post to.

  • data (Any) – Request payload.

  • headers (dict, optional) – Optional request headers.

Returns:

Response text on success, otherwise None.

Return type:

str or None

upload(local_path: str, remote_path: str) bool[source]

Uploading is not supported for HTTPHandler.

Raises:

NotSupportedError – Always raised to indicate upload is unsupported.

datavizhub.acquisition.HTTPManager

alias of HTTPHandler

exception datavizhub.acquisition.NotSupportedError[source]

Bases: AcquisitionError

Raised when an operation is not supported by a manager.

class datavizhub.acquisition.S3Manager(access_key: str | None, secret_key: str | None, bucket_name: str, unsigned: bool = False, region_name: str | None = None)[source]

Bases: DataAcquirer

CAPABILITIES = {'fetch', 'list', 'upload'}

Acquire objects from Amazon S3 buckets via boto3.

This manager wraps boto3’s S3 client to standardize connecting, listing, and fetching S3 objects using the acquisition interface.

Supported Protocols

  • s3:// (buckets and keys)

param access_key:

AWS access key ID. Optional for public buckets or when using IAM roles.

type access_key:

str, optional

param secret_key:

AWS secret access key.

type secret_key:

str, optional

param bucket_name:

Default S3 bucket to operate on.

type bucket_name:

str

param unsigned:

Disable request signing for public buckets using botocore.config.Config(signature_version=UNSIGNED).

type unsigned:

bool, default=False

param region_name:

AWS region for the client. If omitted, botocore defaults apply.

type region_name:

str, optional

Examples

Download a key to a local file:

from datavizhub.acquisition.s3_manager import S3Manager

s3 = S3Manager("AKIA...", "SECRET...", "my-bucket")
s3.connect()
s3.fetch("path/to/object.nc", "object.nc")
s3.disconnect()

Public bucket access (unsigned):

from datavizhub.acquisition.s3_manager import S3Manager
s3 = S3Manager(None, None, bucket_name="noaa-hrrr-bdp-pds", unsigned=True)
lines = s3.get_idx_lines("hrrr.20230801/conus/hrrr.t00z.wrfsfcf00.grib2")
ranges = s3.idx_to_byteranges(lines, r"(:TMP:surface|:PRATE:surface)")
blob = s3.download_byteranges("hrrr.20230801/conus/hrrr.t00z.wrfsfcf00.grib2", ranges.keys())
__init__(access_key: str | None, secret_key: str | None, bucket_name: str, unsigned: bool = False, region_name: str | None = None) None[source]

Initialize the S3 manager.

Parameters:
  • access_key (str, optional) – AWS access key ID. Optional for public buckets or when using IAM roles.

  • secret_key (str, optional) – AWS secret access key.

  • bucket_name (str) – Default S3 bucket to operate on.

  • unsigned (bool, default=False) – When True, disable request signing for public buckets using botocore.config.Config(signature_version=UNSIGNED).

  • region_name (str, optional) – AWS region for the client. If omitted, botocore’s default resolution is used.

connect() None[source]

Create an S3 client using the provided credentials.

Raises:
  • NoCredentialsError – When credentials are not available or invalid.

  • botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError – On other client initialization failures.

delete(remote_path: str) bool[source]

Delete an object from the bucket.

disconnect() None[source]

Release the client reference.

Notes

boto3 clients do not require explicit shutdown. Setting the reference to None allows the instance to be reused or garbage-collected.

download_byteranges(key: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes[source]

Download multiple byte ranges from an object and concatenate results.

Parameters:
  • key (str) – Object key within the configured bucket.

  • byte_ranges (Iterable[str]) – Iterable of Range header values (e.g., "bytes=0-99").

  • max_workers (int, default=10) – Maximum parallel workers for ranged requests.

  • timeout (int, default=30) – Timeout per ranged request (seconds).

Returns:

Concatenated payload of the requested ranges, preserving the order of the input byte_ranges.

Return type:

bytes

download_file(file_path: str, local_file_name: str) bool[source]

Compatibility method: download an S3 key.

Parameters:
  • file_path (str) – S3 key to download.

  • local_file_name (str) – Local destination path.

Returns:

True on success, False on failure.

Return type:

bool

exists(remote_path: str) bool[source]

Return True if the object exists in the bucket.

fetch(remote_path: str, local_filename: str | None = None) bool[source]

Download an S3 key to a local file.

Parameters:
  • remote_path (str) – S3 key to download from bucket_name.

  • local_filename (str, optional) – Local destination filename. Defaults to the basename of the key.

Returns:

True on success, False on failure.

Return type:

bool

get_chunks(key: str, chunk_size: int = 524288000) list[str][source]

Compute contiguous chunk ranges for an S3 object.

The final range uses the file size as the inclusive end byte.

get_idx_lines(key: str, *, timeout: int = 30, max_retries: int = 3, write_to: str | None = None) list[str] | None[source]

Fetch and parse the GRIB index (.idx) for key.

Parameters:
  • key (str) – GRIB object key, or an explicit .idx path.

  • write_to (str, optional) – If provided, write the idx text to write_to (appends .idx if not present).

  • timeout (int, default=30) – Per-request timeout (seconds). Included for API consistency.

  • max_retries (int, default=3) – Simple retry count on transient errors.

Notes

Appends .idx to key unless an explicit .idx path is provided.

get_size(key: str) int | None[source]

Return the size in bytes for a given S3 object key.

Returns:

Content length if available, else None.

Return type:

int or None

idx_to_byteranges(lines: list[str], search_str: str) dict[str, str][source]

Wrapper for grib_utils.idx_to_byteranges() using regex filtering.

list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None[source]

List object keys under a prefix in the bucket with optional regex filter.

Parameters:
  • remote_path (str, optional) – Prefix to list. Defaults to all keys in the bucket.

  • pattern (str, optional) – Regular expression applied to full keys using re.search(). If provided, only matching keys are returned.

Returns:

Keys found under the prefix, or None on error.

Return type:

list of str or None

stat(remote_path: str)[source]

Return basic metadata for an object (size, last modified, etag).

upload(local_path: str, remote_path: str) bool[source]

Standardized upload implementation delegating to upload_file().

upload_file(file_path: str, s3_file_name: str) bool[source]

Upload a local file to the configured bucket.

Parameters:
  • file_path (str) – Local file path.

  • s3_file_name (str) – Destination S3 key within bucket_name.

Returns:

True on success, False otherwise.

Return type:

bool

class datavizhub.acquisition.VimeoManager(client_id: str, client_secret: str, access_token: str)[source]

Bases: DataAcquirer

CAPABILITIES = {'upload'}

Upload videos to Vimeo using PyVimeo.

This manager encapsulates video uploads and updates via the Vimeo API using PyVimeo. It participates in the acquisition interface for pipeline consistency, though generic file fetching/listing is not supported for Vimeo in this project.

Supported Protocols

  • Vimeo API (token-based)

param client_id:

Vimeo API client ID.

type client_id:

str

param client_secret:

Vimeo API client secret.

type client_secret:

str

param access_token:

Vimeo API access token.

type access_token:

str

Examples

Upload a video and get its URI:

from datavizhub.acquisition.vimeo_manager import VimeoManager

vm = VimeoManager(client_id, client_secret, access_token)
vm.connect()
uri = vm.upload_video("/path/to/video.mp4", video_name="My Video")
vm.disconnect()
print(uri)
connect() None[source]

Initialize the Vimeo client using provided credentials.

disconnect() None[source]

Release the Vimeo client reference.

fetch(remote_path: str, local_filename: str | None = None) bool[source]

Fetching from Vimeo is not supported.

Raises:

NotImplementedError – Always raised to indicate downloads are not supported.

list_files(remote_path: str | None = None) Iterable[str] | None[source]

Listing is not implemented for Vimeo.

Returns:

Always returns None.

Return type:

None

update_video(file_path: str, video_uri: str) str[source]

Replace the video file for an existing Vimeo video.

Parameters:
  • file_path (str) – Path to the replacement video file.

  • video_uri (str) – Vimeo video URI (e.g., "/videos/12345").

Returns:

The URI of the updated video.

Return type:

str

Raises:

Exception – If the update fails or the response cannot be interpreted.

update_video_description(video_uri: str, new_description: str) str[source]

Update the description of a Vimeo video.

Parameters:
  • video_uri (str) – Vimeo video URI (e.g., "/videos/12345").

  • new_description (str) – New description text to set.

Returns:

Confirmation message when the update succeeds.

Return type:

str

Raises:

Exception – If the Vimeo API call fails.

upload(local_path: str, remote_path: str) bool[source]

Standardized upload interface mapping to upload_video().

Parameters:
  • local_path (str) – Local video file path.

  • remote_path (str) – Interpreted as the Vimeo video name/title.

Returns:

True if an upload URI was returned.

Return type:

bool

upload_video(file_path: str, video_name: str | None = None) str[source]

Upload a local video to Vimeo.

Parameters:
  • file_path (str) – Path to the local video file.

  • video_name (str, optional) – Optional title to assign to the video.

Returns:

The Vimeo video URI for the uploaded content.

Return type:

str

Raises:

Exception – If the upload fails or the response cannot be interpreted.

Modules

Base interface for data acquisition in DataVizHub.

Defines DataAcquirer plus common exceptions. Concrete managers (FTP, HTTP, S3, Vimeo) implement the acquisition lifecycle: connect, fetch/list, optional upload, and disconnect.

exception datavizhub.acquisition.base.AcquisitionError[source]

Bases: Exception

Base exception for acquisition-related errors.

class datavizhub.acquisition.base.DataAcquirer[source]

Bases: ABC

Abstract interface for acquiring data from remote sources.

This abstract base class defines the minimal contract that any remote data source manager must implement to interoperate with the rest of the library. It standardizes how to connect to a source, fetch a resource to a local destination, enumerate available resources, and cleanly disconnect.

Parameters:
  • source (str) – Conceptual identifier of the remote source. Concrete implementations interpret this as needed (e.g., FTP host, S3 bucket, base URL, API). The value is typically provided through the concrete class constructor.

  • destination (str) – Conceptual identifier for the local destination (e.g., local file path) used by fetch(). This is usually supplied per call via local_filename and not stored on the instance.

Notes

  • Implementations are responsible for their own connection client lifecycle (initialize in connect(), cleanup in disconnect()).

  • list_files() may return None for sources that do not support listing (e.g., generic HTTP URLs).

Examples

Basic pipeline using a concrete manager:

from datavizhub.acquisition.ftp_manager import FTPManager

acq = FTPManager(host="ftp.example.com", username="anonymous", password="test@test.com")
acq.connect()
# Optional: enumerate files under a directory on the server
for name in (acq.list_files("/pub") or []):
    print(name)
# Fetch a file to the current working directory
acq.fetch("/pub/data/file.txt", local_filename="file.txt")
acq.disconnect()

Selecting a manager dynamically by source type:

def get_manager(config):
    if config["type"] == "s3":
        from datavizhub.acquisition.s3_manager import S3Manager
        return S3Manager(config["access_key"], config["secret_key"], config["bucket"])
    elif config["type"] == "ftp":
        from datavizhub.acquisition.ftp_manager import FTPManager
        return FTPManager(config["host"], config.get("port", 21), config["user"], config["pass"])
    else:
        from datavizhub.acquisition.http_manager import HTTPHandler
        return HTTPHandler()

mgr = get_manager(cfg)
mgr.connect()
mgr.fetch(cfg["remote"], cfg.get("local"))
mgr.disconnect()

Using as a context manager:

from datavizhub.acquisition.ftp_manager import FTPManager

with FTPManager(host="ftp.example.com") as mgr:
    mgr.fetch("/pub/data/file.txt", local_filename="file.txt")
property capabilities: set[str]

Set of capability strings (e.g., {‘fetch’,’upload’,’list’}).

abstract connect() None[source]

Establish a connection or initialize the client as needed.

Notes

Implementations should set up any underlying network clients or authenticate to remote services. This method should be idempotent and safe to call multiple times.

property connected: bool

Whether the manager considers itself connected.

delete(remote_path: str) bool[source]

Delete a remote resource if supported.

Notes

Default implementation is not supported. Subclasses may override.

Raises:

NotSupportedError – Always raised for the default implementation.

abstract disconnect() None[source]

Tear down the connection or client resources if applicable.

Notes

Implementations should release sockets/clients and clear references so that instances are reusable or can be garbage-collected cleanly.

exists(remote_path: str) bool[source]

Check whether a remote path exists.

Notes

Default implementation is not supported. Subclasses may override.

Raises:

NotSupportedError – Always raised for the default implementation.

abstract fetch(remote_path: str, local_filename: str | None = None) bool[source]

Fetch a remote resource to a local file.

Parameters:
  • remote_path (str) – Remote identifier (e.g., URL, S3 key, FTP path) of the resource to download.

  • local_filename (str, optional) – Local destination filename or path. If omitted, implementations may infer a name from remote_path.

Returns:

True on successful fetch, False on failure.

Return type:

bool

fetch_many(items: Iterable[str], dest_dir: str) list[Tuple[str, bool]][source]

Fetch multiple remote resources to a destination directory.

Parameters:
  • items (Iterable[str]) – Collection of remote paths/identifiers to fetch.

  • dest_dir (str) – Local directory where files will be written.

Returns:

A list of (remote_path, success) tuples.

Return type:

list of (str, bool)

abstract list_files(remote_path: str | None = None) Iterable[str] | None[source]

List files or resources available at the remote path.

Parameters:

remote_path (str, optional) – Remote path, prefix, or locator to enumerate. If omitted, the implementation may list a default location (e.g., current directory for FTP or entire bucket/prefix for S3).

Returns:

Iterable of resource names/keys/paths. May return None if the operation is not supported by the source (e.g., HTTP URLs).

Return type:

Iterable of str or None

stat(remote_path: str)[source]

Return remote metadata if supported.

Notes

Default implementation is not supported. Subclasses may override.

Raises:

NotSupportedError – Always raised for the default implementation.

abstract upload(local_path: str, remote_path: str) bool[source]

Upload a local resource to the remote destination.

Parameters:
  • local_path (str) – Local filesystem path of the resource to upload.

  • remote_path (str) – Remote destination identifier (e.g., FTP path, S3 key).

Returns:

True on success, False on failure.

Return type:

bool

exception datavizhub.acquisition.base.NotSupportedError[source]

Bases: AcquisitionError

Raised when an operation is not supported by a manager.

FTP data acquisition manager.

Implements DataAcquirer for FTP servers with support for listing, fetching, and uploading files. Adds optional helpers for GRIB .idx subsetting and FTP byte-range downloads via REST.

Advanced Features

  • get_size(path): use SIZE to return remote size in bytes.

  • get_idx_lines(path, *, write_to=None, timeout=30, max_retries=3): fetch and parse a GRIB .idx (appends .idx unless explicit).

  • idx_to_byteranges(lines, search_regex): regex to Range headers.

  • get_chunks(path, chunk_size=500MB): compute contiguous ranges.

  • download_byteranges(path, byte_ranges, *, max_workers=10, timeout=30): parallel range downloads using one short-lived FTP connection per range to ensure thread safety; concatenates the results in input order.

class datavizhub.acquisition.ftp_manager.FTPManager(host: str, port: int = 21, username: str = 'anonymous', password: str = 'test@test.com', timeout: int = 30)[source]

Bases: DataAcquirer

Acquire files from FTP servers using passive mode.

This manager wraps Python’s ftplib to provide reliable FTP interactions including connecting, listing directories, and downloading files. It standardizes the acquisition interface via DataAcquirer and preserves the original convenience methods used elsewhere in the project.

Supported Protocols

  • ftp://

param host:

FTP server hostname or IP address.

type host:

str

param port:

FTP server port.

type port:

int, default=21

param username:

Username for authentication.

type username:

str, default=”anonymous”

param password:

Password for authentication.

type password:

str, default=”test@test.com

param timeout:

Socket timeout in seconds.

type timeout:

int, default=30

Examples

Download a file from an FTP directory:

from datavizhub.acquisition.ftp_manager import FTPManager

ftp = FTPManager("ftp.example.com")
ftp.connect()
ftp.fetch("/pub/some/file.txt", "file.txt")
ftp.disconnect()
CAPABILITIES = {'fetch', 'list', 'upload'}
connect() None[source]

Connect to the FTP server and enable passive mode.

Raises:

Exception – Propagates any underlying connection or authentication failure.

delete(remote_path: str) bool[source]

Delete a remote file if possible.

Parameters:

remote_path (str) – Path to the remote file (may include directories).

delete_empty_files(dir_path: Path) None[source]

Delete zero-byte files in a directory.

Parameters:

dir_path (pathlib.Path) – Directory to scan for empty files.

disconnect() None[source]

Close the FTP session if connected.

download_byteranges(remote_path: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes[source]

Download multiple byte ranges using parallel FTP connections.

Notes

  • Spawns one short-lived FTP connection per range (per thread) to maintain thread safety across requests; this incurs connection overhead but avoids shared-socket issues in ftplib.

  • Uses REST to position the transfer at the desired start byte; stops reading after the requested range length. Results are concatenated in the input order.

download_file(remote_file_path: str, local_file_path: str) bool[source]

Download a single file via FTP with retries.

Parameters:
  • remote_file_path (str) – Remote file path (may include directories).

  • local_file_path (str) – Local destination path including filename.

Returns:

True if downloaded and non-zero in size; False otherwise.

Return type:

bool

Raises:
  • FileNotFoundError – If the remote file does not exist.

  • Exception – If the final attempt fails for any other reason.

exists(remote_path: str) bool[source]

Return True if the remote file exists on the FTP server.

Parameters:

remote_path (str) – Path to the remote file (may include directories).

fetch(remote_path: str, local_filename: str | None = None) bool[source]

Download a remote file to a local path.

Parameters:
  • remote_path (str) – Full remote file path (may include directories).

  • local_filename (str, optional) – Local destination path. Defaults to basename of remote_path.

Returns:

True on success, False otherwise.

Return type:

bool

ftp: FTP | None
get_chunks(remote_path: str, chunk_size: int = 524288000) list[str][source]
get_idx_lines(remote_path: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3) list[str] | None[source]

Fetch and parse the GRIB .idx for a remote path.

Appends .idx to remote_path unless an explicit .idx is provided.

get_size(remote_path: str) int | None[source]

Return the size in bytes for a remote file using SIZE.

idx_to_byteranges(lines: list[str], search_str: str) dict[str, str][source]
list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None[source]

List names at the given remote path and optionally filter by regex.

Parameters:
  • remote_path (str, optional) – Remote directory to list. Defaults to current server directory.

  • pattern (str, optional) – Regular expression applied to names returned by NLST (via re.search()).

Returns:

Filenames present in the directory, or None on error.

Return type:

list of str or None

stat(remote_path: str)[source]

Return minimal metadata for a remote file (size in bytes).

Parameters:

remote_path (str) – Path to the remote file (may include directories).

Returns:

A mapping with {"size": int} if available; None on error.

Return type:

dict or None

sync_ftp_directory(remote_dir: str, local_dir: str, dataset_period: str) None[source]

Synchronize a remote directory to local storage by date range.

Parameters:
  • remote_dir (str) – Remote FTP directory to synchronize.

  • local_dir (str) – Local directory to mirror files into.

  • dataset_period (str) – Period spec parsable by DateManager (e.g., "7d", "24h").

Notes

  • Downloads files within the date range not present locally or with size 0.

  • Deletes local files no longer present remotely.

upload(local_path: str, remote_path: str) bool[source]

Standardized upload implementation delegating to upload_file().

Parameters:
  • local_path (str) – Local file path to upload.

  • remote_path (str) – Remote destination path.

Returns:

True on success.

Return type:

bool

upload_file(local_file_path: str, remote_file_path: str) None[source]

Upload a local file to the FTP server with retries.

Parameters:
  • local_file_path (str) – Local file path to upload.

  • remote_file_path (str) – Remote path including target filename.

Raises:

Exception – When the final attempt fails to upload the file.

HTTP data acquisition handler.

Provides a minimal DataAcquirer for HTTP GET downloads, plus optional helpers for content size queries, GRIB .idx subsetting, byte-range downloads, and best-effort listing via anchor scraping.

Advanced Features

  • get_size(url): return Content-Length from a HEAD if provided.

  • get_idx_lines(url, *, write_to=None, timeout=30, max_retries=3): fetch and parse .idx (appends .idx unless explicit).

  • idx_to_byteranges(lines, search_regex): regex-based selection of ranges.

  • get_chunks(url, chunk_size=500MB): compute contiguous ranges.

  • download_byteranges(url, byte_ranges, *, max_workers=10, timeout=30): parallel ranged GETs, concatenated in order.

  • list_files(url, pattern=None): scrape anchor tags from directory-style index pages (e.g., NOMADS) and filter with regex if provided.

class datavizhub.acquisition.http_manager.HTTPHandler[source]

Bases: DataAcquirer

CAPABILITIES = {'fetch'}

Acquire files over HTTP/HTTPS.

This lightweight manager performs simple HTTP(S) GETs to fetch remote resources to the local filesystem. Because HTTP is stateless for these operations, connect() and disconnect() are no-ops.

Supported Protocols

  • http://

  • https://

Examples

Download a file via HTTPS:

from datavizhub.acquisition.http_manager import HTTPHandler

http = HTTPHandler()
http.connect()  # no-op
http.fetch("https://example.com/data.json", "data.json")
http.disconnect()  # no-op
connect() None[source]

Initialize the handler (no persistent connection).

Notes

Provided for API parity; does nothing for basic HTTP GETs.

disconnect() None[source]

No persistent connection to tear down.

download_byteranges(url: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes[source]

Parallel ranged downloads concatenated in the order of byte_ranges.

Parameters:
  • url (str) – Target URL for the ranged GET requests.

  • byte_ranges (Iterable[str]) – Iterable of Range header values, e.g., "bytes=0-99".

  • max_workers (int, default=10) – Number of worker threads for parallelism.

  • timeout (int, default=30) – Per-request timeout (seconds).

static download_file(url: str, filename: str) None[source]

Compatibility helper that downloads a file.

Parameters:
  • url (str) – File URL to download.

  • filename (str) – Local destination path.

fetch(remote_path: str, local_filename: str | None = None) bool[source]

Download content at remote_path to local_filename.

Parameters:
  • remote_path (str) – Full HTTP(S) URL to download.

  • local_filename (str, optional) – Local destination path. Defaults to the basename of the URL.

Returns:

True on success, False if request fails.

Return type:

bool

static fetch_data(url: str)[source]

Fetch binary payload via GET.

Parameters:

url (str) – URL to request.

Returns:

Raw response body on success, otherwise None.

Return type:

bytes or None

static fetch_headers(url: str)[source]

Perform a HEAD request and return headers.

Parameters:

url (str) – URL to request.

Returns:

Response headers on success, otherwise None.

Return type:

Mapping or None

static fetch_json(url: str)[source]

Fetch JSON content via GET and parse it.

Parameters:

url (str) – URL to request.

Returns:

Parsed JSON object on success, otherwise None.

Return type:

dict or list or None

static fetch_text(url: str)[source]

Fetch text content via GET.

Parameters:

url (str) – URL to request.

Returns:

Text response on success, otherwise None.

Return type:

str or None

get_chunks(url: str, chunk_size: int = 524288000) list[str][source]
get_idx_lines(url: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3) list[str] | None[source]

Fetch and parse the GRIB index (.idx) for a URL.

Appends .idx to url unless an explicit .idx path is provided. Retries are applied on transient failures.

get_size(url: str) int | None[source]

Return the Content-Length from a HEAD request if provided.

idx_to_byteranges(lines: list[str], search_str: str) dict[str, str][source]
list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None[source]

Attempt to list files by scraping anchor tags from an index page.

This is best-effort and intended for directory-style endpoints such as NOMADS listings. If the page is not HTML or contains no anchors, an empty list is returned.

Parameters:
  • remote_path (str) – Page URL to scrape for anchors.

  • pattern (str, optional) – Regular expression applied to full URLs (via re.search()).

static post_data(url: str, data, headers=None)[source]

Send a POST request and return the body.

Parameters:
  • url (str) – URL to post to.

  • data (Any) – Request payload.

  • headers (dict, optional) – Optional request headers.

Returns:

Response text on success, otherwise None.

Return type:

str or None

upload(local_path: str, remote_path: str) bool[source]

Uploading is not supported for HTTPHandler.

Raises:

NotSupportedError – Always raised to indicate upload is unsupported.

Amazon S3 data acquisition manager using boto3.

Implements DataAcquirer for S3 buckets with listing, fetching, and uploading support. Includes optional advanced helpers for GRIB workflows and large file transfers.

Advanced Features

  • get_size(key): return object size (bytes) via head_object.

  • get_idx_lines(key, *, write_to=None, timeout=30, max_retries=3): fetch and parse a GRIB .idx file. Appends .idx to key unless an explicit .idx path is provided. Optionally writes the idx to disk.

  • idx_to_byteranges(lines, search_regex): build HTTP Range strings from .idx content using a regex filter.

  • get_chunks(key, chunk_size=500MB): compute contiguous byte ranges, using an inclusive final end byte (NODD style).

  • download_byteranges(key, byte_ranges, *, max_workers=10, timeout=30): download multiple ranges in parallel and return concatenated bytes.

  • list_files(prefix=None, pattern=None): list keys with optional regex filtering applied to full keys.

class datavizhub.acquisition.s3_manager.S3Manager(access_key: str | None, secret_key: str | None, bucket_name: str, unsigned: bool = False, region_name: str | None = None)[source]

Bases: DataAcquirer

CAPABILITIES = {'fetch', 'list', 'upload'}

Acquire objects from Amazon S3 buckets via boto3.

This manager wraps boto3’s S3 client to standardize connecting, listing, and fetching S3 objects using the acquisition interface.

Supported Protocols

  • s3:// (buckets and keys)

param access_key:

AWS access key ID. Optional for public buckets or when using IAM roles.

type access_key:

str, optional

param secret_key:

AWS secret access key.

type secret_key:

str, optional

param bucket_name:

Default S3 bucket to operate on.

type bucket_name:

str

param unsigned:

Disable request signing for public buckets using botocore.config.Config(signature_version=UNSIGNED).

type unsigned:

bool, default=False

param region_name:

AWS region for the client. If omitted, botocore defaults apply.

type region_name:

str, optional

Examples

Download a key to a local file:

from datavizhub.acquisition.s3_manager import S3Manager

s3 = S3Manager("AKIA...", "SECRET...", "my-bucket")
s3.connect()
s3.fetch("path/to/object.nc", "object.nc")
s3.disconnect()

Public bucket access (unsigned):

from datavizhub.acquisition.s3_manager import S3Manager
s3 = S3Manager(None, None, bucket_name="noaa-hrrr-bdp-pds", unsigned=True)
lines = s3.get_idx_lines("hrrr.20230801/conus/hrrr.t00z.wrfsfcf00.grib2")
ranges = s3.idx_to_byteranges(lines, r"(:TMP:surface|:PRATE:surface)")
blob = s3.download_byteranges("hrrr.20230801/conus/hrrr.t00z.wrfsfcf00.grib2", ranges.keys())
__init__(access_key: str | None, secret_key: str | None, bucket_name: str, unsigned: bool = False, region_name: str | None = None) None[source]

Initialize the S3 manager.

Parameters:
  • access_key (str, optional) – AWS access key ID. Optional for public buckets or when using IAM roles.

  • secret_key (str, optional) – AWS secret access key.

  • bucket_name (str) – Default S3 bucket to operate on.

  • unsigned (bool, default=False) – When True, disable request signing for public buckets using botocore.config.Config(signature_version=UNSIGNED).

  • region_name (str, optional) – AWS region for the client. If omitted, botocore’s default resolution is used.

connect() None[source]

Create an S3 client using the provided credentials.

Raises:
  • NoCredentialsError – When credentials are not available or invalid.

  • botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError – On other client initialization failures.

delete(remote_path: str) bool[source]

Delete an object from the bucket.

disconnect() None[source]

Release the client reference.

Notes

boto3 clients do not require explicit shutdown. Setting the reference to None allows the instance to be reused or garbage-collected.

download_byteranges(key: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes[source]

Download multiple byte ranges from an object and concatenate results.

Parameters:
  • key (str) – Object key within the configured bucket.

  • byte_ranges (Iterable[str]) – Iterable of Range header values (e.g., "bytes=0-99").

  • max_workers (int, default=10) – Maximum parallel workers for ranged requests.

  • timeout (int, default=30) – Timeout per ranged request (seconds).

Returns:

Concatenated payload of the requested ranges, preserving the order of the input byte_ranges.

Return type:

bytes

download_file(file_path: str, local_file_name: str) bool[source]

Compatibility method: download an S3 key.

Parameters:
  • file_path (str) – S3 key to download.

  • local_file_name (str) – Local destination path.

Returns:

True on success, False on failure.

Return type:

bool

exists(remote_path: str) bool[source]

Return True if the object exists in the bucket.

fetch(remote_path: str, local_filename: str | None = None) bool[source]

Download an S3 key to a local file.

Parameters:
  • remote_path (str) – S3 key to download from bucket_name.

  • local_filename (str, optional) – Local destination filename. Defaults to the basename of the key.

Returns:

True on success, False on failure.

Return type:

bool

get_chunks(key: str, chunk_size: int = 524288000) list[str][source]

Compute contiguous chunk ranges for an S3 object.

The final range uses the file size as the inclusive end byte.

get_idx_lines(key: str, *, timeout: int = 30, max_retries: int = 3, write_to: str | None = None) list[str] | None[source]

Fetch and parse the GRIB index (.idx) for key.

Parameters:
  • key (str) – GRIB object key, or an explicit .idx path.

  • write_to (str, optional) – If provided, write the idx text to write_to (appends .idx if not present).

  • timeout (int, default=30) – Per-request timeout (seconds). Included for API consistency.

  • max_retries (int, default=3) – Simple retry count on transient errors.

Notes

Appends .idx to key unless an explicit .idx path is provided.

get_size(key: str) int | None[source]

Return the size in bytes for a given S3 object key.

Returns:

Content length if available, else None.

Return type:

int or None

idx_to_byteranges(lines: list[str], search_str: str) dict[str, str][source]

Wrapper for grib_utils.idx_to_byteranges() using regex filtering.

list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None[source]

List object keys under a prefix in the bucket with optional regex filter.

Parameters:
  • remote_path (str, optional) – Prefix to list. Defaults to all keys in the bucket.

  • pattern (str, optional) – Regular expression applied to full keys using re.search(). If provided, only matching keys are returned.

Returns:

Keys found under the prefix, or None on error.

Return type:

list of str or None

stat(remote_path: str)[source]

Return basic metadata for an object (size, last modified, etag).

upload(local_path: str, remote_path: str) bool[source]

Standardized upload implementation delegating to upload_file().

upload_file(file_path: str, s3_file_name: str) bool[source]

Upload a local file to the configured bucket.

Parameters:
  • file_path (str) – Local file path.

  • s3_file_name (str) – Destination S3 key within bucket_name.

Returns:

True on success, False otherwise.

Return type:

bool

Vimeo upload manager using PyVimeo.

Implements DataAcquirer with upload-only support to Vimeo. Fetching and listing are not supported.

class datavizhub.acquisition.vimeo_manager.VimeoManager(client_id: str, client_secret: str, access_token: str)[source]

Bases: DataAcquirer

CAPABILITIES = {'upload'}

Upload videos to Vimeo using PyVimeo.

This manager encapsulates video uploads and updates via the Vimeo API using PyVimeo. It participates in the acquisition interface for pipeline consistency, though generic file fetching/listing is not supported for Vimeo in this project.

Supported Protocols

  • Vimeo API (token-based)

param client_id:

Vimeo API client ID.

type client_id:

str

param client_secret:

Vimeo API client secret.

type client_secret:

str

param access_token:

Vimeo API access token.

type access_token:

str

Examples

Upload a video and get its URI:

from datavizhub.acquisition.vimeo_manager import VimeoManager

vm = VimeoManager(client_id, client_secret, access_token)
vm.connect()
uri = vm.upload_video("/path/to/video.mp4", video_name="My Video")
vm.disconnect()
print(uri)
connect() None[source]

Initialize the Vimeo client using provided credentials.

disconnect() None[source]

Release the Vimeo client reference.

fetch(remote_path: str, local_filename: str | None = None) bool[source]

Fetching from Vimeo is not supported.

Raises:

NotImplementedError – Always raised to indicate downloads are not supported.

list_files(remote_path: str | None = None) Iterable[str] | None[source]

Listing is not implemented for Vimeo.

Returns:

Always returns None.

Return type:

None

update_video(file_path: str, video_uri: str) str[source]

Replace the video file for an existing Vimeo video.

Parameters:
  • file_path (str) – Path to the replacement video file.

  • video_uri (str) – Vimeo video URI (e.g., "/videos/12345").

Returns:

The URI of the updated video.

Return type:

str

Raises:

Exception – If the update fails or the response cannot be interpreted.

update_video_description(video_uri: str, new_description: str) str[source]

Update the description of a Vimeo video.

Parameters:
  • video_uri (str) – Vimeo video URI (e.g., "/videos/12345").

  • new_description (str) – New description text to set.

Returns:

Confirmation message when the update succeeds.

Return type:

str

Raises:

Exception – If the Vimeo API call fails.

upload(local_path: str, remote_path: str) bool[source]

Standardized upload interface mapping to upload_video().

Parameters:
  • local_path (str) – Local video file path.

  • remote_path (str) – Interpreted as the Vimeo video name/title.

Returns:

True if an upload URI was returned.

Return type:

bool

upload_video(file_path: str, video_name: str | None = None) str[source]

Upload a local video to Vimeo.

Parameters:
  • file_path (str) – Path to the local video file.

  • video_name (str, optional) – Optional title to assign to the video.

Returns:

The Vimeo video URI for the uploaded content.

Return type:

str

Raises:

Exception – If the upload fails or the response cannot be interpreted.

vimeo_client: vimeo.VimeoClient | None