Source code for oxbow._core.sequence

"""
DataSource classes for sequence file formats, including FASTA and FASTQ.
"""

from __future__ import annotations

import pathlib
from typing import IO, Callable, Literal

try:
    from typing import Self
except ImportError:
    from typing_extensions import Self

from oxbow._core.base import DEFAULT_BATCH_SIZE, DataSource, prepare_source_and_index
from oxbow.oxbow import PyFastaScanner, PyFastqScanner


class SequenceFile(DataSource):
    @property
    def _gzi(self):
        return self._gzi_src() if self._gzi_src else None

    def __init__(
        self,
        source,
        compressed,
        fields,
        regions,
        index,
        gzi,
        batch_size,
    ):
        super().__init__(source, index, batch_size)
        if isinstance(gzi, (str, pathlib.Path)):
            gzi = str(gzi)
            self._gzi_src = lambda: gzi
        elif callable(gzi) or gzi is None:
            self._gzi_src = gzi
        else:
            raise TypeError(
                "`gzi` must be a str, pathlib.Path, or a callable returning "
                "an IO stream"
            )

        if isinstance(regions, str):
            regions = [regions]
        self._query_regions = regions
        # FASTA sends all regions in one scan_query call, so _regions stays
        # None to produce a single fragment/batch iteration in the base class.
        self._regions = None

        self._scanner_kwargs = dict(compressed=compressed, fields=fields)

    def _make_reader(self, columns, batch_size, region=None):
        """Override to handle FASTA's multi-region scan_query."""
        scanner = self.scanner()
        if self._query_regions is not None:
            return scanner.scan_query(
                regions=self._query_regions,
                index=self._index,
                gzi=self._gzi,
                columns=columns,
                batch_size=batch_size,
            )
        return scanner.scan(columns=columns, batch_size=batch_size)

    def _scan_query(self, scanner, region, columns, batch_size):
        raise NotImplementedError

    def regions(self, regions: str | list[str]) -> Self:
        return type(self)(
            self._src,
            regions=regions,
            index=self._index_src,
            gzi=self._gzi_src,
            batch_size=self._batch_size,
            **self._scanner_kwargs,
        )


class FastaFile(SequenceFile):
    _scanner_type = PyFastaScanner

[docs] def __init__( self, source: str | Callable[[], IO[bytes] | str], compressed: bool = False, *, fields: Literal["*"] | list[str] | None = "*", coords: Literal["01", "11"] = "11", regions: str | list[str] | None = None, index: str | Callable[[], IO[bytes] | str] | None = None, gzi: str | Callable[[], IO[bytes]] | None = None, batch_size: int = 1, ): super().__init__( source=source, compressed=compressed, fields=fields, regions=regions, index=index, gzi=gzi, batch_size=batch_size, ) self._scanner_kwargs["coords"] = coords
class FastqFile(SequenceFile): _scanner_type = PyFastqScanner
[docs] def __init__( self, source: str | Callable[[], IO[bytes] | str], compressed: bool = False, *, fields: Literal["*"] | list[str] | None = "*", batch_size: int = DEFAULT_BATCH_SIZE, ): super().__init__( source=source, compressed=compressed, fields=fields, regions=None, index=None, gzi=None, batch_size=batch_size, )
[docs] def regions(self, regions: str | list[str]): raise NotImplementedError("FastqFile does not support genomic range queries.")
def from_fasta( source: str | pathlib.Path | Callable[[], IO[bytes] | str], compression: Literal["infer", "bgzf", "gzip", None] = "infer", *, fields: Literal["*"] | list[str] | None = "*", coords: Literal["01", "11"] = "11", regions: str | list[str] | None = None, index: str | pathlib.Path | Callable[[], IO[bytes] | str] | None = None, gzi: str | pathlib.Path | Callable[[], IO[bytes] | str] | None = None, batch_size: int = 1, ) -> FastaFile: """ Create a FASTA file data source. Parameters ---------- source : str, pathlib.Path, or Callable The URI or path to the FASTA file, or a callable that opens the file as a file-like object. compression : Literal["infer", "bgzf", "gzip", None], default: "infer" Compression of the source bytestream. If "infer" and ``source`` is a URI or path, the file's compression is guessed based on the extension, where ".gz" or ".bgz" is interpreted as BGZF. Pass "gzip" to decode regular GZIP. If None, the source bytestream is assumed to be uncompressed. For more customized decoding, provide a callable ``source`` instead. fields : list[str], optional Specific fields to project. By default, all fields are included. regions : list[str], optional Provide one or more genomic ranges to slice subsequences as output records. Only applicable if an associated index file is available. index : str, pathlib.Path, or Callable, optional An optional FAI index file associated with the FASTA file. If ``source`` is a URI or path and the index file shares the same name with a ".fai" extension, the index file is automatically detected. If the FASTA file is BGZF-compressed, a GZI index file is also required. gzi : str, pathlib.Path, or Callable, optional An optional GZI index file associated with a BGZF-compressed FASTA file. This is required in addition to the FAI index file for random access. batch_size : int, optional [default: 1] The number of records to read in each batch. Since sequences for FASTA files can be very long, the default batch size is set to 1 to generate one sequence record at a time. Returns ------- FastaFile A data source object representing the FASTA file. See also -------- from_fastq : Create a FASTQ file data source. """ source, index, bgzf_compressed = prepare_source_and_index( source, index, compression ) return FastaFile( source=source, compressed=bgzf_compressed, fields=fields, coords=coords, regions=regions, index=index, gzi=gzi, batch_size=batch_size, ) def from_fastq( source: str | pathlib.Path | Callable[[], IO[bytes] | str], compression: Literal["infer", "gzip", None] = "infer", *, fields: Literal["*"] | list[str] | None = "*", batch_size: int = DEFAULT_BATCH_SIZE, ) -> FastqFile: """ Create a FASTQ file data source. Parameters ---------- source : str, pathlib.Path, or Callable The URI or path to the FASTQ file, or a callable that opens the file as a file-like object. compression : Literal["infer", "gzip", None], default: "infer" Compression of the source bytestream. If "infer" and `source` is a URI or path, the file's compression is guessed based on the file extension. For more custom decoding, provide a callable ``source`` instead. fields : list[str], optional Specific fields to project. By default, all fields are included. batch_size : int, optional [default: 131072] The number of records to read in each batch. Returns ------- FastqFile A data source object representing the FASTQ file. Notes ----- Indexed FASTQ files are not supported. Hence, range queries are disallowed and files compressed using either regular GZIP or BGZF are decoded using a standard GZIP decoder. See also -------- from_fasta : Create a FASTA file data source. """ source, _, compressed = prepare_source_and_index(source, None, compression) return FastqFile( source=source, compressed=compressed, fields=fields, batch_size=batch_size, )