"""
Utilities for downloading contaminant FASTA files.
"""
from pathlib import Path
from urllib.request import urlopen
from collections.abc import Callable
import hashlib
import os
import re
import shutil
import tempfile
_DOWNLOAD_TIMEOUT_SECONDS = 60
def _md5_id(path: Path, length: int = 8) -> str:
return hashlib.md5(path.read_bytes()).hexdigest()[:length]
def _is_uniprot_accession(accession: str) -> bool:
pattern = (
r"[OPQ][0-9][A-Z0-9]{3}[0-9](-[0-9]{1,2})?"
r"|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}(-[0-9]{1,2})?"
)
return bool(re.fullmatch(pattern, accession))
def check_uniprot_accession_nr(accession: str) -> None:
if not _is_uniprot_accession(accession):
raise ValueError(
f"Accession '{accession}' is not a valid UniProt accession.",
)
def _format_frankenfield_header(header: str) -> str:
"""
Validate Frankenfield2022 headers; enforce three pipe-separated
fields and UniProt-style accession.
"""
parts = header.split(maxsplit=1)
id_part = parts[0]
desc = parts[1] if len(parts) > 1 else ""
segments = id_part.split("|")
if len(segments) != 3:
raise ValueError(
f"Header '{header}' must have exactly three "
"pipe-separated fields.",
)
database, accession_number, protein_id = segments
if accession_number.startswith("Cont_"):
accession_number = accession_number[len("Cont_") :]
if accession_number not in _FRANKENFIELD_MANUAL_IDS:
check_uniprot_accession_nr(accession_number)
new_id = f"{database}|{accession_number}|{protein_id}"
return f"{new_id} {desc}".strip()
def _format_fasta(
source_path: Path,
destination_path: Path,
formatter: Callable[[str], str],
) -> None:
"""
Rewrite FASTA headers using a formatter callable.
"""
with (
open(source_path, encoding="utf-8") as src,
open(
destination_path,
"w",
encoding="utf-8",
) as dest,
):
for line in src:
if line.startswith(">"):
header = line[1:].strip()
formatted = formatter(header)
dest.write(f">{formatted}\n")
else:
dest.write(line)
def _download(url: str, destination: Path) -> None:
"""
Stream ``url`` to ``destination`` with a bounded timeout.
"""
with (
urlopen(url, timeout=_DOWNLOAD_TIMEOUT_SECONDS) as response,
open(
destination,
"wb",
) as out,
):
shutil.copyfileobj(response, out)
def _validate_fasta(path: Path) -> None:
"""
Verify ``path`` is non-empty and starts (after blank lines) with a
FASTA header line beginning with ``>``.
"""
with open(path, "rb") as src:
for raw in src:
line = raw.strip()
if not line:
continue
if not line.startswith(b">"):
raise ValueError(
f"Downloaded file at {path} does not look like a "
"FASTA: first non-blank line does not start with '>'.",
)
return
raise ValueError(f"Downloaded file at {path} is empty.")
def _resolve_destination(
base_destination: Path,
candidate_path: Path,
use_digest: bool,
) -> Path:
"""
Resolve final destination, optionally appending an MD5 digest.
"""
if not use_digest:
return base_destination
digest = _md5_id(candidate_path)
return base_destination.with_name(
f"{base_destination.stem}_{digest}{base_destination.suffix}",
)
def _atomic_move(candidate_path: Path, destination: Path) -> None:
"""
Move ``candidate_path`` to ``destination`` via same-fs staging.
"""
staging = destination.parent / f".{destination.name}.tmp"
try:
shutil.copy2(candidate_path, staging)
os.replace(staging, destination)
finally:
staging.unlink(missing_ok=True)
def _check_no_existing(path: Path, force: bool) -> None:
"""
Raise ``FileExistsError`` if ``path`` exists and ``force`` is False.
"""
if path.exists() and not force:
raise FileExistsError(
f"File already exists at {path}. Use force=True to overwrite.",
)
def _fetch_candidate(
url: str,
tmp_dir: str,
formatter: Callable[[str], str] | None,
verbose: bool,
) -> Path:
"""
Download FASTA into ``tmp_dir`` and apply ``formatter`` if given.
"""
raw_path = Path(tmp_dir) / "raw"
_download(url, raw_path)
_validate_fasta(raw_path)
if formatter is None:
return raw_path
formatted_path = Path(tmp_dir) / "formatted"
_format_fasta(raw_path, formatted_path, formatter)
if verbose:
print("Formatting contaminants.")
return formatted_path
_FRANKENFIELD_MANUAL_IDS = {"AAAA1", "AAAA2"}
_SOURCE_MAP = {
"gpm_crap": {
"url": "ftp://ftp.thegpm.org/fasta/cRAP/crap.fasta",
"default_path": "contaminants_gpm-crap.fasta",
},
"frankenfield2022": {
"url": (
"https://raw.githubusercontent.com/HaoGroup-ProtContLib/"
"Protein-Contaminant-Libraries-for-DDA-and-DIA-Proteomics/"
"refs/heads/main/Universal%20protein%20contaminant%20FASTA/"
"0602_Universal%20Contaminants.fasta"
),
"default_path": "contaminants_frankenfield2022.fasta",
"formatter": _format_frankenfield_header,
},
}
[docs]
def contaminants(
source: str = "frankenfield2022",
path: str | Path | None = None,
force: bool = False,
verbose: bool = False,
) -> Path:
"""Download a contaminant FASTA file from a supported source.
Fetches a protein contaminant database in FASTA format and
writes it to disk. Two sources are supported:
- ``"frankenfield2022"``: Universal contaminant library for DDA
and DIA proteomics from Frankenfield et al. [1]_ Headers are
reformatted to standard ``db|accession|id`` notation and
``Cont_`` prefixes are stripped from accession numbers. See
`database description
<https://proteopy.readthedocs.io/en/latest/api/manual/
frankenfield2022.html>`_.
- ``"gpm_crap"``: The GPM common Repository of Adventitious
Proteins (cRAP) [2]_, a community-curated list of common
laboratory contaminants. See `database description
<https://proteopy.readthedocs.io/en/latest/api/manual/
gpm-crap.html>`_.
Parameters
----------
source : str, optional
Identifier of the contaminant database to download.
Supported values: ``"frankenfield2022"``, ``"gpm_crap"``.
path : str | Path | None, optional
Destination path for the downloaded FASTA file. When
``None``, a default file name is written in the current
working directory with an MD5 digest appended to the stem
for reproducible identification.
force : bool, optional
If ``True``, overwrite an existing file at the resolved
destination path.
verbose : bool, optional
Print download URL, formatting status, and final save
path to stdout.
Returns
-------
Path
Absolute path to the written FASTA file.
Raises
------
ValueError
If ``source`` is not one of the supported source keys,
or if a FASTA header from the ``"frankenfield2022"``
source does not contain exactly three pipe-separated
fields or carries an invalid UniProt accession number.
FileExistsError
If a file already exists at the resolved destination
and ``force`` is ``False``.
Examples
--------
Download the Frankenfield 2022 library to the default path:
>>> import proteopy as pr
>>> path = pr.download.contaminants()
Download the GPM cRAP database to the default path:
>>> path = pr.download.contaminants(source="gpm_crap")
Save to a specific location:
>>> path = pr.download.contaminants(
... source="frankenfield2022",
... path="my_project/contaminants.fasta",
... )
References
----------
.. [1] Frankenfield AM, Ni J, Ahmed M, and Hao L.
"Protein Contaminants Matter: Building Universal Protein
Contaminant Libraries for DDA and DIA Proteomics."
*Journal of Proteome Research*, 21(9):2104-2113, 2022.
DOI: 10.1021/acs.jproteome.2c00145.
.. [2] The Global Proteome Machine Organization. "Common
Repository of Adventitious Proteins (cRAP)."
https://www.thegpm.org/crap/
"""
if source not in _SOURCE_MAP:
raise ValueError(f"Unsupported source '{source}'.")
# -- Resolve base destination
meta = _SOURCE_MAP[source]
base_destination = (
Path(meta["default_path"]) if path is None else Path(path)
)
base_destination.parent.mkdir(parents=True, exist_ok=True)
# -- Short-circuit when explicit path already exists
if path is not None:
_check_no_existing(base_destination, force)
# -- Download (and optionally reformat) into a temp directory
if verbose:
print(f"Downloading from {meta['url']}")
with tempfile.TemporaryDirectory() as tmp_dir:
candidate_path = _fetch_candidate(
meta["url"],
tmp_dir,
meta.get("formatter"),
verbose,
)
destination = _resolve_destination(
base_destination,
candidate_path,
use_digest=path is None,
)
if verbose:
print(f"Destination: {destination}")
_check_no_existing(destination, force)
_atomic_move(candidate_path, destination)
if verbose:
print(f"Saved to {destination}")
return destination