Module chronicle::queue

Expand description

Chronicle Queue

Chronicle Queue is a “record everything” message queue providing sub microsecond write-to-read latencies. It can be used for rapid Interprocess Communication (IPC), and is tailored to the transfer and storage of large amounts of data in low-latency environments, where rapid recording and sharing of information is essential, eg High Frequency Trading systems.

Chronicle Queue is designed from the ground up to provide low latencies, and demonstrates extremely well controlled outlier performance. Example write-to-read latencies for 256 byte messages written at 100,000 msgs/s on a commodity Ryzen 9 7950X are (all times in nanoseconds):

50%90%97%99%99.7%99.9%99.97%99.99%99.997%99.999%
Write-to-Read Latency (ns)241261271281311331461490501521

Chronicle Queues are backed by memory-mapped files (either physical or tmpfs), with regions of the file mapped on demand according to the configured block size. There are some important things to keep in mind when selecting the block size, as described more fully in the queue::SingleChronicleQueueBuilder::blocksize section. A good starting point for typical use cases is to select a block size of 256MB.

Getting Started

Chronicle Queue is provided as a tar bundle which includes two crates:

  • lib: the Queue library
  • tests: a set of demos and examples to illustrate syntax and recommended use

The library core is pre-packaged and does not need any additional actions.

To build and run the tests:

$> cd chronicle-queue-rust/tests
$> export CHRONICLE_KEY=<see below>
$> CHRONICLE_CPUSET=<see below> cargo test --release -- --nocapture --test-threads=1

Chronicle Queue requires a valid license key to be set in the CHRONICLE_KEY environment variable. If this has not already been provided to you please contact sales@chronicle.software.

The CHRONICLE_CPUSET environment variable can be used to indicate reserved core ranges (eg if some cores have been isolated using Chronicle Tune, isolcpus, cgroups or similar). For example, if cores 0-2 are for general purpose use, and cores 3-7 are reserved, then setting:

CHRONICLE_CPUSET=0-2:3-7

will help ensure appropriate thread placement. See [Affinity] for more details.

Note: The bundled crates use relative paths between tests and lib. If the library crate is moved relative to the tests crate please be sure to update tests/build.rs and tests/Cargo.toml accordingly.

Hello World with Chronicle Queue

Once the supplied tar bundle has been unpacked the following short example can be used to create a first queueu and try appending and reading messages from it. This demo is also included in the tests create as hello_world.rs.

Creating a New Queue

A new Chronicle Queue is obtained using a SingleChronicleQueueBuilder by passing the path to the directory where the queue files are to be stored.

let queue = queue::SingleChronicleQueueBuilder::new("hello-world");

The queue files will reside in the directory hello-world and are named with the current date using the following format:

./hello-world/{date}.cq4

In addition, a .cq4t metastore file is created. This file contains various auxiliary data about the queue and is kept separate from the actual queue data files.

Writing to a Queue

Messages are written, or appended, to a queue using an ExcerptAppender. The appender always writes messages to the end of the queue; there is no way to insert messages at specific locations or offsets within a queue.

Once the appender has been obtained, a string can be written as follows:

let appender = queue.acquire_appender().unwrap();
appender.write_string("Hello World!");

Reading from a Queue

Messages are read from the queue using an ExcerptTailer. The term “tailing” stems from reading from the queue’s tail - ie the end of the queue - which is the default read mode. However, tailers can be repositioned arbitrarily anywhere from the start to the end of the queue as required. For more details see ExcerptTailer.

let tailer = queue.create_tailer().unwrap();
let msg = tailer.read_string();

Cleaning Up

The queue handle obtained from SingleChronicleQueueBuilder is designed for RAII-style use and automatically cleans up resources when the handle goes out of scope.

Putting It All Together

Putting these steps together, the complete example is as follows:

use chronicle::*;

#[test]
fn test_hello_world() {
    let queue = queue::SingleChronicleQueueBuilder::new("hello-world").build().unwrap();

    let appender = queue.acquire_appender().unwrap();
    appender.write_string("Hello World!");

    let tailer = queue.create_tailer().unwrap();
    assert!( "Hello World!" == tailer.read_string() );

    // queue automatically closed and resources released when queue is dropped
}

Configuration

In addition to creating the Queue handle, SingleChronicleQueueBuilder can be used to set some configuration items for a queue.

These are discussed in the following sections.

Configuration - Roll Cycle

A Chronicle Queue is a logical view of a directory on the file system. The queue data is split across multiple files, each of which contains data belonging to a single cycle. The length of the cycle is determined by the rollcycle parameter passed to the queue builder.

Example options are:

  • RollCycle::Daily: events stored in the queue will be grouped into 24-hour periods
  • RollCycle::Hourly: a new queue file will be created every hour

As new files are created (the queue is rolled) to accommodate events being written to the queue, a persisted data-structure (the .cq4t file) is updated with the lowest and highest cycle numbers present in the directory. Maintaining this table allows an ExcerptTailer to busy-spin waiting for new data to be appended to the queue, without the need to make costly calls to the file system to check for the existence of new queue files.

The interval for rolling is configured using the queue’s rollcycle attribute, for example:

let queue = queue::SingleChronicleQueueBuilder::new("example")
                .rollcycle(RollCycle::Daily)
                .build()
                .unwrap();

The roll cycle is fixed once the first data is appended to the queue. If an attempt is made to open an existing queue with a different rollcycle passed to the SingleChronicleQueueBuilder then the queue’s existing roll cycle will take priority and be used instead. (A corresponding warning message will also be logged).

The default roll cycle is FastDaily. See RollCycle for a complete list of available roll cycles.

Note: file rolling is a relatively expensive operation and it is recommended to schedule rolling during quiet periods wherever possible.

Configuration - Block Size

Chronicle Queue content is backed by memory mapped files, and Queue maps a series of segments on demand rather than attempting to map the whole file at once. The blocksize configures the size of each individually mapped segment (in bytes).

Memory mapping a new segment is a relatively expensive operation, so larger segment sizes are generally favoured in order to keep mapping reasonably infrequent. That said, very large segments can put pressure on memory resources. A blocksize of around 256MB is generally a good compromise, although for systems with large amounts of memory multi-GB segments can be a sensible choice in some cases.

Note: The largest singe message which is guaranteed to be written to a segment is 25% of the segment size. This is rarely a practical issue, but where very large individual messages are written to a Queue, the blocksize should be at least 4x the largest expected message size

The default block size is 64MB.

Using Queue

This section describes how to perform the most common Chronicle Queue operations. All data is stored in queue as a sequence of bytes, and a couple of different options are available from direct reading and writing of raw bytes, through to more structured formats which carry additional embedded controls for greater flexibility (eg to relax requirements for binary compatibility between different reader and writer versions).

Before continuing, it is briefly mentioned that Chronicle refers to the act of writing as “appending”, and reading as “tailing”. Alongside this, the messages written to a queue are referred to as “excerpts”. Following from this terminology, writing to a queue is done using an ExcerptAppender, and reading from a queue is performed using an ExcerptTailer.

Using Queue - Appending

Data is appended (written) to the end of a Chronicle Queue using an ExcerptAppender. For any given appender, messages are guaranteed to be written to the queue in the order the appender sent them. Multiple appenders can be in use at the same time with the same queue, but there are no guarantees about the order in which messages from different appenders are interleaved: only that messages for one appender will always be written in order.

An appender is acquired from a queue handle as follows:

let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
let appender = queue.acquire_appender().unwrap();

Once an appender is obtained there are both high-level and low-level options for writing the actual message content.

The high-level API methods are (see the links for full details):

These each write the corresponding slice or string as a single atomic update to the queue, and fully abstract away the details of writing content (cf below). The methods are just as efficient as the more explicit calls discussed below; their only limitation is that the data to be written must be fully formed by the time the call is made. In cases where this limitation is not intrusive these calls can be preferred for their syntactic simplicity.

The lower level API uses a DocumentContext to formally bracket a series of updates to the queue content. The document context ensures the series of updates are applied atomically to the queue when the context is completed (dropped). Alternatively, a context can be rolled back to discard any changes when the context is dropped.

Note: A write lock is held for the duration of a document context’s scope. The overhead of actions performed with an open context should be minimised where possible to avoid contention with other writes. Importantly, reads from the queue do not use any locks, nor do they cause any contention with writes.

Within a document context, a handle to a Wire or Bytes is required to modify the sequence of bytes (the difference being that while Bytes manipulate the raw byte content, Wire embeds additional controls within the data which makes it more flexible - for example across machine architectures, or different versions of readers and writers (eg with differing struct layouts).

All data written to a Queue has a unique, strictly increasing index which can be used by tailers to lookup specific items for example.

let appender = queue.acquire_appender().unwrap();
{
    let context = appender.scoped_writing_document();
    context.wire().value_out().string("data");
    println!("data was written to index {}", context.index());
}

Optionally, data can be written to the message with an associated label. An important benefit of this approach is the data can be read back (via labels) in any order:

let appender = queue.acquire_appender().unwrap();
{
    let context = appender.scoped_writing_document();
    context.wire().write("key1").string("value1");
    context.wire().write("key2").string("value2");
}

The above label mechanism can be extended further to support fully self-describing messages which are completely flexible - for example to remove constraints on data schema changes.

The syntax chains two closure-based calls, each taking a Wire reference. For example, to write a self-describing trade object to a document:

let appender = queue.acquire_appender().unwrap();
{
    appender.write_document(|wire| {
        wire.write("trade").marshallable(|w| {
            w.write("symbol").string("EURUSD")
             .write("price").float64(1.234)
             .write("quantity").float64(15e6)
             // ...
        });
    });
}

Messages can also be written without labels, where now the position is the only indication as to the meaning of values. This is less flexible but faster than the above:

let appender = queue.acquire_appender().unwrap();
{
    appender.write_document(|w| {
        w.value_out().int32(1234)
         .value_out().int64(5678)
         .value_out().string("data");
    });
}

The above example used a Wire-based closure, which supports flexible byte-level output (by including some additional control data). Raw byte access via Bytes is also available, this time using a Bytes-based closure:

let appender = queue.acquire_appender().unwrap();
{
    appender.write_bytes(|b| {
        b.write_i8(123)
         .write_f32(4.56)
         .write_string("data");
    });
}

Finally, the lowest-level, least flexible, but most efficient option for appending data to a queue is to reserve a region of bytes which can then be manipulated directly using a &mut [u8] slice.

let mut appender = queue.acquire_appender().unwrap();
let mut bytes = appender.scoped_bytes_writer(MSGSIZE);
let slice = bytes.as_mut().as_slice();

// write directly to the &mut [u8] slice

See ExcerptAppender::reserve_bytes and BytesWriter for further details.

Using Queue - Tailing

Reading data from the queue follows a similar pattern as writing. Reads from queue are non-blocking, and in cases where there is no new data to read either an empty result is returned, or the corresponding read document context returns false for the is_present check (see further below).

Messages are passively viewed when read (as opposed to being removed from the queue). Each tailer will see each message, and also the same message can be re-read by a given tailer where this is required.

Data is read from a Chronicle Queue using an ExcerptTailer. As new tailer will attach to the end of a queue by default, but can be repositioned anywhere along the queue any number of times as needed.

A tailer is obtained from a queue handle as follows:

let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
let tailer = queue.create_tailer().unwrap();

Once a tailer is obtained there are both high-level and low-level options for reading the actual content, in direct analogue to appending.

The high-level API methods are (see the links for full details):

As with the corresponding appender calls, these each read a slice or string as a single atomic read from the queue, and fully abstract away tje details of reading content (cf below). The methods are just as efficient as the more explicit calls discussed below, and can be preferred for their syntactic simplicity when available.

The lower level API uses a DocumentContext to formally bracket a series of reads from the content. A read context can optionally be rolled back to undo a read (ie the tailer state and position will be unchanged) when the context is dropped.

Note: Reading from a queue is an entirely passive action: no locks are held, and there is no contention or back-pressure with other readers or writers. Importantly the performance of any one reader has no impact on any other part of the system - so for example if a reader is slow or stalled for any reason, there is no impact on other readers or writers.

The following examples directly complement those given in the appending section above (in the same order).

To read from a queue, and obtain the message’s index;

let tailer = queue.create_tailer().unwrap();
{
    let context = tailer.scoped_reading_document();
    let str = context.wire().value_in().string();
    println!("data read at index {}", context.index());
}

Optionally, data can be retrieved via labels. Note this example demonstrates the flexibility to read labelled data back in any order (cf the appending code):

let tailer = queue.create_tailer().unwrap();
{
    let context = tailer.scoped_reading_document();
    let str1 = context.wire().read("key2").string();
    let str2 = context.wire().read("key1").string();
}

The label mechanism can be extended to read fully self-describing messages.

The syntax chains two closure-based calls, each taking a [Wire] reference. For example, to read back the self-describing trade object from the earlier append example:

let tailer = queue.create_tailer().unwrap();
{
    tailer.read_document(|wire| {
        wire.read("trade").marshallable(|w| {
            let symbol = w.read("symbol").string();
            let price = w.read("price").float64();
            let quantity = w.read("quantity").float64();
            // ...
        });
    });

Messages can be read without labels, where now the position is the only indication as to the meaning of values:

let tailer = queue.create_tailer().unwrap();
{
    tailer.read_document(|w| {
        let in = w.value_in();
        let num1 = in.int32();
        let num2 = in.int64();
        let str = in.string();
    });
}

The above example used a Wire-based closure, which supports flexible byte-level input (by including some additional control data). Raw byte access via Bytes is also available, this time using a Bytes-based closure:

let tailer = queue.create_tailer().unwrap();
{
    tailer.read_bytes(|b| {
        let code = b.read_i8();
        let num = b.read_i32();
        let str = b.read_string();
    });
}

Finally, the lowest-level option for reading data from a queue is to obtain a slice over a region which can then be read directly:

let bytes = tailer.scoped_bytes_reader();
if !bytes.is_empty() {
    let slice = bytes.as_slice();
}

Regardless of the approach used to read data from a queue, each tailer sees every message.

Using Queue - Tailer Directions

The default behaviour of a tailer is to advance to the next (higher) message after each read. The ExcerptTailer::direction method allows the direction of movement to be controlled using the following options:

  • None: don’t advance the read position
  • Forward: advance the read position forwards to the next (higher) message
  • Backward: advance the read position backwards to the previous (lower) message

The direction of movement can be altered at any time while a Tailer is in use.

Using Queue - Positioning Tailers

The default behaviour of a tailer is to attach to the first available message in a queue. If queue files have rolled, this means a tailer may attach to an older file. In addition, if some of the older files have been removed after rolling then the tailer will attach to the oldest available cycle.

In some applications it may be preferred to start reading only new messages from the end of a queue when attaching a tailer. For this the ExcerptTailer provides the to_end() method, which positions the tailer at the end of the queue (ie as if the last message had just been read). A new reading document will only set the is_present flag once a new message has been added by an appender:

// this will be false until a new message is appended to the queue
let available = tailer.to_end().scoped_reading_document().is_present();

If a Tailer has been set to use a Backward direction then “start” and “end” still refer to the usual positions (it’s only the movement between reads which changes). Specifically a to_end() will place the tailer at the end of the queue such that the next read returns the last message in the queue (then advance to the last-but-1 etc). A ’to_start()` will place the tailer at the start such that the next read will return the first message in the queue. After that subsequent reads will see no new messages (as now pointing before the start of the queue).

In addition to the start and end, a tailer may be explicitly positioned at any valid index (see move_to_index. Once repositioned, the next read by the tailer will return the message at the specified position.

Using Queue - Named (or Restartable) Tailers

A basic tailer only maintains state while in scope. This means if a tailer is dropped for any reason then when re-attached it will start again from the beginning of the queue (unless explicitly repositioned).

It can be useful to have a type of tailer which retains a memory of where it last read, and so will resume from where it left off when restarted. These are referred to as “restartable” or “named” tailers, and aside from maintaining position state across restarts are used exactly the same as basic tailers.

// add some data to a queue
{
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
    {
        let appender = queue.acquire_appender().unwrap();

        for i in 0..10 {
            let mut str = String::new();
            write!(&mut str, "test {}", i).unwrap();
            appender.write_string(&str);
        }
    }
}

// read part of the data, using two named tailers up to different points
{
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
    {
        let tailer1 = queue.create_named_tailer("tailer1");
        assert!( "test 0" == tailer1.read_string() );
        assert!( "test 1" == tailer1.read_string() );
        assert!( "test 2" == tailer1.read_string() );

        let tailer2 = queue.create_named_tailer("tailer2");
        assert!( "test 0" == tailer2.read_string() );
    }
}

// resume reading after dropping the tailers
// as these are named tailers they will each resume from their last read position
{
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
    {
        let tailer1 = queue.create_named_tailer("tailer1");
        assert!( "test 3" == tailer1.read_string() );
        assert!( "test 4" == tailer1.read_string() );
        assert!( "test 5" == tailer1.read_string() );

        let tailer2 = queue.create_named_tailer("tailer2");
        assert!( "test 1" == tailer2.read_string() );
    }
}

A named tailer’s index is maintained regardless of how the tailer’s position is moved around the queue - eg manually jumping to given positions. Simply, the tailer resumes from whichever message it read last.

Note: A tailer’s position is only advanced on normal closure of a reading DocumentContext. If rollback_on_close is called on a reading DocumentContext prior to it being dropped then the read position is not changed (so a subsequent read attempt will re-read the same message)

Using Queue - Error Handling

The error-handling philosophy of Chronicle Queue adheres closely to the Rust approach whereby ordinary, recoverable errors are handled using the Result type, whereas anything which significantly departs from any reasonable expected behaviour and which either cannot be meaningfully recovered and/or indicates a criticial low-level abnormality or corruption will result in a Panic.

Examples of an error which would be covered by Result include an invalid directory for a queue, incorrect permissions for accessing a queue, insufficient disk to open a queue etc. Examples of errors which would result in a panic would be corrupted/unrecoverable queue data, invalid memory access and similar.

Queue uses Result in a handful of places, mostly early on in a queue lifecycle: creating or opening a queue, attaching appenders, creating tailers etc.

Once the queue environment is established, appenders and tailers attached etc, then the vast majority of “soft” error cases (eg requesting new data when none is available) are covered by safe, empty, null-type return values, or no-op actions, without needing the more formal Result machinery.

Any more severe error at this point in a queue lifecycle would only arise in a highly abnormal situation, for which a Panic response is appropriate.

Structs

A BytesReader enables a reader to directly access bytes within a reserved section written by a BytesWriter. In some cases this can result in reduced read latency by eliminating a copy. It also enables Queue to be directly integrated with components which read directly from a region of memory.
A BytesWriter enables a writer to directly access mutable bytes within a reserved section with explicit size at the current end of a Queue. In some cases this can result in reduced write latency by eliminating a copy. It also enables Queue to be directly integrated with components which write directly to memory.
A DocumentContext encapulates a single, atomic message in Queue, with a read/write context handle obtained from an ExcerptTailer or ExcerptAppender respectively.
Data written to a Queue is always appended to the end; by construction Queue does not permit data to be changed once it has been committed. For this reason we interchangeably refer to “writers” as “appenders”.
Once data is written to a Queue it is persisted and cannot be changed. Alongside this, readers of messages view rather than consume content: any number of readers can read each message; and where appropriate any one reader can re-read the same message multiple times. To make it explicit that readers do not consume content we interchangeably refer to “readers” as “tailers”.
A Chronicle Queue is a logical view of a directory on the file system. The queue data is split across multiple files, each of which contains data belonging to a single cycle. The length of the cycle is determined by the rollcycle parameter passed to the queue builder.
A SingleChronicleQueue instance provides a handle to a Chronicle Queue, which in turn represents a single, ordered, logical queue of persisted data. A single logical queue may be backed by multiple memory-mapped files depending on the associated RollCycle (eg use a new file for each day). The SingleChronicleQueue handle abstracts the physical layout of the queue, segmentation, rolling etc.
The SingleChronicleQueueBuilder is a convenience type used to assemble a SingleChronicleQueue handle.
The default action for a tailer after reading a message is to advance forwards to the next (higher) message. The TailerDirection allows the direction of movement to be controlled, and the options are:
The ValueIn type provides a normalised interface for reading content from a Chronicle Wire, which in turn abstracts underlying binary resources such as Queue documents.
The ValueOut type provides a normalised interface for writing content to a Chronicle Wire, which in turn abstracts underlying binary resources such as Queue documents.
Wire is a Chronicle abstraction over binary resources such as Queue. There are two access patterns depending on user preferences: