risingwave_stream/executor/
sync_kv_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This contains the synced kv log store implementation.
16//! It's meant to buffer a large number of records emitted from upstream,
17//! to avoid overwhelming the downstream executor.
18//!
19//! The synced kv log store polls two futures:
20//!
21//! 1. Upstream: upstream message source
22//!
23//!   It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ...
24//!   When writing a stream chunk, if the log store buffer is full, it will:
25//!     a. Flush the buffer to the log store.
26//!     b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`)
27//!       which can read the corresponding chunks in the log store.
28//!       We will compact adjacent references,
29//!       so it can read multiple chunks if there's a build up.
30//!
31//!   On receiving barriers, it will:
32//!     a. Apply truncation to historical data in the logstore.
33//!     b. Flush and checkpoint the logstore data.
34//!
35//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore.
36//!
37//!   It will read all historical data from the logstore first. This can be done just by
38//!   constructing a state store stream, which will read all data until the latest epoch.
39//!   This is a static snapshot of data.
40//!   For any subsequently flushed chunks, we will read them via
41//!   `flushed_chunk_future`. See the next paragraph below.
42//!
43//!   We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how
44//!   it's constructed, what it is.
45//!
46//!   Finally we will pop the earliest item in the buffer.
47//!   - If it's a chunk yield it.
48//!   - If it's a watermark yield it.
49//!   - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`),
50//!     we will read the corresponding chunks in the log store.
51//!     This is done by constructing a `flushed_chunk_future` which will read the log store
52//!     using the `seq_id`.
53//!   - Barrier,
54//!     because they are directly propagated from the upstream when polling it.
55//!
56//! TODO(kwannoel):
57//! - [] Add dedicated metrics for sync log store, namespace according to the upstream.
58//! - [] Add tests
59//! - [] Handle watermark r/w
60//! - [] Handle paused stream
61
62use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79    LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89    KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92    LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93    LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96    Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101    Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105    use risingwave_common::id::FragmentId;
106    use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
107
108    use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
109    use crate::executor::monitor::StreamingMetrics;
110    use crate::task::ActorId;
111
112    #[derive(Clone)]
113    pub struct SyncedKvLogStoreMetrics {
114        // state of the log store
115        pub unclean_state: LabelGuardedIntCounter,
116        pub clean_state: LabelGuardedIntCounter,
117        pub wait_next_poll_ns: LabelGuardedIntCounter,
118
119        // Write metrics
120        pub storage_write_count: LabelGuardedIntCounter,
121        pub storage_write_size: LabelGuardedIntCounter,
122        pub pause_duration_ns: LabelGuardedIntCounter,
123
124        // Buffer metrics
125        pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
126        pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
127        pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
128        pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
129        pub buffer_read_count: LabelGuardedIntCounter,
130        pub buffer_read_size: LabelGuardedIntCounter,
131
132        // Read metrics
133        pub total_read_count: LabelGuardedIntCounter,
134        pub total_read_size: LabelGuardedIntCounter,
135        pub persistent_log_read_metrics: KvLogStoreReadMetrics,
136        pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
137    }
138
139    impl SyncedKvLogStoreMetrics {
140        /// `id`: refers to a unique way to identify the logstore. This can be the sink id,
141        ///       or for joins, it can be the `fragment_id`.
142        /// `name`: refers to the MV / Sink that the log store is associated with.
143        /// `target`: refers to the target of the log store,
144        ///           for instance `MySql` Sink, PG sink, etc...
145        ///           or unaligned join.
146        pub(crate) fn new(
147            metrics: &StreamingMetrics,
148            actor_id: ActorId,
149            fragment_id: FragmentId,
150            name: &str,
151            target: &'static str,
152        ) -> Self {
153            let actor_id_str = actor_id.to_string();
154            let fragment_id_str = fragment_id.to_string();
155            let labels = &[&actor_id_str, target, &fragment_id_str, name];
156
157            let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
158                "dirty",
159                &actor_id_str,
160                target,
161                &fragment_id_str,
162                name,
163            ]);
164            let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
165                "clean",
166                &actor_id_str,
167                target,
168                &fragment_id_str,
169                name,
170            ]);
171            let wait_next_poll_ns = metrics
172                .sync_kv_log_store_wait_next_poll_ns
173                .with_guarded_label_values(labels);
174
175            let storage_write_size = metrics
176                .sync_kv_log_store_storage_write_size
177                .with_guarded_label_values(labels);
178            let storage_write_count = metrics
179                .sync_kv_log_store_storage_write_count
180                .with_guarded_label_values(labels);
181            let storage_pause_duration_ns = metrics
182                .sync_kv_log_store_write_pause_duration_ns
183                .with_guarded_label_values(labels);
184
185            let buffer_unconsumed_item_count = metrics
186                .sync_kv_log_store_buffer_unconsumed_item_count
187                .with_guarded_label_values(labels);
188            let buffer_unconsumed_row_count = metrics
189                .sync_kv_log_store_buffer_unconsumed_row_count
190                .with_guarded_label_values(labels);
191            let buffer_unconsumed_epoch_count = metrics
192                .sync_kv_log_store_buffer_unconsumed_epoch_count
193                .with_guarded_label_values(labels);
194            let buffer_unconsumed_min_epoch = metrics
195                .sync_kv_log_store_buffer_unconsumed_min_epoch
196                .with_guarded_label_values(labels);
197            let buffer_read_count = metrics
198                .sync_kv_log_store_read_count
199                .with_guarded_label_values(&[
200                    "buffer",
201                    &actor_id_str,
202                    target,
203                    &fragment_id_str,
204                    name,
205                ]);
206
207            let buffer_read_size = metrics
208                .sync_kv_log_store_read_size
209                .with_guarded_label_values(&[
210                    "buffer",
211                    &actor_id_str,
212                    target,
213                    &fragment_id_str,
214                    name,
215                ]);
216
217            let total_read_count = metrics
218                .sync_kv_log_store_read_count
219                .with_guarded_label_values(&[
220                    "total",
221                    &actor_id_str,
222                    target,
223                    &fragment_id_str,
224                    name,
225                ]);
226
227            let total_read_size = metrics
228                .sync_kv_log_store_read_size
229                .with_guarded_label_values(&[
230                    "total",
231                    &actor_id_str,
232                    target,
233                    &fragment_id_str,
234                    name,
235                ]);
236
237            const READ_PERSISTENT_LOG: &str = "persistent_log";
238            const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
239
240            let persistent_log_read_size = metrics
241                .sync_kv_log_store_read_size
242                .with_guarded_label_values(&[
243                    READ_PERSISTENT_LOG,
244                    &actor_id_str,
245                    target,
246                    &fragment_id_str,
247                    name,
248                ]);
249
250            let persistent_log_read_count = metrics
251                .sync_kv_log_store_read_count
252                .with_guarded_label_values(&[
253                    READ_PERSISTENT_LOG,
254                    &actor_id_str,
255                    target,
256                    &fragment_id_str,
257                    name,
258                ]);
259
260            let flushed_buffer_read_size = metrics
261                .sync_kv_log_store_read_size
262                .with_guarded_label_values(&[
263                    READ_FLUSHED_BUFFER,
264                    &actor_id_str,
265                    target,
266                    &fragment_id_str,
267                    name,
268                ]);
269
270            let flushed_buffer_read_count = metrics
271                .sync_kv_log_store_read_count
272                .with_guarded_label_values(&[
273                    READ_FLUSHED_BUFFER,
274                    &actor_id_str,
275                    target,
276                    &fragment_id_str,
277                    name,
278                ]);
279
280            Self {
281                unclean_state,
282                clean_state,
283                wait_next_poll_ns,
284                storage_write_size,
285                storage_write_count,
286                pause_duration_ns: storage_pause_duration_ns,
287                buffer_unconsumed_item_count,
288                buffer_unconsumed_row_count,
289                buffer_unconsumed_epoch_count,
290                buffer_unconsumed_min_epoch,
291                buffer_read_count,
292                buffer_read_size,
293                total_read_count,
294                total_read_size,
295                persistent_log_read_metrics: KvLogStoreReadMetrics {
296                    storage_read_size: persistent_log_read_size,
297                    storage_read_count: persistent_log_read_count,
298                },
299                flushed_buffer_read_metrics: KvLogStoreReadMetrics {
300                    storage_read_count: flushed_buffer_read_count,
301                    storage_read_size: flushed_buffer_read_size,
302                },
303            }
304        }
305
306        #[cfg(test)]
307        pub(crate) fn for_test() -> Self {
308            SyncedKvLogStoreMetrics {
309                unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
310                clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
311                wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
312                storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
313                storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
314                pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
315                buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
316                buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
317                buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
318                buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
319                buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
320                buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
321                total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
322                total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
323                persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
324                flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
325            }
326        }
327    }
328}
329
330type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
331
332pub struct SyncedKvLogStoreExecutor<S: StateStore> {
333    actor_context: ActorContextRef,
334    table_id: TableId,
335    metrics: SyncedKvLogStoreMetrics,
336    serde: LogStoreRowSerde,
337
338    // Upstream
339    upstream: Executor,
340
341    // Log store state
342    state_store: S,
343    max_buffer_size: usize,
344
345    // Max chunk size when reading from logstore / buffer
346    chunk_size: usize,
347
348    pause_duration_ms: Duration,
349
350    aligned: bool,
351}
352// Stream interface
353impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
354    #[expect(clippy::too_many_arguments)]
355    pub(crate) fn new(
356        actor_context: ActorContextRef,
357        table_id: TableId,
358        metrics: SyncedKvLogStoreMetrics,
359        serde: LogStoreRowSerde,
360        state_store: S,
361        buffer_size: usize,
362        chunk_size: usize,
363        upstream: Executor,
364        pause_duration_ms: Duration,
365        aligned: bool,
366    ) -> Self {
367        Self {
368            actor_context,
369            table_id,
370            metrics,
371            serde,
372            state_store,
373            upstream,
374            max_buffer_size: buffer_size,
375            chunk_size,
376            pause_duration_ms,
377            aligned,
378        }
379    }
380}
381
382struct FlushedChunkInfo {
383    epoch: u64,
384    start_seq_id: SeqId,
385    end_seq_id: SeqId,
386    flush_info: FlushInfo,
387    vnode_bitmap: Bitmap,
388}
389
390enum WriteFuture<S: LocalStateStore> {
391    /// We trigger a brief pause to let the `ReadFuture` be polled in the following scenarios:
392    /// - When seeing an upstream data chunk, when the buffer becomes full, and the state is clean.
393    /// - When seeing a checkpoint barrier, when the buffer is not empty, and the state is clean.
394    ///
395    /// On pausing, we will transition to a dirty state.
396    ///
397    /// We trigger resume to let the `ReadFuture` to be polled in the following scenarios:
398    /// - After the pause duration.
399    /// - After the read future consumes a chunk.
400    Paused {
401        start_instant: Instant,
402        sleep_future: Option<Pin<Box<Sleep>>>,
403        barrier: Barrier,
404        stream: BoxedMessageStream,
405        write_state: LogStoreWriteState<S>, // Just used to hold the state
406    },
407    ReceiveFromUpstream {
408        future: StreamFuture<BoxedMessageStream>,
409        write_state: LogStoreWriteState<S>,
410    },
411    FlushingChunk {
412        epoch: u64,
413        start_seq_id: SeqId,
414        end_seq_id: SeqId,
415        future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
416        stream: BoxedMessageStream,
417    },
418    Empty,
419}
420
421enum WriteFutureEvent {
422    UpstreamMessageReceived(Message),
423    ChunkFlushed(FlushedChunkInfo),
424}
425
426impl<S: LocalStateStore> WriteFuture<S> {
427    fn flush_chunk(
428        stream: BoxedMessageStream,
429        write_state: LogStoreWriteState<S>,
430        chunk: StreamChunk,
431        epoch: u64,
432        start_seq_id: SeqId,
433        end_seq_id: SeqId,
434    ) -> Self {
435        tracing::trace!(
436            start_seq_id,
437            end_seq_id,
438            epoch,
439            cardinality = chunk.cardinality(),
440            "write_future: flushing chunk"
441        );
442        Self::FlushingChunk {
443            epoch,
444            start_seq_id,
445            end_seq_id,
446            future: Box::pin(write_state.into_write_chunk_future(
447                chunk,
448                epoch,
449                start_seq_id,
450                end_seq_id,
451            )),
452            stream,
453        }
454    }
455
456    fn receive_from_upstream(
457        stream: BoxedMessageStream,
458        write_state: LogStoreWriteState<S>,
459    ) -> Self {
460        Self::ReceiveFromUpstream {
461            future: stream.into_future(),
462            write_state,
463        }
464    }
465
466    fn paused(
467        duration: Duration,
468        barrier: Barrier,
469        stream: BoxedMessageStream,
470        write_state: LogStoreWriteState<S>,
471    ) -> Self {
472        let now = Instant::now();
473        tracing::trace!(?now, ?duration, "write_future_pause");
474        Self::Paused {
475            start_instant: now,
476            sleep_future: Some(Box::pin(sleep_until(now + duration))),
477            barrier,
478            stream,
479            write_state,
480        }
481    }
482
483    async fn next_event(
484        &mut self,
485        metrics: &SyncedKvLogStoreMetrics,
486    ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
487        match self {
488            WriteFuture::Paused {
489                start_instant,
490                sleep_future,
491                ..
492            } => {
493                if let Some(sleep_future) = sleep_future {
494                    sleep_future.await;
495                    metrics
496                        .pause_duration_ns
497                        .inc_by(start_instant.elapsed().as_nanos() as _);
498                    tracing::trace!("resuming write future");
499                }
500                must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
501                    Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
502                })
503            }
504            WriteFuture::ReceiveFromUpstream { future, .. } => {
505                let (opt, stream) = future.await;
506                must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
507                    opt
508                    .ok_or_else(|| anyhow!("end of upstream input").into())
509                    .and_then(|result| result.map(|item| {
510                        (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
511                    }))
512                })
513            }
514            WriteFuture::FlushingChunk { future, .. } => {
515                let (write_state, result) = future.await;
516                let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, ..  } => {
517                    result.map(|(flush_info, vnode_bitmap)| {
518                        (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
519                            epoch,
520                            start_seq_id,
521                            end_seq_id,
522                            flush_info,
523                            vnode_bitmap,
524                        }))
525                    })
526                });
527                result.map_err(Into::into)
528            }
529            WriteFuture::Empty => {
530                unreachable!("should not be polled after ready")
531            }
532        }
533    }
534}
535
536// Stream interface
537impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
538    #[try_stream(ok= Message, error = StreamExecutorError)]
539    pub async fn execute_monitored(self) {
540        let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
541        #[for_await]
542        for message in self.execute_inner() {
543            let current_time = Instant::now();
544            yield message?;
545            wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
546        }
547    }
548
549    #[try_stream(ok = Message, error = StreamExecutorError)]
550    async fn execute_inner(self) {
551        let mut input = self.upstream.execute();
552
553        // init first epoch + local state store
554        let first_barrier = expect_first_barrier(&mut input).await?;
555        let first_write_epoch = first_barrier.epoch;
556        yield Message::Barrier(first_barrier.clone());
557
558        let local_state_store = self
559            .state_store
560            .new_local(NewLocalOptions {
561                table_id: self.table_id,
562                op_consistency_level: OpConsistencyLevel::Inconsistent,
563                table_option: TableOption {
564                    retention_seconds: None,
565                },
566                is_replicated: false,
567                vnodes: self.serde.vnodes().clone(),
568                upload_on_flush: false,
569            })
570            .await;
571
572        let (mut read_state, mut initial_write_state) = new_log_store_state(
573            self.table_id,
574            local_state_store,
575            self.serde,
576            self.chunk_size,
577        );
578        initial_write_state.init(first_write_epoch).await?;
579
580        let mut pause_stream = first_barrier.is_pause_on_startup();
581        let mut initial_write_epoch = first_write_epoch;
582
583        if self.aligned {
584            tracing::info!("aligned mode");
585            // We want to realign the buffer and the stream.
586            // We just block the upstream input stream,
587            // and wait until the persisted logstore is empty.
588            // Then after that we can consume the input stream.
589            let log_store_stream = read_state
590                .read_persisted_log_store(
591                    self.metrics.persistent_log_read_metrics.clone(),
592                    initial_write_epoch.curr,
593                    LogStoreReadStateStreamRangeStart::Unbounded,
594                )
595                .await?;
596
597            #[for_await]
598            for message in log_store_stream {
599                let (_epoch, message) = message?;
600                match message {
601                    KvLogStoreItem::Barrier { .. } => {
602                        continue;
603                    }
604                    KvLogStoreItem::StreamChunk { chunk, .. } => {
605                        yield Message::Chunk(chunk);
606                    }
607                }
608            }
609
610            let mut realigned_logstore = false;
611
612            #[for_await]
613            for message in input {
614                match message? {
615                    Message::Barrier(barrier) => {
616                        let is_checkpoint = barrier.is_checkpoint();
617                        let mut progress = LogStoreVnodeProgress::None;
618                        progress.apply_aligned(
619                            read_state.vnodes().clone(),
620                            barrier.epoch.prev,
621                            None,
622                        );
623                        // Truncate the logstore
624                        let post_seal = initial_write_state
625                            .seal_current_epoch(barrier.epoch.curr, progress.take());
626                        let update_vnode_bitmap =
627                            barrier.as_update_vnode_bitmap(self.actor_context.id);
628                        yield Message::Barrier(barrier);
629                        post_seal.post_yield_barrier(update_vnode_bitmap).await?;
630                        if !realigned_logstore && is_checkpoint {
631                            realigned_logstore = true;
632                            tracing::info!("realigned logstore");
633                        }
634                    }
635                    Message::Chunk(chunk) => {
636                        yield Message::Chunk(chunk);
637                    }
638                    Message::Watermark(watermark) => {
639                        yield Message::Watermark(watermark);
640                    }
641                }
642            }
643
644            return Ok(());
645        }
646
647        // We only recreate the consume stream when:
648        // 1. On bootstrap
649        // 2. On vnode update
650        'recreate_consume_stream: loop {
651            let mut seq_id = FIRST_SEQ_ID;
652            let mut buffer = SyncedLogStoreBuffer {
653                buffer: VecDeque::new(),
654                current_size: 0,
655                max_size: self.max_buffer_size,
656                max_chunk_size: self.chunk_size,
657                next_chunk_id: 0,
658                metrics: self.metrics.clone(),
659                flushed_count: 0,
660            };
661
662            let log_store_stream = read_state
663                .read_persisted_log_store(
664                    self.metrics.persistent_log_read_metrics.clone(),
665                    initial_write_epoch.curr,
666                    LogStoreReadStateStreamRangeStart::Unbounded,
667                )
668                .await?;
669
670            let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
671            let mut clean_state = log_store_stream.peek().await.is_none();
672            tracing::trace!(?clean_state);
673
674            let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
675
676            let mut write_future_state =
677                WriteFuture::receive_from_upstream(input, initial_write_state);
678
679            let mut progress = LogStoreVnodeProgress::None;
680
681            loop {
682                let select_result = {
683                    let read_future = async {
684                        if pause_stream {
685                            pending().await
686                        } else {
687                            read_future_state
688                                .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
689                                .await
690                        }
691                    };
692                    pin_mut!(read_future);
693                    let write_future = write_future_state.next_event(&self.metrics);
694                    pin_mut!(write_future);
695                    let output = select(write_future, read_future).await;
696                    drop_either_future(output)
697                };
698                match select_result {
699                    Either::Left(result) => {
700                        // drop the future to ensure that the future must be reset later
701                        drop(write_future_state);
702                        let (stream, mut write_state, either) = result?;
703                        match either {
704                            WriteFutureEvent::UpstreamMessageReceived(msg) => {
705                                match msg {
706                                    Message::Barrier(barrier) => {
707                                        if clean_state
708                                            && barrier.kind.is_checkpoint()
709                                            && !buffer.is_empty()
710                                        {
711                                            write_future_state = WriteFuture::paused(
712                                                self.pause_duration_ms,
713                                                barrier,
714                                                stream,
715                                                write_state,
716                                            );
717                                            clean_state = false;
718                                            self.metrics.unclean_state.inc();
719                                        } else {
720                                            if let Some(mutation) = barrier.mutation.as_deref() {
721                                                match mutation {
722                                                    Mutation::Pause => {
723                                                        pause_stream = true;
724                                                    }
725                                                    Mutation::Resume => {
726                                                        pause_stream = false;
727                                                    }
728                                                    _ => {}
729                                                }
730                                            }
731                                            let write_state_post_write_barrier =
732                                                Self::write_barrier(
733                                                    self.actor_context.id,
734                                                    &mut write_state,
735                                                    barrier.clone(),
736                                                    &self.metrics,
737                                                    progress.take(),
738                                                    &mut buffer,
739                                                )
740                                                .await?;
741                                            seq_id = FIRST_SEQ_ID;
742                                            let update_vnode_bitmap = barrier
743                                                .as_update_vnode_bitmap(self.actor_context.id);
744                                            let barrier_epoch = barrier.epoch;
745                                            tracing::trace!(
746                                                ?update_vnode_bitmap,
747                                                actor_id = %self.actor_context.id,
748                                                "update vnode bitmap"
749                                            );
750
751                                            yield Message::Barrier(barrier);
752
753                                            write_state_post_write_barrier
754                                                .post_yield_barrier(update_vnode_bitmap.clone())
755                                                .await?;
756                                            if let Some(vnode_bitmap) = update_vnode_bitmap {
757                                                // Apply Vnode Update
758                                                read_state.update_vnode_bitmap(vnode_bitmap);
759                                                initial_write_epoch = barrier_epoch;
760                                                input = stream;
761                                                initial_write_state = write_state;
762                                                continue 'recreate_consume_stream;
763                                            } else {
764                                                write_future_state =
765                                                    WriteFuture::receive_from_upstream(
766                                                        stream,
767                                                        write_state,
768                                                    );
769                                            }
770                                        }
771                                    }
772                                    Message::Chunk(chunk) => {
773                                        let start_seq_id = seq_id;
774                                        let new_seq_id = seq_id + chunk.cardinality() as SeqId;
775                                        let end_seq_id = new_seq_id - 1;
776                                        let epoch = write_state.epoch().curr;
777                                        tracing::trace!(
778                                            start_seq_id,
779                                            end_seq_id,
780                                            new_seq_id,
781                                            epoch,
782                                            cardinality = chunk.cardinality(),
783                                            "received chunk"
784                                        );
785                                        if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
786                                            start_seq_id,
787                                            end_seq_id,
788                                            chunk,
789                                            epoch,
790                                        ) {
791                                            seq_id = new_seq_id;
792                                            write_future_state = WriteFuture::flush_chunk(
793                                                stream,
794                                                write_state,
795                                                chunk_to_flush,
796                                                epoch,
797                                                start_seq_id,
798                                                end_seq_id,
799                                            );
800                                        } else {
801                                            seq_id = new_seq_id;
802                                            write_future_state = WriteFuture::receive_from_upstream(
803                                                stream,
804                                                write_state,
805                                            );
806                                        }
807                                    }
808                                    // FIXME(kwannoel): This should truncate the logstore,
809                                    // it will not bypass like barrier.
810                                    Message::Watermark(_watermark) => {
811                                        write_future_state =
812                                            WriteFuture::receive_from_upstream(stream, write_state);
813                                    }
814                                }
815                            }
816                            WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
817                                start_seq_id,
818                                end_seq_id,
819                                epoch,
820                                flush_info,
821                                vnode_bitmap,
822                            }) => {
823                                buffer.add_flushed_item_to_buffer(
824                                    start_seq_id,
825                                    end_seq_id,
826                                    vnode_bitmap,
827                                    epoch,
828                                );
829                                self.metrics
830                                    .storage_write_count
831                                    .inc_by(flush_info.flush_count as _);
832                                self.metrics
833                                    .storage_write_size
834                                    .inc_by(flush_info.flush_size as _);
835                                write_future_state =
836                                    WriteFuture::receive_from_upstream(stream, write_state);
837                            }
838                        }
839                    }
840                    Either::Right(result) => {
841                        if !clean_state
842                            && matches!(read_future_state, ReadFuture::Idle)
843                            && buffer.is_empty()
844                        {
845                            clean_state = true;
846                            self.metrics.clean_state.inc();
847
848                            // Let write future resume immediately
849                            if let WriteFuture::Paused { sleep_future, .. } =
850                                &mut write_future_state
851                            {
852                                tracing::trace!("resuming paused future");
853                                assert!(buffer.current_size < self.max_buffer_size);
854                                *sleep_future = None;
855                            }
856                        }
857                        let chunk = result?;
858                        self.metrics
859                            .total_read_count
860                            .inc_by(chunk.cardinality() as _);
861
862                        yield Message::Chunk(chunk);
863                    }
864                }
865            }
866        }
867    }
868}
869
870type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
871
872enum ReadFuture<S: StateStoreRead> {
873    ReadingPersistedStream(PersistedStream<S>),
874    ReadingFlushedChunk {
875        future: ReadFlushedChunkFuture,
876        end_seq_id: SeqId,
877    },
878    Idle,
879}
880
881// Read methods
882impl<S: StateStoreRead> ReadFuture<S> {
883    async fn next_chunk(
884        &mut self,
885        progress: &mut LogStoreVnodeProgress,
886        read_state: &LogStoreReadState<S>,
887        buffer: &mut SyncedLogStoreBuffer,
888        metrics: &SyncedKvLogStoreMetrics,
889    ) -> StreamExecutorResult<StreamChunk> {
890        match self {
891            ReadFuture::ReadingPersistedStream(stream) => {
892                while let Some((epoch, item)) = stream.try_next().await? {
893                    match item {
894                        KvLogStoreItem::Barrier { vnodes, .. } => {
895                            tracing::trace!(epoch, "read logstore barrier");
896                            // update the progress
897                            progress.apply_aligned(vnodes, epoch, None);
898                            continue;
899                        }
900                        KvLogStoreItem::StreamChunk {
901                            chunk,
902                            progress: chunk_progress,
903                        } => {
904                            tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
905                            progress.apply_per_vnode(epoch, chunk_progress);
906                            return Ok(chunk);
907                        }
908                    }
909                }
910                *self = ReadFuture::Idle;
911            }
912            ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
913        }
914        match self {
915            ReadFuture::ReadingPersistedStream(_) => {
916                unreachable!("must have finished read persisted stream when reaching here")
917            }
918            ReadFuture::ReadingFlushedChunk { .. } => {}
919            ReadFuture::Idle => loop {
920                let Some((item_epoch, item)) = buffer.pop_front() else {
921                    return pending().await;
922                };
923                match item {
924                    LogStoreBufferItem::StreamChunk {
925                        chunk,
926                        start_seq_id,
927                        end_seq_id,
928                        flushed,
929                        ..
930                    } => {
931                        metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
932                        tracing::trace!(
933                            start_seq_id,
934                            end_seq_id,
935                            flushed,
936                            cardinality = chunk.cardinality(),
937                            "read buffered chunk of size"
938                        );
939                        progress.apply_aligned(
940                            read_state.vnodes().clone(),
941                            item_epoch,
942                            Some(end_seq_id),
943                        );
944                        return Ok(chunk);
945                    }
946                    LogStoreBufferItem::Flushed {
947                        vnode_bitmap,
948                        start_seq_id,
949                        end_seq_id,
950                        chunk_id,
951                    } => {
952                        tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
953                        let read_metrics = metrics.flushed_buffer_read_metrics.clone();
954                        let future = read_state
955                            .read_flushed_chunk(
956                                vnode_bitmap,
957                                chunk_id,
958                                start_seq_id,
959                                end_seq_id,
960                                item_epoch,
961                                read_metrics,
962                            )
963                            .boxed();
964                        *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
965                        break;
966                    }
967                    LogStoreBufferItem::Barrier { .. } => {
968                        tracing::trace!(item_epoch, "read buffer barrier");
969                        progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
970                        continue;
971                    }
972                }
973            },
974        }
975
976        let (future, end_seq_id) = match self {
977            ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
978                unreachable!("should be at ReadingFlushedChunk")
979            }
980            ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
981        };
982
983        let (_, chunk, epoch) = future.await?;
984        progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
985        tracing::trace!(
986            end_seq_id,
987            "read flushed chunk of size: {}",
988            chunk.cardinality()
989        );
990        *self = ReadFuture::Idle;
991        Ok(chunk)
992    }
993}
994
995// Write methods
996impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
997    async fn write_barrier<'a>(
998        actor_id: ActorId,
999        write_state: &'a mut LogStoreWriteState<S::Local>,
1000        barrier: Barrier,
1001        metrics: &SyncedKvLogStoreMetrics,
1002        progress: LogStoreVnodeProgress,
1003        buffer: &mut SyncedLogStoreBuffer,
1004    ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1005        tracing::trace!(%actor_id, ?progress, "applying truncation");
1006        // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
1007        // to reduce memory consumption of logstore.
1008
1009        let epoch = barrier.epoch.prev;
1010        let mut writer = write_state.start_writer(false);
1011        writer.write_barrier(epoch, barrier.is_checkpoint())?;
1012
1013        if barrier.is_checkpoint() {
1014            for (epoch, item) in buffer.buffer.iter_mut().rev() {
1015                match item {
1016                    LogStoreBufferItem::StreamChunk {
1017                        chunk,
1018                        start_seq_id,
1019                        end_seq_id,
1020                        flushed,
1021                        ..
1022                    } => {
1023                        if !*flushed {
1024                            writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1025                            *flushed = true;
1026                        } else {
1027                            break;
1028                        }
1029                    }
1030                    LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1031                }
1032            }
1033        }
1034
1035        // Apply truncation
1036        let (flush_info, _) = writer.finish().await?;
1037        metrics
1038            .storage_write_count
1039            .inc_by(flush_info.flush_count as _);
1040        metrics
1041            .storage_write_size
1042            .inc_by(flush_info.flush_size as _);
1043        let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1044
1045        // Add to buffer
1046        buffer.buffer.push_back((
1047            epoch,
1048            LogStoreBufferItem::Barrier {
1049                is_checkpoint: barrier.is_checkpoint(),
1050                next_epoch: barrier.epoch.curr,
1051            },
1052        ));
1053        buffer.next_chunk_id = 0;
1054        buffer.update_unconsumed_buffer_metrics();
1055
1056        Ok(post_seal)
1057    }
1058}
1059
1060struct SyncedLogStoreBuffer {
1061    buffer: VecDeque<(u64, LogStoreBufferItem)>,
1062    current_size: usize,
1063    max_size: usize,
1064    max_chunk_size: usize,
1065    next_chunk_id: ChunkId,
1066    metrics: SyncedKvLogStoreMetrics,
1067    flushed_count: usize,
1068}
1069
1070impl SyncedLogStoreBuffer {
1071    fn is_empty(&self) -> bool {
1072        self.current_size == 0
1073    }
1074
1075    fn add_or_flush_chunk(
1076        &mut self,
1077        start_seq_id: SeqId,
1078        end_seq_id: SeqId,
1079        chunk: StreamChunk,
1080        epoch: u64,
1081    ) -> Option<StreamChunk> {
1082        let current_size = self.current_size;
1083        let chunk_size = chunk.cardinality();
1084
1085        tracing::trace!(
1086            current_size,
1087            chunk_size,
1088            max_size = self.max_size,
1089            "checking chunk size"
1090        );
1091        let should_flush_chunk = current_size + chunk_size > self.max_size;
1092        if should_flush_chunk {
1093            tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1094            Some(chunk)
1095        } else {
1096            tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1097            self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1098            None
1099        }
1100    }
1101
1102    /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer.
1103    /// This doesn't contain any data, but it contains the metadata to read the flushed chunk.
1104    fn add_flushed_item_to_buffer(
1105        &mut self,
1106        start_seq_id: SeqId,
1107        end_seq_id: SeqId,
1108        new_vnode_bitmap: Bitmap,
1109        epoch: u64,
1110    ) {
1111        let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1112
1113        if let Some((
1114            item_epoch,
1115            LogStoreBufferItem::Flushed {
1116                start_seq_id: prev_start_seq_id,
1117                end_seq_id: prev_end_seq_id,
1118                vnode_bitmap,
1119                ..
1120            },
1121        )) = self.buffer.back_mut()
1122            && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1123            && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1124            && projected_flushed_chunk_size <= self.max_chunk_size
1125        {
1126            assert!(
1127                *prev_end_seq_id < start_seq_id,
1128                "prev end_seq_id {} should be smaller than current start_seq_id {}",
1129                end_seq_id,
1130                start_seq_id
1131            );
1132            assert_eq!(
1133                epoch, *item_epoch,
1134                "epoch of newly added flushed item must be the same as the last flushed item"
1135            );
1136            *prev_end_seq_id = end_seq_id;
1137            *vnode_bitmap |= new_vnode_bitmap;
1138        } else {
1139            let chunk_id = self.next_chunk_id;
1140            self.next_chunk_id += 1;
1141            self.buffer.push_back((
1142                epoch,
1143                LogStoreBufferItem::Flushed {
1144                    start_seq_id,
1145                    end_seq_id,
1146                    vnode_bitmap: new_vnode_bitmap,
1147                    chunk_id,
1148                },
1149            ));
1150            self.flushed_count += 1;
1151            tracing::trace!(
1152                "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1153            );
1154        }
1155        // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported.
1156        self.update_unconsumed_buffer_metrics();
1157    }
1158
1159    fn add_chunk_to_buffer(
1160        &mut self,
1161        chunk: StreamChunk,
1162        start_seq_id: SeqId,
1163        end_seq_id: SeqId,
1164        epoch: u64,
1165    ) {
1166        let chunk_id = self.next_chunk_id;
1167        self.next_chunk_id += 1;
1168        self.current_size += chunk.cardinality();
1169        self.buffer.push_back((
1170            epoch,
1171            LogStoreBufferItem::StreamChunk {
1172                chunk,
1173                start_seq_id,
1174                end_seq_id,
1175                flushed: false,
1176                chunk_id,
1177            },
1178        ));
1179        self.update_unconsumed_buffer_metrics();
1180    }
1181
1182    fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1183        let item = self.buffer.pop_front();
1184        match &item {
1185            Some((_, LogStoreBufferItem::Flushed { .. })) => {
1186                self.flushed_count -= 1;
1187            }
1188            Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1189                self.current_size -= chunk.cardinality();
1190            }
1191            _ => {}
1192        }
1193        self.update_unconsumed_buffer_metrics();
1194        item
1195    }
1196
1197    fn update_unconsumed_buffer_metrics(&self) {
1198        let mut epoch_count = 0;
1199        let mut row_count = 0;
1200        for (_, item) in &self.buffer {
1201            match item {
1202                LogStoreBufferItem::StreamChunk { chunk, .. } => {
1203                    row_count += chunk.cardinality();
1204                }
1205                LogStoreBufferItem::Flushed {
1206                    start_seq_id,
1207                    end_seq_id,
1208                    ..
1209                } => {
1210                    row_count += (end_seq_id - start_seq_id) as usize;
1211                }
1212                LogStoreBufferItem::Barrier { .. } => {
1213                    epoch_count += 1;
1214                }
1215            }
1216        }
1217        self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1218        self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1219        self.metrics
1220            .buffer_unconsumed_item_count
1221            .set(self.buffer.len() as _);
1222        self.metrics.buffer_unconsumed_min_epoch.set(
1223            self.buffer
1224                .front()
1225                .map(|(epoch, _)| *epoch)
1226                .unwrap_or_default() as _,
1227        );
1228    }
1229}
1230
1231impl<S> Execute for SyncedKvLogStoreExecutor<S>
1232where
1233    S: StateStore,
1234{
1235    fn execute(self: Box<Self>) -> BoxedMessageStream {
1236        self.execute_monitored().boxed()
1237    }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242    use itertools::Itertools;
1243    use pretty_assertions::assert_eq;
1244    use risingwave_common::catalog::Field;
1245    use risingwave_common::hash::VirtualNode;
1246    use risingwave_common::test_prelude::*;
1247    use risingwave_common::util::epoch::test_epoch;
1248    use risingwave_storage::memory::MemoryStateStore;
1249
1250    use super::*;
1251    use crate::assert_stream_chunk_eq;
1252    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1253    use crate::common::log_store_impl::kv_log_store::test_utils::{
1254        check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1255    };
1256    use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1257    use crate::executor::test_utils::MockSource;
1258
1259    fn init_logger() {
1260        let _ = tracing_subscriber::fmt()
1261            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1262            .with_ansi(false)
1263            .try_init();
1264    }
1265
1266    // test read/write buffer
1267    #[tokio::test]
1268    async fn test_read_write_buffer() {
1269        init_logger();
1270
1271        let pk_info = &KV_LOG_STORE_V2_INFO;
1272        let column_descs = test_payload_schema(pk_info);
1273        let fields = column_descs
1274            .into_iter()
1275            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1276            .collect_vec();
1277        let schema = Schema { fields };
1278        let stream_key = vec![0];
1279        let (mut tx, source) = MockSource::channel();
1280        let source = source.into_executor(schema.clone(), stream_key.clone());
1281
1282        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1283
1284        let table = gen_test_log_store_table(pk_info);
1285
1286        let log_store_executor = SyncedKvLogStoreExecutor::new(
1287            ActorContext::for_test(123),
1288            table.id,
1289            SyncedKvLogStoreMetrics::for_test(),
1290            LogStoreRowSerde::new(&table, vnodes, pk_info),
1291            MemoryStateStore::new(),
1292            10,
1293            256,
1294            source,
1295            Duration::from_millis(256),
1296            false,
1297        )
1298        .boxed();
1299
1300        // Init
1301        tx.push_barrier(test_epoch(1), false);
1302
1303        let chunk_1 = StreamChunk::from_pretty(
1304            "  I   T
1305            +  5  10
1306            +  6  10
1307            +  8  10
1308            +  9  10
1309            +  10 11",
1310        );
1311
1312        let chunk_2 = StreamChunk::from_pretty(
1313            "   I   T
1314            -   5  10
1315            -   6  10
1316            -   8  10
1317            U-  9  10
1318            U+ 10  11",
1319        );
1320
1321        tx.push_chunk(chunk_1.clone());
1322        tx.push_chunk(chunk_2.clone());
1323
1324        let mut stream = log_store_executor.execute();
1325
1326        match stream.next().await {
1327            Some(Ok(Message::Barrier(barrier))) => {
1328                assert_eq!(barrier.epoch.curr, test_epoch(1));
1329            }
1330            other => panic!("Expected a barrier message, got {:?}", other),
1331        }
1332
1333        match stream.next().await {
1334            Some(Ok(Message::Chunk(chunk))) => {
1335                assert_stream_chunk_eq!(chunk, chunk_1);
1336            }
1337            other => panic!("Expected a chunk message, got {:?}", other),
1338        }
1339
1340        match stream.next().await {
1341            Some(Ok(Message::Chunk(chunk))) => {
1342                assert_stream_chunk_eq!(chunk, chunk_2);
1343            }
1344            other => panic!("Expected a chunk message, got {:?}", other),
1345        }
1346
1347        tx.push_barrier(test_epoch(2), false);
1348
1349        match stream.next().await {
1350            Some(Ok(Message::Barrier(barrier))) => {
1351                assert_eq!(barrier.epoch.curr, test_epoch(2));
1352            }
1353            other => panic!("Expected a barrier message, got {:?}", other),
1354        }
1355    }
1356
1357    // test barrier persisted read
1358    //
1359    // sequence of events (earliest -> latest):
1360    // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item
1361    // * poll just means we read from the executor stream.
1362    #[tokio::test]
1363    async fn test_barrier_persisted_read() {
1364        init_logger();
1365
1366        let pk_info = &KV_LOG_STORE_V2_INFO;
1367        let column_descs = test_payload_schema(pk_info);
1368        let fields = column_descs
1369            .into_iter()
1370            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1371            .collect_vec();
1372        let schema = Schema { fields };
1373        let stream_key = vec![0];
1374        let (mut tx, source) = MockSource::channel();
1375        let source = source.into_executor(schema.clone(), stream_key.clone());
1376
1377        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1378
1379        let table = gen_test_log_store_table(pk_info);
1380
1381        let log_store_executor = SyncedKvLogStoreExecutor::new(
1382            ActorContext::for_test(123),
1383            table.id,
1384            SyncedKvLogStoreMetrics::for_test(),
1385            LogStoreRowSerde::new(&table, vnodes, pk_info),
1386            MemoryStateStore::new(),
1387            10,
1388            256,
1389            source,
1390            Duration::from_millis(256),
1391            false,
1392        )
1393        .boxed();
1394
1395        // Init
1396        tx.push_barrier(test_epoch(1), false);
1397
1398        let chunk_1 = StreamChunk::from_pretty(
1399            "  I   T
1400            +  5  10
1401            +  6  10
1402            +  8  10
1403            +  9  10
1404            +  10 11",
1405        );
1406
1407        let chunk_2 = StreamChunk::from_pretty(
1408            "   I   T
1409            -   5  10
1410            -   6  10
1411            -   8  10
1412            U- 10  11
1413            U+ 10  10",
1414        );
1415
1416        tx.push_chunk(chunk_1.clone());
1417        tx.push_chunk(chunk_2.clone());
1418
1419        tx.push_barrier(test_epoch(2), false);
1420
1421        let mut stream = log_store_executor.execute();
1422
1423        match stream.next().await {
1424            Some(Ok(Message::Barrier(barrier))) => {
1425                assert_eq!(barrier.epoch.curr, test_epoch(1));
1426            }
1427            other => panic!("Expected a barrier message, got {:?}", other),
1428        }
1429
1430        match stream.next().await {
1431            Some(Ok(Message::Chunk(chunk))) => {
1432                assert_stream_chunk_eq!(chunk, chunk_1);
1433            }
1434            other => panic!("Expected a chunk message, got {:?}", other),
1435        }
1436
1437        match stream.next().await {
1438            Some(Ok(Message::Chunk(chunk))) => {
1439                assert_stream_chunk_eq!(chunk, chunk_2);
1440            }
1441            other => panic!("Expected a chunk message, got {:?}", other),
1442        }
1443
1444        match stream.next().await {
1445            Some(Ok(Message::Barrier(barrier))) => {
1446                assert_eq!(barrier.epoch.curr, test_epoch(2));
1447            }
1448            other => panic!("Expected a barrier message, got {:?}", other),
1449        }
1450    }
1451
1452    // When we hit buffer max_chunk, we only store placeholder `FlushedItem`.
1453    // So we just let capacity = 0, and we will always flush incoming chunks to state store.
1454    #[tokio::test]
1455    async fn test_max_chunk_persisted_read() {
1456        init_logger();
1457
1458        let pk_info = &KV_LOG_STORE_V2_INFO;
1459        let column_descs = test_payload_schema(pk_info);
1460        let fields = column_descs
1461            .into_iter()
1462            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1463            .collect_vec();
1464        let schema = Schema { fields };
1465        let stream_key = vec![0];
1466        let (mut tx, source) = MockSource::channel();
1467        let source = source.into_executor(schema.clone(), stream_key.clone());
1468
1469        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1470
1471        let table = gen_test_log_store_table(pk_info);
1472
1473        let log_store_executor = SyncedKvLogStoreExecutor::new(
1474            ActorContext::for_test(123),
1475            table.id,
1476            SyncedKvLogStoreMetrics::for_test(),
1477            LogStoreRowSerde::new(&table, vnodes, pk_info),
1478            MemoryStateStore::new(),
1479            0,
1480            256,
1481            source,
1482            Duration::from_millis(256),
1483            false,
1484        )
1485        .boxed();
1486
1487        // Init
1488        tx.push_barrier(test_epoch(1), false);
1489
1490        let chunk_1 = StreamChunk::from_pretty(
1491            "  I   T
1492            +  5  10
1493            +  6  10
1494            +  8  10
1495            +  9  10
1496            +  10 11",
1497        );
1498
1499        let chunk_2 = StreamChunk::from_pretty(
1500            "   I   T
1501            -   5  10
1502            -   6  10
1503            -   8  10
1504            U- 10  11
1505            U+ 10  10",
1506        );
1507
1508        tx.push_chunk(chunk_1.clone());
1509        tx.push_chunk(chunk_2.clone());
1510
1511        tx.push_barrier(test_epoch(2), false);
1512
1513        let mut stream = log_store_executor.execute();
1514
1515        for i in 1..=2 {
1516            match stream.next().await {
1517                Some(Ok(Message::Barrier(barrier))) => {
1518                    assert_eq!(barrier.epoch.curr, test_epoch(i));
1519                }
1520                other => panic!("Expected a barrier message, got {:?}", other),
1521            }
1522        }
1523
1524        match stream.next().await {
1525            Some(Ok(Message::Chunk(actual))) => {
1526                let expected = StreamChunk::from_pretty(
1527                    "   I   T
1528                    +   5  10
1529                    +   6  10
1530                    +   8  10
1531                    +   9  10
1532                    +  10  11
1533                    -   5  10
1534                    -   6  10
1535                    -   8  10
1536                    U- 10  11
1537                    U+ 10  10",
1538                );
1539                assert_stream_chunk_eq!(actual, expected);
1540            }
1541            other => panic!("Expected a chunk message, got {:?}", other),
1542        }
1543    }
1544}