risingwave_stream::executor

Module sync_kv_log_store

source
Expand description

This contains the synced kv log store implementation. It’s meant to buffer a large number of records emitted from upstream, to avoid overwhelming the downstream executor.

The synced kv log store polls two futures:

  1. Upstream: upstream message source

    It will write stream messages to the log store buffer. e.g. Message::Barrier, Message::Chunk, … When writing a stream chunk, if the log store buffer is full, it will: a. Flush the buffer to the log store. b. Convert the stream chunk into a reference (LogStoreBufferItem::Flushed) which can read the corresponding chunks in the log store. We will compact adjacent references, so it can read multiple chunks if there’s a build up.

    On receiving barriers, it will: a. Apply truncation to historical data in the logstore. b. Flush and checkpoint the logstore data.

  2. State store + buffer + recently flushed chunks: the storage components of the logstore.

    It will read all historical data from the logstore first. This can be done just by constructing a state store stream, which will read all data until the latest epoch. This is a static snapshot of data. For any subsequently flushed chunks, we will read them via flushed_chunk_future. See the next paragraph below.

    We will next read flushed_chunk_future (if there’s one pre-existing one), see below for how it’s constructed, what it is.

    Finally we will pop the earliest item in the buffer.

    • If it’s a chunk yield it.
    • If it’s a watermark yield it.
    • If it’s a flushed chunk reference (LogStoreBufferItem::Flushed), we will read the corresponding chunks in the log store. This is done by constructing a flushed_chunk_future which will read the log store using the seq_id.
  • Barrier, because they are directly propagated from the upstream when polling it.

TODO(kwannoel):

  • [] Add dedicated metrics for sync log store, namespace according to the upstream.
  • [] Add tests
  • [] Handle watermark r/w
  • [] Handle paused stream

Structs§

Enums§

Type Aliases§