datavizhub.acquisition package¶
Data acquisition managers (FTP, HTTP, S3, Vimeo) and base interface.
Provides DataAcquirer
and concrete managers for fetching and uploading
resources from remote sources. See each manager’s docstrings for usage and
supported capabilities.
Notes
HTTPManager
is provided as a convenience alias toHTTPHandler
for naming consistency across managers. Both refer to the same implementation.
- exception datavizhub.acquisition.AcquisitionError[source]¶
Bases:
Exception
Base exception for acquisition-related errors.
- class datavizhub.acquisition.DataAcquirer[source]¶
Bases:
ABC
Abstract interface for acquiring data from remote sources.
This abstract base class defines the minimal contract that any remote data source manager must implement to interoperate with the rest of the library. It standardizes how to connect to a source, fetch a resource to a local destination, enumerate available resources, and cleanly disconnect.
- Parameters:
source (str) – Conceptual identifier of the remote source. Concrete implementations interpret this as needed (e.g., FTP host, S3 bucket, base URL, API). The value is typically provided through the concrete class constructor.
destination (str) – Conceptual identifier for the local destination (e.g., local file path) used by
fetch()
. This is usually supplied per call vialocal_filename
and not stored on the instance.
Notes
Implementations are responsible for their own connection client lifecycle (initialize in
connect()
, cleanup indisconnect()
).list_files()
may returnNone
for sources that do not support listing (e.g., generic HTTP URLs).
Examples
Basic pipeline using a concrete manager:
from datavizhub.acquisition.ftp_manager import FTPManager acq = FTPManager(host="ftp.example.com", username="anonymous", password="test@test.com") acq.connect() # Optional: enumerate files under a directory on the server for name in (acq.list_files("/pub") or []): print(name) # Fetch a file to the current working directory acq.fetch("/pub/data/file.txt", local_filename="file.txt") acq.disconnect()
Selecting a manager dynamically by source type:
def get_manager(config): if config["type"] == "s3": from datavizhub.acquisition.s3_manager import S3Manager return S3Manager(config["access_key"], config["secret_key"], config["bucket"]) elif config["type"] == "ftp": from datavizhub.acquisition.ftp_manager import FTPManager return FTPManager(config["host"], config.get("port", 21), config["user"], config["pass"]) else: from datavizhub.acquisition.http_manager import HTTPHandler return HTTPHandler() mgr = get_manager(cfg) mgr.connect() mgr.fetch(cfg["remote"], cfg.get("local")) mgr.disconnect()
Using as a context manager:
from datavizhub.acquisition.ftp_manager import FTPManager with FTPManager(host="ftp.example.com") as mgr: mgr.fetch("/pub/data/file.txt", local_filename="file.txt")
- property capabilities: set[str]¶
Set of capability strings (e.g., {‘fetch’,’upload’,’list’}).
- abstract connect() None [source]¶
Establish a connection or initialize the client as needed.
Notes
Implementations should set up any underlying network clients or authenticate to remote services. This method should be idempotent and safe to call multiple times.
- property connected: bool¶
Whether the manager considers itself connected.
- delete(remote_path: str) bool [source]¶
Delete a remote resource if supported.
Notes
Default implementation is not supported. Subclasses may override.
- Raises:
NotSupportedError – Always raised for the default implementation.
- abstract disconnect() None [source]¶
Tear down the connection or client resources if applicable.
Notes
Implementations should release sockets/clients and clear references so that instances are reusable or can be garbage-collected cleanly.
- exists(remote_path: str) bool [source]¶
Check whether a remote path exists.
Notes
Default implementation is not supported. Subclasses may override.
- Raises:
NotSupportedError – Always raised for the default implementation.
- abstract fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Fetch a remote resource to a local file.
- Parameters:
remote_path (str) – Remote identifier (e.g., URL, S3 key, FTP path) of the resource to download.
local_filename (str, optional) – Local destination filename or path. If omitted, implementations may infer a name from
remote_path
.
- Returns:
True
on successful fetch,False
on failure.- Return type:
bool
- fetch_many(items: Iterable[str], dest_dir: str) list[Tuple[str, bool]] [source]¶
Fetch multiple remote resources to a destination directory.
- Parameters:
items (Iterable[str]) – Collection of remote paths/identifiers to fetch.
dest_dir (str) – Local directory where files will be written.
- Returns:
A list of
(remote_path, success)
tuples.- Return type:
list of (str, bool)
- abstract list_files(remote_path: str | None = None) Iterable[str] | None [source]¶
List files or resources available at the remote path.
- Parameters:
remote_path (str, optional) – Remote path, prefix, or locator to enumerate. If omitted, the implementation may list a default location (e.g., current directory for FTP or entire bucket/prefix for S3).
- Returns:
Iterable of resource names/keys/paths. May return
None
if the operation is not supported by the source (e.g., HTTP URLs).- Return type:
Iterable of str or None
- stat(remote_path: str)[source]¶
Return remote metadata if supported.
Notes
Default implementation is not supported. Subclasses may override.
- Raises:
NotSupportedError – Always raised for the default implementation.
- abstract upload(local_path: str, remote_path: str) bool [source]¶
Upload a local resource to the remote destination.
- Parameters:
local_path (str) – Local filesystem path of the resource to upload.
remote_path (str) – Remote destination identifier (e.g., FTP path, S3 key).
- Returns:
True
on success,False
on failure.- Return type:
bool
- class datavizhub.acquisition.FTPManager(host: str, port: int = 21, username: str = 'anonymous', password: str = 'test@test.com', timeout: int = 30)[source]¶
Bases:
DataAcquirer
Acquire files from FTP servers using passive mode.
This manager wraps Python’s
ftplib
to provide reliable FTP interactions including connecting, listing directories, and downloading files. It standardizes the acquisition interface viaDataAcquirer
and preserves the original convenience methods used elsewhere in the project.Supported Protocols¶
ftp://
- param host:
FTP server hostname or IP address.
- type host:
str
- param port:
FTP server port.
- type port:
int, default=21
- param username:
Username for authentication.
- type username:
str, default=”anonymous”
- param password:
Password for authentication.
- type password:
str, default=”test@test.com”
- param timeout:
Socket timeout in seconds.
- type timeout:
int, default=30
Examples
Download a file from an FTP directory:
from datavizhub.acquisition.ftp_manager import FTPManager ftp = FTPManager("ftp.example.com") ftp.connect() ftp.fetch("/pub/some/file.txt", "file.txt") ftp.disconnect()
- CAPABILITIES = {'fetch', 'list', 'upload'}¶
- connect() None [source]¶
Connect to the FTP server and enable passive mode.
- Raises:
Exception – Propagates any underlying connection or authentication failure.
- delete(remote_path: str) bool [source]¶
Delete a remote file if possible.
- Parameters:
remote_path (str) – Path to the remote file (may include directories).
- delete_empty_files(dir_path: Path) None [source]¶
Delete zero-byte files in a directory.
- Parameters:
dir_path (pathlib.Path) – Directory to scan for empty files.
- download_byteranges(remote_path: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes [source]¶
Download multiple byte ranges using parallel FTP connections.
Notes
Spawns one short-lived FTP connection per range (per thread) to maintain thread safety across requests; this incurs connection overhead but avoids shared-socket issues in
ftplib
.Uses
REST
to position the transfer at the desired start byte; stops reading after the requested range length. Results are concatenated in the input order.
- download_file(remote_file_path: str, local_file_path: str) bool [source]¶
Download a single file via FTP with retries.
- Parameters:
remote_file_path (str) – Remote file path (may include directories).
local_file_path (str) – Local destination path including filename.
- Returns:
True
if downloaded and non-zero in size;False
otherwise.- Return type:
bool
- Raises:
FileNotFoundError – If the remote file does not exist.
Exception – If the final attempt fails for any other reason.
- exists(remote_path: str) bool [source]¶
Return True if the remote file exists on the FTP server.
- Parameters:
remote_path (str) – Path to the remote file (may include directories).
- fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Download a remote file to a local path.
- Parameters:
remote_path (str) – Full remote file path (may include directories).
local_filename (str, optional) – Local destination path. Defaults to basename of
remote_path
.
- Returns:
True
on success,False
otherwise.- Return type:
bool
- get_idx_lines(remote_path: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3) list[str] | None [source]¶
Fetch and parse the GRIB
.idx
for a remote path.Appends
.idx
toremote_path
unless an explicit.idx
is provided.
- get_size(remote_path: str) int | None [source]¶
Return the size in bytes for a remote file using
SIZE
.
- list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None [source]¶
List names at the given remote path and optionally filter by regex.
- Parameters:
remote_path (str, optional) – Remote directory to list. Defaults to current server directory.
pattern (str, optional) – Regular expression applied to names returned by
NLST
(viare.search()
).
- Returns:
Filenames present in the directory, or
None
on error.- Return type:
list of str or None
- stat(remote_path: str)[source]¶
Return minimal metadata for a remote file (size in bytes).
- Parameters:
remote_path (str) – Path to the remote file (may include directories).
- Returns:
A mapping with
{"size": int}
if available;None
on error.- Return type:
dict or None
- sync_ftp_directory(remote_dir: str, local_dir: str, dataset_period: str) None [source]¶
Synchronize a remote directory to local storage by date range.
- Parameters:
remote_dir (str) – Remote FTP directory to synchronize.
local_dir (str) – Local directory to mirror files into.
dataset_period (str) – Period spec parsable by
DateManager
(e.g.,"7d"
,"24h"
).
Notes
Downloads files within the date range not present locally or with size 0.
Deletes local files no longer present remotely.
- upload(local_path: str, remote_path: str) bool [source]¶
Standardized upload implementation delegating to
upload_file()
.- Parameters:
local_path (str) – Local file path to upload.
remote_path (str) – Remote destination path.
- Returns:
True
on success.- Return type:
bool
- upload_file(local_file_path: str, remote_file_path: str) None [source]¶
Upload a local file to the FTP server with retries.
- Parameters:
local_file_path (str) – Local file path to upload.
remote_file_path (str) – Remote path including target filename.
- Raises:
Exception – When the final attempt fails to upload the file.
- class datavizhub.acquisition.HTTPHandler[source]¶
Bases:
DataAcquirer
- CAPABILITIES = {'fetch'}¶
Acquire files over HTTP/HTTPS.
This lightweight manager performs simple HTTP(S) GETs to fetch remote resources to the local filesystem. Because HTTP is stateless for these operations,
connect()
anddisconnect()
are no-ops.Supported Protocols¶
http://
https://
Examples
Download a file via HTTPS:
from datavizhub.acquisition.http_manager import HTTPHandler http = HTTPHandler() http.connect() # no-op http.fetch("https://example.com/data.json", "data.json") http.disconnect() # no-op
- connect() None [source]¶
Initialize the handler (no persistent connection).
Notes
Provided for API parity; does nothing for basic HTTP GETs.
- download_byteranges(url: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes [source]¶
Parallel ranged downloads concatenated in the order of
byte_ranges
.- Parameters:
url (str) – Target URL for the ranged GET requests.
byte_ranges (Iterable[str]) – Iterable of Range header values, e.g.,
"bytes=0-99"
.max_workers (int, default=10) – Number of worker threads for parallelism.
timeout (int, default=30) – Per-request timeout (seconds).
- static download_file(url: str, filename: str) None [source]¶
Compatibility helper that downloads a file.
- Parameters:
url (str) – File URL to download.
filename (str) – Local destination path.
- fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Download content at
remote_path
tolocal_filename
.- Parameters:
remote_path (str) – Full HTTP(S) URL to download.
local_filename (str, optional) – Local destination path. Defaults to the basename of the URL.
- Returns:
True
on success,False
if request fails.- Return type:
bool
- static fetch_data(url: str)[source]¶
Fetch binary payload via GET.
- Parameters:
url (str) – URL to request.
- Returns:
Raw response body on success, otherwise
None
.- Return type:
bytes or None
- static fetch_headers(url: str)[source]¶
Perform a HEAD request and return headers.
- Parameters:
url (str) – URL to request.
- Returns:
Response headers on success, otherwise
None
.- Return type:
Mapping or None
- static fetch_json(url: str)[source]¶
Fetch JSON content via GET and parse it.
- Parameters:
url (str) – URL to request.
- Returns:
Parsed JSON object on success, otherwise
None
.- Return type:
dict or list or None
- static fetch_text(url: str)[source]¶
Fetch text content via GET.
- Parameters:
url (str) – URL to request.
- Returns:
Text response on success, otherwise
None
.- Return type:
str or None
- get_idx_lines(url: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3) list[str] | None [source]¶
Fetch and parse the GRIB index (
.idx
) for a URL.Appends
.idx
tourl
unless an explicit.idx
path is provided. Retries are applied on transient failures.
- list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None [source]¶
Attempt to list files by scraping anchor tags from an index page.
This is best-effort and intended for directory-style endpoints such as NOMADS listings. If the page is not HTML or contains no anchors, an empty list is returned.
- Parameters:
remote_path (str) – Page URL to scrape for anchors.
pattern (str, optional) – Regular expression applied to full URLs (via
re.search()
).
- static post_data(url: str, data, headers=None)[source]¶
Send a POST request and return the body.
- Parameters:
url (str) – URL to post to.
data (Any) – Request payload.
headers (dict, optional) – Optional request headers.
- Returns:
Response text on success, otherwise
None
.- Return type:
str or None
- upload(local_path: str, remote_path: str) bool [source]¶
Uploading is not supported for HTTPHandler.
- Raises:
NotSupportedError – Always raised to indicate upload is unsupported.
- datavizhub.acquisition.HTTPManager¶
alias of
HTTPHandler
- exception datavizhub.acquisition.NotSupportedError[source]¶
Bases:
AcquisitionError
Raised when an operation is not supported by a manager.
- class datavizhub.acquisition.S3Manager(access_key: str | None, secret_key: str | None, bucket_name: str, unsigned: bool = False, region_name: str | None = None)[source]¶
Bases:
DataAcquirer
- CAPABILITIES = {'fetch', 'list', 'upload'}¶
Acquire objects from Amazon S3 buckets via boto3.
This manager wraps
boto3
’s S3 client to standardize connecting, listing, and fetching S3 objects using the acquisition interface.Supported Protocols¶
s3://
(buckets and keys)
- param access_key:
AWS access key ID. Optional for public buckets or when using IAM roles.
- type access_key:
str, optional
- param secret_key:
AWS secret access key.
- type secret_key:
str, optional
- param bucket_name:
Default S3 bucket to operate on.
- type bucket_name:
str
- param unsigned:
Disable request signing for public buckets using
botocore.config.Config(signature_version=UNSIGNED)
.- type unsigned:
bool, default=False
- param region_name:
AWS region for the client. If omitted, botocore defaults apply.
- type region_name:
str, optional
Examples
Download a key to a local file:
from datavizhub.acquisition.s3_manager import S3Manager s3 = S3Manager("AKIA...", "SECRET...", "my-bucket") s3.connect() s3.fetch("path/to/object.nc", "object.nc") s3.disconnect()
Public bucket access (unsigned):
from datavizhub.acquisition.s3_manager import S3Manager s3 = S3Manager(None, None, bucket_name="noaa-hrrr-bdp-pds", unsigned=True) lines = s3.get_idx_lines("hrrr.20230801/conus/hrrr.t00z.wrfsfcf00.grib2") ranges = s3.idx_to_byteranges(lines, r"(:TMP:surface|:PRATE:surface)") blob = s3.download_byteranges("hrrr.20230801/conus/hrrr.t00z.wrfsfcf00.grib2", ranges.keys())
- __init__(access_key: str | None, secret_key: str | None, bucket_name: str, unsigned: bool = False, region_name: str | None = None) None [source]¶
Initialize the S3 manager.
- Parameters:
access_key (str, optional) – AWS access key ID. Optional for public buckets or when using IAM roles.
secret_key (str, optional) – AWS secret access key.
bucket_name (str) – Default S3 bucket to operate on.
unsigned (bool, default=False) – When True, disable request signing for public buckets using
botocore.config.Config(signature_version=UNSIGNED)
.region_name (str, optional) – AWS region for the client. If omitted, botocore’s default resolution is used.
- connect() None [source]¶
Create an S3 client using the provided credentials.
- Raises:
NoCredentialsError – When credentials are not available or invalid.
botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError – On other client initialization failures.
- disconnect() None [source]¶
Release the client reference.
Notes
boto3 clients do not require explicit shutdown. Setting the reference to
None
allows the instance to be reused or garbage-collected.
- download_byteranges(key: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes [source]¶
Download multiple byte ranges from an object and concatenate results.
- Parameters:
key (str) – Object key within the configured bucket.
byte_ranges (Iterable[str]) – Iterable of Range header values (e.g.,
"bytes=0-99"
).max_workers (int, default=10) – Maximum parallel workers for ranged requests.
timeout (int, default=30) – Timeout per ranged request (seconds).
- Returns:
Concatenated payload of the requested ranges, preserving the order of the input
byte_ranges
.- Return type:
bytes
- download_file(file_path: str, local_file_name: str) bool [source]¶
Compatibility method: download an S3 key.
- Parameters:
file_path (str) – S3 key to download.
local_file_name (str) – Local destination path.
- Returns:
True
on success,False
on failure.- Return type:
bool
- fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Download an S3 key to a local file.
- Parameters:
remote_path (str) – S3 key to download from
bucket_name
.local_filename (str, optional) – Local destination filename. Defaults to the basename of the key.
- Returns:
True
on success,False
on failure.- Return type:
bool
- get_chunks(key: str, chunk_size: int = 524288000) list[str] [source]¶
Compute contiguous chunk ranges for an S3 object.
The final range uses the file size as the inclusive end byte.
- get_idx_lines(key: str, *, timeout: int = 30, max_retries: int = 3, write_to: str | None = None) list[str] | None [source]¶
Fetch and parse the GRIB index (.idx) for
key
.- Parameters:
key (str) – GRIB object key, or an explicit
.idx
path.write_to (str, optional) – If provided, write the idx text to
write_to
(appends.idx
if not present).timeout (int, default=30) – Per-request timeout (seconds). Included for API consistency.
max_retries (int, default=3) – Simple retry count on transient errors.
Notes
Appends
.idx
tokey
unless an explicit.idx
path is provided.
- get_size(key: str) int | None [source]¶
Return the size in bytes for a given S3 object key.
- Returns:
Content length if available, else
None
.- Return type:
int or None
- idx_to_byteranges(lines: list[str], search_str: str) dict[str, str] [source]¶
Wrapper for
grib_utils.idx_to_byteranges()
using regex filtering.
- list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None [source]¶
List object keys under a prefix in the bucket with optional regex filter.
- Parameters:
remote_path (str, optional) – Prefix to list. Defaults to all keys in the bucket.
pattern (str, optional) – Regular expression applied to full keys using
re.search()
. If provided, only matching keys are returned.
- Returns:
Keys found under the prefix, or
None
on error.- Return type:
list of str or None
- upload(local_path: str, remote_path: str) bool [source]¶
Standardized upload implementation delegating to
upload_file()
.
- class datavizhub.acquisition.VimeoManager(client_id: str, client_secret: str, access_token: str)[source]¶
Bases:
DataAcquirer
- CAPABILITIES = {'upload'}¶
Upload videos to Vimeo using PyVimeo.
This manager encapsulates video uploads and updates via the Vimeo API using
PyVimeo
. It participates in the acquisition interface for pipeline consistency, though generic file fetching/listing is not supported for Vimeo in this project.Supported Protocols¶
Vimeo API (token-based)
- param client_id:
Vimeo API client ID.
- type client_id:
str
- param client_secret:
Vimeo API client secret.
- type client_secret:
str
- param access_token:
Vimeo API access token.
- type access_token:
str
Examples
Upload a video and get its URI:
from datavizhub.acquisition.vimeo_manager import VimeoManager vm = VimeoManager(client_id, client_secret, access_token) vm.connect() uri = vm.upload_video("/path/to/video.mp4", video_name="My Video") vm.disconnect() print(uri)
- fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Fetching from Vimeo is not supported.
- Raises:
NotImplementedError – Always raised to indicate downloads are not supported.
- list_files(remote_path: str | None = None) Iterable[str] | None [source]¶
Listing is not implemented for Vimeo.
- Returns:
Always returns
None
.- Return type:
None
- update_video(file_path: str, video_uri: str) str [source]¶
Replace the video file for an existing Vimeo video.
- Parameters:
file_path (str) – Path to the replacement video file.
video_uri (str) – Vimeo video URI (e.g.,
"/videos/12345"
).
- Returns:
The URI of the updated video.
- Return type:
str
- Raises:
Exception – If the update fails or the response cannot be interpreted.
- update_video_description(video_uri: str, new_description: str) str [source]¶
Update the description of a Vimeo video.
- Parameters:
video_uri (str) – Vimeo video URI (e.g.,
"/videos/12345"
).new_description (str) – New description text to set.
- Returns:
Confirmation message when the update succeeds.
- Return type:
str
- Raises:
Exception – If the Vimeo API call fails.
- upload(local_path: str, remote_path: str) bool [source]¶
Standardized upload interface mapping to
upload_video()
.- Parameters:
local_path (str) – Local video file path.
remote_path (str) – Interpreted as the Vimeo video name/title.
- Returns:
True
if an upload URI was returned.- Return type:
bool
- upload_video(file_path: str, video_name: str | None = None) str [source]¶
Upload a local video to Vimeo.
- Parameters:
file_path (str) – Path to the local video file.
video_name (str, optional) – Optional title to assign to the video.
- Returns:
The Vimeo video URI for the uploaded content.
- Return type:
str
- Raises:
Exception – If the upload fails or the response cannot be interpreted.
Modules¶
Base interface for data acquisition in DataVizHub.
Defines DataAcquirer
plus common exceptions. Concrete managers (FTP,
HTTP, S3, Vimeo) implement the acquisition lifecycle: connect, fetch/list,
optional upload, and disconnect.
- exception datavizhub.acquisition.base.AcquisitionError[source]¶
Bases:
Exception
Base exception for acquisition-related errors.
- class datavizhub.acquisition.base.DataAcquirer[source]¶
Bases:
ABC
Abstract interface for acquiring data from remote sources.
This abstract base class defines the minimal contract that any remote data source manager must implement to interoperate with the rest of the library. It standardizes how to connect to a source, fetch a resource to a local destination, enumerate available resources, and cleanly disconnect.
- Parameters:
source (str) – Conceptual identifier of the remote source. Concrete implementations interpret this as needed (e.g., FTP host, S3 bucket, base URL, API). The value is typically provided through the concrete class constructor.
destination (str) – Conceptual identifier for the local destination (e.g., local file path) used by
fetch()
. This is usually supplied per call vialocal_filename
and not stored on the instance.
Notes
Implementations are responsible for their own connection client lifecycle (initialize in
connect()
, cleanup indisconnect()
).list_files()
may returnNone
for sources that do not support listing (e.g., generic HTTP URLs).
Examples
Basic pipeline using a concrete manager:
from datavizhub.acquisition.ftp_manager import FTPManager acq = FTPManager(host="ftp.example.com", username="anonymous", password="test@test.com") acq.connect() # Optional: enumerate files under a directory on the server for name in (acq.list_files("/pub") or []): print(name) # Fetch a file to the current working directory acq.fetch("/pub/data/file.txt", local_filename="file.txt") acq.disconnect()
Selecting a manager dynamically by source type:
def get_manager(config): if config["type"] == "s3": from datavizhub.acquisition.s3_manager import S3Manager return S3Manager(config["access_key"], config["secret_key"], config["bucket"]) elif config["type"] == "ftp": from datavizhub.acquisition.ftp_manager import FTPManager return FTPManager(config["host"], config.get("port", 21), config["user"], config["pass"]) else: from datavizhub.acquisition.http_manager import HTTPHandler return HTTPHandler() mgr = get_manager(cfg) mgr.connect() mgr.fetch(cfg["remote"], cfg.get("local")) mgr.disconnect()
Using as a context manager:
from datavizhub.acquisition.ftp_manager import FTPManager with FTPManager(host="ftp.example.com") as mgr: mgr.fetch("/pub/data/file.txt", local_filename="file.txt")
- property capabilities: set[str]¶
Set of capability strings (e.g., {‘fetch’,’upload’,’list’}).
- abstract connect() None [source]¶
Establish a connection or initialize the client as needed.
Notes
Implementations should set up any underlying network clients or authenticate to remote services. This method should be idempotent and safe to call multiple times.
- property connected: bool¶
Whether the manager considers itself connected.
- delete(remote_path: str) bool [source]¶
Delete a remote resource if supported.
Notes
Default implementation is not supported. Subclasses may override.
- Raises:
NotSupportedError – Always raised for the default implementation.
- abstract disconnect() None [source]¶
Tear down the connection or client resources if applicable.
Notes
Implementations should release sockets/clients and clear references so that instances are reusable or can be garbage-collected cleanly.
- exists(remote_path: str) bool [source]¶
Check whether a remote path exists.
Notes
Default implementation is not supported. Subclasses may override.
- Raises:
NotSupportedError – Always raised for the default implementation.
- abstract fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Fetch a remote resource to a local file.
- Parameters:
remote_path (str) – Remote identifier (e.g., URL, S3 key, FTP path) of the resource to download.
local_filename (str, optional) – Local destination filename or path. If omitted, implementations may infer a name from
remote_path
.
- Returns:
True
on successful fetch,False
on failure.- Return type:
bool
- fetch_many(items: Iterable[str], dest_dir: str) list[Tuple[str, bool]] [source]¶
Fetch multiple remote resources to a destination directory.
- Parameters:
items (Iterable[str]) – Collection of remote paths/identifiers to fetch.
dest_dir (str) – Local directory where files will be written.
- Returns:
A list of
(remote_path, success)
tuples.- Return type:
list of (str, bool)
- abstract list_files(remote_path: str | None = None) Iterable[str] | None [source]¶
List files or resources available at the remote path.
- Parameters:
remote_path (str, optional) – Remote path, prefix, or locator to enumerate. If omitted, the implementation may list a default location (e.g., current directory for FTP or entire bucket/prefix for S3).
- Returns:
Iterable of resource names/keys/paths. May return
None
if the operation is not supported by the source (e.g., HTTP URLs).- Return type:
Iterable of str or None
- stat(remote_path: str)[source]¶
Return remote metadata if supported.
Notes
Default implementation is not supported. Subclasses may override.
- Raises:
NotSupportedError – Always raised for the default implementation.
- abstract upload(local_path: str, remote_path: str) bool [source]¶
Upload a local resource to the remote destination.
- Parameters:
local_path (str) – Local filesystem path of the resource to upload.
remote_path (str) – Remote destination identifier (e.g., FTP path, S3 key).
- Returns:
True
on success,False
on failure.- Return type:
bool
- exception datavizhub.acquisition.base.NotSupportedError[source]¶
Bases:
AcquisitionError
Raised when an operation is not supported by a manager.
FTP data acquisition manager.
Implements DataAcquirer
for FTP servers
with support for listing, fetching, and uploading files. Adds optional helpers
for GRIB .idx
subsetting and FTP byte-range downloads via REST
.
Advanced Features¶
get_size(path)
: useSIZE
to return remote size in bytes.get_idx_lines(path, *, write_to=None, timeout=30, max_retries=3)
: fetch and parse a GRIB.idx
(appends.idx
unless explicit).idx_to_byteranges(lines, search_regex)
: regex to Range headers.get_chunks(path, chunk_size=500MB)
: compute contiguous ranges.download_byteranges(path, byte_ranges, *, max_workers=10, timeout=30)
: parallel range downloads using one short-lived FTP connection per range to ensure thread safety; concatenates the results in input order.
- class datavizhub.acquisition.ftp_manager.FTPManager(host: str, port: int = 21, username: str = 'anonymous', password: str = 'test@test.com', timeout: int = 30)[source]¶
Bases:
DataAcquirer
Acquire files from FTP servers using passive mode.
This manager wraps Python’s
ftplib
to provide reliable FTP interactions including connecting, listing directories, and downloading files. It standardizes the acquisition interface viaDataAcquirer
and preserves the original convenience methods used elsewhere in the project.Supported Protocols¶
ftp://
- param host:
FTP server hostname or IP address.
- type host:
str
- param port:
FTP server port.
- type port:
int, default=21
- param username:
Username for authentication.
- type username:
str, default=”anonymous”
- param password:
Password for authentication.
- type password:
str, default=”test@test.com”
- param timeout:
Socket timeout in seconds.
- type timeout:
int, default=30
Examples
Download a file from an FTP directory:
from datavizhub.acquisition.ftp_manager import FTPManager ftp = FTPManager("ftp.example.com") ftp.connect() ftp.fetch("/pub/some/file.txt", "file.txt") ftp.disconnect()
- CAPABILITIES = {'fetch', 'list', 'upload'}¶
- connect() None [source]¶
Connect to the FTP server and enable passive mode.
- Raises:
Exception – Propagates any underlying connection or authentication failure.
- delete(remote_path: str) bool [source]¶
Delete a remote file if possible.
- Parameters:
remote_path (str) – Path to the remote file (may include directories).
- delete_empty_files(dir_path: Path) None [source]¶
Delete zero-byte files in a directory.
- Parameters:
dir_path (pathlib.Path) – Directory to scan for empty files.
- download_byteranges(remote_path: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes [source]¶
Download multiple byte ranges using parallel FTP connections.
Notes
Spawns one short-lived FTP connection per range (per thread) to maintain thread safety across requests; this incurs connection overhead but avoids shared-socket issues in
ftplib
.Uses
REST
to position the transfer at the desired start byte; stops reading after the requested range length. Results are concatenated in the input order.
- download_file(remote_file_path: str, local_file_path: str) bool [source]¶
Download a single file via FTP with retries.
- Parameters:
remote_file_path (str) – Remote file path (may include directories).
local_file_path (str) – Local destination path including filename.
- Returns:
True
if downloaded and non-zero in size;False
otherwise.- Return type:
bool
- Raises:
FileNotFoundError – If the remote file does not exist.
Exception – If the final attempt fails for any other reason.
- exists(remote_path: str) bool [source]¶
Return True if the remote file exists on the FTP server.
- Parameters:
remote_path (str) – Path to the remote file (may include directories).
- fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Download a remote file to a local path.
- Parameters:
remote_path (str) – Full remote file path (may include directories).
local_filename (str, optional) – Local destination path. Defaults to basename of
remote_path
.
- Returns:
True
on success,False
otherwise.- Return type:
bool
- ftp: FTP | None¶
- get_idx_lines(remote_path: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3) list[str] | None [source]¶
Fetch and parse the GRIB
.idx
for a remote path.Appends
.idx
toremote_path
unless an explicit.idx
is provided.
- get_size(remote_path: str) int | None [source]¶
Return the size in bytes for a remote file using
SIZE
.
- list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None [source]¶
List names at the given remote path and optionally filter by regex.
- Parameters:
remote_path (str, optional) – Remote directory to list. Defaults to current server directory.
pattern (str, optional) – Regular expression applied to names returned by
NLST
(viare.search()
).
- Returns:
Filenames present in the directory, or
None
on error.- Return type:
list of str or None
- stat(remote_path: str)[source]¶
Return minimal metadata for a remote file (size in bytes).
- Parameters:
remote_path (str) – Path to the remote file (may include directories).
- Returns:
A mapping with
{"size": int}
if available;None
on error.- Return type:
dict or None
- sync_ftp_directory(remote_dir: str, local_dir: str, dataset_period: str) None [source]¶
Synchronize a remote directory to local storage by date range.
- Parameters:
remote_dir (str) – Remote FTP directory to synchronize.
local_dir (str) – Local directory to mirror files into.
dataset_period (str) – Period spec parsable by
DateManager
(e.g.,"7d"
,"24h"
).
Notes
Downloads files within the date range not present locally or with size 0.
Deletes local files no longer present remotely.
- upload(local_path: str, remote_path: str) bool [source]¶
Standardized upload implementation delegating to
upload_file()
.- Parameters:
local_path (str) – Local file path to upload.
remote_path (str) – Remote destination path.
- Returns:
True
on success.- Return type:
bool
- upload_file(local_file_path: str, remote_file_path: str) None [source]¶
Upload a local file to the FTP server with retries.
- Parameters:
local_file_path (str) – Local file path to upload.
remote_file_path (str) – Remote path including target filename.
- Raises:
Exception – When the final attempt fails to upload the file.
HTTP data acquisition handler.
Provides a minimal DataAcquirer
for HTTP
GET downloads, plus optional helpers for content size queries, GRIB .idx
subsetting, byte-range downloads, and best-effort listing via anchor scraping.
Advanced Features¶
get_size(url)
: returnContent-Length
from aHEAD
if provided.get_idx_lines(url, *, write_to=None, timeout=30, max_retries=3)
: fetch and parse.idx
(appends.idx
unless explicit).idx_to_byteranges(lines, search_regex)
: regex-based selection of ranges.get_chunks(url, chunk_size=500MB)
: compute contiguous ranges.download_byteranges(url, byte_ranges, *, max_workers=10, timeout=30)
: parallel ranged GETs, concatenated in order.list_files(url, pattern=None)
: scrape anchor tags from directory-style index pages (e.g., NOMADS) and filter with regex if provided.
- class datavizhub.acquisition.http_manager.HTTPHandler[source]¶
Bases:
DataAcquirer
- CAPABILITIES = {'fetch'}¶
Acquire files over HTTP/HTTPS.
This lightweight manager performs simple HTTP(S) GETs to fetch remote resources to the local filesystem. Because HTTP is stateless for these operations,
connect()
anddisconnect()
are no-ops.Supported Protocols¶
http://
https://
Examples
Download a file via HTTPS:
from datavizhub.acquisition.http_manager import HTTPHandler http = HTTPHandler() http.connect() # no-op http.fetch("https://example.com/data.json", "data.json") http.disconnect() # no-op
- connect() None [source]¶
Initialize the handler (no persistent connection).
Notes
Provided for API parity; does nothing for basic HTTP GETs.
- download_byteranges(url: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes [source]¶
Parallel ranged downloads concatenated in the order of
byte_ranges
.- Parameters:
url (str) – Target URL for the ranged GET requests.
byte_ranges (Iterable[str]) – Iterable of Range header values, e.g.,
"bytes=0-99"
.max_workers (int, default=10) – Number of worker threads for parallelism.
timeout (int, default=30) – Per-request timeout (seconds).
- static download_file(url: str, filename: str) None [source]¶
Compatibility helper that downloads a file.
- Parameters:
url (str) – File URL to download.
filename (str) – Local destination path.
- fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Download content at
remote_path
tolocal_filename
.- Parameters:
remote_path (str) – Full HTTP(S) URL to download.
local_filename (str, optional) – Local destination path. Defaults to the basename of the URL.
- Returns:
True
on success,False
if request fails.- Return type:
bool
- static fetch_data(url: str)[source]¶
Fetch binary payload via GET.
- Parameters:
url (str) – URL to request.
- Returns:
Raw response body on success, otherwise
None
.- Return type:
bytes or None
- static fetch_headers(url: str)[source]¶
Perform a HEAD request and return headers.
- Parameters:
url (str) – URL to request.
- Returns:
Response headers on success, otherwise
None
.- Return type:
Mapping or None
- static fetch_json(url: str)[source]¶
Fetch JSON content via GET and parse it.
- Parameters:
url (str) – URL to request.
- Returns:
Parsed JSON object on success, otherwise
None
.- Return type:
dict or list or None
- static fetch_text(url: str)[source]¶
Fetch text content via GET.
- Parameters:
url (str) – URL to request.
- Returns:
Text response on success, otherwise
None
.- Return type:
str or None
- get_idx_lines(url: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3) list[str] | None [source]¶
Fetch and parse the GRIB index (
.idx
) for a URL.Appends
.idx
tourl
unless an explicit.idx
path is provided. Retries are applied on transient failures.
- list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None [source]¶
Attempt to list files by scraping anchor tags from an index page.
This is best-effort and intended for directory-style endpoints such as NOMADS listings. If the page is not HTML or contains no anchors, an empty list is returned.
- Parameters:
remote_path (str) – Page URL to scrape for anchors.
pattern (str, optional) – Regular expression applied to full URLs (via
re.search()
).
- static post_data(url: str, data, headers=None)[source]¶
Send a POST request and return the body.
- Parameters:
url (str) – URL to post to.
data (Any) – Request payload.
headers (dict, optional) – Optional request headers.
- Returns:
Response text on success, otherwise
None
.- Return type:
str or None
- upload(local_path: str, remote_path: str) bool [source]¶
Uploading is not supported for HTTPHandler.
- Raises:
NotSupportedError – Always raised to indicate upload is unsupported.
Amazon S3 data acquisition manager using boto3.
Implements DataAcquirer
for S3 buckets
with listing, fetching, and uploading support. Includes optional advanced
helpers for GRIB workflows and large file transfers.
Advanced Features¶
get_size(key)
: return object size (bytes) viahead_object
.get_idx_lines(key, *, write_to=None, timeout=30, max_retries=3)
: fetch and parse a GRIB.idx
file. Appends.idx
tokey
unless an explicit.idx
path is provided. Optionally writes the idx to disk.idx_to_byteranges(lines, search_regex)
: build HTTP Range strings from.idx
content using a regex filter.get_chunks(key, chunk_size=500MB)
: compute contiguous byte ranges, using an inclusive final end byte (NODD style).download_byteranges(key, byte_ranges, *, max_workers=10, timeout=30)
: download multiple ranges in parallel and return concatenated bytes.list_files(prefix=None, pattern=None)
: list keys with optional regex filtering applied to full keys.
- class datavizhub.acquisition.s3_manager.S3Manager(access_key: str | None, secret_key: str | None, bucket_name: str, unsigned: bool = False, region_name: str | None = None)[source]¶
Bases:
DataAcquirer
- CAPABILITIES = {'fetch', 'list', 'upload'}¶
Acquire objects from Amazon S3 buckets via boto3.
This manager wraps
boto3
’s S3 client to standardize connecting, listing, and fetching S3 objects using the acquisition interface.Supported Protocols¶
s3://
(buckets and keys)
- param access_key:
AWS access key ID. Optional for public buckets or when using IAM roles.
- type access_key:
str, optional
- param secret_key:
AWS secret access key.
- type secret_key:
str, optional
- param bucket_name:
Default S3 bucket to operate on.
- type bucket_name:
str
- param unsigned:
Disable request signing for public buckets using
botocore.config.Config(signature_version=UNSIGNED)
.- type unsigned:
bool, default=False
- param region_name:
AWS region for the client. If omitted, botocore defaults apply.
- type region_name:
str, optional
Examples
Download a key to a local file:
from datavizhub.acquisition.s3_manager import S3Manager s3 = S3Manager("AKIA...", "SECRET...", "my-bucket") s3.connect() s3.fetch("path/to/object.nc", "object.nc") s3.disconnect()
Public bucket access (unsigned):
from datavizhub.acquisition.s3_manager import S3Manager s3 = S3Manager(None, None, bucket_name="noaa-hrrr-bdp-pds", unsigned=True) lines = s3.get_idx_lines("hrrr.20230801/conus/hrrr.t00z.wrfsfcf00.grib2") ranges = s3.idx_to_byteranges(lines, r"(:TMP:surface|:PRATE:surface)") blob = s3.download_byteranges("hrrr.20230801/conus/hrrr.t00z.wrfsfcf00.grib2", ranges.keys())
- __init__(access_key: str | None, secret_key: str | None, bucket_name: str, unsigned: bool = False, region_name: str | None = None) None [source]¶
Initialize the S3 manager.
- Parameters:
access_key (str, optional) – AWS access key ID. Optional for public buckets or when using IAM roles.
secret_key (str, optional) – AWS secret access key.
bucket_name (str) – Default S3 bucket to operate on.
unsigned (bool, default=False) – When True, disable request signing for public buckets using
botocore.config.Config(signature_version=UNSIGNED)
.region_name (str, optional) – AWS region for the client. If omitted, botocore’s default resolution is used.
- connect() None [source]¶
Create an S3 client using the provided credentials.
- Raises:
NoCredentialsError – When credentials are not available or invalid.
botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError – On other client initialization failures.
- disconnect() None [source]¶
Release the client reference.
Notes
boto3 clients do not require explicit shutdown. Setting the reference to
None
allows the instance to be reused or garbage-collected.
- download_byteranges(key: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30) bytes [source]¶
Download multiple byte ranges from an object and concatenate results.
- Parameters:
key (str) – Object key within the configured bucket.
byte_ranges (Iterable[str]) – Iterable of Range header values (e.g.,
"bytes=0-99"
).max_workers (int, default=10) – Maximum parallel workers for ranged requests.
timeout (int, default=30) – Timeout per ranged request (seconds).
- Returns:
Concatenated payload of the requested ranges, preserving the order of the input
byte_ranges
.- Return type:
bytes
- download_file(file_path: str, local_file_name: str) bool [source]¶
Compatibility method: download an S3 key.
- Parameters:
file_path (str) – S3 key to download.
local_file_name (str) – Local destination path.
- Returns:
True
on success,False
on failure.- Return type:
bool
- fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Download an S3 key to a local file.
- Parameters:
remote_path (str) – S3 key to download from
bucket_name
.local_filename (str, optional) – Local destination filename. Defaults to the basename of the key.
- Returns:
True
on success,False
on failure.- Return type:
bool
- get_chunks(key: str, chunk_size: int = 524288000) list[str] [source]¶
Compute contiguous chunk ranges for an S3 object.
The final range uses the file size as the inclusive end byte.
- get_idx_lines(key: str, *, timeout: int = 30, max_retries: int = 3, write_to: str | None = None) list[str] | None [source]¶
Fetch and parse the GRIB index (.idx) for
key
.- Parameters:
key (str) – GRIB object key, or an explicit
.idx
path.write_to (str, optional) – If provided, write the idx text to
write_to
(appends.idx
if not present).timeout (int, default=30) – Per-request timeout (seconds). Included for API consistency.
max_retries (int, default=3) – Simple retry count on transient errors.
Notes
Appends
.idx
tokey
unless an explicit.idx
path is provided.
- get_size(key: str) int | None [source]¶
Return the size in bytes for a given S3 object key.
- Returns:
Content length if available, else
None
.- Return type:
int or None
- idx_to_byteranges(lines: list[str], search_str: str) dict[str, str] [source]¶
Wrapper for
grib_utils.idx_to_byteranges()
using regex filtering.
- list_files(remote_path: str | None = None, pattern: str | None = None) Iterable[str] | None [source]¶
List object keys under a prefix in the bucket with optional regex filter.
- Parameters:
remote_path (str, optional) – Prefix to list. Defaults to all keys in the bucket.
pattern (str, optional) – Regular expression applied to full keys using
re.search()
. If provided, only matching keys are returned.
- Returns:
Keys found under the prefix, or
None
on error.- Return type:
list of str or None
- upload(local_path: str, remote_path: str) bool [source]¶
Standardized upload implementation delegating to
upload_file()
.
Vimeo upload manager using PyVimeo.
Implements DataAcquirer
with upload-only
support to Vimeo. Fetching and listing are not supported.
- class datavizhub.acquisition.vimeo_manager.VimeoManager(client_id: str, client_secret: str, access_token: str)[source]¶
Bases:
DataAcquirer
- CAPABILITIES = {'upload'}¶
Upload videos to Vimeo using PyVimeo.
This manager encapsulates video uploads and updates via the Vimeo API using
PyVimeo
. It participates in the acquisition interface for pipeline consistency, though generic file fetching/listing is not supported for Vimeo in this project.Supported Protocols¶
Vimeo API (token-based)
- param client_id:
Vimeo API client ID.
- type client_id:
str
- param client_secret:
Vimeo API client secret.
- type client_secret:
str
- param access_token:
Vimeo API access token.
- type access_token:
str
Examples
Upload a video and get its URI:
from datavizhub.acquisition.vimeo_manager import VimeoManager vm = VimeoManager(client_id, client_secret, access_token) vm.connect() uri = vm.upload_video("/path/to/video.mp4", video_name="My Video") vm.disconnect() print(uri)
- fetch(remote_path: str, local_filename: str | None = None) bool [source]¶
Fetching from Vimeo is not supported.
- Raises:
NotImplementedError – Always raised to indicate downloads are not supported.
- list_files(remote_path: str | None = None) Iterable[str] | None [source]¶
Listing is not implemented for Vimeo.
- Returns:
Always returns
None
.- Return type:
None
- update_video(file_path: str, video_uri: str) str [source]¶
Replace the video file for an existing Vimeo video.
- Parameters:
file_path (str) – Path to the replacement video file.
video_uri (str) – Vimeo video URI (e.g.,
"/videos/12345"
).
- Returns:
The URI of the updated video.
- Return type:
str
- Raises:
Exception – If the update fails or the response cannot be interpreted.
- update_video_description(video_uri: str, new_description: str) str [source]¶
Update the description of a Vimeo video.
- Parameters:
video_uri (str) – Vimeo video URI (e.g.,
"/videos/12345"
).new_description (str) – New description text to set.
- Returns:
Confirmation message when the update succeeds.
- Return type:
str
- Raises:
Exception – If the Vimeo API call fails.
- upload(local_path: str, remote_path: str) bool [source]¶
Standardized upload interface mapping to
upload_video()
.- Parameters:
local_path (str) – Local video file path.
remote_path (str) – Interpreted as the Vimeo video name/title.
- Returns:
True
if an upload URI was returned.- Return type:
bool
- upload_video(file_path: str, video_name: str | None = None) str [source]¶
Upload a local video to Vimeo.
- Parameters:
file_path (str) – Path to the local video file.
video_name (str, optional) – Optional title to assign to the video.
- Returns:
The Vimeo video URI for the uploaded content.
- Return type:
str
- Raises:
Exception – If the upload fails or the response cannot be interpreted.
- vimeo_client: vimeo.VimeoClient | None¶