Expand description
This contains the synced kv log store implementation. It’s meant to buffer a large number of records emitted from upstream, to avoid overwhelming the downstream executor.
The synced kv log store polls two futures:
- Upstream: upstream message source
It will write stream messages to the log store buffer. e.g. Message::Barrier
, Message::Chunk
, …
When writing a stream chunk, if the log store buffer is full, it will:
a. Flush the buffer to the log store.
b. Convert the stream chunk into a reference (LogStoreBufferItem::Flushed
)
which can read the corresponding chunks in the log store.
We will compact adjacent references,
so it can read multiple chunks if there’s a build up.
On receiving barriers, it will: a. Apply truncation to historical data in the logstore. b. Flush and checkpoint the logstore data.
- State store + buffer + recently flushed chunks: the storage components of the logstore.
It will read all historical data from the logstore first. This can be done just by
constructing a state store stream, which will read all data until the latest epoch.
This is a static snapshot of data.
For any subsequently flushed chunks, we will read them via
flushed_chunk_future
. See the next paragraph below.
We will next read flushed_chunk_future
(if there’s one pre-existing one), see below for how
it’s constructed, what it is.
Finally we will pop the earliest item in the buffer.
- If it’s a chunk yield it.
- If it’s a watermark yield it.
- If it’s a flushed chunk reference (
LogStoreBufferItem::Flushed
), we will read the corresponding chunks in the log store. This is done by constructing aflushed_chunk_future
which will read the log store using theseq_id
. - Barrier, because they are directly propagated from the upstream when polling it.
TODO(kwannoel):
- [] Add dedicated metrics for sync log store, namespace according to the upstream.
- [] Add tests
- [] Handle watermark r/w
- [] Handle paused stream