Struct chronicle::queue::ExcerptTailer
#[repr(C)]pub struct ExcerptTailer { /* private fields */ }
Expand description
Once data is written to a Queue it is persisted and cannot be changed. Alongside this, readers of messages view rather than consume content: any number of readers can read each message; and where appropriate any one reader can re-read the same message multiple times. To make it explicit that readers do not consume content we interchangeably refer to “readers” as “tailers”.
All reads from a Queue take place via an ExcerptTailer
, either using calls directly on the tailer,
or (more commonly) via a scoped read DocumentContext
obtained from the tailer.
Tailers will automatically attach to the start of a Queue, and increment their read position after each successful
read (ie one which returns a DocumentContext
which returns true
when checked with is_present()
, and which
is closed normally without being rolled back - see DocumentContext::rollback_on_close()
)
Each entry to a Queue has a unique, strictly increasing index
number, which is a combination of the cycle
number
and a sequence number within the cycle. Cycle
s in turn refer to the individual memory-mapped files backing a queue,
where again the cycle numbers increase on each file roll - see RollCycle
The position of the tailer can be changed and/or queried at any point using the following group of methods:
fn index(self:&ExcerptTailer)->i64;
fn last_read_index(self:&ExcerptTailer) -> i64;
fn cycle(self:&ExcerptTailer) -> i32;
fn move_to_index(self:&ExcerptTailer, index: i64) -> bool;
fn move_to_cycle(self:&ExcerptTailer, cycle: i32) -> bool;
fn to_start(self:&ExcerptTailer) -> SharedPtr<ExcerptTailer>;
fn to_end(self:&ExcerptTailer) -> SharedPtr<ExcerptTailer>;
Basic ExcerptTailer
s only maintains state while in scope. This means if a tailer is dropped for
any reason then when re-attached it will start again from the beginning of the queue (unless
explicitly repositioned).
It can be useful to have a type of tailer which retains a memory of where it last read, and so will resume from where it left off when restarted. These are referred to as “restartable” or “named” tailers, and aside from maintainting position state across restarts are used exactly the same as basic tailers.
Example
let queue = queue::SingleChronicleQueueBuilder::new("example").build().unwrap();
let appender = queue.acquire_appender().unwrap();
// ...
// append data
// ...
let tailer = queue.create_tailer().unwrap();
// the tailer can now be used to atomically read from the queue
// either using direct reads, eg:
let data = tailer.read_slice();
let str = tailer.read_string();
// Note: the direct calls return empty strings, slices etc if there is no message
// at the current read position (in which case the read position is not advanced)
// or via a scoped document context, eg:
{
let context = tailer.scoped_reading_document();
// contexts should always be checked for valid data
if context.is_present() {
let data = context.wire().value_in().string();
}
}
// A Named tailer will resume from its last read position when re-attached
// To create a Named Tailer use the following
let named_tailer = queue.create_named_tailer("name");
// Aside from the persisted read position, Named Tailers are used exactly the
// same as basic Tailers
Implementations§
§impl ExcerptTailer
impl ExcerptTailer
pub fn read_string(&self) -> String
pub fn read_string(&self) -> String
Read the current message as a string
Returns
The data at the current read position, parsed as a string; or an empty string if there is no message available
§impl ExcerptTailer
impl ExcerptTailer
pub fn read_slice(&self) -> &[u8] ⓘ
pub fn read_slice(&self) -> &[u8] ⓘ
§impl ExcerptTailer
impl ExcerptTailer
pub fn read_slice_direct(&self) -> &[u8] ⓘ
pub fn read_slice_direct(&self) -> &[u8] ⓘ
§impl ExcerptTailer
impl ExcerptTailer
§impl ExcerptTailer
impl ExcerptTailer
pub fn last_read_index(&self) -> i64
pub fn last_read_index(&self) -> i64
Get the index of the last read completed by this ExcerptTailer
§impl ExcerptTailer
impl ExcerptTailer
§impl ExcerptTailer
impl ExcerptTailer
pub fn move_to_index(&self, index: i64) -> bool
pub fn move_to_index(&self, index: i64) -> bool
Move this ExcerptTailer
to the given index.
The next read will return the message at the specified position.
§impl ExcerptTailer
impl ExcerptTailer
pub fn move_to_cycle(&self, cycle: i32) -> bool
pub fn move_to_cycle(&self, cycle: i32) -> bool
Move this ExcerptTailer
to the start of the given cycle
(corresponds to backing file).
The next read will return the first message from the given cycle
§impl ExcerptTailer
impl ExcerptTailer
pub fn to_start(&self) -> SharedPtr<ExcerptTailer>
pub fn to_start(&self) -> SharedPtr<ExcerptTailer>
Move this ExcerptTailer
to the start or the first available cycle
Both a Forward
and Backward
tailer will return the first message on the next read.
After that the Forward
tailer will advance along the queue, while the Backward
tailer
will not see any further messages (as now pointing beyond the start of the queue).
§impl ExcerptTailer
impl ExcerptTailer
pub fn to_end(&self) -> SharedPtr<ExcerptTailer>
pub fn to_end(&self) -> SharedPtr<ExcerptTailer>
Move this ExcerptTailer
to the end of the last available cycle
(as if the last message had just been read)
A Forward
tailed will not return any message until a new message is added after the move. A
Backward
tailer will return the last message in the queue on the next read, then advance
to the last-but-1 message etc.
§impl ExcerptTailer
impl ExcerptTailer
pub fn direction(&self, direction: TailerDirection) -> &ExcerptTailer
pub fn direction(&self, direction: TailerDirection) -> &ExcerptTailer
The default action for a tailer after reading a message is to advance forwards to the next (higher) message.
The TailerDirection
allows the direction of movement to be controlled, and the options are:
None
: don’t advance the read positionForward
: advance the read position forwards to the next (higher) messageBackward
: advance the read position backwards to the previous (lower) message
See TailerDirection
Example
{
// write messages "test 0", "test 1", ... "test 9" to the queue
let appender = q.acquire_appender().unwrap();
for i in 0..10 {
let mut str = String::new();
write!(&mut str, "test {}", i).unwrap();
appender.write_string(&str);
}
}
{
let queue = queue::SingleChronicleQueueBuilder::new(guard.path()).build().unwrap();
{
let tailer = queue.create_tailer().unwrap();
// tailer starts at 0, and increments read position after each read
assert!( "test 0" == tailer.read_string() );
assert!( "test 1" == tailer.read_string() );
assert!( "test 2" == tailer.read_string() );
// tailer is now pointing to message "test 3"
// reverse direction
// the next read will still be "test 3", but after that the tailer will move backwards
// to "test 2", then to "test 1"
tailer.direction(queue::TailerDirection::Backward);
assert!( "test 3" == tailer.read_string() );
assert!( "test 2" == tailer.read_string() );
assert!( "test 1" == tailer.read_string() );
}
}
§impl ExcerptTailer
impl ExcerptTailer
pub fn scoped_reading_document(&self) -> Scoped<'_, DocumentContext>
pub fn scoped_reading_document(&self) -> Scoped<'_, DocumentContext>
Get an RAII-style scoped DocumentContext
for reading the next message from the Queue.
The scoped document automatically calls close()
when dropped, which in turn advances
the read position of this ExcerptTailer
to the next message.
Optionally the advance to the next message can be suppressed by calling
rollback_on_close()
on the DocumentContext
any time
prior to the context being dropped.
Example
let tailer = queue.create_tailer().unwrap();
{
let context = tailer.scoped_reading_document();
if context.is_present() {
// access the message data via the context
}
// context is dropped when the scope is closed,
// which advances the tailer to the next read position
}
pub fn read_document<F>(&self, cb: F) -> boolwhere
F: Fn(&Wire),
pub fn read_document<F>(&self, cb: F) -> boolwhere
F: Fn(&Wire),
The read_document
method uses a Wire
-based closure to conveniently apply
a sequence of reads from a single message object.
An example use case for this is reading self-describing content from a queue in
a completely flexible format (eg to remove constraints on userdata schema changes). The below
uses two closures, the first is the closure passed to read_document
itself, the second
uses the marshallable
interface from the
ValueIn
wrapped by the closure’s `Wire’.
Returns
true
if a message was available to read (and had the closure applied)false
if no new message available on the queue
Example
let tailer = queue.create_tailer().unwrap();
{
tailer.read_document(|wire|{
wire.read("trade").marshallable(|w|{
let symbol = w.read("symbol").string();
let price = w.read("price").float64();
let quantity = w.read("quantity").float64();
// ...
});
});
}
pub fn read_bytes<F>(&self, cb: F) -> boolwhere
F: Fn(&Bytes),
pub fn read_bytes<F>(&self, cb: F) -> boolwhere
F: Fn(&Bytes),
Similarly to read_document
, the read_bytes
method
uses a Bytes
-based closure to conveniently apply a sequence of reads from
the message bytes using the Bytes
abstraction, and bypassing Wire
.
Returns
true
if a message was available to read (and had the closure applied)false
if no new message available on the queue
Example
let tailer = queue.create_tailer().unwrap();
{
tailer.read_bytes(|b|{
let num1 = b.read_i8();
let num2 = b.read_f32();
let str = b.read_string();
});
}
pub fn scoped_bytes_reader(&self) -> Scoped<'_, BytesReader>
pub fn scoped_bytes_reader(&self) -> Scoped<'_, BytesReader>
Get an RAII-style scoped BytesReader
which provides direct read access to the bytes of the next message from the Queue.
The scoped reader automatically calls close()
when dropped, which in turn advances
the read position of this ExcerptTailer
to the next message.
Optionally the advance to the next message can be suppressed by calling
rollback_on_close()
on the BytesReader
any time
prior to the context being dropped.
Example
let tailer = queue.create_tailer().unwrap();
{
let bytes = tailer.scoped_bytes_reader();
if !bytes.is_empty() {
// access the message data via the reader
let slice = bytes.as_slice();
// ...
}
// reader is dropped when the scope is closed,
// which advances the tailer to the next read position
}