risingwave_stream/executor/
sync_kv_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This contains the synced kv log store implementation.
16//! It's meant to buffer a large number of records emitted from upstream,
17//! to avoid overwhelming the downstream executor.
18//!
19//! The synced kv log store polls two futures:
20//!
21//! 1. Upstream: upstream message source
22//!
23//!   It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ...
24//!   When writing a stream chunk, if the log store buffer is full, it will:
25//!     a. Flush the buffer to the log store.
26//!     b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`)
27//!       which can read the corresponding chunks in the log store.
28//!       We will compact adjacent references,
29//!       so it can read multiple chunks if there's a build up.
30//!
31//!   On receiving barriers, it will:
32//!     a. Apply truncation to historical data in the logstore.
33//!     b. Flush and checkpoint the logstore data.
34//!
35//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore.
36//!
37//!   It will read all historical data from the logstore first. This can be done just by
38//!   constructing a state store stream, which will read all data until the latest epoch.
39//!   This is a static snapshot of data.
40//!   For any subsequently flushed chunks, we will read them via
41//!   `flushed_chunk_future`. See the next paragraph below.
42//!
43//!   We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how
44//!   it's constructed, what it is.
45//!
46//!   Finally we will pop the earliest item in the buffer.
47//!   - If it's a chunk yield it.
48//!   - If it's a watermark yield it.
49//!   - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`),
50//!     we will read the corresponding chunks in the log store.
51//!     This is done by constructing a `flushed_chunk_future` which will read the log store
52//!     using the `seq_id`.
53//!   - Barrier,
54//!     because they are directly propagated from the upstream when polling it.
55//!
56//! TODO(kwannoel):
57//! - [] Add dedicated metrics for sync log store, namespace according to the upstream.
58//! - [] Add tests
59//! - [] Handle watermark r/w
60//! - [] Handle paused stream
61
62use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_common_estimate_size::EstimateSize;
77use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
78use risingwave_storage::StateStore;
79use risingwave_storage::store::timeout_auto_rebuild::TimeoutAutoRebuildIter;
80use risingwave_storage::store::{
81    LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
82};
83use rw_futures_util::drop_either_future;
84use tokio::time::{Duration, Instant, Sleep, sleep_until};
85use tokio_stream::adapters::Peekable;
86
87use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
88use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
89use crate::common::log_store_impl::kv_log_store::serde::{
90    KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
91};
92use crate::common::log_store_impl::kv_log_store::state::{
93    LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
94    LogStoreWriteState, new_log_store_state,
95};
96use crate::common::log_store_impl::kv_log_store::{
97    Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
98};
99use crate::executor::prelude::*;
100use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
101use crate::executor::{
102    Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
103};
104
105pub mod metrics {
106    use risingwave_common::id::FragmentId;
107    use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
108
109    use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
110    use crate::executor::monitor::StreamingMetrics;
111    use crate::task::ActorId;
112
113    #[derive(Clone)]
114    pub struct SyncedKvLogStoreMetrics {
115        // state of the log store
116        pub unclean_state: LabelGuardedIntCounter,
117        pub clean_state: LabelGuardedIntCounter,
118        pub wait_next_poll_ns: LabelGuardedIntCounter,
119
120        // Write metrics
121        pub storage_write_count: LabelGuardedIntCounter,
122        pub storage_write_size: LabelGuardedIntCounter,
123        pub pause_duration_ns: LabelGuardedIntCounter,
124
125        // Buffer metrics
126        pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
127        pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
128        pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
129        pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
130        pub buffer_memory_bytes: LabelGuardedIntGauge,
131        pub buffer_read_count: LabelGuardedIntCounter,
132        pub buffer_read_size: LabelGuardedIntCounter,
133
134        // Read metrics
135        pub total_read_count: LabelGuardedIntCounter,
136        pub total_read_size: LabelGuardedIntCounter,
137        pub persistent_log_read_metrics: KvLogStoreReadMetrics,
138        pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
139    }
140
141    impl SyncedKvLogStoreMetrics {
142        /// `id`: refers to a unique way to identify the logstore. This can be the sink id,
143        ///       or for joins, it can be the `fragment_id`.
144        /// `name`: refers to the MV / Sink that the log store is associated with.
145        /// `target`: refers to the target of the log store,
146        ///           for instance `MySql` Sink, PG sink, etc...
147        ///           or unaligned join.
148        pub(crate) fn new(
149            metrics: &StreamingMetrics,
150            actor_id: ActorId,
151            fragment_id: FragmentId,
152            name: &str,
153            target: &'static str,
154        ) -> Self {
155            let actor_id_str = actor_id.to_string();
156            let fragment_id_str = fragment_id.to_string();
157            let labels = &[&actor_id_str, target, &fragment_id_str, name];
158
159            let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
160                "dirty",
161                &actor_id_str,
162                target,
163                &fragment_id_str,
164                name,
165            ]);
166            let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
167                "clean",
168                &actor_id_str,
169                target,
170                &fragment_id_str,
171                name,
172            ]);
173            let wait_next_poll_ns = metrics
174                .sync_kv_log_store_wait_next_poll_ns
175                .with_guarded_label_values(labels);
176
177            let storage_write_size = metrics
178                .sync_kv_log_store_storage_write_size
179                .with_guarded_label_values(labels);
180            let storage_write_count = metrics
181                .sync_kv_log_store_storage_write_count
182                .with_guarded_label_values(labels);
183            let storage_pause_duration_ns = metrics
184                .sync_kv_log_store_write_pause_duration_ns
185                .with_guarded_label_values(labels);
186
187            let buffer_unconsumed_item_count = metrics
188                .sync_kv_log_store_buffer_unconsumed_item_count
189                .with_guarded_label_values(labels);
190            let buffer_unconsumed_row_count = metrics
191                .sync_kv_log_store_buffer_unconsumed_row_count
192                .with_guarded_label_values(labels);
193            let buffer_unconsumed_epoch_count = metrics
194                .sync_kv_log_store_buffer_unconsumed_epoch_count
195                .with_guarded_label_values(labels);
196            let buffer_unconsumed_min_epoch = metrics
197                .sync_kv_log_store_buffer_unconsumed_min_epoch
198                .with_guarded_label_values(labels);
199            let buffer_memory_bytes = metrics
200                .sync_kv_log_store_buffer_memory_bytes
201                .with_guarded_label_values(labels);
202            let buffer_read_count = metrics
203                .sync_kv_log_store_read_count
204                .with_guarded_label_values(&[
205                    "buffer",
206                    &actor_id_str,
207                    target,
208                    &fragment_id_str,
209                    name,
210                ]);
211
212            let buffer_read_size = metrics
213                .sync_kv_log_store_read_size
214                .with_guarded_label_values(&[
215                    "buffer",
216                    &actor_id_str,
217                    target,
218                    &fragment_id_str,
219                    name,
220                ]);
221
222            let total_read_count = metrics
223                .sync_kv_log_store_read_count
224                .with_guarded_label_values(&[
225                    "total",
226                    &actor_id_str,
227                    target,
228                    &fragment_id_str,
229                    name,
230                ]);
231
232            let total_read_size = metrics
233                .sync_kv_log_store_read_size
234                .with_guarded_label_values(&[
235                    "total",
236                    &actor_id_str,
237                    target,
238                    &fragment_id_str,
239                    name,
240                ]);
241
242            const READ_PERSISTENT_LOG: &str = "persistent_log";
243            const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
244
245            let persistent_log_read_size = metrics
246                .sync_kv_log_store_read_size
247                .with_guarded_label_values(&[
248                    READ_PERSISTENT_LOG,
249                    &actor_id_str,
250                    target,
251                    &fragment_id_str,
252                    name,
253                ]);
254
255            let persistent_log_read_count = metrics
256                .sync_kv_log_store_read_count
257                .with_guarded_label_values(&[
258                    READ_PERSISTENT_LOG,
259                    &actor_id_str,
260                    target,
261                    &fragment_id_str,
262                    name,
263                ]);
264
265            let flushed_buffer_read_size = metrics
266                .sync_kv_log_store_read_size
267                .with_guarded_label_values(&[
268                    READ_FLUSHED_BUFFER,
269                    &actor_id_str,
270                    target,
271                    &fragment_id_str,
272                    name,
273                ]);
274
275            let flushed_buffer_read_count = metrics
276                .sync_kv_log_store_read_count
277                .with_guarded_label_values(&[
278                    READ_FLUSHED_BUFFER,
279                    &actor_id_str,
280                    target,
281                    &fragment_id_str,
282                    name,
283                ]);
284
285            Self {
286                unclean_state,
287                clean_state,
288                wait_next_poll_ns,
289                storage_write_size,
290                storage_write_count,
291                pause_duration_ns: storage_pause_duration_ns,
292                buffer_unconsumed_item_count,
293                buffer_unconsumed_row_count,
294                buffer_unconsumed_epoch_count,
295                buffer_unconsumed_min_epoch,
296                buffer_memory_bytes,
297                buffer_read_count,
298                buffer_read_size,
299                total_read_count,
300                total_read_size,
301                persistent_log_read_metrics: KvLogStoreReadMetrics {
302                    storage_read_size: persistent_log_read_size,
303                    storage_read_count: persistent_log_read_count,
304                },
305                flushed_buffer_read_metrics: KvLogStoreReadMetrics {
306                    storage_read_count: flushed_buffer_read_count,
307                    storage_read_size: flushed_buffer_read_size,
308                },
309            }
310        }
311
312        #[cfg(test)]
313        pub(crate) fn for_test() -> Self {
314            SyncedKvLogStoreMetrics {
315                unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
316                clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
317                wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
318                storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
319                storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
320                pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
321                buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
322                buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
323                buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
324                buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
325                buffer_memory_bytes: LabelGuardedIntGauge::test_int_gauge::<4>(),
326                buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
327                buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
328                total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
329                total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
330                persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
331                flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
332            }
333        }
334    }
335}
336
337type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
338
339pub struct SyncedKvLogStoreExecutor<S: StateStore> {
340    actor_context: ActorContextRef,
341    table_id: TableId,
342    metrics: SyncedKvLogStoreMetrics,
343    serde: LogStoreRowSerde,
344
345    // Upstream
346    upstream: Executor,
347
348    // Log store state
349    state_store: S,
350    max_buffer_size: usize,
351
352    // Max chunk size when reading from logstore / buffer
353    chunk_size: usize,
354
355    pause_duration_ms: Duration,
356
357    aligned: bool,
358}
359// Stream interface
360impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
361    #[expect(clippy::too_many_arguments)]
362    pub(crate) fn new(
363        actor_context: ActorContextRef,
364        table_id: TableId,
365        metrics: SyncedKvLogStoreMetrics,
366        serde: LogStoreRowSerde,
367        state_store: S,
368        buffer_size: usize,
369        chunk_size: usize,
370        upstream: Executor,
371        pause_duration_ms: Duration,
372        aligned: bool,
373    ) -> Self {
374        Self {
375            actor_context,
376            table_id,
377            metrics,
378            serde,
379            state_store,
380            upstream,
381            max_buffer_size: buffer_size,
382            chunk_size,
383            pause_duration_ms,
384            aligned,
385        }
386    }
387}
388
389struct FlushedChunkInfo {
390    epoch: u64,
391    start_seq_id: SeqId,
392    end_seq_id: SeqId,
393    flush_info: FlushInfo,
394    vnode_bitmap: Bitmap,
395}
396
397enum WriteFuture<S: LocalStateStore> {
398    /// We trigger a brief pause to let the `ReadFuture` be polled in the following scenarios:
399    /// - When seeing an upstream data chunk, when the buffer becomes full, and the state is clean.
400    /// - When seeing a checkpoint barrier, when the buffer is not empty, and the state is clean.
401    ///
402    /// On pausing, we will transition to a dirty state.
403    ///
404    /// We trigger resume to let the `ReadFuture` to be polled in the following scenarios:
405    /// - After the pause duration.
406    /// - After the read future consumes a chunk.
407    Paused {
408        start_instant: Instant,
409        sleep_future: Option<Pin<Box<Sleep>>>,
410        barrier: Barrier,
411        stream: BoxedMessageStream,
412        write_state: LogStoreWriteState<S>, // Just used to hold the state
413    },
414    ReceiveFromUpstream {
415        future: StreamFuture<BoxedMessageStream>,
416        write_state: LogStoreWriteState<S>,
417    },
418    FlushingChunk {
419        epoch: u64,
420        start_seq_id: SeqId,
421        end_seq_id: SeqId,
422        future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
423        stream: BoxedMessageStream,
424    },
425    Empty,
426}
427
428enum WriteFutureEvent {
429    UpstreamMessageReceived(Message),
430    ChunkFlushed(FlushedChunkInfo),
431}
432
433impl<S: LocalStateStore> WriteFuture<S> {
434    fn flush_chunk(
435        stream: BoxedMessageStream,
436        write_state: LogStoreWriteState<S>,
437        chunk: StreamChunk,
438        epoch: u64,
439        start_seq_id: SeqId,
440        end_seq_id: SeqId,
441    ) -> Self {
442        tracing::trace!(
443            start_seq_id,
444            end_seq_id,
445            epoch,
446            cardinality = chunk.cardinality(),
447            "write_future: flushing chunk"
448        );
449        Self::FlushingChunk {
450            epoch,
451            start_seq_id,
452            end_seq_id,
453            future: Box::pin(write_state.into_write_chunk_future(
454                chunk,
455                epoch,
456                start_seq_id,
457                end_seq_id,
458            )),
459            stream,
460        }
461    }
462
463    fn receive_from_upstream(
464        stream: BoxedMessageStream,
465        write_state: LogStoreWriteState<S>,
466    ) -> Self {
467        Self::ReceiveFromUpstream {
468            future: stream.into_future(),
469            write_state,
470        }
471    }
472
473    fn paused(
474        duration: Duration,
475        barrier: Barrier,
476        stream: BoxedMessageStream,
477        write_state: LogStoreWriteState<S>,
478    ) -> Self {
479        let now = Instant::now();
480        tracing::trace!(?now, ?duration, "write_future_pause");
481        Self::Paused {
482            start_instant: now,
483            sleep_future: Some(Box::pin(sleep_until(now + duration))),
484            barrier,
485            stream,
486            write_state,
487        }
488    }
489
490    async fn next_event(
491        &mut self,
492        metrics: &SyncedKvLogStoreMetrics,
493    ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
494        match self {
495            WriteFuture::Paused {
496                start_instant,
497                sleep_future,
498                ..
499            } => {
500                if let Some(sleep_future) = sleep_future {
501                    sleep_future.await;
502                    metrics
503                        .pause_duration_ns
504                        .inc_by(start_instant.elapsed().as_nanos() as _);
505                    tracing::trace!("resuming write future");
506                }
507                must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
508                    Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
509                })
510            }
511            WriteFuture::ReceiveFromUpstream { future, .. } => {
512                let (opt, stream) = future.await;
513                must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
514                    opt
515                    .ok_or_else(|| anyhow!("end of upstream input").into())
516                    .and_then(|result| result.map(|item| {
517                        (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
518                    }))
519                })
520            }
521            WriteFuture::FlushingChunk { future, .. } => {
522                let (write_state, result) = future.await;
523                let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, ..  } => {
524                    result.map(|(flush_info, vnode_bitmap)| {
525                        (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
526                            epoch,
527                            start_seq_id,
528                            end_seq_id,
529                            flush_info,
530                            vnode_bitmap,
531                        }))
532                    })
533                });
534                result.map_err(Into::into)
535            }
536            WriteFuture::Empty => {
537                unreachable!("should not be polled after ready")
538            }
539        }
540    }
541}
542
543// Stream interface
544impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
545    #[try_stream(ok= Message, error = StreamExecutorError)]
546    pub async fn execute_monitored(self) {
547        let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
548        #[for_await]
549        for message in self.execute_inner() {
550            let current_time = Instant::now();
551            yield message?;
552            wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
553        }
554    }
555
556    #[try_stream(ok = Message, error = StreamExecutorError)]
557    async fn execute_inner(self) {
558        let mut input = self.upstream.execute();
559
560        // init first epoch + local state store
561        let first_barrier = expect_first_barrier(&mut input).await?;
562        let first_write_epoch = first_barrier.epoch;
563        yield Message::Barrier(first_barrier.clone());
564
565        let local_state_store = self
566            .state_store
567            .new_local(NewLocalOptions {
568                table_id: self.table_id,
569                fragment_id: self.actor_context.fragment_id,
570                op_consistency_level: OpConsistencyLevel::Inconsistent,
571                table_option: TableOption {
572                    retention_seconds: None,
573                },
574                is_replicated: false,
575                vnodes: self.serde.vnodes().clone(),
576                upload_on_flush: false,
577            })
578            .await;
579
580        let (mut read_state, mut initial_write_state) = new_log_store_state(
581            self.table_id,
582            local_state_store,
583            self.serde,
584            self.chunk_size,
585        );
586        initial_write_state.init(first_write_epoch).await?;
587
588        let mut pause_stream = first_barrier.is_pause_on_startup();
589        let mut initial_write_epoch = first_write_epoch;
590
591        if self.aligned {
592            tracing::info!("aligned mode");
593            // We want to realign the buffer and the stream.
594            // We just block the upstream input stream,
595            // and wait until the persisted logstore is empty.
596            // Then after that we can consume the input stream.
597            let log_store_stream = read_state
598                .read_persisted_log_store(
599                    self.metrics.persistent_log_read_metrics.clone(),
600                    initial_write_epoch.curr,
601                    LogStoreReadStateStreamRangeStart::Unbounded,
602                )
603                .await?;
604
605            #[for_await]
606            for message in log_store_stream {
607                let (_epoch, message) = message?;
608                match message {
609                    KvLogStoreItem::Barrier { .. } => {
610                        continue;
611                    }
612                    KvLogStoreItem::StreamChunk { chunk, .. } => {
613                        yield Message::Chunk(chunk);
614                    }
615                }
616            }
617
618            let mut realigned_logstore = false;
619
620            #[for_await]
621            for message in input {
622                match message? {
623                    Message::Barrier(barrier) => {
624                        let is_checkpoint = barrier.is_checkpoint();
625                        let mut progress = LogStoreVnodeProgress::None;
626                        progress.apply_aligned(
627                            read_state.vnodes().clone(),
628                            barrier.epoch.prev,
629                            None,
630                        );
631                        // Truncate the logstore
632                        let post_seal = initial_write_state
633                            .seal_current_epoch(barrier.epoch.curr, progress.take());
634                        let update_vnode_bitmap =
635                            barrier.as_update_vnode_bitmap(self.actor_context.id);
636                        yield Message::Barrier(barrier);
637                        post_seal.post_yield_barrier(update_vnode_bitmap).await?;
638                        if !realigned_logstore && is_checkpoint {
639                            realigned_logstore = true;
640                            tracing::info!("realigned logstore");
641                        }
642                    }
643                    Message::Chunk(chunk) => {
644                        yield Message::Chunk(chunk);
645                    }
646                    Message::Watermark(watermark) => {
647                        yield Message::Watermark(watermark);
648                    }
649                }
650            }
651
652            return Ok(());
653        }
654
655        // We only recreate the consume stream when:
656        // 1. On bootstrap
657        // 2. On vnode update
658        'recreate_consume_stream: loop {
659            let mut seq_id = FIRST_SEQ_ID;
660            let mut buffer = SyncedLogStoreBuffer {
661                buffer: VecDeque::new(),
662                current_size: 0,
663                max_size: self.max_buffer_size,
664                max_chunk_size: self.chunk_size,
665                next_chunk_id: 0,
666                metrics: self.metrics.clone(),
667                flushed_count: 0,
668            };
669
670            let log_store_stream = read_state
671                .read_persisted_log_store(
672                    self.metrics.persistent_log_read_metrics.clone(),
673                    initial_write_epoch.curr,
674                    LogStoreReadStateStreamRangeStart::Unbounded,
675                )
676                .await?;
677
678            let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
679            let mut clean_state = log_store_stream.peek().await.is_none();
680            tracing::trace!(?clean_state);
681
682            let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
683
684            let mut write_future_state =
685                WriteFuture::receive_from_upstream(input, initial_write_state);
686
687            let mut progress = LogStoreVnodeProgress::None;
688
689            loop {
690                let select_result = {
691                    let read_future = async {
692                        if pause_stream {
693                            pending().await
694                        } else {
695                            read_future_state
696                                .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
697                                .await
698                        }
699                    };
700                    pin_mut!(read_future);
701                    let write_future = write_future_state.next_event(&self.metrics);
702                    pin_mut!(write_future);
703                    let output = select(write_future, read_future).await;
704                    drop_either_future(output)
705                };
706                match select_result {
707                    Either::Left(result) => {
708                        // drop the future to ensure that the future must be reset later
709                        drop(write_future_state);
710                        let (stream, mut write_state, either) = result?;
711                        match either {
712                            WriteFutureEvent::UpstreamMessageReceived(msg) => {
713                                match msg {
714                                    Message::Barrier(barrier) => {
715                                        if clean_state
716                                            && barrier.kind.is_checkpoint()
717                                            && !buffer.is_empty()
718                                        {
719                                            write_future_state = WriteFuture::paused(
720                                                self.pause_duration_ms,
721                                                barrier,
722                                                stream,
723                                                write_state,
724                                            );
725                                            clean_state = false;
726                                            self.metrics.unclean_state.inc();
727                                        } else {
728                                            if let Some(mutation) = barrier.mutation.as_deref() {
729                                                match mutation {
730                                                    Mutation::Pause => {
731                                                        pause_stream = true;
732                                                    }
733                                                    Mutation::Resume => {
734                                                        pause_stream = false;
735                                                    }
736                                                    _ => {}
737                                                }
738                                            }
739                                            let write_state_post_write_barrier =
740                                                Self::write_barrier(
741                                                    self.actor_context.id,
742                                                    &mut write_state,
743                                                    barrier.clone(),
744                                                    &self.metrics,
745                                                    progress.take(),
746                                                    &mut buffer,
747                                                )
748                                                .await?;
749                                            seq_id = FIRST_SEQ_ID;
750                                            let update_vnode_bitmap = barrier
751                                                .as_update_vnode_bitmap(self.actor_context.id);
752                                            let barrier_epoch = barrier.epoch;
753                                            tracing::trace!(
754                                                ?update_vnode_bitmap,
755                                                actor_id = %self.actor_context.id,
756                                                "update vnode bitmap"
757                                            );
758
759                                            yield Message::Barrier(barrier);
760
761                                            write_state_post_write_barrier
762                                                .post_yield_barrier(update_vnode_bitmap.clone())
763                                                .await?;
764                                            if let Some(vnode_bitmap) = update_vnode_bitmap {
765                                                // Apply Vnode Update
766                                                read_state.update_vnode_bitmap(vnode_bitmap);
767                                                initial_write_epoch = barrier_epoch;
768                                                input = stream;
769                                                initial_write_state = write_state;
770                                                continue 'recreate_consume_stream;
771                                            } else {
772                                                write_future_state =
773                                                    WriteFuture::receive_from_upstream(
774                                                        stream,
775                                                        write_state,
776                                                    );
777                                            }
778                                        }
779                                    }
780                                    Message::Chunk(chunk) => {
781                                        let start_seq_id = seq_id;
782                                        let new_seq_id = seq_id + chunk.cardinality() as SeqId;
783                                        let end_seq_id = new_seq_id - 1;
784                                        let epoch = write_state.epoch().curr;
785                                        tracing::trace!(
786                                            start_seq_id,
787                                            end_seq_id,
788                                            new_seq_id,
789                                            epoch,
790                                            cardinality = chunk.cardinality(),
791                                            "received chunk"
792                                        );
793                                        if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
794                                            start_seq_id,
795                                            end_seq_id,
796                                            chunk,
797                                            epoch,
798                                        ) {
799                                            seq_id = new_seq_id;
800                                            write_future_state = WriteFuture::flush_chunk(
801                                                stream,
802                                                write_state,
803                                                chunk_to_flush,
804                                                epoch,
805                                                start_seq_id,
806                                                end_seq_id,
807                                            );
808                                        } else {
809                                            seq_id = new_seq_id;
810                                            write_future_state = WriteFuture::receive_from_upstream(
811                                                stream,
812                                                write_state,
813                                            );
814                                        }
815                                    }
816                                    // FIXME(kwannoel): This should truncate the logstore,
817                                    // it will not bypass like barrier.
818                                    Message::Watermark(_watermark) => {
819                                        write_future_state =
820                                            WriteFuture::receive_from_upstream(stream, write_state);
821                                    }
822                                }
823                            }
824                            WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
825                                start_seq_id,
826                                end_seq_id,
827                                epoch,
828                                flush_info,
829                                vnode_bitmap,
830                            }) => {
831                                buffer.add_flushed_item_to_buffer(
832                                    start_seq_id,
833                                    end_seq_id,
834                                    vnode_bitmap,
835                                    epoch,
836                                );
837                                self.metrics
838                                    .storage_write_count
839                                    .inc_by(flush_info.flush_count as _);
840                                self.metrics
841                                    .storage_write_size
842                                    .inc_by(flush_info.flush_size as _);
843                                write_future_state =
844                                    WriteFuture::receive_from_upstream(stream, write_state);
845                            }
846                        }
847                    }
848                    Either::Right(result) => {
849                        if !clean_state
850                            && matches!(read_future_state, ReadFuture::Idle)
851                            && buffer.is_empty()
852                        {
853                            clean_state = true;
854                            self.metrics.clean_state.inc();
855
856                            // Let write future resume immediately
857                            if let WriteFuture::Paused { sleep_future, .. } =
858                                &mut write_future_state
859                            {
860                                tracing::trace!("resuming paused future");
861                                assert!(buffer.current_size < self.max_buffer_size);
862                                *sleep_future = None;
863                            }
864                        }
865                        let chunk = result?;
866                        self.metrics
867                            .total_read_count
868                            .inc_by(chunk.cardinality() as _);
869
870                        yield Message::Chunk(chunk);
871                    }
872                }
873            }
874        }
875    }
876}
877
878type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
879
880enum ReadFuture<S: StateStoreRead> {
881    ReadingPersistedStream(PersistedStream<S>),
882    ReadingFlushedChunk {
883        future: ReadFlushedChunkFuture,
884        end_seq_id: SeqId,
885    },
886    Idle,
887}
888
889// Read methods
890impl<S: StateStoreRead> ReadFuture<S> {
891    async fn next_chunk(
892        &mut self,
893        progress: &mut LogStoreVnodeProgress,
894        read_state: &LogStoreReadState<S>,
895        buffer: &mut SyncedLogStoreBuffer,
896        metrics: &SyncedKvLogStoreMetrics,
897    ) -> StreamExecutorResult<StreamChunk> {
898        match self {
899            ReadFuture::ReadingPersistedStream(stream) => {
900                while let Some((epoch, item)) = stream.try_next().await? {
901                    match item {
902                        KvLogStoreItem::Barrier { vnodes, .. } => {
903                            tracing::trace!(epoch, "read logstore barrier");
904                            // update the progress
905                            progress.apply_aligned(vnodes, epoch, None);
906                            continue;
907                        }
908                        KvLogStoreItem::StreamChunk {
909                            chunk,
910                            progress: chunk_progress,
911                        } => {
912                            tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
913                            progress.apply_per_vnode(epoch, chunk_progress);
914                            return Ok(chunk);
915                        }
916                    }
917                }
918                *self = ReadFuture::Idle;
919            }
920            ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
921        }
922        match self {
923            ReadFuture::ReadingPersistedStream(_) => {
924                unreachable!("must have finished read persisted stream when reaching here")
925            }
926            ReadFuture::ReadingFlushedChunk { .. } => {}
927            ReadFuture::Idle => loop {
928                let Some((item_epoch, item)) = buffer.pop_front() else {
929                    return pending().await;
930                };
931                match item {
932                    LogStoreBufferItem::StreamChunk {
933                        chunk,
934                        start_seq_id,
935                        end_seq_id,
936                        flushed,
937                        ..
938                    } => {
939                        metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
940                        tracing::trace!(
941                            start_seq_id,
942                            end_seq_id,
943                            flushed,
944                            cardinality = chunk.cardinality(),
945                            "read buffered chunk of size"
946                        );
947                        progress.apply_aligned(
948                            read_state.vnodes().clone(),
949                            item_epoch,
950                            Some(end_seq_id),
951                        );
952                        return Ok(chunk);
953                    }
954                    LogStoreBufferItem::Flushed {
955                        vnode_bitmap,
956                        start_seq_id,
957                        end_seq_id,
958                        chunk_id,
959                    } => {
960                        tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
961                        let read_metrics = metrics.flushed_buffer_read_metrics.clone();
962                        let future = read_state
963                            .read_flushed_chunk(
964                                vnode_bitmap,
965                                chunk_id,
966                                start_seq_id,
967                                end_seq_id,
968                                item_epoch,
969                                read_metrics,
970                            )
971                            .boxed();
972                        *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
973                        break;
974                    }
975                    LogStoreBufferItem::Barrier { .. } => {
976                        tracing::trace!(item_epoch, "read buffer barrier");
977                        progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
978                        continue;
979                    }
980                }
981            },
982        }
983
984        let (future, end_seq_id) = match self {
985            ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
986                unreachable!("should be at ReadingFlushedChunk")
987            }
988            ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
989        };
990
991        let (_, chunk, epoch) = future.await?;
992        progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
993        tracing::trace!(
994            end_seq_id,
995            "read flushed chunk of size: {}",
996            chunk.cardinality()
997        );
998        *self = ReadFuture::Idle;
999        Ok(chunk)
1000    }
1001}
1002
1003// Write methods
1004impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
1005    async fn write_barrier<'a>(
1006        actor_id: ActorId,
1007        write_state: &'a mut LogStoreWriteState<S::Local>,
1008        barrier: Barrier,
1009        metrics: &SyncedKvLogStoreMetrics,
1010        progress: LogStoreVnodeProgress,
1011        buffer: &mut SyncedLogStoreBuffer,
1012    ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1013        tracing::trace!(%actor_id, ?progress, "applying truncation");
1014        // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
1015        // to reduce memory consumption of logstore.
1016
1017        let epoch = barrier.epoch.prev;
1018        let mut writer = write_state.start_writer(false);
1019        writer.write_barrier(epoch, barrier.is_checkpoint())?;
1020
1021        if barrier.is_checkpoint() {
1022            for (epoch, item) in buffer.buffer.iter_mut().rev() {
1023                match item {
1024                    LogStoreBufferItem::StreamChunk {
1025                        chunk,
1026                        start_seq_id,
1027                        end_seq_id,
1028                        flushed,
1029                        ..
1030                    } => {
1031                        if !*flushed {
1032                            writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1033                            *flushed = true;
1034                        } else {
1035                            break;
1036                        }
1037                    }
1038                    LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1039                }
1040            }
1041        }
1042
1043        // Apply truncation
1044        let (flush_info, _) = writer.finish().await?;
1045        metrics
1046            .storage_write_count
1047            .inc_by(flush_info.flush_count as _);
1048        metrics
1049            .storage_write_size
1050            .inc_by(flush_info.flush_size as _);
1051        let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1052
1053        // Add to buffer
1054        buffer.buffer.push_back((
1055            epoch,
1056            LogStoreBufferItem::Barrier {
1057                is_checkpoint: barrier.is_checkpoint(),
1058                next_epoch: barrier.epoch.curr,
1059                schema_change: None,
1060                is_stop: false,
1061            },
1062        ));
1063        buffer.next_chunk_id = 0;
1064        buffer.update_buffer_metrics();
1065
1066        Ok(post_seal)
1067    }
1068}
1069
1070struct SyncedLogStoreBuffer {
1071    buffer: VecDeque<(u64, LogStoreBufferItem)>,
1072    current_size: usize,
1073    max_size: usize,
1074    max_chunk_size: usize,
1075    next_chunk_id: ChunkId,
1076    metrics: SyncedKvLogStoreMetrics,
1077    flushed_count: usize,
1078}
1079
1080impl SyncedLogStoreBuffer {
1081    fn is_empty(&self) -> bool {
1082        self.current_size == 0
1083    }
1084
1085    fn add_or_flush_chunk(
1086        &mut self,
1087        start_seq_id: SeqId,
1088        end_seq_id: SeqId,
1089        chunk: StreamChunk,
1090        epoch: u64,
1091    ) -> Option<StreamChunk> {
1092        let current_size = self.current_size;
1093        let chunk_size = chunk.cardinality();
1094
1095        tracing::trace!(
1096            current_size,
1097            chunk_size,
1098            max_size = self.max_size,
1099            "checking chunk size"
1100        );
1101        let should_flush_chunk = current_size + chunk_size > self.max_size;
1102        if should_flush_chunk {
1103            tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1104            Some(chunk)
1105        } else {
1106            tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1107            self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1108            None
1109        }
1110    }
1111
1112    /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer.
1113    /// This doesn't contain any data, but it contains the metadata to read the flushed chunk.
1114    fn add_flushed_item_to_buffer(
1115        &mut self,
1116        start_seq_id: SeqId,
1117        end_seq_id: SeqId,
1118        new_vnode_bitmap: Bitmap,
1119        epoch: u64,
1120    ) {
1121        let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1122
1123        if let Some((
1124            item_epoch,
1125            LogStoreBufferItem::Flushed {
1126                start_seq_id: prev_start_seq_id,
1127                end_seq_id: prev_end_seq_id,
1128                vnode_bitmap,
1129                ..
1130            },
1131        )) = self.buffer.back_mut()
1132            && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1133            && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1134            && projected_flushed_chunk_size <= self.max_chunk_size
1135        {
1136            assert!(
1137                *prev_end_seq_id < start_seq_id,
1138                "prev end_seq_id {} should be smaller than current start_seq_id {}",
1139                end_seq_id,
1140                start_seq_id
1141            );
1142            assert_eq!(
1143                epoch, *item_epoch,
1144                "epoch of newly added flushed item must be the same as the last flushed item"
1145            );
1146            *prev_end_seq_id = end_seq_id;
1147            *vnode_bitmap |= new_vnode_bitmap;
1148        } else {
1149            let chunk_id = self.next_chunk_id;
1150            self.next_chunk_id += 1;
1151            self.buffer.push_back((
1152                epoch,
1153                LogStoreBufferItem::Flushed {
1154                    start_seq_id,
1155                    end_seq_id,
1156                    vnode_bitmap: new_vnode_bitmap,
1157                    chunk_id,
1158                },
1159            ));
1160            self.flushed_count += 1;
1161            tracing::trace!(
1162                "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1163            );
1164        }
1165        // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported.
1166        self.update_buffer_metrics();
1167    }
1168
1169    fn add_chunk_to_buffer(
1170        &mut self,
1171        chunk: StreamChunk,
1172        start_seq_id: SeqId,
1173        end_seq_id: SeqId,
1174        epoch: u64,
1175    ) {
1176        let chunk_id = self.next_chunk_id;
1177        self.next_chunk_id += 1;
1178        self.current_size += chunk.cardinality();
1179        self.buffer.push_back((
1180            epoch,
1181            LogStoreBufferItem::StreamChunk {
1182                chunk,
1183                start_seq_id,
1184                end_seq_id,
1185                flushed: false,
1186                chunk_id,
1187            },
1188        ));
1189        self.update_buffer_metrics();
1190    }
1191
1192    fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1193        let item = self.buffer.pop_front();
1194        match &item {
1195            Some((_, LogStoreBufferItem::Flushed { .. })) => {
1196                self.flushed_count -= 1;
1197            }
1198            Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1199                self.current_size -= chunk.cardinality();
1200            }
1201            _ => {}
1202        }
1203        self.update_buffer_metrics();
1204        item
1205    }
1206
1207    fn update_buffer_metrics(&self) {
1208        let mut epoch_count = 0;
1209        let mut row_count = 0;
1210        let mut memory_bytes = 0;
1211        for (_, item) in &self.buffer {
1212            match item {
1213                LogStoreBufferItem::StreamChunk { chunk, .. } => {
1214                    row_count += chunk.cardinality();
1215                }
1216                LogStoreBufferItem::Flushed {
1217                    start_seq_id,
1218                    end_seq_id,
1219                    ..
1220                } => {
1221                    row_count += (end_seq_id - start_seq_id) as usize;
1222                }
1223                LogStoreBufferItem::Barrier { .. } => {
1224                    epoch_count += 1;
1225                }
1226            }
1227            memory_bytes += item.estimated_size();
1228        }
1229        self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1230        self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1231        self.metrics
1232            .buffer_unconsumed_item_count
1233            .set(self.buffer.len() as _);
1234        self.metrics.buffer_unconsumed_min_epoch.set(
1235            self.buffer
1236                .front()
1237                .map(|(epoch, _)| *epoch)
1238                .unwrap_or_default() as _,
1239        );
1240        self.metrics.buffer_memory_bytes.set(memory_bytes as _);
1241    }
1242}
1243
1244impl<S> Execute for SyncedKvLogStoreExecutor<S>
1245where
1246    S: StateStore,
1247{
1248    fn execute(self: Box<Self>) -> BoxedMessageStream {
1249        self.execute_monitored().boxed()
1250    }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use itertools::Itertools;
1256    use pretty_assertions::assert_eq;
1257    use risingwave_common::catalog::Field;
1258    use risingwave_common::hash::VirtualNode;
1259    use risingwave_common::test_prelude::*;
1260    use risingwave_common::util::epoch::test_epoch;
1261    use risingwave_storage::memory::MemoryStateStore;
1262
1263    use super::*;
1264    use crate::assert_stream_chunk_eq;
1265    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1266    use crate::common::log_store_impl::kv_log_store::test_utils::{
1267        check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1268    };
1269    use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1270    use crate::executor::test_utils::MockSource;
1271
1272    fn init_logger() {
1273        let _ = tracing_subscriber::fmt()
1274            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1275            .with_ansi(false)
1276            .try_init();
1277    }
1278
1279    // test read/write buffer
1280    #[tokio::test]
1281    async fn test_read_write_buffer() {
1282        init_logger();
1283
1284        let pk_info = &KV_LOG_STORE_V2_INFO;
1285        let column_descs = test_payload_schema(pk_info);
1286        let fields = column_descs
1287            .into_iter()
1288            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1289            .collect_vec();
1290        let schema = Schema { fields };
1291        let stream_key = vec![0];
1292        let (mut tx, source) = MockSource::channel();
1293        let source = source.into_executor(schema.clone(), stream_key.clone());
1294
1295        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1296
1297        let table = gen_test_log_store_table(pk_info);
1298
1299        let log_store_executor = SyncedKvLogStoreExecutor::new(
1300            ActorContext::for_test(123),
1301            table.id,
1302            SyncedKvLogStoreMetrics::for_test(),
1303            LogStoreRowSerde::new(&table, vnodes, pk_info),
1304            MemoryStateStore::new(),
1305            10,
1306            256,
1307            source,
1308            Duration::from_millis(256),
1309            false,
1310        )
1311        .boxed();
1312
1313        // Init
1314        tx.push_barrier(test_epoch(1), false);
1315
1316        let chunk_1 = StreamChunk::from_pretty(
1317            "  I   T
1318            +  5  10
1319            +  6  10
1320            +  8  10
1321            +  9  10
1322            +  10 11",
1323        );
1324
1325        let chunk_2 = StreamChunk::from_pretty(
1326            "   I   T
1327            -   5  10
1328            -   6  10
1329            -   8  10
1330            U-  9  10
1331            U+ 10  11",
1332        );
1333
1334        tx.push_chunk(chunk_1.clone());
1335        tx.push_chunk(chunk_2.clone());
1336
1337        let mut stream = log_store_executor.execute();
1338
1339        match stream.next().await {
1340            Some(Ok(Message::Barrier(barrier))) => {
1341                assert_eq!(barrier.epoch.curr, test_epoch(1));
1342            }
1343            other => panic!("Expected a barrier message, got {:?}", other),
1344        }
1345
1346        match stream.next().await {
1347            Some(Ok(Message::Chunk(chunk))) => {
1348                assert_stream_chunk_eq!(chunk, chunk_1);
1349            }
1350            other => panic!("Expected a chunk message, got {:?}", other),
1351        }
1352
1353        match stream.next().await {
1354            Some(Ok(Message::Chunk(chunk))) => {
1355                assert_stream_chunk_eq!(chunk, chunk_2);
1356            }
1357            other => panic!("Expected a chunk message, got {:?}", other),
1358        }
1359
1360        tx.push_barrier(test_epoch(2), false);
1361
1362        match stream.next().await {
1363            Some(Ok(Message::Barrier(barrier))) => {
1364                assert_eq!(barrier.epoch.curr, test_epoch(2));
1365            }
1366            other => panic!("Expected a barrier message, got {:?}", other),
1367        }
1368    }
1369
1370    // test barrier persisted read
1371    //
1372    // sequence of events (earliest -> latest):
1373    // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item
1374    // * poll just means we read from the executor stream.
1375    #[tokio::test]
1376    async fn test_barrier_persisted_read() {
1377        init_logger();
1378
1379        let pk_info = &KV_LOG_STORE_V2_INFO;
1380        let column_descs = test_payload_schema(pk_info);
1381        let fields = column_descs
1382            .into_iter()
1383            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1384            .collect_vec();
1385        let schema = Schema { fields };
1386        let stream_key = vec![0];
1387        let (mut tx, source) = MockSource::channel();
1388        let source = source.into_executor(schema.clone(), stream_key.clone());
1389
1390        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1391
1392        let table = gen_test_log_store_table(pk_info);
1393
1394        let log_store_executor = SyncedKvLogStoreExecutor::new(
1395            ActorContext::for_test(123),
1396            table.id,
1397            SyncedKvLogStoreMetrics::for_test(),
1398            LogStoreRowSerde::new(&table, vnodes, pk_info),
1399            MemoryStateStore::new(),
1400            10,
1401            256,
1402            source,
1403            Duration::from_millis(256),
1404            false,
1405        )
1406        .boxed();
1407
1408        // Init
1409        tx.push_barrier(test_epoch(1), false);
1410
1411        let chunk_1 = StreamChunk::from_pretty(
1412            "  I   T
1413            +  5  10
1414            +  6  10
1415            +  8  10
1416            +  9  10
1417            +  10 11",
1418        );
1419
1420        let chunk_2 = StreamChunk::from_pretty(
1421            "   I   T
1422            -   5  10
1423            -   6  10
1424            -   8  10
1425            U- 10  11
1426            U+ 10  10",
1427        );
1428
1429        tx.push_chunk(chunk_1.clone());
1430        tx.push_chunk(chunk_2.clone());
1431
1432        tx.push_barrier(test_epoch(2), false);
1433
1434        let mut stream = log_store_executor.execute();
1435
1436        match stream.next().await {
1437            Some(Ok(Message::Barrier(barrier))) => {
1438                assert_eq!(barrier.epoch.curr, test_epoch(1));
1439            }
1440            other => panic!("Expected a barrier message, got {:?}", other),
1441        }
1442
1443        match stream.next().await {
1444            Some(Ok(Message::Chunk(chunk))) => {
1445                assert_stream_chunk_eq!(chunk, chunk_1);
1446            }
1447            other => panic!("Expected a chunk message, got {:?}", other),
1448        }
1449
1450        match stream.next().await {
1451            Some(Ok(Message::Chunk(chunk))) => {
1452                assert_stream_chunk_eq!(chunk, chunk_2);
1453            }
1454            other => panic!("Expected a chunk message, got {:?}", other),
1455        }
1456
1457        match stream.next().await {
1458            Some(Ok(Message::Barrier(barrier))) => {
1459                assert_eq!(barrier.epoch.curr, test_epoch(2));
1460            }
1461            other => panic!("Expected a barrier message, got {:?}", other),
1462        }
1463    }
1464
1465    // When we hit buffer max_chunk, we only store placeholder `FlushedItem`.
1466    // So we just let capacity = 0, and we will always flush incoming chunks to state store.
1467    #[tokio::test]
1468    async fn test_max_chunk_persisted_read() {
1469        init_logger();
1470
1471        let pk_info = &KV_LOG_STORE_V2_INFO;
1472        let column_descs = test_payload_schema(pk_info);
1473        let fields = column_descs
1474            .into_iter()
1475            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1476            .collect_vec();
1477        let schema = Schema { fields };
1478        let stream_key = vec![0];
1479        let (mut tx, source) = MockSource::channel();
1480        let source = source.into_executor(schema.clone(), stream_key.clone());
1481
1482        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1483
1484        let table = gen_test_log_store_table(pk_info);
1485
1486        let log_store_executor = SyncedKvLogStoreExecutor::new(
1487            ActorContext::for_test(123),
1488            table.id,
1489            SyncedKvLogStoreMetrics::for_test(),
1490            LogStoreRowSerde::new(&table, vnodes, pk_info),
1491            MemoryStateStore::new(),
1492            0,
1493            256,
1494            source,
1495            Duration::from_millis(256),
1496            false,
1497        )
1498        .boxed();
1499
1500        // Init
1501        tx.push_barrier(test_epoch(1), false);
1502
1503        let chunk_1 = StreamChunk::from_pretty(
1504            "  I   T
1505            +  5  10
1506            +  6  10
1507            +  8  10
1508            +  9  10
1509            +  10 11",
1510        );
1511
1512        let chunk_2 = StreamChunk::from_pretty(
1513            "   I   T
1514            -   5  10
1515            -   6  10
1516            -   8  10
1517            U- 10  11
1518            U+ 10  10",
1519        );
1520
1521        tx.push_chunk(chunk_1.clone());
1522        tx.push_chunk(chunk_2.clone());
1523
1524        tx.push_barrier(test_epoch(2), false);
1525
1526        let mut stream = log_store_executor.execute();
1527
1528        for i in 1..=2 {
1529            match stream.next().await {
1530                Some(Ok(Message::Barrier(barrier))) => {
1531                    assert_eq!(barrier.epoch.curr, test_epoch(i));
1532                }
1533                other => panic!("Expected a barrier message, got {:?}", other),
1534            }
1535        }
1536
1537        match stream.next().await {
1538            Some(Ok(Message::Chunk(actual))) => {
1539                let expected = StreamChunk::from_pretty(
1540                    "   I   T
1541                    +   5  10
1542                    +   6  10
1543                    +   8  10
1544                    +   9  10
1545                    +  10  11
1546                    -   5  10
1547                    -   6  10
1548                    -   8  10
1549                    U- 10  11
1550                    U+ 10  10",
1551                );
1552                assert_stream_chunk_eq!(actual, expected);
1553            }
1554            other => panic!("Expected a chunk message, got {:?}", other),
1555        }
1556    }
1557}