oxbow.arrow.BatchReaderDataset#

class oxbow.arrow.BatchReaderDataset(fragments: list[BatchReaderFragment], partition_expression: Expression = None)#

A PyArrow Dataset composed of one or more BatchReaderFragments.

Parameters:
  • fragments (list[BatchReaderFragment]) – The list of fragments.

  • partition_expression (pyarrow.dataset.Expression, optional) – A partition expression for the dataset, by default None.

schema#

The schema of the RecordBatches in the dataset.

Type:

pyarrow.Schema

partition_expression#

An expression that evaluates to true for all data viewed by this dataset.

Type:

pyarrow.dataset.Expression

__init__(fragments: list[BatchReaderFragment], partition_expression: Expression = None)[source]#

Create a BatchReaderDataset from a list of BatchReaderFragments.

Parameters:
  • fragments (list[BatchReaderFragment]) – The list of fragments.

  • partition_expression (pyarrow.dataset.Expression, optional) – A partition expression for the dataset, by default None.

Methods

__init__(fragments[, partition_expression])

Create a BatchReaderDataset from a list of BatchReaderFragments.

count_rows(self, Expression filter=None, ...)

Count rows matching the scanner filter.

filter(expression)

Apply a row-level filter expression and return a filtered dataset.

get_fragments([filter])

Return an iterator over fragments.

head(self, int num_rows[, columns])

Load the first N rows of the dataset.

iter_batches([columns, batch_size])

Iterate over batches in the dataset.

join(*args, **kwargs)

Perform a join operation on the dataset.

join_asof(*args, **kwargs)

Perform an as-of join operation on the dataset.

replace_schema(schema)

Replace the schema of the dataset.

scanner([columns, filter, batch_size, ...])

Build a scan operation against the dataset.

sort_by(sorting, **kwargs)

Sort the dataset by the specified columns.

take(self, indices[, columns])

Select rows of data by index.

to_batches([columns, filter, batch_size, ...])

Read the dataset as materialized record batches.

to_table(self[, columns])

Read the dataset to an Arrow table.

Attributes

partition_expression

returns: An expression that evaluates to true for all data viewed by this

schema

returns: The schema of the RecordBatches in the dataset.

count_rows(self, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)#

Count rows matching the scanner filter.

Parameters:
  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • cache_metadata (bool, default True) – If enabled, metadata may be cached when scanning to speed up repeated scans.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

count

Return type:

int

filter(expression: Expression) BatchReaderDataset[source]#

Apply a row-level filter expression and return a filtered dataset.

Parameters:

expression (pyarrow.dataset.Expression) – The filter expression.

Returns:

The filtered dataset.

Return type:

BatchReaderDataset

Notes

This performs a row-level filter, not a fragment-level filter. The new filter is applied lazily on the underlying record batches by using a wrapper factory function to make a new BatchReaderFragment.

get_fragments(filter: Expression | None = None) Iterator[BatchReaderFragment][source]#

Return an iterator over fragments.

Parameters:

filter (pyarrow.dataset.Expression, optional) – An expression to filter fragments. Not yet implemented.

Returns:

An iterator over fragments.

Return type:

Iterator[BatchReaderFragment]

Notes

filter here is meant to be applied at the fragment level via comparison with the partition_expression, not at the row level.

head(self, int num_rows, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)#

Load the first N rows of the dataset.

Parameters:
  • num_rows (int) – The number of rows to load.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • cache_metadata (bool, default True) – If enabled, metadata may be cached when scanning to speed up repeated scans.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

table

Return type:

Table

iter_batches(columns: list[str] | None = None, batch_size: int | None = None) Iterator[RecordBatch][source]#

Iterate over batches in the dataset.

Parameters:
  • columns (list[str], optional) – Names of columns to project. By default, all available columns are projected.

  • batch_size (int, optional) – The maximum row count for scanned record batches.

Returns:

An iterator of record batches.

Return type:

Iterator[pyarrow.RecordBatch]

join(*args, **kwargs)[source]#

Perform a join operation on the dataset.

Raises:

NotImplementedError – This method is not yet implemented.

join_asof(*args, **kwargs)[source]#

Perform an as-of join operation on the dataset.

Raises:

NotImplementedError – This method is not yet implemented.

property partition_expression: Expression#
returns: An expression that evaluates to true for all data viewed by this

dataset.

Return type:

pyarrow.dataset.Expression

replace_schema(schema)[source]#

Replace the schema of the dataset.

Parameters:

schema (pyarrow.Schema) – The new schema.

Raises:

NotImplementedError – This method is not yet implemented.

scanner(columns: list[str] | None = None, filter: ds.Expression | None = None, batch_size: int | None = None, batch_readahead: int = 16, fragment_readahead: int = 4, fragment_scan_options: ds.FragmentScanOptions | None = None, use_threads: bool | None = True, memory_pool: ds.MemoryPool | None = None, **kwargs) ds.Scanner[source]#

Build a scan operation against the dataset.

This scanner chains the record batches from all fragments together and applies column projection and row filtering.

Parameters:
  • columns (list[str], optional) – Names of columns to project. By default, all available columns are projected.

  • filter (pyarrow.dataset.Expression, optional) – A filter expression. Scan will return only the rows matching the filter.

  • batch_size (int, optional) – The maximum row count for scanned record batches.

  • batch_readahead (int, optional) – The number of batches to read ahead in a file.

  • fragment_readahead (int, optional) – The number of fragments/files to read ahead.

  • fragment_scan_options (pyarrow.dataset.FragmentScanOptions, optional) – Options specific to a particular scan and fragment type, by default None.

  • use_threads (bool, optional) – If enabled, maximum parallelism will be used, by default True.

  • memory_pool (pyarrow.dataset.MemoryPool, optional) – For memory allocations, if required. By default, uses the default pool.

Returns:

A scanner object for the dataset.

Return type:

pyarrow.dataset.Scanner

property schema: Schema#

returns: The schema of the RecordBatches in the dataset. :rtype: pyarrow.Schema

sort_by(sorting, **kwargs)[source]#

Sort the dataset by the specified columns.

Parameters:

sorting (list[str]) – The columns to sort by.

Raises:

NotImplementedError – This method is not yet implemented.

take(self, indices, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)#

Select rows of data by index.

Parameters:
  • indices (Array or array-like) – indices of rows to select in the dataset.

  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • cache_metadata (bool, default True) – If enabled, metadata may be cached when scanning to speed up repeated scans.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

table

Return type:

Table

to_batches(columns: list[str] | None = None, filter: ds.Expression | None = None, batch_size: int | None = None, batch_readahead: int = 16, fragment_readahead: int = 4, fragment_scan_options: ds.FragmentScanOptions | None = None, use_threads: bool | None = True, memory_pool: ds.MemoryPool | None = None, **kwargs) Iterator[pa.RecordBatch][source]#

Read the dataset as materialized record batches.

Parameters:
  • columns (list[str], optional) – Names of columns to project. By default, all available columns are projected.

  • filter (pyarrow.dataset.Expression, optional) – A filter expression. Scan will return only the rows matching the filter.

  • batch_size (int, optional) – The maximum row count for scanned record batches.

  • batch_readahead (int, optional) – The number of batches to read ahead in a file.

  • fragment_readahead (int, optional) – The number of fragments/files to read ahead.

  • fragment_scan_options (pyarrow.dataset.FragmentScanOptions, optional) – Options specific to a particular scan and fragment type, by default None.

  • use_threads (bool, optional) – If enabled, maximum parallelism will be used, by default True.

  • memory_pool (pyarrow.dataset.MemoryPool, optional) – For memory allocations, if required. By default, uses the default pool.

Returns:

An iterator of record batches.

Return type:

Iterator[pyarrow.RecordBatch]

to_table(self, columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)#

Read the dataset to an Arrow table.

Note that this method reads all the selected data from the dataset into memory.

Parameters:
  • columns (list of str, default None) –

    The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary with {new_column_name: expression} values for more advanced projections.

    The list of columns or expressions may use the special fields __batch_index (the index of the batch within the fragment), __fragment_index (the index of the fragment within the dataset), __last_in_fragment (whether the batch is last in fragment), and __filename (the name of the source file or a description of the source fragment).

    The columns will be passed down to Datasets and corresponding data fragments to avoid loading, copying, and deserializing columns that will not be required further down the compute chain. By default all of the available columns are projected. Raises an exception if any of the referenced column names does not exist in the dataset’s Schema.

  • filter (Expression, default None) – Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them.

  • batch_size (int, default 131_072) – The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size.

  • batch_readahead (int, default 16) – The number of batches to read ahead in a file. This might not work for all file formats. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_readahead (int, default 4) – The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization.

  • fragment_scan_options (FragmentScanOptions, default None) – Options specific to a particular scan and fragment type, which can change between different scans of the same dataset.

  • use_threads (bool, default True) – If enabled, then maximum parallelism will be used determined by the number of available CPU cores.

  • cache_metadata (bool, default True) – If enabled, metadata may be cached when scanning to speed up repeated scans.

  • memory_pool (MemoryPool, default None) – For memory allocations, if required. If not specified, uses the default pool.

Returns:

table

Return type:

Table