Source code for bioversions.utils

"""Utilities and implementation for bioversions."""

from __future__ import annotations

import datetime
import enum
import gzip
import io
import os
from collections.abc import Generator, Iterable, Mapping
from contextlib import contextmanager
from typing import Any, ClassVar, TextIO, TypedDict, cast

import bioregistry
import pydantic
import pystow.utils
import requests
import requests.exceptions
from bs4 import BeautifulSoup, Tag
from pystow.constants import TimeoutHint
from typing_extensions import NotRequired

from .version import VERSION

__all__ = [
    "BIOVERSIONS_USER_AGENT",
    "DailyGetter",
    "Getter",
    "MetaGetter",
    "OBOFoundryGetter",
    "ReleaseDict",
    "UnversionedGetter",
    "VersionResult",
    "VersionType",
    "find_soup_tag",
    "find_soup_text",
    "get_obo_version",
    "get_obograph_json_version",
    "get_owl_xml_version",
    "get_soup",
    "requests_get",
]

BIOVERSIONS_HOME = pystow.join("bioversions")
HERE = os.path.abspath(os.path.dirname(__file__))
DOCS = os.path.abspath(os.path.join(HERE, os.pardir, os.pardir, "docs"))
IMG = os.path.join(DOCS, "img")

BIOVERSIONS_USER_AGENT = f"bioversions v{VERSION}"

HUMAN_BROWSER_AGENT = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
)


def get_soup(
    url: str,
    *,
    verify: bool = True,
    timeout: TimeoutHint | None = None,
    user_agent: str | None = None,
) -> BeautifulSoup:
    """Wrap getting soup with the user agent."""
    if user_agent is None:
        user_agent = BIOVERSIONS_USER_AGENT
    return pystow.utils.get_soup(url, verify=verify, timeout=timeout, user_agent=user_agent)


def requests_get(url: str, *args: Any, timeout: int | float, **kwargs: Any) -> requests.Response:
    """Wrap :func:`requests.get` that automatically adds a User-Agent."""
    if "headers" not in kwargs:
        kwargs["headers"] = {}
    if "User-Agent" not in kwargs["headers"]:
        kwargs["headers"]["User-Agent"] = BIOVERSIONS_USER_AGENT
    res = requests.get(
        url,
        *args,
        timeout=timeout,
        **kwargs,
    )
    return res


class VersionType(str, enum.Enum):
    """Different types of versions."""

    semver = "semver"
    date = "date"
    month = "month"
    year = "year"
    year_minor = "year_minor"
    semver_minor = "semver_minor"
    sequential = "sequential"
    daily = "daily"
    unversioned = "unversioned"
    other = "other"
    missing = "missing"
    static = "static"
    #: Saved for the most shameful of data
    garbage = "garbage"

    @property
    def label(self) -> str:  # noqa:C901
        """Get the human-readable label."""
        match self:
            case self.semver:
                return "SemVer (X.Y.Z)"
            case self.date:
                return "CalVer (YYYY-MM-DD)"
            case self.month:
                return "CalVer (YYYY-MM)"
            case self.year:
                return "CalVer (YYYY)"
            case self.year_minor:
                return "CalVer (YYYY.X)"
            case self.semver_minor:
                return "SemVer (X.Y)"
            case self.sequential:
                return "Sequential (X)"
            case self.daily:
                return "Daily"
            case self.unversioned:
                return "Unversioned"
            case self.other:
                return "Other"
            case self.missing:
                return "Missing"
            case self.static:
                return "Static"
            #: Saved for the most shameful of data
            case self.garbage:
                return "Garbage"


def find_soup_tag(element: Tag, *args: Any, **kwargs: Any) -> Tag:
    """Find a sub-element."""
    tag = element.find(*args, **kwargs)
    if not isinstance(tag, Tag):
        raise ValueError(f"could not find an element matching {args=} and {kwargs=}")
    return tag


def find_soup_text(element: Tag, *args: Any, **kwargs: Any) -> str:
    """Find a sub-element."""
    tag = find_soup_tag(element, *args, **kwargs)
    if not isinstance(tag.text, str) or not tag.text:
        raise ValueError
    return tag.text


class MetaGetter(type):
    """A metatype to expose two class properties."""

    _cache: ClassVar[str | ReleaseDict | datetime.datetime | datetime.date | None] = None

    date_fmt: str | None
    date_version_fmt: str | None
    homepage_fmt: str | None

    @property
    def _cache_prop(cls) -> str | ReleaseDict | datetime.datetime | datetime.date:
        if cls._cache is None:
            cls._cache = cls().get()  # type:ignore
        return cls._cache

    @property
    def version(cls) -> str:
        """Get the version of the getter based on the inheriting class's implementation."""
        if isinstance(cls._cache_prop, str):
            return cls._cache_prop
        elif isinstance(cls._cache_prop, dict):
            return cls._cache_prop["version"]
        elif isinstance(cls._cache_prop, datetime.datetime | datetime.date):
            return cls._cache_prop.strftime("%Y-%m-%d")
        else:
            raise TypeError(f"_cache_prop was a {type(cls._cache_prop)}")

    @property
    def date(cls) -> datetime.date | None:
        """Get the date if it's set."""
        vp = cls.version_date_parsed
        if vp is not None:
            return vp
        if not isinstance(cls._cache_prop, dict):
            return None
        date = cls._cache_prop["date"]
        if isinstance(date, datetime.datetime):
            return date.date()
        elif isinstance(date, datetime.date):
            return date
        if not cls.date_fmt:
            raise TypeError(
                f"Need to set {cls.__name__} class variable `date_fmt` to parse date {date}"
            )
        try:
            return datetime.datetime.strptime(date, cls.date_fmt).date()
        except ValueError:
            raise ValueError(
                f"Issue in {cls.__name__} with date {date} and fmt {cls.date_fmt}"
            ) from None

    @property
    def version_date_parsed(cls) -> datetime.date | None:
        """Get the date as a parsed class there's a format string."""
        if cls.date_version_fmt is None:
            return None
        try:
            return datetime.datetime.strptime(cls.version, cls.date_version_fmt).date()
        except ValueError:
            raise ValueError(
                f"Issue parsing {cls.__name__} version {cls.version} "
                f"with fmt {cls.date_version_fmt}"
            ) from None

    @property
    def homepage(cls) -> str | None:
        """Get the homepage's URL if a format string was specified."""
        if cls.homepage_fmt is None:
            return None

        version = cls.homepage_version_transform(cls.version)
        return cls.homepage_fmt.format(version=version)

    @staticmethod
    def homepage_version_transform(version: str) -> str:
        """Transform the version for formatting into the homepage."""
        return version


[docs] class VersionResult(pydantic.BaseModel): """A dataclass for information about a database and version.""" #: The database name name: str #: The database current version version: str #: The class that retrieved the version classname: str #: The version type vtype: VersionType #: The date of the current release date: datetime.date | None #: The URL for the homepage of the specific version of the database homepage: str | None #: The database prefix bioregistry_id: str | None
class ReleaseDict(TypedDict): """A release dict.""" version: str date: NotRequired[str | datetime.datetime | datetime.date] class Getter(metaclass=MetaGetter): """A class for holding the name of a database and implementation of the version getter.""" #: The name of the database. Specify this in the inheriting class!. name: ClassVar[str] #: The type of version string. Required! version_type: ClassVar[VersionType] #: The URL with `{version}` to format in the version. Specify this in the inheriting class. homepage_fmt: ClassVar[str | None] = None date_fmt: ClassVar[str | None] = None date_version_fmt: ClassVar[str | None] = None bioregistry_id: ClassVar[str | None] = None # The following are automatically calculated based on the metaclass version: ClassVar[str] date: ClassVar[str] homepage: ClassVar[str] #: Prefixes this getter works for collection: ClassVar[list[str] | None] = None def get(self) -> str | ReleaseDict | datetime.datetime | datetime.date: """Get the latest of this database.""" raise NotImplementedError @classmethod def print(cls, sep: str = "\t", file: TextIO | None = None) -> None: """Print the latest version of this database.""" x = [] if cls.bioregistry_id: x.append(cls.bioregistry_id) elif cls.collection: x.append("/".join(cls.collection)) else: x.append("<no prefix>") x.append(cls.name) x.append(cls.version) if cls.date: x.append(f"({cls.date})") if cls.homepage: x.append(cls.homepage) print(*x, sep=sep, file=file) @classmethod def resolve(cls) -> VersionResult: """Get a Bioversion data container with the data for this database.""" return VersionResult( name=cls.name, version=cls.version, classname=cls.__name__, vtype=cls.version_type, homepage=cls.homepage, date=cls.date, bioregistry_id=cls.bioregistry_id, ) @classmethod def to_dict(cls) -> Mapping[str, Any]: """Get a dict with the data for this database.""" return cls.resolve().model_dump() class DailyGetter(Getter): """A base getter for daily updated resources.""" version_type = VersionType.daily def get(self) -> str: """Return a constant "daily" string.""" return "daily" class UnversionedGetter(Getter): """A base getter for unversioned resources.""" version_type = VersionType.unversioned #: Has this database been apparently abandoned (true) or is it still updated (false) abandoned: ClassVar[bool] def get(self) -> str: """Return a constant unversioned string.""" return "unversioned" def get_obo_version(url: str, *, max_lines: int = 200) -> str | None: """Get the data version from an OBO file.""" with _iterate_lines(url) as file: for i, line in enumerate(file): if isinstance(line, bytes): line = line.decode("utf-8") line = line.strip() if line.startswith("data-version"): return line[len("data-version:") :].strip() if not line: # this means we got past the exposition section return None if i > max_lines: # this might happen if there are tons of axioms # shoved into OBO, but this always comes at the end return None return None class OBOFoundryGetter(Getter): """An implementation for getting OBO Foundry ontology versions.""" strip_key_prefix: ClassVar[bool] = False strip_version_prefix: ClassVar[bool] = False strip_file_suffix: ClassVar[bool] = False @property def key(self) -> str: """Get the OBO Foundry key.""" if self.bioregistry_id is None: raise ValueError("missing bioregistry ID") rv = bioregistry.get_obofoundry_prefix(self.bioregistry_id) if rv is None: raise ValueError return rv def get(self) -> str: """Get the OBO version.""" url = f"https://purl.obolibrary.org/obo/{self.key}.obo" version = get_obo_version(url) if version is None: raise ValueError(f"No `data-version` line contained in {url}") return self.process(version) def process(self, version: str) -> str: """Post-process the version string.""" if self.strip_key_prefix: version = version[len(f"{self.key}/") :] if self.strip_version_prefix: version = version[len("releases/") :] if self.strip_file_suffix: version = version[: -(len(self.key) + 5)] return version def _get_ftp_date_version(host: str, directory: str) -> str: url = f"https://{host}/{directory}" soup = get_soup(url) return max( text for anchor in soup.find_all("a") if isinstance(anchor.text, str) and anchor.text and _is_iso_8601(text := anchor.text.rstrip("/")) ) def _is_iso_8601(s: str) -> bool: x = s.split("-") return len(x) == 3 and x[0].isnumeric() and x[1].isnumeric() and x[2].isnumeric() def _is_version(s: str) -> bool: x = s.split(".") return len(x) == 2 and x[0].isnumeric() and x[1].isnumeric() def _is_semantic_version(s: str) -> bool: x = s.split(".") return len(x) == 3 and x[0].isnumeric() and x[1].isnumeric() and x[2].isnumeric() VERSION_IRI_TAG = "<owl:versionIRI rdf:resource=" VERSION_IRI_TAG_LEN = len(VERSION_IRI_TAG) def get_owl_xml_version(url: str, *, max_lines: int = 200) -> str | None: """Get version from an OWL XML document.""" try: with _iterate_lines(url) as file: for i, line in enumerate(file): if isinstance(line, bytes): line = line.decode("utf-8") line = line.strip() if line.startswith(VERSION_IRI_TAG): return line[VERSION_IRI_TAG_LEN:].removesuffix("/>") if i > max_lines: return None except requests.exceptions.SSLError: pass return None @contextmanager def _iterate_lines(url: str) -> Generator[Iterable[str], None, None]: with requests.get( url, stream=True, timeout=60, headers={"User-Agent": BIOVERSIONS_USER_AGENT} ) as res: if url.endswith(".gz"): compressed_stream = io.BufferedReader(res.raw) # type:ignore with gzip.open(compressed_stream, "rt", encoding="utf-8") as file: yield file else: yield res.iter_lines(decode_unicode=True) def get_obograph_json_version(url: str) -> str | None: """Get version from an OBO Graph JSON document.""" res = requests_get(url, timeout=60).json() version = res["graphs"][0]["meta"]["version"] return cast(str, version)