Expand description
Chronicle Queue
Chronicle Queue is a “record everything” message queue providing sub microsecond write-to-read latencies. It can be used for rapid Interprocess Communication (IPC), and is tailored to the transfer and storage of large amounts of data in low-latency environments, where rapid recording and sharing of information is essential, eg High Frequency Trading systems.
Chronicle Queue is designed from the ground up to provide low latencies, and demonstrates extremely well controlled outlier performance. Example write-to-read latencies for 256 byte messages written at 100,000 msgs/s on a commodity Ryzen 9 7950X are (all times in nanoseconds):
50% | 90% | 97% | 99% | 99.7% | 99.9% | 99.97% | 99.99% | 99.997% | 99.999% | |
---|---|---|---|---|---|---|---|---|---|---|
Write-to-Read Latency (ns) | 241 | 261 | 271 | 281 | 311 | 331 | 461 | 490 | 501 | 521 |
Chronicle Queues are backed by memory-mapped files (either physical or tmpfs), with regions of the file
mapped on demand according to the configured block size. There are some important things to keep in
mind when selecting the block size, as described more fully in the queue::SingleChronicleQueueBuilder::blocksize
section. A good starting point for typical use cases is to select a block size of 256MB.
Getting Started
Chronicle Queue is provided as a tar bundle which includes two crates:
lib
: the Queue librarytests
: a set of demos and examples to illustrate syntax and recommended use
The library core is pre-packaged and does not need any additional actions.
To build and run the tests:
$> cd chronicle-queue-rust/tests
$> export CHRONICLE_KEY=<see below>
$> CHRONICLE_CPUSET=<see below> cargo test --release -- --nocapture --test-threads=1
Chronicle Queue requires a valid license key to be set in the CHRONICLE_KEY
environment variable.
If this has not already been provided to you please contact
sales@chronicle.software.
The CHRONICLE_CPUSET
environment variable can be used to indicate reserved core ranges
(eg if some cores have been isolated using Chronicle Tune, isolcpus, cgroups or similar). For
example, if cores 0-2 are for general purpose use, and cores 3-7 are reserved, then setting:
CHRONICLE_CPUSET=0-2:3-7
will help ensure appropriate thread placement. See [Affinity
] for more details.
Note: The bundled crates use relative paths between tests
and lib
. If the library
crate is moved relative to the tests
crate please be sure to update tests/build.rs
and
tests/Cargo.toml
accordingly.
Hello World with Chronicle Queue
Once the supplied tar bundle has been unpacked the following short example can be used
to create a first queueu and try appending and reading messages from it. This demo is
also included in the tests
create as hello_world.rs
.
Creating a New Queue
A new Chronicle Queue is obtained using a SingleChronicleQueueBuilder
by passing the path to the directory where the queue files are to be stored.
let queue = queue::SingleChronicleQueueBuilder::new("hello-world");
The queue files will reside in the directory hello-world
and are named with the
current date using the following format:
./hello-world/{date}.cq4
In addition, a .cq4t
metastore file is created. This file contains various auxiliary
data about the queue and is kept separate from the actual queue data files.
Writing to a Queue
Messages are written, or appended, to a queue using an ExcerptAppender
.
The appender always writes messages to the end of the queue; there is no way to insert
messages at specific locations or offsets within a queue.
Once the appender has been obtained, a string can be written as follows:
let appender = queue.acquire_appender().unwrap();
appender.write_string("Hello World!");
Reading from a Queue
Messages are read from the queue using an ExcerptTailer
. The term
“tailing” stems from reading from the queue’s tail - ie the end of the queue - which is the
default read mode. However, tailers can be repositioned arbitrarily anywhere from the start
to the end of the queue as required. For more details see ExcerptTailer
.
let tailer = queue.create_tailer().unwrap();
let msg = tailer.read_string();
Cleaning Up
The queue
handle obtained from SingleChronicleQueueBuilder
is designed for RAII-style
use and automatically cleans up resources when the handle goes out of scope.
Putting It All Together
Putting these steps together, the complete example is as follows:
use chronicle::*;
#[test]
fn test_hello_world() {
let queue = queue::SingleChronicleQueueBuilder::new("hello-world").build().unwrap();
let appender = queue.acquire_appender().unwrap();
appender.write_string("Hello World!");
let tailer = queue.create_tailer().unwrap();
assert!( "Hello World!" == tailer.read_string() );
// queue automatically closed and resources released when queue is dropped
}
Configuration
In addition to creating the Queue handle, SingleChronicleQueueBuilder
can be used to set some configuration items for a queue.
These are discussed in the following sections.
Configuration - Roll Cycle
A Chronicle Queue is a logical view of a directory on the file system. The queue data is
split across multiple files, each of which contains data belonging to a single cycle
.
The length of the cycle is determined by the rollcycle
parameter passed to the queue builder.
Example options are:
RollCycle::Daily
: events stored in the queue will be grouped into 24-hour periodsRollCycle::Hourly
: a new queue file will be created every hour
As new files are created (the queue is rolled) to accommodate events being written to the
queue, a persisted data-structure (the .cq4t
file) is updated with the lowest and highest
cycle
numbers present in the directory. Maintaining this table allows an ExcerptTailer
to busy-spin waiting for new data to be appended to the queue, without the need to make
costly calls to the file system to check for the existence of new queue files.
The interval for rolling is configured using the queue’s rollcycle
attribute, for example:
let queue = queue::SingleChronicleQueueBuilder::new("example")
.rollcycle(RollCycle::Daily)
.build()
.unwrap();
The roll cycle is fixed once the first data is appended to the queue. If an attempt is made
to open an existing queue with a different rollcycle
passed to the SingleChronicleQueueBuilder
then the queue’s existing roll cycle will take priority and be used instead.
(A corresponding warning message will also be logged).
The default roll cycle is FastDaily
. See RollCycle
for a complete list of available roll cycles.
Note: file rolling is a relatively expensive operation and it is recommended to schedule rolling during quiet periods wherever possible.
Configuration - Block Size
Chronicle Queue content is backed by memory mapped files, and Queue maps a series of segments
on demand rather than attempting to map the whole file at once. The blocksize
configures the
size of each individually mapped segment (in bytes).
Memory mapping a new segment is a relatively expensive operation, so larger segment sizes are
generally favoured in order to keep mapping reasonably infrequent. That said, very large
segments can put pressure on memory resources. A blocksize
of around 256MB is generally
a good compromise, although for systems with large amounts of memory multi-GB segments
can be a sensible choice in some cases.
Note: The largest singe message which is guaranteed to be written to a segment is 25%
of the segment size. This is rarely a practical issue, but where very large individual messages
are written to a Queue, the blocksize
should be at least 4x the largest expected message size
The default block size is 64MB.
Using Queue
This section describes how to perform the most common Chronicle Queue operations. All data is stored in queue as a sequence of bytes, and a couple of different options are available from direct reading and writing of raw bytes, through to more structured formats which carry additional embedded controls for greater flexibility (eg to relax requirements for binary compatibility between different reader and writer versions).
Before continuing, it is briefly mentioned that Chronicle refers to the act of writing as “appending”,
and reading as “tailing”. Alongside this, the messages written to a queue are referred to as
“excerpts”. Following from this terminology, writing to a queue is done using an
ExcerptAppender
, and reading from a queue is performed using an
ExcerptTailer
.
Using Queue - Appending
Data is appended (written) to the end of a Chronicle Queue using an
ExcerptAppender
. For any given appender, messages are guaranteed
to be written to the queue in the order the appender sent them. Multiple appenders can be
in use at the same time with the same queue, but there are no guarantees about the order in
which messages from different appenders are interleaved: only that messages for one appender
will always be written in order.
An appender is acquired from a queue handle as follows:
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
let appender = queue.acquire_appender().unwrap();
Once an appender is obtained there are both high-level and low-level options for writing the actual message content.
The high-level API methods are (see the links for full details):
These each write the corresponding slice or string as a single atomic update to the queue, and fully abstract away the details of writing content (cf below). The methods are just as efficient as the more explicit calls discussed below; their only limitation is that the data to be written must be fully formed by the time the call is made. In cases where this limitation is not intrusive these calls can be preferred for their syntactic simplicity.
The lower level API uses a DocumentContext
to formally bracket
a series of updates to the queue content. The document context ensures the series of updates
are applied atomically to the queue when the context is completed (dropped). Alternatively, a context
can be rolled back to discard any changes when
the context is dropped.
Note: A write lock is held for the duration of a document context’s scope. The overhead of actions performed with an open context should be minimised where possible to avoid contention with other writes. Importantly, reads from the queue do not use any locks, nor do they cause any contention with writes.
Within a document context, a handle to a Wire
or Bytes
is required to modify the
sequence of bytes (the difference being that while Bytes
manipulate the raw byte content,
Wire
embeds additional controls within the data which makes it more flexible - for example
across machine architectures, or different versions of readers and writers (eg with differing
struct layouts).
All data written to a Queue has a unique, strictly increasing index
which can be used by
tailers to lookup specific items for example.
let appender = queue.acquire_appender().unwrap();
{
let context = appender.scoped_writing_document();
context.wire().value_out().string("data");
println!("data was written to index {}", context.index());
}
Optionally, data can be written to the message with an associated label. An important benefit of this approach is the data can be read back (via labels) in any order:
let appender = queue.acquire_appender().unwrap();
{
let context = appender.scoped_writing_document();
context.wire().write("key1").string("value1");
context.wire().write("key2").string("value2");
}
The above label mechanism can be extended further to support fully self-describing messages which are completely flexible - for example to remove constraints on data schema changes.
The syntax chains two closure-based calls, each taking a Wire
reference. For example,
to write a self-describing trade
object to a document:
let appender = queue.acquire_appender().unwrap();
{
appender.write_document(|wire| {
wire.write("trade").marshallable(|w| {
w.write("symbol").string("EURUSD")
.write("price").float64(1.234)
.write("quantity").float64(15e6)
// ...
});
});
}
Messages can also be written without labels, where now the position is the only indication as to the meaning of values. This is less flexible but faster than the above:
let appender = queue.acquire_appender().unwrap();
{
appender.write_document(|w| {
w.value_out().int32(1234)
.value_out().int64(5678)
.value_out().string("data");
});
}
The above example used a Wire
-based closure, which supports flexible byte-level output
(by including some additional control data). Raw byte access via Bytes
is
also available, this time using a Bytes
-based closure:
let appender = queue.acquire_appender().unwrap();
{
appender.write_bytes(|b| {
b.write_i8(123)
.write_f32(4.56)
.write_string("data");
});
}
Finally, the lowest-level, least flexible, but most efficient option for appending data to a
queue is to reserve a region of bytes which can then be manipulated directly using
a &mut [u8]
slice.
let mut appender = queue.acquire_appender().unwrap();
let mut bytes = appender.scoped_bytes_writer(MSGSIZE);
let slice = bytes.as_mut().as_slice();
// write directly to the &mut [u8] slice
See ExcerptAppender::reserve_bytes
and
BytesWriter
for further details.
Using Queue - Tailing
Reading data from the queue follows a similar pattern as writing. Reads from queue are
non-blocking, and in cases where there is no new data to read either an empty result is
returned, or the corresponding read document context returns false
for the is_present
check
(see further below).
Messages are passively viewed when read (as opposed to being removed from the queue). Each tailer will see each message, and also the same message can be re-read by a given tailer where this is required.
Data is read from a Chronicle Queue using an ExcerptTailer
. As new
tailer will attach to the end of a queue by default, but can be repositioned anywhere along the
queue any number of times as needed.
A tailer is obtained from a queue handle as follows:
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
let tailer = queue.create_tailer().unwrap();
Once a tailer is obtained there are both high-level and low-level options for reading the actual content, in direct analogue to appending.
The high-level API methods are (see the links for full details):
As with the corresponding appender calls, these each read a slice or string as a single atomic read from the queue, and fully abstract away tje details of reading content (cf below). The methods are just as efficient as the more explicit calls discussed below, and can be preferred for their syntactic simplicity when available.
The lower level API uses a DocumentContext
to formally bracket a
series of reads from the content. A read context can optionally be
rolled back to undo a read (ie the tailer state
and position will be unchanged) when the context is dropped.
Note: Reading from a queue is an entirely passive action: no locks are held, and there is no contention or back-pressure with other readers or writers. Importantly the performance of any one reader has no impact on any other part of the system - so for example if a reader is slow or stalled for any reason, there is no impact on other readers or writers.
The following examples directly complement those given in the appending section above (in the same order).
To read from a queue, and obtain the message’s index;
let tailer = queue.create_tailer().unwrap();
{
let context = tailer.scoped_reading_document();
let str = context.wire().value_in().string();
println!("data read at index {}", context.index());
}
Optionally, data can be retrieved via labels. Note this example demonstrates the flexibility to read labelled data back in any order (cf the appending code):
let tailer = queue.create_tailer().unwrap();
{
let context = tailer.scoped_reading_document();
let str1 = context.wire().read("key2").string();
let str2 = context.wire().read("key1").string();
}
The label mechanism can be extended to read fully self-describing messages.
The syntax chains two closure-based calls, each taking a [Wire
] reference. For example, to
read back the self-describing trade
object from the earlier append example:
let tailer = queue.create_tailer().unwrap();
{
tailer.read_document(|wire| {
wire.read("trade").marshallable(|w| {
let symbol = w.read("symbol").string();
let price = w.read("price").float64();
let quantity = w.read("quantity").float64();
// ...
});
});
Messages can be read without labels, where now the position is the only indication as to the meaning of values:
let tailer = queue.create_tailer().unwrap();
{
tailer.read_document(|w| {
let in = w.value_in();
let num1 = in.int32();
let num2 = in.int64();
let str = in.string();
});
}
The above example used a Wire
-based closure, which supports flexible byte-level input
(by including some additional control data). Raw byte access via Bytes
is
also available, this time using a Bytes
-based closure:
let tailer = queue.create_tailer().unwrap();
{
tailer.read_bytes(|b| {
let code = b.read_i8();
let num = b.read_i32();
let str = b.read_string();
});
}
Finally, the lowest-level option for reading data from a queue is to obtain a slice over a region which can then be read directly:
let bytes = tailer.scoped_bytes_reader();
if !bytes.is_empty() {
let slice = bytes.as_slice();
}
Regardless of the approach used to read data from a queue, each tailer sees every message.
Using Queue - Tailer Directions
The default behaviour of a tailer is to advance to the next (higher) message after each read.
The ExcerptTailer::direction
method allows the direction
of movement to be controlled using the following options:
None
: don’t advance the read positionForward
: advance the read position forwards to the next (higher) messageBackward
: advance the read position backwards to the previous (lower) message
The direction of movement can be altered at any time while a Tailer is in use.
Using Queue - Positioning Tailers
The default behaviour of a tailer is to attach to the first available message in a queue. If queue files have rolled, this means a tailer may attach to an older file. In addition, if some of the older files have been removed after rolling then the tailer will attach to the oldest available cycle.
In some applications it may be preferred to start reading only new messages from the end
of a queue when attaching a tailer. For this the ExcerptTailer
provides the
to_end()
method, which positions the tailer at the end of
the queue (ie as if the last message had just been read). A new reading document will only
set the is_present
flag once a new message has been added
by an appender:
// this will be false until a new message is appended to the queue
let available = tailer.to_end().scoped_reading_document().is_present();
If a Tailer has been set to use a Backward
direction then
“start” and “end” still refer to the usual positions (it’s only the movement between reads which
changes). Specifically a to_end()
will place the tailer at the end of the queue
such that the next read returns the last message in the queue (then advance to the last-but-1 etc).
A ’to_start()` will place the tailer at the start such that
the next read will return the first message in the queue. After that subsequent reads will see
no new messages (as now pointing before the start of the queue).
In addition to the start and end, a tailer
may be explicitly positioned at any valid
index (see move_to_index
. Once repositioned,
the next read by the tailer
will return the message at the specified position.
Using Queue - Named (or Restartable) Tailers
A basic tailer only maintains state while in scope. This means if a tailer is dropped for any reason then when re-attached it will start again from the beginning of the queue (unless explicitly repositioned).
It can be useful to have a type of tailer which retains a memory of where it last read, and so will resume from where it left off when restarted. These are referred to as “restartable” or “named” tailers, and aside from maintaining position state across restarts are used exactly the same as basic tailers.
// add some data to a queue
{
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
{
let appender = queue.acquire_appender().unwrap();
for i in 0..10 {
let mut str = String::new();
write!(&mut str, "test {}", i).unwrap();
appender.write_string(&str);
}
}
}
// read part of the data, using two named tailers up to different points
{
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
{
let tailer1 = queue.create_named_tailer("tailer1");
assert!( "test 0" == tailer1.read_string() );
assert!( "test 1" == tailer1.read_string() );
assert!( "test 2" == tailer1.read_string() );
let tailer2 = queue.create_named_tailer("tailer2");
assert!( "test 0" == tailer2.read_string() );
}
}
// resume reading after dropping the tailers
// as these are named tailers they will each resume from their last read position
{
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
{
let tailer1 = queue.create_named_tailer("tailer1");
assert!( "test 3" == tailer1.read_string() );
assert!( "test 4" == tailer1.read_string() );
assert!( "test 5" == tailer1.read_string() );
let tailer2 = queue.create_named_tailer("tailer2");
assert!( "test 1" == tailer2.read_string() );
}
}
A named tailer’s index is maintained regardless of how the tailer’s position is moved around the queue - eg manually jumping to given positions. Simply, the tailer resumes from whichever message it read last.
Note: A tailer’s position is only advanced on normal closure of a reading DocumentContext
.
If rollback_on_close
is called on a reading
DocumentContext
prior to it being dropped then the read position is not changed (so a
subsequent read attempt will re-read the same message)
Using Queue - Error Handling
The error-handling philosophy of Chronicle Queue adheres closely to the Rust approach whereby ordinary,
recoverable errors are handled using the Result
type, whereas anything which significantly departs from
any reasonable expected behaviour and which either cannot be meaningfully recovered and/or
indicates a criticial low-level abnormality or corruption will result in a Panic
.
Examples of an error which would be covered by Result
include an invalid directory for
a queue, incorrect permissions for accessing a queue, insufficient disk to open a queue etc.
Examples of errors which would result in a panic would be corrupted/unrecoverable queue
data, invalid memory access and similar.
Queue uses Result
in a handful of places, mostly early on in a queue lifecycle: creating or
opening a queue, attaching appenders, creating tailers etc.
Once the queue environment is established, appenders and tailers attached etc, then the vast
majority of “soft” error cases (eg requesting new data when none is available) are covered by
safe, empty, null-type return values, or no-op actions, without needing the more formal Result
machinery.
Any more severe error at this point in a queue lifecycle would only arise in a highly
abnormal situation, for which a Panic
response is appropriate.
Structs
BytesReader
enables a reader to directly access bytes within a reserved section written by a BytesWriter.
In some cases this can result in reduced read latency by eliminating a copy.
It also enables Queue to be directly integrated with components which read directly from a region of memory.BytesWriter
enables a writer to directly access mutable bytes within a reserved section with explicit size at the current end of a Queue.
In some cases this can result in reduced write latency by eliminating a copy.
It also enables Queue to be directly integrated with components which write directly to memory.DocumentContext
encapulates a single, atomic message in Queue, with a read/write context handle obtained
from an ExcerptTailer or ExcerptAppender respectively.cycle
.
The length of the cycle is determined by the rollcycle
parameter passed to the queue builder.SingleChronicleQueue
instance provides a handle to a Chronicle Queue, which in turn represents
a single, ordered, logical queue of persisted data. A single logical queue may be backed by multiple
memory-mapped files depending on the associated RollCycle
(eg use a new file for each day). The
SingleChronicleQueue
handle abstracts the physical layout of the queue, segmentation, rolling etc.SingleChronicleQueueBuilder
is a convenience type used to assemble a SingleChronicleQueue
handle.TailerDirection
allows the direction of movement to be controlled, and the options are:ValueIn
type provides a normalised interface for reading content from a Chronicle Wire
,
which in turn abstracts underlying binary resources such as Queue documents.ValueOut
type provides a normalised interface for writing content to a Chronicle Wire
,
which in turn abstracts underlying binary resources such as Queue documents.Wire
is a Chronicle abstraction over binary resources such as Queue.
There are two access patterns depending on user preferences: