risingwave_stream/executor/
sync_kv_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This contains the synced kv log store implementation.
16//! It's meant to buffer a large number of records emitted from upstream,
17//! to avoid overwhelming the downstream executor.
18//!
19//! The synced kv log store polls two futures:
20//!
21//! 1. Upstream: upstream message source
22//!
23//!   It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ...
24//!   When writing a stream chunk, if the log store buffer is full, it will:
25//!     a. Flush the buffer to the log store.
26//!     b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`)
27//!       which can read the corresponding chunks in the log store.
28//!       We will compact adjacent references,
29//!       so it can read multiple chunks if there's a build up.
30//!
31//!   On receiving barriers, it will:
32//!     a. Apply truncation to historical data in the logstore.
33//!     b. Flush and checkpoint the logstore data.
34//!
35//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore.
36//!
37//!   It will read all historical data from the logstore first. This can be done just by
38//!   constructing a state store stream, which will read all data until the latest epoch.
39//!   This is a static snapshot of data.
40//!   For any subsequently flushed chunks, we will read them via
41//!   `flushed_chunk_future`. See the next paragraph below.
42//!
43//!   We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how
44//!   it's constructed, what it is.
45//!
46//!   Finally we will pop the earliest item in the buffer.
47//!   - If it's a chunk yield it.
48//!   - If it's a watermark yield it.
49//!   - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`),
50//!     we will read the corresponding chunks in the log store.
51//!     This is done by constructing a `flushed_chunk_future` which will read the log store
52//!     using the `seq_id`.
53//!   - Barrier,
54//!     because they are directly propagated from the upstream when polling it.
55//!
56//! TODO(kwannoel):
57//! - [] Add dedicated metrics for sync log store, namespace according to the upstream.
58//! - [] Add tests
59//! - [] Handle watermark r/w
60//! - [] Handle paused stream
61
62use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79    LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89    KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92    LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93    LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96    Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101    Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105    use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
106
107    use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
108    use crate::executor::monitor::StreamingMetrics;
109    use crate::task::ActorId;
110
111    #[derive(Clone)]
112    pub struct SyncedKvLogStoreMetrics {
113        // state of the log store
114        pub unclean_state: LabelGuardedIntCounter,
115        pub clean_state: LabelGuardedIntCounter,
116        pub wait_next_poll_ns: LabelGuardedIntCounter,
117
118        // Write metrics
119        pub storage_write_count: LabelGuardedIntCounter,
120        pub storage_write_size: LabelGuardedIntCounter,
121        pub pause_duration_ns: LabelGuardedIntCounter,
122
123        // Buffer metrics
124        pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
125        pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
126        pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
127        pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
128        pub buffer_read_count: LabelGuardedIntCounter,
129        pub buffer_read_size: LabelGuardedIntCounter,
130
131        // Read metrics
132        pub total_read_count: LabelGuardedIntCounter,
133        pub total_read_size: LabelGuardedIntCounter,
134        pub persistent_log_read_metrics: KvLogStoreReadMetrics,
135        pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
136    }
137
138    impl SyncedKvLogStoreMetrics {
139        /// `id`: refers to a unique way to identify the logstore. This can be the sink id,
140        ///       or for joins, it can be the `fragment_id`.
141        /// `name`: refers to the MV / Sink that the log store is associated with.
142        /// `target`: refers to the target of the log store,
143        ///           for instance `MySql` Sink, PG sink, etc...
144        ///           or unaligned join.
145        pub(crate) fn new(
146            metrics: &StreamingMetrics,
147            actor_id: ActorId,
148            id: u32,
149            name: &str,
150            target: &'static str,
151        ) -> Self {
152            let actor_id_str = actor_id.to_string();
153            let id_str = id.to_string();
154            let labels = &[&actor_id_str, target, &id_str, name];
155
156            let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
157                "dirty",
158                &actor_id_str,
159                target,
160                &id_str,
161                name,
162            ]);
163            let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
164                "clean",
165                &actor_id_str,
166                target,
167                &id_str,
168                name,
169            ]);
170            let wait_next_poll_ns = metrics
171                .sync_kv_log_store_wait_next_poll_ns
172                .with_guarded_label_values(labels);
173
174            let storage_write_size = metrics
175                .sync_kv_log_store_storage_write_size
176                .with_guarded_label_values(labels);
177            let storage_write_count = metrics
178                .sync_kv_log_store_storage_write_count
179                .with_guarded_label_values(labels);
180            let storage_pause_duration_ns = metrics
181                .sync_kv_log_store_write_pause_duration_ns
182                .with_guarded_label_values(labels);
183
184            let buffer_unconsumed_item_count = metrics
185                .sync_kv_log_store_buffer_unconsumed_item_count
186                .with_guarded_label_values(labels);
187            let buffer_unconsumed_row_count = metrics
188                .sync_kv_log_store_buffer_unconsumed_row_count
189                .with_guarded_label_values(labels);
190            let buffer_unconsumed_epoch_count = metrics
191                .sync_kv_log_store_buffer_unconsumed_epoch_count
192                .with_guarded_label_values(labels);
193            let buffer_unconsumed_min_epoch = metrics
194                .sync_kv_log_store_buffer_unconsumed_min_epoch
195                .with_guarded_label_values(labels);
196            let buffer_read_count = metrics
197                .sync_kv_log_store_read_count
198                .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
199
200            let buffer_read_size = metrics
201                .sync_kv_log_store_read_size
202                .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
203
204            let total_read_count = metrics
205                .sync_kv_log_store_read_count
206                .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
207
208            let total_read_size = metrics
209                .sync_kv_log_store_read_size
210                .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
211
212            const READ_PERSISTENT_LOG: &str = "persistent_log";
213            const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
214
215            let persistent_log_read_size = metrics
216                .sync_kv_log_store_read_size
217                .with_guarded_label_values(&[
218                    READ_PERSISTENT_LOG,
219                    &actor_id_str,
220                    target,
221                    &id_str,
222                    name,
223                ]);
224
225            let persistent_log_read_count = metrics
226                .sync_kv_log_store_read_count
227                .with_guarded_label_values(&[
228                    READ_PERSISTENT_LOG,
229                    &actor_id_str,
230                    target,
231                    &id_str,
232                    name,
233                ]);
234
235            let flushed_buffer_read_size = metrics
236                .sync_kv_log_store_read_size
237                .with_guarded_label_values(&[
238                    READ_FLUSHED_BUFFER,
239                    &actor_id_str,
240                    target,
241                    &id_str,
242                    name,
243                ]);
244
245            let flushed_buffer_read_count = metrics
246                .sync_kv_log_store_read_count
247                .with_guarded_label_values(&[
248                    READ_FLUSHED_BUFFER,
249                    &actor_id_str,
250                    target,
251                    &id_str,
252                    name,
253                ]);
254
255            Self {
256                unclean_state,
257                clean_state,
258                wait_next_poll_ns,
259                storage_write_size,
260                storage_write_count,
261                pause_duration_ns: storage_pause_duration_ns,
262                buffer_unconsumed_item_count,
263                buffer_unconsumed_row_count,
264                buffer_unconsumed_epoch_count,
265                buffer_unconsumed_min_epoch,
266                buffer_read_count,
267                buffer_read_size,
268                total_read_count,
269                total_read_size,
270                persistent_log_read_metrics: KvLogStoreReadMetrics {
271                    storage_read_size: persistent_log_read_size,
272                    storage_read_count: persistent_log_read_count,
273                },
274                flushed_buffer_read_metrics: KvLogStoreReadMetrics {
275                    storage_read_count: flushed_buffer_read_count,
276                    storage_read_size: flushed_buffer_read_size,
277                },
278            }
279        }
280
281        #[cfg(test)]
282        pub(crate) fn for_test() -> Self {
283            SyncedKvLogStoreMetrics {
284                unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
285                clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
286                wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
287                storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
288                storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
289                pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
290                buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
291                buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
292                buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
293                buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
294                buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
295                buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
296                total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
297                total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
298                persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
299                flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
300            }
301        }
302    }
303}
304
305type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
306
307pub struct SyncedKvLogStoreExecutor<S: StateStore> {
308    actor_context: ActorContextRef,
309    table_id: TableId,
310    metrics: SyncedKvLogStoreMetrics,
311    serde: LogStoreRowSerde,
312
313    // Upstream
314    upstream: Executor,
315
316    // Log store state
317    state_store: S,
318    max_buffer_size: usize,
319
320    // Max chunk size when reading from logstore / buffer
321    chunk_size: u32,
322
323    pause_duration_ms: Duration,
324}
325// Stream interface
326impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
327    #[expect(clippy::too_many_arguments)]
328    pub(crate) fn new(
329        actor_context: ActorContextRef,
330        table_id: u32,
331        metrics: SyncedKvLogStoreMetrics,
332        serde: LogStoreRowSerde,
333        state_store: S,
334        buffer_size: usize,
335        chunk_size: u32,
336        upstream: Executor,
337        pause_duration_ms: Duration,
338    ) -> Self {
339        Self {
340            actor_context,
341            table_id: TableId::new(table_id),
342            metrics,
343            serde,
344            state_store,
345            upstream,
346            max_buffer_size: buffer_size,
347            chunk_size,
348            pause_duration_ms,
349        }
350    }
351}
352
353struct FlushedChunkInfo {
354    epoch: u64,
355    start_seq_id: SeqId,
356    end_seq_id: SeqId,
357    flush_info: FlushInfo,
358    vnode_bitmap: Bitmap,
359}
360
361enum WriteFuture<S: LocalStateStore> {
362    /// We trigger a brief pause to let the `ReadFuture` be polled in the following scenarios:
363    /// - When seeing an upstream data chunk, when the buffer becomes full, and the state is clean.
364    /// - When seeing a checkpoint barrier, when the buffer is not empty, and the state is clean.
365    ///
366    /// On pausing, we will transition to a dirty state.
367    ///
368    /// We trigger resume to let the `ReadFuture` to be polled in the following scenarios:
369    /// - After the pause duration.
370    /// - After the read future consumes a chunk.
371    Paused {
372        start_instant: Instant,
373        sleep_future: Option<Pin<Box<Sleep>>>,
374        barrier: Barrier,
375        stream: BoxedMessageStream,
376        write_state: LogStoreWriteState<S>, // Just used to hold the state
377    },
378    ReceiveFromUpstream {
379        future: StreamFuture<BoxedMessageStream>,
380        write_state: LogStoreWriteState<S>,
381    },
382    FlushingChunk {
383        epoch: u64,
384        start_seq_id: SeqId,
385        end_seq_id: SeqId,
386        future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
387        stream: BoxedMessageStream,
388    },
389    Empty,
390}
391
392enum WriteFutureEvent {
393    UpstreamMessageReceived(Message),
394    ChunkFlushed(FlushedChunkInfo),
395}
396
397impl<S: LocalStateStore> WriteFuture<S> {
398    fn flush_chunk(
399        stream: BoxedMessageStream,
400        write_state: LogStoreWriteState<S>,
401        chunk: StreamChunk,
402        epoch: u64,
403        start_seq_id: SeqId,
404        end_seq_id: SeqId,
405    ) -> Self {
406        tracing::trace!(
407            start_seq_id,
408            end_seq_id,
409            epoch,
410            cardinality = chunk.cardinality(),
411            "write_future: flushing chunk"
412        );
413        Self::FlushingChunk {
414            epoch,
415            start_seq_id,
416            end_seq_id,
417            future: Box::pin(write_state.into_write_chunk_future(
418                chunk,
419                epoch,
420                start_seq_id,
421                end_seq_id,
422            )),
423            stream,
424        }
425    }
426
427    fn receive_from_upstream(
428        stream: BoxedMessageStream,
429        write_state: LogStoreWriteState<S>,
430    ) -> Self {
431        Self::ReceiveFromUpstream {
432            future: stream.into_future(),
433            write_state,
434        }
435    }
436
437    fn paused(
438        duration: Duration,
439        barrier: Barrier,
440        stream: BoxedMessageStream,
441        write_state: LogStoreWriteState<S>,
442    ) -> Self {
443        let now = Instant::now();
444        tracing::trace!(?now, ?duration, "write_future_pause");
445        Self::Paused {
446            start_instant: now,
447            sleep_future: Some(Box::pin(sleep_until(now + duration))),
448            barrier,
449            stream,
450            write_state,
451        }
452    }
453
454    async fn next_event(
455        &mut self,
456        metrics: &SyncedKvLogStoreMetrics,
457    ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
458        match self {
459            WriteFuture::Paused {
460                start_instant,
461                sleep_future,
462                ..
463            } => {
464                if let Some(sleep_future) = sleep_future {
465                    sleep_future.await;
466                    metrics
467                        .pause_duration_ns
468                        .inc_by(start_instant.elapsed().as_nanos() as _);
469                    tracing::trace!("resuming write future");
470                }
471                must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
472                    Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
473                })
474            }
475            WriteFuture::ReceiveFromUpstream { future, .. } => {
476                let (opt, stream) = future.await;
477                must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
478                    opt
479                    .ok_or_else(|| anyhow!("end of upstream input").into())
480                    .and_then(|result| result.map(|item| {
481                        (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
482                    }))
483                })
484            }
485            WriteFuture::FlushingChunk { future, .. } => {
486                let (write_state, result) = future.await;
487                let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, ..  } => {
488                    result.map(|(flush_info, vnode_bitmap)| {
489                        (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
490                            epoch,
491                            start_seq_id,
492                            end_seq_id,
493                            flush_info,
494                            vnode_bitmap,
495                        }))
496                    })
497                });
498                result.map_err(Into::into)
499            }
500            WriteFuture::Empty => {
501                unreachable!("should not be polled after ready")
502            }
503        }
504    }
505}
506
507// Stream interface
508impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
509    #[try_stream(ok= Message, error = StreamExecutorError)]
510    pub async fn execute_monitored(self) {
511        let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
512        #[for_await]
513        for message in self.execute_inner() {
514            let current_time = Instant::now();
515            yield message?;
516            wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
517        }
518    }
519
520    #[try_stream(ok = Message, error = StreamExecutorError)]
521    async fn execute_inner(self) {
522        let mut input = self.upstream.execute();
523
524        // init first epoch + local state store
525        let first_barrier = expect_first_barrier(&mut input).await?;
526        let first_write_epoch = first_barrier.epoch;
527        yield Message::Barrier(first_barrier.clone());
528
529        let local_state_store = self
530            .state_store
531            .new_local(NewLocalOptions {
532                table_id: self.table_id,
533                op_consistency_level: OpConsistencyLevel::Inconsistent,
534                table_option: TableOption {
535                    retention_seconds: None,
536                },
537                is_replicated: false,
538                vnodes: self.serde.vnodes().clone(),
539            })
540            .await;
541
542        let (mut read_state, mut initial_write_state) =
543            new_log_store_state(self.table_id, local_state_store, self.serde);
544        initial_write_state.init(first_write_epoch).await?;
545
546        let mut pause_stream = first_barrier.is_pause_on_startup();
547        let mut initial_write_epoch = first_write_epoch;
548
549        // We only recreate the consume stream when:
550        // 1. On bootstrap
551        // 2. On vnode update
552        'recreate_consume_stream: loop {
553            let mut seq_id = FIRST_SEQ_ID;
554            let mut buffer = SyncedLogStoreBuffer {
555                buffer: VecDeque::new(),
556                current_size: 0,
557                max_size: self.max_buffer_size,
558                max_chunk_size: self.chunk_size,
559                next_chunk_id: 0,
560                metrics: self.metrics.clone(),
561                flushed_count: 0,
562            };
563
564            let log_store_stream = read_state
565                .read_persisted_log_store(
566                    self.metrics.persistent_log_read_metrics.clone(),
567                    initial_write_epoch.curr,
568                    LogStoreReadStateStreamRangeStart::Unbounded,
569                )
570                .await?;
571
572            let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
573            let mut clean_state = log_store_stream.peek().await.is_none();
574            tracing::trace!(?clean_state);
575
576            let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
577
578            let mut write_future_state =
579                WriteFuture::receive_from_upstream(input, initial_write_state);
580
581            let mut progress = LogStoreVnodeProgress::None;
582
583            loop {
584                let select_result = {
585                    let read_future = async {
586                        if pause_stream {
587                            pending().await
588                        } else {
589                            read_future_state
590                                .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
591                                .await
592                        }
593                    };
594                    pin_mut!(read_future);
595                    let write_future = write_future_state.next_event(&self.metrics);
596                    pin_mut!(write_future);
597                    let output = select(write_future, read_future).await;
598                    drop_either_future(output)
599                };
600                match select_result {
601                    Either::Left(result) => {
602                        // drop the future to ensure that the future must be reset later
603                        drop(write_future_state);
604                        let (stream, mut write_state, either) = result?;
605                        match either {
606                            WriteFutureEvent::UpstreamMessageReceived(msg) => {
607                                match msg {
608                                    Message::Barrier(barrier) => {
609                                        if clean_state
610                                            && barrier.kind.is_checkpoint()
611                                            && !buffer.is_empty()
612                                        {
613                                            write_future_state = WriteFuture::paused(
614                                                self.pause_duration_ms,
615                                                barrier,
616                                                stream,
617                                                write_state,
618                                            );
619                                            clean_state = false;
620                                            self.metrics.unclean_state.inc();
621                                        } else {
622                                            if let Some(mutation) = barrier.mutation.as_deref() {
623                                                match mutation {
624                                                    Mutation::Pause => {
625                                                        pause_stream = true;
626                                                    }
627                                                    Mutation::Resume => {
628                                                        pause_stream = false;
629                                                    }
630                                                    _ => {}
631                                                }
632                                            }
633                                            let write_state_post_write_barrier =
634                                                Self::write_barrier(
635                                                    &mut write_state,
636                                                    barrier.clone(),
637                                                    &self.metrics,
638                                                    progress.take(),
639                                                    &mut buffer,
640                                                )
641                                                .await?;
642                                            seq_id = FIRST_SEQ_ID;
643                                            let update_vnode_bitmap = barrier
644                                                .as_update_vnode_bitmap(self.actor_context.id);
645                                            let barrier_epoch = barrier.epoch;
646
647                                            yield Message::Barrier(barrier);
648
649                                            write_state_post_write_barrier
650                                                .post_yield_barrier(update_vnode_bitmap.clone())
651                                                .await?;
652                                            if let Some(vnode_bitmap) = update_vnode_bitmap {
653                                                // Apply Vnode Update
654                                                read_state.update_vnode_bitmap(vnode_bitmap);
655                                                initial_write_epoch = barrier_epoch;
656                                                input = stream;
657                                                initial_write_state = write_state;
658                                                continue 'recreate_consume_stream;
659                                            } else {
660                                                write_future_state =
661                                                    WriteFuture::receive_from_upstream(
662                                                        stream,
663                                                        write_state,
664                                                    );
665                                            }
666                                        }
667                                    }
668                                    Message::Chunk(chunk) => {
669                                        let start_seq_id = seq_id;
670                                        let new_seq_id = seq_id + chunk.cardinality() as SeqId;
671                                        let end_seq_id = new_seq_id - 1;
672                                        let epoch = write_state.epoch().curr;
673                                        tracing::trace!(
674                                            start_seq_id,
675                                            end_seq_id,
676                                            new_seq_id,
677                                            epoch,
678                                            cardinality = chunk.cardinality(),
679                                            "received chunk"
680                                        );
681                                        if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
682                                            start_seq_id,
683                                            end_seq_id,
684                                            chunk,
685                                            epoch,
686                                        ) {
687                                            seq_id = new_seq_id;
688                                            write_future_state = WriteFuture::flush_chunk(
689                                                stream,
690                                                write_state,
691                                                chunk_to_flush,
692                                                epoch,
693                                                start_seq_id,
694                                                end_seq_id,
695                                            );
696                                        } else {
697                                            seq_id = new_seq_id;
698                                            write_future_state = WriteFuture::receive_from_upstream(
699                                                stream,
700                                                write_state,
701                                            );
702                                        }
703                                    }
704                                    // FIXME(kwannoel): This should truncate the logstore,
705                                    // it will not bypass like barrier.
706                                    Message::Watermark(_watermark) => {
707                                        write_future_state =
708                                            WriteFuture::receive_from_upstream(stream, write_state);
709                                    }
710                                }
711                            }
712                            WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
713                                start_seq_id,
714                                end_seq_id,
715                                epoch,
716                                flush_info,
717                                vnode_bitmap,
718                            }) => {
719                                buffer.add_flushed_item_to_buffer(
720                                    start_seq_id,
721                                    end_seq_id,
722                                    vnode_bitmap,
723                                    epoch,
724                                );
725                                self.metrics
726                                    .storage_write_count
727                                    .inc_by(flush_info.flush_count as _);
728                                self.metrics
729                                    .storage_write_size
730                                    .inc_by(flush_info.flush_size as _);
731                                write_future_state =
732                                    WriteFuture::receive_from_upstream(stream, write_state);
733                            }
734                        }
735                    }
736                    Either::Right(result) => {
737                        if !clean_state
738                            && matches!(read_future_state, ReadFuture::Idle)
739                            && buffer.is_empty()
740                        {
741                            clean_state = true;
742                            self.metrics.clean_state.inc();
743
744                            // Let write future resume immediately
745                            if let WriteFuture::Paused { sleep_future, .. } =
746                                &mut write_future_state
747                            {
748                                tracing::trace!("resuming paused future");
749                                assert!(buffer.current_size < self.max_buffer_size);
750                                *sleep_future = None;
751                            }
752                        }
753                        let chunk = result?;
754                        self.metrics
755                            .total_read_count
756                            .inc_by(chunk.cardinality() as _);
757
758                        yield Message::Chunk(chunk);
759                    }
760                }
761            }
762        }
763    }
764}
765
766type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
767
768enum ReadFuture<S: StateStoreRead> {
769    ReadingPersistedStream(PersistedStream<S>),
770    ReadingFlushedChunk {
771        future: ReadFlushedChunkFuture,
772        end_seq_id: SeqId,
773    },
774    Idle,
775}
776
777// Read methods
778impl<S: StateStoreRead> ReadFuture<S> {
779    async fn next_chunk(
780        &mut self,
781        progress: &mut LogStoreVnodeProgress,
782        read_state: &LogStoreReadState<S>,
783        buffer: &mut SyncedLogStoreBuffer,
784        metrics: &SyncedKvLogStoreMetrics,
785    ) -> StreamExecutorResult<StreamChunk> {
786        match self {
787            ReadFuture::ReadingPersistedStream(stream) => {
788                while let Some((epoch, item)) = stream.try_next().await? {
789                    match item {
790                        KvLogStoreItem::Barrier { vnodes, .. } => {
791                            tracing::trace!(epoch, "read logstore barrier");
792                            // update the progress
793                            progress.apply_aligned(vnodes, epoch, None);
794                            continue;
795                        }
796                        KvLogStoreItem::StreamChunk {
797                            chunk,
798                            progress: chunk_progress,
799                        } => {
800                            tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
801                            progress.apply_per_vnode(epoch, chunk_progress);
802                            return Ok(chunk);
803                        }
804                    }
805                }
806                *self = ReadFuture::Idle;
807            }
808            ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
809        }
810        match self {
811            ReadFuture::ReadingPersistedStream(_) => {
812                unreachable!("must have finished read persisted stream when reaching here")
813            }
814            ReadFuture::ReadingFlushedChunk { .. } => {}
815            ReadFuture::Idle => loop {
816                let Some((item_epoch, item)) = buffer.pop_front() else {
817                    return pending().await;
818                };
819                match item {
820                    LogStoreBufferItem::StreamChunk {
821                        chunk,
822                        start_seq_id,
823                        end_seq_id,
824                        flushed,
825                        ..
826                    } => {
827                        metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
828                        tracing::trace!(
829                            start_seq_id,
830                            end_seq_id,
831                            flushed,
832                            cardinality = chunk.cardinality(),
833                            "read buffered chunk of size"
834                        );
835                        return Ok(chunk);
836                    }
837                    LogStoreBufferItem::Flushed {
838                        vnode_bitmap,
839                        start_seq_id,
840                        end_seq_id,
841                        chunk_id,
842                    } => {
843                        tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
844                        let read_metrics = metrics.flushed_buffer_read_metrics.clone();
845                        let future = read_state
846                            .read_flushed_chunk(
847                                vnode_bitmap,
848                                chunk_id,
849                                start_seq_id,
850                                end_seq_id,
851                                item_epoch,
852                                read_metrics,
853                            )
854                            .boxed();
855                        *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
856                        break;
857                    }
858                    LogStoreBufferItem::Barrier { .. } => {
859                        tracing::trace!(item_epoch, "read buffer barrier");
860                        progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
861                        continue;
862                    }
863                }
864            },
865        }
866
867        let (future, end_seq_id) = match self {
868            ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
869                unreachable!("should be at ReadingFlushedChunk")
870            }
871            ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
872        };
873
874        let (_, chunk, epoch) = future.await?;
875        progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
876        tracing::trace!(
877            end_seq_id,
878            "read flushed chunk of size: {}",
879            chunk.cardinality()
880        );
881        *self = ReadFuture::Idle;
882        Ok(chunk)
883    }
884}
885
886// Write methods
887impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
888    async fn write_barrier<'a>(
889        write_state: &'a mut LogStoreWriteState<S::Local>,
890        barrier: Barrier,
891        metrics: &SyncedKvLogStoreMetrics,
892        progress: LogStoreVnodeProgress,
893        buffer: &mut SyncedLogStoreBuffer,
894    ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
895        tracing::trace!(?progress, "applying truncation");
896        // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
897        // to reduce memory consumption of logstore.
898
899        let epoch = barrier.epoch.prev;
900        let mut writer = write_state.start_writer(false);
901        writer.write_barrier(epoch, barrier.is_checkpoint())?;
902
903        if barrier.is_checkpoint() {
904            for (epoch, item) in buffer.buffer.iter_mut().rev() {
905                match item {
906                    LogStoreBufferItem::StreamChunk {
907                        chunk,
908                        start_seq_id,
909                        end_seq_id,
910                        flushed,
911                        ..
912                    } => {
913                        if !*flushed {
914                            writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
915                            *flushed = true;
916                        } else {
917                            break;
918                        }
919                    }
920                    LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
921                }
922            }
923        }
924
925        // Apply truncation
926        let (flush_info, _) = writer.finish().await?;
927        metrics
928            .storage_write_count
929            .inc_by(flush_info.flush_count as _);
930        metrics
931            .storage_write_size
932            .inc_by(flush_info.flush_size as _);
933        let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
934
935        // Add to buffer
936        buffer.buffer.push_back((
937            epoch,
938            LogStoreBufferItem::Barrier {
939                is_checkpoint: barrier.is_checkpoint(),
940                next_epoch: barrier.epoch.curr,
941            },
942        ));
943        buffer.next_chunk_id = 0;
944        buffer.update_unconsumed_buffer_metrics();
945
946        Ok(post_seal)
947    }
948}
949
950struct SyncedLogStoreBuffer {
951    buffer: VecDeque<(u64, LogStoreBufferItem)>,
952    current_size: usize,
953    max_size: usize,
954    max_chunk_size: u32,
955    next_chunk_id: ChunkId,
956    metrics: SyncedKvLogStoreMetrics,
957    flushed_count: usize,
958}
959
960impl SyncedLogStoreBuffer {
961    fn is_empty(&self) -> bool {
962        self.current_size == 0
963    }
964
965    fn add_or_flush_chunk(
966        &mut self,
967        start_seq_id: SeqId,
968        end_seq_id: SeqId,
969        chunk: StreamChunk,
970        epoch: u64,
971    ) -> Option<StreamChunk> {
972        let current_size = self.current_size;
973        let chunk_size = chunk.cardinality();
974
975        tracing::trace!(
976            current_size,
977            chunk_size,
978            max_size = self.max_size,
979            "checking chunk size"
980        );
981        let should_flush_chunk = current_size + chunk_size > self.max_size;
982        if should_flush_chunk {
983            tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
984            Some(chunk)
985        } else {
986            tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
987            self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
988            None
989        }
990    }
991
992    /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer.
993    /// This doesn't contain any data, but it contains the metadata to read the flushed chunk.
994    fn add_flushed_item_to_buffer(
995        &mut self,
996        start_seq_id: SeqId,
997        end_seq_id: SeqId,
998        new_vnode_bitmap: Bitmap,
999        epoch: u64,
1000    ) {
1001        let new_chunk_size = end_seq_id - start_seq_id + 1;
1002
1003        if let Some((
1004            item_epoch,
1005            LogStoreBufferItem::Flushed {
1006                start_seq_id: prev_start_seq_id,
1007                end_seq_id: prev_end_seq_id,
1008                vnode_bitmap,
1009                ..
1010            },
1011        )) = self.buffer.back_mut()
1012            && let flushed_chunk_size = *prev_end_seq_id - *prev_start_seq_id + 1
1013            && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1014            && projected_flushed_chunk_size as u32 <= self.max_chunk_size
1015        {
1016            assert!(
1017                *prev_end_seq_id < start_seq_id,
1018                "prev end_seq_id {} should be smaller than current start_seq_id {}",
1019                end_seq_id,
1020                start_seq_id
1021            );
1022            assert_eq!(
1023                epoch, *item_epoch,
1024                "epoch of newly added flushed item must be the same as the last flushed item"
1025            );
1026            *prev_end_seq_id = end_seq_id;
1027            *vnode_bitmap |= new_vnode_bitmap;
1028        } else {
1029            let chunk_id = self.next_chunk_id;
1030            self.next_chunk_id += 1;
1031            self.buffer.push_back((
1032                epoch,
1033                LogStoreBufferItem::Flushed {
1034                    start_seq_id,
1035                    end_seq_id,
1036                    vnode_bitmap: new_vnode_bitmap,
1037                    chunk_id,
1038                },
1039            ));
1040            self.flushed_count += 1;
1041            tracing::trace!(
1042                "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1043            );
1044        }
1045        // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported.
1046        self.update_unconsumed_buffer_metrics();
1047    }
1048
1049    fn add_chunk_to_buffer(
1050        &mut self,
1051        chunk: StreamChunk,
1052        start_seq_id: SeqId,
1053        end_seq_id: SeqId,
1054        epoch: u64,
1055    ) {
1056        let chunk_id = self.next_chunk_id;
1057        self.next_chunk_id += 1;
1058        self.current_size += chunk.cardinality();
1059        self.buffer.push_back((
1060            epoch,
1061            LogStoreBufferItem::StreamChunk {
1062                chunk,
1063                start_seq_id,
1064                end_seq_id,
1065                flushed: false,
1066                chunk_id,
1067            },
1068        ));
1069        self.update_unconsumed_buffer_metrics();
1070    }
1071
1072    fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1073        let item = self.buffer.pop_front();
1074        match &item {
1075            Some((_, LogStoreBufferItem::Flushed { .. })) => {
1076                self.flushed_count -= 1;
1077            }
1078            Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1079                self.current_size -= chunk.cardinality();
1080            }
1081            _ => {}
1082        }
1083        self.update_unconsumed_buffer_metrics();
1084        item
1085    }
1086
1087    fn update_unconsumed_buffer_metrics(&self) {
1088        let mut epoch_count = 0;
1089        let mut row_count = 0;
1090        for (_, item) in &self.buffer {
1091            match item {
1092                LogStoreBufferItem::StreamChunk { chunk, .. } => {
1093                    row_count += chunk.cardinality();
1094                }
1095                LogStoreBufferItem::Flushed {
1096                    start_seq_id,
1097                    end_seq_id,
1098                    ..
1099                } => {
1100                    row_count += (end_seq_id - start_seq_id) as usize;
1101                }
1102                LogStoreBufferItem::Barrier { .. } => {
1103                    epoch_count += 1;
1104                }
1105            }
1106        }
1107        self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1108        self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1109        self.metrics
1110            .buffer_unconsumed_item_count
1111            .set(self.buffer.len() as _);
1112        self.metrics.buffer_unconsumed_min_epoch.set(
1113            self.buffer
1114                .front()
1115                .map(|(epoch, _)| *epoch)
1116                .unwrap_or_default() as _,
1117        );
1118    }
1119}
1120
1121impl<S> Execute for SyncedKvLogStoreExecutor<S>
1122where
1123    S: StateStore,
1124{
1125    fn execute(self: Box<Self>) -> BoxedMessageStream {
1126        self.execute_monitored().boxed()
1127    }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use itertools::Itertools;
1133    use pretty_assertions::assert_eq;
1134    use risingwave_common::catalog::Field;
1135    use risingwave_common::hash::VirtualNode;
1136    use risingwave_common::test_prelude::*;
1137    use risingwave_common::util::epoch::test_epoch;
1138    use risingwave_storage::memory::MemoryStateStore;
1139
1140    use super::*;
1141    use crate::assert_stream_chunk_eq;
1142    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1143    use crate::common::log_store_impl::kv_log_store::test_utils::{
1144        check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1145    };
1146    use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1147    use crate::executor::test_utils::MockSource;
1148
1149    fn init_logger() {
1150        let _ = tracing_subscriber::fmt()
1151            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1152            .with_ansi(false)
1153            .try_init();
1154    }
1155
1156    // test read/write buffer
1157    #[tokio::test]
1158    async fn test_read_write_buffer() {
1159        init_logger();
1160
1161        let pk_info = &KV_LOG_STORE_V2_INFO;
1162        let column_descs = test_payload_schema(pk_info);
1163        let fields = column_descs
1164            .into_iter()
1165            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1166            .collect_vec();
1167        let schema = Schema { fields };
1168        let pk_indices = vec![0];
1169        let (mut tx, source) = MockSource::channel();
1170        let source = source.into_executor(schema.clone(), pk_indices.clone());
1171
1172        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1173
1174        let table = gen_test_log_store_table(pk_info);
1175
1176        let log_store_executor = SyncedKvLogStoreExecutor::new(
1177            ActorContext::for_test(123),
1178            table.id,
1179            SyncedKvLogStoreMetrics::for_test(),
1180            LogStoreRowSerde::new(&table, vnodes, pk_info),
1181            MemoryStateStore::new(),
1182            10,
1183            256,
1184            source,
1185            Duration::from_millis(256),
1186        )
1187        .boxed();
1188
1189        // Init
1190        tx.push_barrier(test_epoch(1), false);
1191
1192        let chunk_1 = StreamChunk::from_pretty(
1193            "  I   T
1194            +  5  10
1195            +  6  10
1196            +  8  10
1197            +  9  10
1198            +  10 11",
1199        );
1200
1201        let chunk_2 = StreamChunk::from_pretty(
1202            "   I   T
1203            -   5  10
1204            -   6  10
1205            -   8  10
1206            U-  9  10
1207            U+ 10  11",
1208        );
1209
1210        tx.push_chunk(chunk_1.clone());
1211        tx.push_chunk(chunk_2.clone());
1212
1213        let mut stream = log_store_executor.execute();
1214
1215        match stream.next().await {
1216            Some(Ok(Message::Barrier(barrier))) => {
1217                assert_eq!(barrier.epoch.curr, test_epoch(1));
1218            }
1219            other => panic!("Expected a barrier message, got {:?}", other),
1220        }
1221
1222        match stream.next().await {
1223            Some(Ok(Message::Chunk(chunk))) => {
1224                assert_stream_chunk_eq!(chunk, chunk_1);
1225            }
1226            other => panic!("Expected a chunk message, got {:?}", other),
1227        }
1228
1229        match stream.next().await {
1230            Some(Ok(Message::Chunk(chunk))) => {
1231                assert_stream_chunk_eq!(chunk, chunk_2);
1232            }
1233            other => panic!("Expected a chunk message, got {:?}", other),
1234        }
1235
1236        tx.push_barrier(test_epoch(2), false);
1237
1238        match stream.next().await {
1239            Some(Ok(Message::Barrier(barrier))) => {
1240                assert_eq!(barrier.epoch.curr, test_epoch(2));
1241            }
1242            other => panic!("Expected a barrier message, got {:?}", other),
1243        }
1244    }
1245
1246    // test barrier persisted read
1247    //
1248    // sequence of events (earliest -> latest):
1249    // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item
1250    // * poll just means we read from the executor stream.
1251    #[tokio::test]
1252    async fn test_barrier_persisted_read() {
1253        init_logger();
1254
1255        let pk_info = &KV_LOG_STORE_V2_INFO;
1256        let column_descs = test_payload_schema(pk_info);
1257        let fields = column_descs
1258            .into_iter()
1259            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1260            .collect_vec();
1261        let schema = Schema { fields };
1262        let pk_indices = vec![0];
1263        let (mut tx, source) = MockSource::channel();
1264        let source = source.into_executor(schema.clone(), pk_indices.clone());
1265
1266        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1267
1268        let table = gen_test_log_store_table(pk_info);
1269
1270        let log_store_executor = SyncedKvLogStoreExecutor::new(
1271            ActorContext::for_test(123),
1272            table.id,
1273            SyncedKvLogStoreMetrics::for_test(),
1274            LogStoreRowSerde::new(&table, vnodes, pk_info),
1275            MemoryStateStore::new(),
1276            10,
1277            256,
1278            source,
1279            Duration::from_millis(256),
1280        )
1281        .boxed();
1282
1283        // Init
1284        tx.push_barrier(test_epoch(1), false);
1285
1286        let chunk_1 = StreamChunk::from_pretty(
1287            "  I   T
1288            +  5  10
1289            +  6  10
1290            +  8  10
1291            +  9  10
1292            +  10 11",
1293        );
1294
1295        let chunk_2 = StreamChunk::from_pretty(
1296            "   I   T
1297            -   5  10
1298            -   6  10
1299            -   8  10
1300            U- 10  11
1301            U+ 10  10",
1302        );
1303
1304        tx.push_chunk(chunk_1.clone());
1305        tx.push_chunk(chunk_2.clone());
1306
1307        tx.push_barrier(test_epoch(2), false);
1308
1309        let mut stream = log_store_executor.execute();
1310
1311        match stream.next().await {
1312            Some(Ok(Message::Barrier(barrier))) => {
1313                assert_eq!(barrier.epoch.curr, test_epoch(1));
1314            }
1315            other => panic!("Expected a barrier message, got {:?}", other),
1316        }
1317
1318        match stream.next().await {
1319            Some(Ok(Message::Chunk(chunk))) => {
1320                assert_stream_chunk_eq!(chunk, chunk_1);
1321            }
1322            other => panic!("Expected a chunk message, got {:?}", other),
1323        }
1324
1325        match stream.next().await {
1326            Some(Ok(Message::Chunk(chunk))) => {
1327                assert_stream_chunk_eq!(chunk, chunk_2);
1328            }
1329            other => panic!("Expected a chunk message, got {:?}", other),
1330        }
1331
1332        match stream.next().await {
1333            Some(Ok(Message::Barrier(barrier))) => {
1334                assert_eq!(barrier.epoch.curr, test_epoch(2));
1335            }
1336            other => panic!("Expected a barrier message, got {:?}", other),
1337        }
1338    }
1339
1340    // When we hit buffer max_chunk, we only store placeholder `FlushedItem`.
1341    // So we just let capacity = 0, and we will always flush incoming chunks to state store.
1342    #[tokio::test]
1343    async fn test_max_chunk_persisted_read() {
1344        init_logger();
1345
1346        let pk_info = &KV_LOG_STORE_V2_INFO;
1347        let column_descs = test_payload_schema(pk_info);
1348        let fields = column_descs
1349            .into_iter()
1350            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1351            .collect_vec();
1352        let schema = Schema { fields };
1353        let pk_indices = vec![0];
1354        let (mut tx, source) = MockSource::channel();
1355        let source = source.into_executor(schema.clone(), pk_indices.clone());
1356
1357        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1358
1359        let table = gen_test_log_store_table(pk_info);
1360
1361        let log_store_executor = SyncedKvLogStoreExecutor::new(
1362            ActorContext::for_test(123),
1363            table.id,
1364            SyncedKvLogStoreMetrics::for_test(),
1365            LogStoreRowSerde::new(&table, vnodes, pk_info),
1366            MemoryStateStore::new(),
1367            0,
1368            256,
1369            source,
1370            Duration::from_millis(256),
1371        )
1372        .boxed();
1373
1374        // Init
1375        tx.push_barrier(test_epoch(1), false);
1376
1377        let chunk_1 = StreamChunk::from_pretty(
1378            "  I   T
1379            +  5  10
1380            +  6  10
1381            +  8  10
1382            +  9  10
1383            +  10 11",
1384        );
1385
1386        let chunk_2 = StreamChunk::from_pretty(
1387            "   I   T
1388            -   5  10
1389            -   6  10
1390            -   8  10
1391            U- 10  11
1392            U+ 10  10",
1393        );
1394
1395        tx.push_chunk(chunk_1.clone());
1396        tx.push_chunk(chunk_2.clone());
1397
1398        tx.push_barrier(test_epoch(2), false);
1399
1400        let mut stream = log_store_executor.execute();
1401
1402        for i in 1..=2 {
1403            match stream.next().await {
1404                Some(Ok(Message::Barrier(barrier))) => {
1405                    assert_eq!(barrier.epoch.curr, test_epoch(i));
1406                }
1407                other => panic!("Expected a barrier message, got {:?}", other),
1408            }
1409        }
1410
1411        match stream.next().await {
1412            Some(Ok(Message::Chunk(actual))) => {
1413                let expected = StreamChunk::from_pretty(
1414                    "   I   T
1415                    +   5  10
1416                    +   6  10
1417                    +   8  10
1418                    +   9  10
1419                    +  10  11
1420                    -   5  10
1421                    -   6  10
1422                    -   8  10
1423                    U- 10  11
1424                    U+ 10  10",
1425                );
1426                assert_stream_chunk_eq!(actual, expected);
1427            }
1428            other => panic!("Expected a chunk message, got {:?}", other),
1429        }
1430    }
1431}