Source code for datavizhub.acquisition.http_manager

"""HTTP data acquisition handler.

Provides a minimal :class:`~datavizhub.acquisition.base.DataAcquirer` for HTTP
GET downloads, plus optional helpers for content size queries, GRIB ``.idx``
subsetting, byte-range downloads, and best-effort listing via anchor scraping.

Advanced Features
-----------------
- ``get_size(url)``: return ``Content-Length`` from a ``HEAD`` if provided.
- ``get_idx_lines(url, *, write_to=None, timeout=30, max_retries=3)``: fetch
  and parse ``.idx`` (appends ``.idx`` unless explicit).
- ``idx_to_byteranges(lines, search_regex)``: regex-based selection of ranges.
- ``get_chunks(url, chunk_size=500MB)``: compute contiguous ranges.
- ``download_byteranges(url, byte_ranges, *, max_workers=10, timeout=30)``:
  parallel ranged GETs, concatenated in order.
- ``list_files(url, pattern=None)``: scrape anchor tags from directory-style
  index pages (e.g., NOMADS) and filter with regex if provided.
"""

import logging
import re
from pathlib import Path
from typing import Iterable, Optional, Iterable as _Iterable
from urllib.parse import urljoin

import requests

from datavizhub.acquisition.base import DataAcquirer
from datavizhub.acquisition.grib_utils import (
    ensure_idx_path,
    parse_idx_lines,
    idx_to_byteranges as _idx_to_byteranges,
    compute_chunks as _compute_chunks,
    parallel_download_byteranges as _parallel_download_byteranges,
)


[docs] class HTTPHandler(DataAcquirer): CAPABILITIES = {"fetch"} """Acquire files over HTTP/HTTPS. This lightweight manager performs simple HTTP(S) GETs to fetch remote resources to the local filesystem. Because HTTP is stateless for these operations, :meth:`connect` and :meth:`disconnect` are no-ops. Supported Protocols ------------------- - ``http://`` - ``https://`` Examples -------- Download a file via HTTPS:: from datavizhub.acquisition.http_manager import HTTPHandler http = HTTPHandler() http.connect() # no-op http.fetch("https://example.com/data.json", "data.json") http.disconnect() # no-op """
[docs] def connect(self) -> None: """Initialize the handler (no persistent connection). Notes ----- Provided for API parity; does nothing for basic HTTP GETs. """ return None
[docs] def fetch(self, remote_path: str, local_filename: Optional[str] = None) -> bool: """Download content at ``remote_path`` to ``local_filename``. Parameters ---------- remote_path : str Full HTTP(S) URL to download. local_filename : str, optional Local destination path. Defaults to the basename of the URL. Returns ------- bool ``True`` on success, ``False`` if request fails. """ filename = local_filename or Path(remote_path).name try: response = requests.get(remote_path, timeout=10) response.raise_for_status() with Path(filename).open("wb") as f: f.write(response.content) logging.info(f"Successfully downloaded {remote_path}") return True except requests.exceptions.HTTPError as http_err: logging.error( f"HTTP error occurred while downloading {remote_path}: {http_err}" ) except requests.exceptions.ConnectionError as conn_err: logging.error( f"Connection error occurred while downloading {remote_path}: {conn_err}" ) except requests.exceptions.Timeout as timeout_err: logging.error( f"Timeout occurred while downloading {remote_path}: {timeout_err}" ) except requests.exceptions.RequestException as req_err: logging.error(f"Error occurred while downloading {remote_path}: {req_err}") except Exception as e: logging.error(f"An error occurred while downloading {remote_path}: {e}") return False
[docs] def list_files(self, remote_path: Optional[str] = None, pattern: Optional[str] = None) -> Optional[Iterable[str]]: """Attempt to list files by scraping anchor tags from an index page. This is best-effort and intended for directory-style endpoints such as NOMADS listings. If the page is not HTML or contains no anchors, an empty list is returned. Parameters ---------- remote_path : str Page URL to scrape for anchors. pattern : str, optional Regular expression applied to full URLs (via :func:`re.search`). """ if not remote_path: return [] try: resp = requests.get(remote_path, timeout=10) resp.raise_for_status() text = resp.text except requests.exceptions.RequestException: return [] hrefs = re.findall(r'href=["\']([^"\']+)["\']', text, re.IGNORECASE) results: list[str] = [] for href in hrefs: if href.startswith("?") or href.startswith("#"): continue # Build absolute URL for relative anchors abs_url = urljoin(remote_path, href) results.append(abs_url) if pattern: rx = re.compile(pattern) results = [u for u in results if rx.search(u)] return results
[docs] def disconnect(self) -> None: """No persistent connection to tear down.""" return None
[docs] def upload(self, local_path: str, remote_path: str) -> bool: """Uploading is not supported for HTTPHandler. Raises ------ NotSupportedError Always raised to indicate upload is unsupported. """ from datavizhub.acquisition.base import NotSupportedError raise NotSupportedError("upload() is not supported for HTTPHandler")
# ---- Advanced features: size, ranges, and GRIB helpers -----------------------------
[docs] def get_size(self, url: str) -> Optional[int]: """Return the ``Content-Length`` from a ``HEAD`` request if provided.""" try: r = requests.head(url, timeout=10) r.raise_for_status() value = r.headers.get("Content-Length") return int(value) if value is not None else None except requests.exceptions.RequestException: return None
def _download(self, url: str, range_header: Optional[str] = None, timeout: int = 30) -> bytes: """Internal helper to issue a GET with optional Range header and timeout.""" headers = {"Range": range_header} if range_header else None r = requests.get(url, headers=headers, timeout=timeout) r.raise_for_status() return r.content
[docs] def get_idx_lines( self, url: str, *, write_to: Optional[str] = None, timeout: int = 30, max_retries: int = 3, ) -> Optional[list[str]]: """Fetch and parse the GRIB index (``.idx``) for a URL. Appends ``.idx`` to ``url`` unless an explicit ``.idx`` path is provided. Retries are applied on transient failures. """ idx_url = ensure_idx_path(url) attempt = 0 while attempt < max_retries: try: payload = self._download(idx_url, timeout=timeout) break except requests.exceptions.RequestException: attempt += 1 if attempt >= max_retries: return None lines = parse_idx_lines(payload) if write_to: try: with open(write_to if write_to.endswith(".idx") else f"{write_to}.idx", "w", encoding="utf8") as f: f.write("\n".join(lines)) except Exception: pass return lines
[docs] def idx_to_byteranges(self, lines: list[str], search_str: str) -> dict[str, str]: return _idx_to_byteranges(lines, search_str)
[docs] def get_chunks(self, url: str, chunk_size: int = 500 * 1024 * 1024) -> list[str]: size = self.get_size(url) if size is None: return [] return _compute_chunks(size, chunk_size)
[docs] def download_byteranges( self, url: str, byte_ranges: _Iterable[str], *, max_workers: int = 10, timeout: int = 30, ) -> bytes: """Parallel ranged downloads concatenated in the order of ``byte_ranges``. Parameters ---------- url : str Target URL for the ranged GET requests. byte_ranges : Iterable[str] Iterable of Range header values, e.g., ``"bytes=0-99"``. max_workers : int, default=10 Number of worker threads for parallelism. timeout : int, default=30 Per-request timeout (seconds). """ def _ranged_get(u: str, range_header: str) -> bytes: return self._download(u, range_header=range_header, timeout=timeout) return _parallel_download_byteranges(_ranged_get, url, byte_ranges, max_workers=max_workers)
# Backwards-compatible helpers
[docs] @staticmethod def download_file(url: str, filename: str) -> None: """Compatibility helper that downloads a file. Parameters ---------- url : str File URL to download. filename : str Local destination path. """ HTTPHandler().fetch(url, filename)
[docs] @staticmethod def fetch_data(url: str): """Fetch binary payload via GET. Parameters ---------- url : str URL to request. Returns ------- bytes or None Raw response body on success, otherwise ``None``. """ try: response = requests.get(url, timeout=10) response.raise_for_status() return response.content except requests.exceptions.RequestException as e: logging.error(f"Error occurred while fetching data from {url}: {e}") except Exception as e: logging.error(f"An error occurred while fetching data from {url}: {e}") return None
[docs] @staticmethod def fetch_text(url: str): """Fetch text content via GET. Parameters ---------- url : str URL to request. Returns ------- str or None Text response on success, otherwise ``None``. """ try: response = requests.get(url, timeout=10) response.raise_for_status() return response.text except requests.exceptions.RequestException as e: logging.error(f"Request failed: {e}") return None
[docs] @staticmethod def fetch_json(url: str): """Fetch JSON content via GET and parse it. Parameters ---------- url : str URL to request. Returns ------- dict or list or None Parsed JSON object on success, otherwise ``None``. """ try: response = requests.get(url, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logging.error(f"Request failed: {e}") return None
[docs] @staticmethod def post_data(url: str, data, headers=None): """Send a POST request and return the body. Parameters ---------- url : str URL to post to. data : Any Request payload. headers : dict, optional Optional request headers. Returns ------- str or None Response text on success, otherwise ``None``. """ try: response = requests.post(url, data=data, headers=headers, timeout=10) response.raise_for_status() return response.text except requests.exceptions.RequestException as e: logging.error(f"Post request failed: {e}") return None
[docs] @staticmethod def fetch_headers(url: str): """Perform a HEAD request and return headers. Parameters ---------- url : str URL to request. Returns ------- Mapping or None Response headers on success, otherwise ``None``. """ try: response = requests.head(url, timeout=10) response.raise_for_status() return response.headers except requests.exceptions.RequestException as e: logging.error(f"HEAD request failed: {e}") return None