risingwave_stream/executor/
sync_kv_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This contains the synced kv log store implementation.
16//! It's meant to buffer a large number of records emitted from upstream,
17//! to avoid overwhelming the downstream executor.
18//!
19//! The synced kv log store polls two futures:
20//!
21//! 1. Upstream: upstream message source
22//!
23//!   It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ...
24//!   When writing a stream chunk, if the log store buffer is full, it will:
25//!     a. Flush the buffer to the log store.
26//!     b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`)
27//!       which can read the corresponding chunks in the log store.
28//!       We will compact adjacent references,
29//!       so it can read multiple chunks if there's a build up.
30//!
31//!   On receiving barriers, it will:
32//!     a. Apply truncation to historical data in the logstore.
33//!     b. Flush and checkpoint the logstore data.
34//!
35//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore.
36//!
37//!   It will read all historical data from the logstore first. This can be done just by
38//!   constructing a state store stream, which will read all data until the latest epoch.
39//!   This is a static snapshot of data.
40//!   For any subsequently flushed chunks, we will read them via
41//!   `flushed_chunk_future`. See the next paragraph below.
42//!
43//!   We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how
44//!   it's constructed, what it is.
45//!
46//!   Finally we will pop the earliest item in the buffer.
47//!   - If it's a chunk yield it.
48//!   - If it's a watermark yield it.
49//!   - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`),
50//!     we will read the corresponding chunks in the log store.
51//!     This is done by constructing a `flushed_chunk_future` which will read the log store
52//!     using the `seq_id`.
53//!   - Barrier,
54//!     because they are directly propagated from the upstream when polling it.
55//!
56//! TODO(kwannoel):
57//! - [] Add dedicated metrics for sync log store, namespace according to the upstream.
58//! - [] Add tests
59//! - [] Handle watermark r/w
60//! - [] Handle paused stream
61
62use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79    LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89    KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92    LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93    LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96    FIRST_SEQ_ID, FlushInfo, ReaderTruncationOffsetType, SeqIdType,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101    Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105    use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
106
107    use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
108    use crate::executor::monitor::StreamingMetrics;
109    use crate::task::ActorId;
110
111    #[derive(Clone)]
112    pub struct SyncedKvLogStoreMetrics {
113        // state of the log store
114        pub unclean_state: LabelGuardedIntCounter<5>,
115        pub clean_state: LabelGuardedIntCounter<5>,
116        pub wait_next_poll_ns: LabelGuardedIntCounter<4>,
117
118        // Write metrics
119        pub storage_write_count: LabelGuardedIntCounter<4>,
120        pub storage_write_size: LabelGuardedIntCounter<4>,
121        pub pause_duration_ns: LabelGuardedIntCounter<4>,
122
123        // Buffer metrics
124        pub buffer_unconsumed_item_count: LabelGuardedIntGauge<4>,
125        pub buffer_unconsumed_row_count: LabelGuardedIntGauge<4>,
126        pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge<4>,
127        pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge<4>,
128        pub buffer_read_count: LabelGuardedIntCounter<5>,
129        pub buffer_read_size: LabelGuardedIntCounter<5>,
130
131        // Read metrics
132        pub total_read_count: LabelGuardedIntCounter<5>,
133        pub total_read_size: LabelGuardedIntCounter<5>,
134        pub persistent_log_read_metrics: KvLogStoreReadMetrics,
135        pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
136    }
137
138    impl SyncedKvLogStoreMetrics {
139        /// `id`: refers to a unique way to identify the logstore. This can be the sink id,
140        ///       or for joins, it can be the `fragment_id`.
141        /// `name`: refers to the MV / Sink that the log store is associated with.
142        /// `target`: refers to the target of the log store,
143        ///           for instance `MySql` Sink, PG sink, etc...
144        ///           or unaligned join.
145        pub(crate) fn new(
146            metrics: &StreamingMetrics,
147            actor_id: ActorId,
148            id: u32,
149            name: &str,
150            target: &'static str,
151        ) -> Self {
152            let actor_id_str = actor_id.to_string();
153            let id_str = id.to_string();
154            let labels = &[&actor_id_str, target, &id_str, name];
155
156            let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
157                "dirty",
158                &actor_id_str,
159                target,
160                &id_str,
161                name,
162            ]);
163            let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
164                "clean",
165                &actor_id_str,
166                target,
167                &id_str,
168                name,
169            ]);
170            let wait_next_poll_ns = metrics
171                .sync_kv_log_store_wait_next_poll_ns
172                .with_guarded_label_values(labels);
173
174            let storage_write_size = metrics
175                .sync_kv_log_store_storage_write_size
176                .with_guarded_label_values(labels);
177            let storage_write_count = metrics
178                .sync_kv_log_store_storage_write_count
179                .with_guarded_label_values(labels);
180            let storage_pause_duration_ns = metrics
181                .sync_kv_log_store_write_pause_duration_ns
182                .with_guarded_label_values(labels);
183
184            let buffer_unconsumed_item_count = metrics
185                .sync_kv_log_store_buffer_unconsumed_item_count
186                .with_guarded_label_values(labels);
187            let buffer_unconsumed_row_count = metrics
188                .sync_kv_log_store_buffer_unconsumed_row_count
189                .with_guarded_label_values(labels);
190            let buffer_unconsumed_epoch_count = metrics
191                .sync_kv_log_store_buffer_unconsumed_epoch_count
192                .with_guarded_label_values(labels);
193            let buffer_unconsumed_min_epoch = metrics
194                .sync_kv_log_store_buffer_unconsumed_min_epoch
195                .with_guarded_label_values(labels);
196            let buffer_read_count = metrics
197                .sync_kv_log_store_read_count
198                .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
199
200            let buffer_read_size = metrics
201                .sync_kv_log_store_read_size
202                .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
203
204            let total_read_count = metrics
205                .sync_kv_log_store_read_count
206                .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
207
208            let total_read_size = metrics
209                .sync_kv_log_store_read_size
210                .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
211
212            const READ_PERSISTENT_LOG: &str = "persistent_log";
213            const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
214
215            let persistent_log_read_size = metrics
216                .sync_kv_log_store_read_size
217                .with_guarded_label_values(&[
218                    READ_PERSISTENT_LOG,
219                    &actor_id_str,
220                    target,
221                    &id_str,
222                    name,
223                ]);
224
225            let persistent_log_read_count = metrics
226                .sync_kv_log_store_read_count
227                .with_guarded_label_values(&[
228                    READ_PERSISTENT_LOG,
229                    &actor_id_str,
230                    target,
231                    &id_str,
232                    name,
233                ]);
234
235            let flushed_buffer_read_size = metrics
236                .sync_kv_log_store_read_size
237                .with_guarded_label_values(&[
238                    READ_FLUSHED_BUFFER,
239                    &actor_id_str,
240                    target,
241                    &id_str,
242                    name,
243                ]);
244
245            let flushed_buffer_read_count = metrics
246                .sync_kv_log_store_read_count
247                .with_guarded_label_values(&[
248                    READ_FLUSHED_BUFFER,
249                    &actor_id_str,
250                    target,
251                    &id_str,
252                    name,
253                ]);
254
255            Self {
256                unclean_state,
257                clean_state,
258                wait_next_poll_ns,
259                storage_write_size,
260                storage_write_count,
261                pause_duration_ns: storage_pause_duration_ns,
262                buffer_unconsumed_item_count,
263                buffer_unconsumed_row_count,
264                buffer_unconsumed_epoch_count,
265                buffer_unconsumed_min_epoch,
266                buffer_read_count,
267                buffer_read_size,
268                total_read_count,
269                total_read_size,
270                persistent_log_read_metrics: KvLogStoreReadMetrics {
271                    storage_read_size: persistent_log_read_size,
272                    storage_read_count: persistent_log_read_count,
273                },
274                flushed_buffer_read_metrics: KvLogStoreReadMetrics {
275                    storage_read_count: flushed_buffer_read_count,
276                    storage_read_size: flushed_buffer_read_size,
277                },
278            }
279        }
280
281        #[cfg(test)]
282        pub(crate) fn for_test() -> Self {
283            SyncedKvLogStoreMetrics {
284                unclean_state: LabelGuardedIntCounter::test_int_counter(),
285                clean_state: LabelGuardedIntCounter::test_int_counter(),
286                wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter(),
287                storage_write_count: LabelGuardedIntCounter::test_int_counter(),
288                storage_write_size: LabelGuardedIntCounter::test_int_counter(),
289                pause_duration_ns: LabelGuardedIntCounter::test_int_counter(),
290                buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge(),
291                buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge(),
292                buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge(),
293                buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge(),
294                buffer_read_count: LabelGuardedIntCounter::test_int_counter(),
295                buffer_read_size: LabelGuardedIntCounter::test_int_counter(),
296                total_read_count: LabelGuardedIntCounter::test_int_counter(),
297                total_read_size: LabelGuardedIntCounter::test_int_counter(),
298                persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
299                flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
300            }
301        }
302    }
303}
304
305type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, u64)>>;
306
307pub struct SyncedKvLogStoreExecutor<S: StateStore> {
308    actor_context: ActorContextRef,
309    table_id: TableId,
310    metrics: SyncedKvLogStoreMetrics,
311    serde: LogStoreRowSerde,
312
313    // Upstream
314    upstream: Executor,
315
316    // Log store state
317    state_store: S,
318    max_buffer_size: usize,
319
320    // Max chunk size when reading from logstore / buffer
321    chunk_size: u32,
322
323    pause_duration_ms: Duration,
324}
325// Stream interface
326impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
327    #[expect(clippy::too_many_arguments)]
328    pub(crate) fn new(
329        actor_context: ActorContextRef,
330        table_id: u32,
331        metrics: SyncedKvLogStoreMetrics,
332        serde: LogStoreRowSerde,
333        state_store: S,
334        buffer_size: usize,
335        chunk_size: u32,
336        upstream: Executor,
337        pause_duration_ms: Duration,
338    ) -> Self {
339        Self {
340            actor_context,
341            table_id: TableId::new(table_id),
342            metrics,
343            serde,
344            state_store,
345            upstream,
346            max_buffer_size: buffer_size,
347            chunk_size,
348            pause_duration_ms,
349        }
350    }
351}
352
353struct FlushedChunkInfo {
354    epoch: u64,
355    start_seq_id: SeqIdType,
356    end_seq_id: SeqIdType,
357    flush_info: FlushInfo,
358    vnode_bitmap: Bitmap,
359}
360
361enum WriteFuture<S: LocalStateStore> {
362    /// We trigger a brief pause to let the `ReadFuture` be polled in the following scenarios:
363    /// - When seeing an upstream data chunk, when the buffer becomes full, and the state is clean.
364    /// - When seeing a checkpoint barrier, when the buffer is not empty, and the state is clean.
365    ///
366    /// On pausing, we will transition to a dirty state.
367    ///
368    /// We trigger resume to let the `ReadFuture` to be polled in the following scenarios:
369    /// - After the pause duration.
370    /// - After the read future consumes a chunk.
371    Paused {
372        start_instant: Instant,
373        sleep_future: Option<Pin<Box<Sleep>>>,
374        barrier: Barrier,
375        stream: BoxedMessageStream,
376        write_state: LogStoreWriteState<S>, // Just used to hold the state
377    },
378    ReceiveFromUpstream {
379        future: StreamFuture<BoxedMessageStream>,
380        write_state: LogStoreWriteState<S>,
381    },
382    FlushingChunk {
383        epoch: u64,
384        start_seq_id: SeqIdType,
385        end_seq_id: SeqIdType,
386        future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
387        stream: BoxedMessageStream,
388    },
389    Empty,
390}
391
392enum WriteFutureEvent {
393    UpstreamMessageReceived(Message),
394    ChunkFlushed(FlushedChunkInfo),
395}
396
397impl<S: LocalStateStore> WriteFuture<S> {
398    fn flush_chunk(
399        stream: BoxedMessageStream,
400        write_state: LogStoreWriteState<S>,
401        chunk: StreamChunk,
402        epoch: u64,
403        start_seq_id: SeqIdType,
404        end_seq_id: SeqIdType,
405    ) -> Self {
406        tracing::trace!(
407            start_seq_id,
408            end_seq_id,
409            epoch,
410            cardinality = chunk.cardinality(),
411            "write_future: flushing chunk"
412        );
413        Self::FlushingChunk {
414            epoch,
415            start_seq_id,
416            end_seq_id,
417            future: Box::pin(write_state.into_write_chunk_future(
418                chunk,
419                epoch,
420                start_seq_id,
421                end_seq_id,
422            )),
423            stream,
424        }
425    }
426
427    fn receive_from_upstream(
428        stream: BoxedMessageStream,
429        write_state: LogStoreWriteState<S>,
430    ) -> Self {
431        Self::ReceiveFromUpstream {
432            future: stream.into_future(),
433            write_state,
434        }
435    }
436
437    fn paused(
438        duration: Duration,
439        barrier: Barrier,
440        stream: BoxedMessageStream,
441        write_state: LogStoreWriteState<S>,
442    ) -> Self {
443        let now = Instant::now();
444        tracing::trace!(?now, ?duration, "write_future_pause");
445        Self::Paused {
446            start_instant: now,
447            sleep_future: Some(Box::pin(sleep_until(now + duration))),
448            barrier,
449            stream,
450            write_state,
451        }
452    }
453
454    async fn next_event(
455        &mut self,
456        metrics: &SyncedKvLogStoreMetrics,
457    ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
458        match self {
459            WriteFuture::Paused {
460                start_instant,
461                sleep_future,
462                ..
463            } => {
464                if let Some(sleep_future) = sleep_future {
465                    sleep_future.await;
466                    metrics
467                        .pause_duration_ns
468                        .inc_by(start_instant.elapsed().as_nanos() as _);
469                    tracing::trace!("resuming write future");
470                }
471                must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
472                    Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
473                })
474            }
475            WriteFuture::ReceiveFromUpstream { future, .. } => {
476                let (opt, stream) = future.await;
477                must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
478                    opt
479                    .ok_or_else(|| anyhow!("end of upstream input").into())
480                    .and_then(|result| result.map(|item| {
481                        (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
482                    }))
483                })
484            }
485            WriteFuture::FlushingChunk { future, .. } => {
486                let (write_state, result) = future.await;
487                let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, ..  } => {
488                    result.map(|(flush_info, vnode_bitmap)| {
489                        (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
490                            epoch,
491                            start_seq_id,
492                            end_seq_id,
493                            flush_info,
494                            vnode_bitmap,
495                        }))
496                    })
497                });
498                result.map_err(Into::into)
499            }
500            WriteFuture::Empty => {
501                unreachable!("should not be polled after ready")
502            }
503        }
504    }
505}
506
507// Stream interface
508impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
509    #[try_stream(ok= Message, error = StreamExecutorError)]
510    pub async fn execute_monitored(self) {
511        let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
512        #[for_await]
513        for message in self.execute_inner() {
514            let current_time = Instant::now();
515            yield message?;
516            wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
517        }
518    }
519
520    #[try_stream(ok = Message, error = StreamExecutorError)]
521    async fn execute_inner(self) {
522        let mut input = self.upstream.execute();
523
524        // init first epoch + local state store
525        let first_barrier = expect_first_barrier(&mut input).await?;
526        let first_write_epoch = first_barrier.epoch;
527        yield Message::Barrier(first_barrier.clone());
528
529        let local_state_store = self
530            .state_store
531            .new_local(NewLocalOptions {
532                table_id: self.table_id,
533                op_consistency_level: OpConsistencyLevel::Inconsistent,
534                table_option: TableOption {
535                    retention_seconds: None,
536                },
537                is_replicated: false,
538                vnodes: self.serde.vnodes().clone(),
539            })
540            .await;
541
542        let (mut read_state, mut initial_write_state) =
543            new_log_store_state(self.table_id, local_state_store, self.serde);
544        initial_write_state.init(first_write_epoch).await?;
545
546        let mut pause_stream = first_barrier.is_pause_on_startup();
547        let mut initial_write_epoch = first_write_epoch;
548
549        // We only recreate the consume stream when:
550        // 1. On bootstrap
551        // 2. On vnode update
552        'recreate_consume_stream: loop {
553            let mut seq_id = FIRST_SEQ_ID;
554            let mut truncation_offset = None;
555            let mut buffer = SyncedLogStoreBuffer {
556                buffer: VecDeque::new(),
557                current_size: 0,
558                max_size: self.max_buffer_size,
559                max_chunk_size: self.chunk_size,
560                next_chunk_id: 0,
561                metrics: self.metrics.clone(),
562                flushed_count: 0,
563            };
564
565            let log_store_stream = read_state
566                .read_persisted_log_store(
567                    self.metrics.persistent_log_read_metrics.clone(),
568                    initial_write_epoch.prev,
569                    LogStoreReadStateStreamRangeStart::Unbounded,
570                )
571                .await?;
572
573            let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
574            let mut clean_state = log_store_stream.peek().await.is_none();
575
576            let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
577
578            let mut write_future_state =
579                WriteFuture::receive_from_upstream(input, initial_write_state);
580
581            loop {
582                let select_result = {
583                    let read_future = async {
584                        if pause_stream {
585                            pending().await
586                        } else {
587                            read_future_state
588                                .next_chunk(&read_state, &mut buffer, &self.metrics)
589                                .await
590                        }
591                    };
592                    pin_mut!(read_future);
593                    let write_future = write_future_state.next_event(&self.metrics);
594                    pin_mut!(write_future);
595                    let output = select(write_future, read_future).await;
596                    drop_either_future(output)
597                };
598                match select_result {
599                    Either::Left(result) => {
600                        // drop the future to ensure that the future must be reset later
601                        drop(write_future_state);
602                        let (stream, mut write_state, either) = result?;
603                        match either {
604                            WriteFutureEvent::UpstreamMessageReceived(msg) => {
605                                match msg {
606                                    Message::Barrier(barrier) => {
607                                        if clean_state
608                                            && barrier.kind.is_checkpoint()
609                                            && !buffer.is_empty()
610                                        {
611                                            write_future_state = WriteFuture::paused(
612                                                self.pause_duration_ms,
613                                                barrier,
614                                                stream,
615                                                write_state,
616                                            );
617                                            clean_state = false;
618                                            self.metrics.unclean_state.inc();
619                                        } else {
620                                            if let Some(mutation) = barrier.mutation.as_deref() {
621                                                match mutation {
622                                                    Mutation::Pause => {
623                                                        pause_stream = true;
624                                                    }
625                                                    Mutation::Resume => {
626                                                        pause_stream = false;
627                                                    }
628                                                    _ => {}
629                                                }
630                                            }
631                                            let write_state_post_write_barrier =
632                                                Self::write_barrier(
633                                                    &mut write_state,
634                                                    barrier.clone(),
635                                                    &self.metrics,
636                                                    truncation_offset,
637                                                    &mut buffer,
638                                                )
639                                                .await?;
640                                            seq_id = FIRST_SEQ_ID;
641                                            let update_vnode_bitmap = barrier
642                                                .as_update_vnode_bitmap(self.actor_context.id);
643                                            let barrier_epoch = barrier.epoch;
644
645                                            yield Message::Barrier(barrier);
646
647                                            write_state_post_write_barrier
648                                                .post_yield_barrier(update_vnode_bitmap.clone())
649                                                .await?;
650                                            if let Some(vnode_bitmap) = update_vnode_bitmap {
651                                                // Apply Vnode Update
652                                                read_state.update_vnode_bitmap(vnode_bitmap);
653                                                initial_write_epoch = barrier_epoch;
654                                                input = stream;
655                                                initial_write_state = write_state;
656                                                continue 'recreate_consume_stream;
657                                            } else {
658                                                write_future_state =
659                                                    WriteFuture::receive_from_upstream(
660                                                        stream,
661                                                        write_state,
662                                                    );
663                                            }
664                                        }
665                                    }
666                                    Message::Chunk(chunk) => {
667                                        let start_seq_id = seq_id;
668                                        let new_seq_id = seq_id + chunk.cardinality() as SeqIdType;
669                                        let end_seq_id = new_seq_id - 1;
670                                        let epoch = write_state.epoch().curr;
671                                        tracing::trace!(
672                                            start_seq_id,
673                                            end_seq_id,
674                                            new_seq_id,
675                                            epoch,
676                                            cardinality = chunk.cardinality(),
677                                            "received chunk"
678                                        );
679                                        if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
680                                            start_seq_id,
681                                            end_seq_id,
682                                            chunk,
683                                            epoch,
684                                        ) {
685                                            seq_id = new_seq_id;
686                                            write_future_state = WriteFuture::flush_chunk(
687                                                stream,
688                                                write_state,
689                                                chunk_to_flush,
690                                                epoch,
691                                                start_seq_id,
692                                                end_seq_id,
693                                            );
694                                        } else {
695                                            seq_id = new_seq_id;
696                                            write_future_state = WriteFuture::receive_from_upstream(
697                                                stream,
698                                                write_state,
699                                            );
700                                        }
701                                    }
702                                    // FIXME(kwannoel): This should truncate the logstore,
703                                    // it will not bypass like barrier.
704                                    Message::Watermark(_watermark) => {
705                                        write_future_state =
706                                            WriteFuture::receive_from_upstream(stream, write_state);
707                                    }
708                                }
709                            }
710                            WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
711                                start_seq_id,
712                                end_seq_id,
713                                epoch,
714                                flush_info,
715                                vnode_bitmap,
716                            }) => {
717                                buffer.add_flushed_item_to_buffer(
718                                    start_seq_id,
719                                    end_seq_id,
720                                    vnode_bitmap,
721                                    epoch,
722                                );
723                                self.metrics
724                                    .storage_write_count
725                                    .inc_by(flush_info.flush_count as _);
726                                self.metrics
727                                    .storage_write_size
728                                    .inc_by(flush_info.flush_size as _);
729                                write_future_state =
730                                    WriteFuture::receive_from_upstream(stream, write_state);
731                            }
732                        }
733                    }
734                    Either::Right(result) => {
735                        if !clean_state
736                            && matches!(read_future_state, ReadFuture::Idle)
737                            && buffer.is_empty()
738                        {
739                            clean_state = true;
740                            self.metrics.clean_state.inc();
741
742                            // Let write future resume immediately
743                            if let WriteFuture::Paused { sleep_future, .. } =
744                                &mut write_future_state
745                            {
746                                tracing::trace!("resuming paused future");
747                                assert!(buffer.current_size < self.max_buffer_size);
748                                *sleep_future = None;
749                            }
750                        }
751                        let (chunk, new_truncate_offset) = result?;
752                        if let Some(new_truncate_offset) = new_truncate_offset {
753                            truncation_offset = Some(new_truncate_offset);
754                        }
755                        self.metrics
756                            .total_read_count
757                            .inc_by(chunk.cardinality() as _);
758
759                        yield Message::Chunk(chunk);
760                    }
761                }
762            }
763        }
764    }
765}
766
767enum ReadFuture<S: StateStoreRead> {
768    ReadingPersistedStream(Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>),
769    ReadingFlushedChunk {
770        future: ReadFlushedChunkFuture,
771        truncate_offset: ReaderTruncationOffsetType,
772    },
773    Idle,
774}
775
776// Read methods
777impl<S: StateStoreRead> ReadFuture<S> {
778    // TODO: should change to always return a truncate offset to ensure that each stream chunk has a truncate offset
779    async fn next_chunk(
780        &mut self,
781        read_state: &LogStoreReadState<S>,
782        buffer: &mut SyncedLogStoreBuffer,
783        metrics: &SyncedKvLogStoreMetrics,
784    ) -> StreamExecutorResult<(StreamChunk, Option<ReaderTruncationOffsetType>)> {
785        match self {
786            ReadFuture::ReadingPersistedStream(stream) => {
787                while let Some((_, item)) = stream.try_next().await? {
788                    match item {
789                        KvLogStoreItem::Barrier { .. } => {
790                            continue;
791                        }
792                        KvLogStoreItem::StreamChunk(chunk) => {
793                            // TODO: should have truncate offset when consuming historical data
794                            tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
795                            return Ok((chunk, None));
796                        }
797                    }
798                }
799                *self = ReadFuture::Idle;
800            }
801            ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
802        }
803        match self {
804            ReadFuture::ReadingPersistedStream(_) => {
805                unreachable!("must have finished read persisted stream when reaching here")
806            }
807            ReadFuture::ReadingFlushedChunk { .. } => {}
808            ReadFuture::Idle => loop {
809                let Some((item_epoch, item)) = buffer.pop_front() else {
810                    return pending().await;
811                };
812                match item {
813                    LogStoreBufferItem::StreamChunk {
814                        chunk,
815                        start_seq_id,
816                        end_seq_id,
817                        flushed,
818                        ..
819                    } => {
820                        metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
821                        tracing::trace!(
822                            start_seq_id,
823                            end_seq_id,
824                            flushed,
825                            cardinality = chunk.cardinality(),
826                            "read buffered chunk of size"
827                        );
828                        return Ok((chunk, Some((item_epoch, Some(end_seq_id)))));
829                    }
830                    LogStoreBufferItem::Flushed {
831                        vnode_bitmap,
832                        start_seq_id,
833                        end_seq_id,
834                        chunk_id,
835                    } => {
836                        tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
837                        let truncate_offset = (item_epoch, Some(end_seq_id));
838                        let read_metrics = metrics.flushed_buffer_read_metrics.clone();
839                        let future = read_state
840                            .read_flushed_chunk(
841                                vnode_bitmap,
842                                chunk_id,
843                                start_seq_id,
844                                end_seq_id,
845                                item_epoch,
846                                read_metrics,
847                            )
848                            .boxed();
849                        *self = ReadFuture::ReadingFlushedChunk {
850                            future,
851                            truncate_offset,
852                        };
853                        break;
854                    }
855                    LogStoreBufferItem::Barrier { .. } => {
856                        continue;
857                    }
858                }
859            },
860        }
861
862        let (future, truncate_offset) = match self {
863            ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
864                unreachable!("should be at ReadingFlushedChunk")
865            }
866            ReadFuture::ReadingFlushedChunk {
867                future,
868                truncate_offset,
869            } => (future, *truncate_offset),
870        };
871
872        let (_, chunk, _) = future.await?;
873        tracing::trace!("read flushed chunk of size: {}", chunk.cardinality());
874        *self = ReadFuture::Idle;
875        Ok((chunk, Some(truncate_offset)))
876    }
877}
878
879// Write methods
880impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
881    async fn write_barrier<'a>(
882        write_state: &'a mut LogStoreWriteState<S::Local>,
883        barrier: Barrier,
884        metrics: &SyncedKvLogStoreMetrics,
885        truncation_offset: Option<ReaderTruncationOffsetType>,
886        buffer: &mut SyncedLogStoreBuffer,
887    ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
888        // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
889        // to reduce memory consumption of logstore.
890
891        let epoch = barrier.epoch.prev;
892        let mut writer = write_state.start_writer(false);
893        writer.write_barrier(epoch, barrier.is_checkpoint())?;
894
895        if barrier.is_checkpoint() {
896            for (epoch, item) in buffer.buffer.iter_mut().rev() {
897                match item {
898                    LogStoreBufferItem::StreamChunk {
899                        chunk,
900                        start_seq_id,
901                        end_seq_id,
902                        flushed,
903                        ..
904                    } => {
905                        if !*flushed {
906                            writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
907                            *flushed = true;
908                        } else {
909                            break;
910                        }
911                    }
912                    LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
913                }
914            }
915        }
916
917        // Apply truncation
918        let (flush_info, _) = writer.finish().await?;
919        metrics
920            .storage_write_count
921            .inc_by(flush_info.flush_count as _);
922        metrics
923            .storage_write_size
924            .inc_by(flush_info.flush_size as _);
925        let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, truncation_offset);
926
927        // Add to buffer
928        buffer.buffer.push_back((
929            epoch,
930            LogStoreBufferItem::Barrier {
931                is_checkpoint: barrier.is_checkpoint(),
932                next_epoch: barrier.epoch.curr,
933            },
934        ));
935        buffer.next_chunk_id = 0;
936        buffer.update_unconsumed_buffer_metrics();
937
938        Ok(post_seal)
939    }
940}
941
942struct SyncedLogStoreBuffer {
943    buffer: VecDeque<(u64, LogStoreBufferItem)>,
944    current_size: usize,
945    max_size: usize,
946    max_chunk_size: u32,
947    next_chunk_id: ChunkId,
948    metrics: SyncedKvLogStoreMetrics,
949    flushed_count: usize,
950}
951
952impl SyncedLogStoreBuffer {
953    fn is_empty(&self) -> bool {
954        self.current_size == 0
955    }
956
957    fn add_or_flush_chunk(
958        &mut self,
959        start_seq_id: SeqIdType,
960        end_seq_id: SeqIdType,
961        chunk: StreamChunk,
962        epoch: u64,
963    ) -> Option<StreamChunk> {
964        let current_size = self.current_size;
965        let chunk_size = chunk.cardinality();
966
967        tracing::trace!(
968            current_size,
969            chunk_size,
970            max_size = self.max_size,
971            "checking chunk size"
972        );
973        let should_flush_chunk = current_size + chunk_size > self.max_size;
974        if should_flush_chunk {
975            tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
976            Some(chunk)
977        } else {
978            tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
979            self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
980            None
981        }
982    }
983
984    /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer.
985    /// This doesn't contain any data, but it contains the metadata to read the flushed chunk.
986    fn add_flushed_item_to_buffer(
987        &mut self,
988        start_seq_id: SeqIdType,
989        end_seq_id: SeqIdType,
990        new_vnode_bitmap: Bitmap,
991        epoch: u64,
992    ) {
993        let new_chunk_size = end_seq_id - start_seq_id + 1;
994
995        if let Some((
996            item_epoch,
997            LogStoreBufferItem::Flushed {
998                start_seq_id: prev_start_seq_id,
999                end_seq_id: prev_end_seq_id,
1000                vnode_bitmap,
1001                ..
1002            },
1003        )) = self.buffer.back_mut()
1004            && let flushed_chunk_size = *prev_end_seq_id - *prev_start_seq_id + 1
1005            && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1006            && projected_flushed_chunk_size as u32 <= self.max_chunk_size
1007        {
1008            assert!(
1009                *prev_end_seq_id < start_seq_id,
1010                "prev end_seq_id {} should be smaller than current start_seq_id {}",
1011                end_seq_id,
1012                start_seq_id
1013            );
1014            assert_eq!(
1015                epoch, *item_epoch,
1016                "epoch of newly added flushed item must be the same as the last flushed item"
1017            );
1018            *prev_end_seq_id = end_seq_id;
1019            *vnode_bitmap |= new_vnode_bitmap;
1020        } else {
1021            let chunk_id = self.next_chunk_id;
1022            self.next_chunk_id += 1;
1023            self.buffer.push_back((
1024                epoch,
1025                LogStoreBufferItem::Flushed {
1026                    start_seq_id,
1027                    end_seq_id,
1028                    vnode_bitmap: new_vnode_bitmap,
1029                    chunk_id,
1030                },
1031            ));
1032            self.flushed_count += 1;
1033            tracing::trace!(
1034                "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1035            );
1036        }
1037        // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported.
1038        self.update_unconsumed_buffer_metrics();
1039    }
1040
1041    fn add_chunk_to_buffer(
1042        &mut self,
1043        chunk: StreamChunk,
1044        start_seq_id: SeqIdType,
1045        end_seq_id: SeqIdType,
1046        epoch: u64,
1047    ) {
1048        let chunk_id = self.next_chunk_id;
1049        self.next_chunk_id += 1;
1050        self.current_size += chunk.cardinality();
1051        self.buffer.push_back((
1052            epoch,
1053            LogStoreBufferItem::StreamChunk {
1054                chunk,
1055                start_seq_id,
1056                end_seq_id,
1057                flushed: false,
1058                chunk_id,
1059            },
1060        ));
1061        self.update_unconsumed_buffer_metrics();
1062    }
1063
1064    fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1065        let item = self.buffer.pop_front();
1066        match &item {
1067            Some((_, LogStoreBufferItem::Flushed { .. })) => {
1068                self.flushed_count -= 1;
1069            }
1070            Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1071                self.current_size -= chunk.cardinality();
1072            }
1073            _ => {}
1074        }
1075        self.update_unconsumed_buffer_metrics();
1076        item
1077    }
1078
1079    fn update_unconsumed_buffer_metrics(&self) {
1080        let mut epoch_count = 0;
1081        let mut row_count = 0;
1082        for (_, item) in &self.buffer {
1083            match item {
1084                LogStoreBufferItem::StreamChunk { chunk, .. } => {
1085                    row_count += chunk.cardinality();
1086                }
1087                LogStoreBufferItem::Flushed {
1088                    start_seq_id,
1089                    end_seq_id,
1090                    ..
1091                } => {
1092                    row_count += (end_seq_id - start_seq_id) as usize;
1093                }
1094                LogStoreBufferItem::Barrier { .. } => {
1095                    epoch_count += 1;
1096                }
1097            }
1098        }
1099        self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1100        self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1101        self.metrics
1102            .buffer_unconsumed_item_count
1103            .set(self.buffer.len() as _);
1104        self.metrics.buffer_unconsumed_min_epoch.set(
1105            self.buffer
1106                .front()
1107                .map(|(epoch, _)| *epoch)
1108                .unwrap_or_default() as _,
1109        );
1110    }
1111}
1112
1113impl<S> Execute for SyncedKvLogStoreExecutor<S>
1114where
1115    S: StateStore,
1116{
1117    fn execute(self: Box<Self>) -> BoxedMessageStream {
1118        self.execute_monitored().boxed()
1119    }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124    use itertools::Itertools;
1125    use pretty_assertions::assert_eq;
1126    use risingwave_common::catalog::Field;
1127    use risingwave_common::hash::VirtualNode;
1128    use risingwave_common::test_prelude::*;
1129    use risingwave_common::util::epoch::test_epoch;
1130    use risingwave_storage::memory::MemoryStateStore;
1131
1132    use super::*;
1133    use crate::assert_stream_chunk_eq;
1134    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1135    use crate::common::log_store_impl::kv_log_store::test_utils::{
1136        check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1137    };
1138    use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1139    use crate::executor::test_utils::MockSource;
1140
1141    fn init_logger() {
1142        let _ = tracing_subscriber::fmt()
1143            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1144            .with_ansi(false)
1145            .try_init();
1146    }
1147
1148    // test read/write buffer
1149    #[tokio::test]
1150    async fn test_read_write_buffer() {
1151        init_logger();
1152
1153        let pk_info = &KV_LOG_STORE_V2_INFO;
1154        let column_descs = test_payload_schema(pk_info);
1155        let fields = column_descs
1156            .into_iter()
1157            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1158            .collect_vec();
1159        let schema = Schema { fields };
1160        let pk_indices = vec![0];
1161        let (mut tx, source) = MockSource::channel();
1162        let source = source.into_executor(schema.clone(), pk_indices.clone());
1163
1164        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1165
1166        let table = gen_test_log_store_table(pk_info);
1167
1168        let log_store_executor = SyncedKvLogStoreExecutor::new(
1169            ActorContext::for_test(123),
1170            table.id,
1171            SyncedKvLogStoreMetrics::for_test(),
1172            LogStoreRowSerde::new(&table, vnodes, pk_info),
1173            MemoryStateStore::new(),
1174            10,
1175            256,
1176            source,
1177            Duration::from_millis(256),
1178        )
1179        .boxed();
1180
1181        // Init
1182        tx.push_barrier(test_epoch(1), false);
1183
1184        let chunk_1 = StreamChunk::from_pretty(
1185            "  I   T
1186            +  5  10
1187            +  6  10
1188            +  8  10
1189            +  9  10
1190            +  10 11",
1191        );
1192
1193        let chunk_2 = StreamChunk::from_pretty(
1194            "   I   T
1195            -   5  10
1196            -   6  10
1197            -   8  10
1198            U-  9  10
1199            U+ 10  11",
1200        );
1201
1202        tx.push_chunk(chunk_1.clone());
1203        tx.push_chunk(chunk_2.clone());
1204
1205        let mut stream = log_store_executor.execute();
1206
1207        match stream.next().await {
1208            Some(Ok(Message::Barrier(barrier))) => {
1209                assert_eq!(barrier.epoch.curr, test_epoch(1));
1210            }
1211            other => panic!("Expected a barrier message, got {:?}", other),
1212        }
1213
1214        match stream.next().await {
1215            Some(Ok(Message::Chunk(chunk))) => {
1216                assert_stream_chunk_eq!(chunk, chunk_1);
1217            }
1218            other => panic!("Expected a chunk message, got {:?}", other),
1219        }
1220
1221        match stream.next().await {
1222            Some(Ok(Message::Chunk(chunk))) => {
1223                assert_stream_chunk_eq!(chunk, chunk_2);
1224            }
1225            other => panic!("Expected a chunk message, got {:?}", other),
1226        }
1227
1228        tx.push_barrier(test_epoch(2), false);
1229
1230        match stream.next().await {
1231            Some(Ok(Message::Barrier(barrier))) => {
1232                assert_eq!(barrier.epoch.curr, test_epoch(2));
1233            }
1234            other => panic!("Expected a barrier message, got {:?}", other),
1235        }
1236    }
1237
1238    // test barrier persisted read
1239    //
1240    // sequence of events (earliest -> latest):
1241    // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item
1242    // * poll just means we read from the executor stream.
1243    #[tokio::test]
1244    async fn test_barrier_persisted_read() {
1245        init_logger();
1246
1247        let pk_info = &KV_LOG_STORE_V2_INFO;
1248        let column_descs = test_payload_schema(pk_info);
1249        let fields = column_descs
1250            .into_iter()
1251            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1252            .collect_vec();
1253        let schema = Schema { fields };
1254        let pk_indices = vec![0];
1255        let (mut tx, source) = MockSource::channel();
1256        let source = source.into_executor(schema.clone(), pk_indices.clone());
1257
1258        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1259
1260        let table = gen_test_log_store_table(pk_info);
1261
1262        let log_store_executor = SyncedKvLogStoreExecutor::new(
1263            ActorContext::for_test(123),
1264            table.id,
1265            SyncedKvLogStoreMetrics::for_test(),
1266            LogStoreRowSerde::new(&table, vnodes, pk_info),
1267            MemoryStateStore::new(),
1268            10,
1269            256,
1270            source,
1271            Duration::from_millis(256),
1272        )
1273        .boxed();
1274
1275        // Init
1276        tx.push_barrier(test_epoch(1), false);
1277
1278        let chunk_1 = StreamChunk::from_pretty(
1279            "  I   T
1280            +  5  10
1281            +  6  10
1282            +  8  10
1283            +  9  10
1284            +  10 11",
1285        );
1286
1287        let chunk_2 = StreamChunk::from_pretty(
1288            "   I   T
1289            -   5  10
1290            -   6  10
1291            -   8  10
1292            U- 10  11
1293            U+ 10  10",
1294        );
1295
1296        tx.push_chunk(chunk_1.clone());
1297        tx.push_chunk(chunk_2.clone());
1298
1299        tx.push_barrier(test_epoch(2), false);
1300
1301        let mut stream = log_store_executor.execute();
1302
1303        match stream.next().await {
1304            Some(Ok(Message::Barrier(barrier))) => {
1305                assert_eq!(barrier.epoch.curr, test_epoch(1));
1306            }
1307            other => panic!("Expected a barrier message, got {:?}", other),
1308        }
1309
1310        match stream.next().await {
1311            Some(Ok(Message::Chunk(chunk))) => {
1312                assert_stream_chunk_eq!(chunk, chunk_1);
1313            }
1314            other => panic!("Expected a chunk message, got {:?}", other),
1315        }
1316
1317        match stream.next().await {
1318            Some(Ok(Message::Chunk(chunk))) => {
1319                assert_stream_chunk_eq!(chunk, chunk_2);
1320            }
1321            other => panic!("Expected a chunk message, got {:?}", other),
1322        }
1323
1324        match stream.next().await {
1325            Some(Ok(Message::Barrier(barrier))) => {
1326                assert_eq!(barrier.epoch.curr, test_epoch(2));
1327            }
1328            other => panic!("Expected a barrier message, got {:?}", other),
1329        }
1330    }
1331
1332    // When we hit buffer max_chunk, we only store placeholder `FlushedItem`.
1333    // So we just let capacity = 0, and we will always flush incoming chunks to state store.
1334    #[tokio::test]
1335    async fn test_max_chunk_persisted_read() {
1336        init_logger();
1337
1338        let pk_info = &KV_LOG_STORE_V2_INFO;
1339        let column_descs = test_payload_schema(pk_info);
1340        let fields = column_descs
1341            .into_iter()
1342            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1343            .collect_vec();
1344        let schema = Schema { fields };
1345        let pk_indices = vec![0];
1346        let (mut tx, source) = MockSource::channel();
1347        let source = source.into_executor(schema.clone(), pk_indices.clone());
1348
1349        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1350
1351        let table = gen_test_log_store_table(pk_info);
1352
1353        let log_store_executor = SyncedKvLogStoreExecutor::new(
1354            ActorContext::for_test(123),
1355            table.id,
1356            SyncedKvLogStoreMetrics::for_test(),
1357            LogStoreRowSerde::new(&table, vnodes, pk_info),
1358            MemoryStateStore::new(),
1359            0,
1360            256,
1361            source,
1362            Duration::from_millis(256),
1363        )
1364        .boxed();
1365
1366        // Init
1367        tx.push_barrier(test_epoch(1), false);
1368
1369        let chunk_1 = StreamChunk::from_pretty(
1370            "  I   T
1371            +  5  10
1372            +  6  10
1373            +  8  10
1374            +  9  10
1375            +  10 11",
1376        );
1377
1378        let chunk_2 = StreamChunk::from_pretty(
1379            "   I   T
1380            -   5  10
1381            -   6  10
1382            -   8  10
1383            U- 10  11
1384            U+ 10  10",
1385        );
1386
1387        tx.push_chunk(chunk_1.clone());
1388        tx.push_chunk(chunk_2.clone());
1389
1390        tx.push_barrier(test_epoch(2), false);
1391
1392        let mut stream = log_store_executor.execute();
1393
1394        for i in 1..=2 {
1395            match stream.next().await {
1396                Some(Ok(Message::Barrier(barrier))) => {
1397                    assert_eq!(barrier.epoch.curr, test_epoch(i));
1398                }
1399                other => panic!("Expected a barrier message, got {:?}", other),
1400            }
1401        }
1402
1403        match stream.next().await {
1404            Some(Ok(Message::Chunk(actual))) => {
1405                let expected = StreamChunk::from_pretty(
1406                    "   I   T
1407                    +   5  10
1408                    +   6  10
1409                    +   8  10
1410                    +   9  10
1411                    +  10  11
1412                    -   5  10
1413                    -   6  10
1414                    -   8  10
1415                    U- 10  11
1416                    U+ 10  10",
1417                );
1418                assert_stream_chunk_eq!(actual, expected);
1419            }
1420            other => panic!("Expected a chunk message, got {:?}", other),
1421        }
1422    }
1423}