How To

All uses of the library start with creating a Streamer object, or a Reader object.

Streamer

The Streamer is designed for reading data from the dataset in approximately (or optionally forced) sequential order.

import multitables
stream = multitables.Streamer(filename="/path/to/h5/file", **kw_args)

Additional flags to pytables’ open_file function can be passed through the optional keyword arguments.

Direct access

multitables allows low level access to the internal queue buffer. This access is synchronised with a guard object. When the guard object is created, an element of the buffer is reserved until the guard is released.

queue = stream.get_queue(
    path='/h5/path/', # Path to dataset within the H5file.
    n_procs=4,        # Number of processes to launch for parallel reads. Defaults to 4.
    read_ahead=5,     # Size of internal buffer in no. of blocks. Defaults to 2*n_proc+1.
    cyclic=False,     # A cyclic reader wraps at the end of the dataset. Defaults to False.
    block_size=32,    # Size (along the outer dimension) of the blocks that will be read.
                      # Defaults to a multiple of the dataset chunk size, or a 128KB block.
                      # Should be left to the default or carefully chosen for chunked arrays,
                      # else performance degradation can occur.
    ordered=False     # Force the stream to return blocks in on-disk order. Useful if two
                      # datasets need to be read synchronously. This option may have a
                      # performance penalty.
)

while True:
    guard = queue.get() # Get the guard object, will block until data is ready.
    if guard is multitables.QueueClosed:
        break # Terminate the loop once the dataset is finished.
    with guard as block: # The guard returns the next block of data in the buffer.
        do_something(block) # Perform actions on the data

Note that block here is a numpy reference to the internal buffer. Once the guard is released, block is no longer guaranteed to point to valid data. If the data need to be saved for later use, make a copy of it with block.copy().

Iterator

A convenience iterator is supplied to make loop termination easier.

for guard in queue.iter():
    with guard as block:
        do_something(block)

Remainder elements

In all the previous cases, if the supplied read_size does not evenly divide the dataset then the remainder elements will not be read. If needed, these remainder elements can be accessed using the following method

last_block = stream.get_remainder(path, queue.block_size)

Cyclic access

When the cyclic mode is enabled, the readers will wrap around the end of the dataset. The check for the end of the queue is no longer needed in this case.

while True:
    with queue.get() as block:
        do_something(block)

In cyclic access mode, the remainder elements are returned as part of a wrapped block that includes elements from the end and beginning of the dataset.

Once finished, the background processes can be stopped with queue.close().

Generator

The generator provides higher level access to the streamed data. Elements from the dataset are returned one row at a time. These rows belong to a copied array, so they can be safely stored for later use. The remainder elements are also included in this mode.

gen = stream.get_generator(path, n_procs, read_ahead, cyclic, block_size)

for row in gen:
    do_something_else(row)

This is supposed to be in analogy to

dataset = h5_file.get_node(path)

for row in dataset:
    do_something_else(row)

When cyclic mode is enabled, the generator has no end and will continue until the loop is manually broken.

Concurrent access

Python iterators and generators are not thread safe. The low level direct access interface is thread safe.

Reader

The Reader is designed for random access, using an interface that is as close as possible to numpy indexing operations.

import multitables
reader = multitables.Reader(filename="/path/to/h5/file", **kw_args)

Additional flags to pytables’ open_file function can be passed through the optional keyword arguments.

Dataset and stage

The basic workflow is to first open the desired dataset using the internal HDF5 path.

dataset = reader.get_dataset(path='/internal/h5/path')

Then, a stage must be created to host random access requests. This stage is an area of shared memory that is allocated and shared with the background reader processes. The result of all requests made with this stage must fit inside the allocated memory of the stage.

stage = dataset.create_stage(shape=10)

The provided shape parameter may be the full shape of the stage using the datatype of the dataset. Or, the shape may be left incomplete and the missing shape dimensions will be filled with the dataset shape. In this example, only the first dimension is specified, as so this stage has room for 10 rows of the dataset.

Requests

Requests happen through three operations. First, the description of a request is made through an indexing operation on the dataset.

req = dataset['col_A'][30:35]

Next, a future is made and a background task scheduled to fetch the requested data and load it into the provided stage.

future = reader.request(req, stage)

Finally, the future is waited upon using a get operation. Four types of get operations are provided. The first and simplest blocks on the task and returns a copy of the data.

data = request.get()

In the next type, a copy is avoided by providing a function that will be run with the data as the only argument. This get operation also blocks until the data is available and the provided function finishes.

def do_something(data):
    pass
data = request.get_direct(do_something)

The remaining two get operations use context managers to control access to the shared memory resource without creating a copy. The first is unsafe, in that if the resulting reference is not properly disposed of, memory errors may result.

with future.get_unsafe() as data:
    do_something(data)
data = None # Properly dispose of the reference

The final uses a wrapper object on the returned data, so that if the reference is not properly disposed of, an exception will be safely called.

with future.get_unsafe() as data:
    do_something(data)
data = None # Properly dispose of the reference

Cleaning up

Once finished, call the close method on the reader object.

reader.close(wait=True)

If the provided wait parameter is True, the close call will block until all background threads and processes have cleanly shut down.

Concurrent access pattern

The following is an example of launching and reading requests in separate threads. This uses the create_stage_pool method, that creates N_stages separate stages and places them in a rotating pool.

The RequestPool object is then used to create a queue of pending futures that returns futures in the same order that they are inserted.

N_stages = 10

stage_pool = dataset.create_stage_pool(1, N_stages)

reqs = multitables.RequestPool()

table_len = dataset.shape[0]
def loader():
    for idx in range(table_len):
        reqs.add(reader.request(dataset[idx:idx+1], stage_pool))

loader_thread = threading.Thread(target=loader)
loader_thread.start()

for idx in range(table_len):
    do_something(reqs.next().get())

reader.close(wait=True)