risingwave_stream/executor/
sync_kv_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This contains the synced kv log store implementation.
16//! It's meant to buffer a large number of records emitted from upstream,
17//! to avoid overwhelming the downstream executor.
18//!
19//! The synced kv log store polls two futures:
20//!
21//! 1. Upstream: upstream message source
22//!
23//!   It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ...
24//!   When writing a stream chunk, if the log store buffer is full, it will:
25//!     a. Flush the buffer to the log store.
26//!     b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`)
27//!       which can read the corresponding chunks in the log store.
28//!       We will compact adjacent references,
29//!       so it can read multiple chunks if there's a build up.
30//!
31//!   On receiving barriers, it will:
32//!     a. Apply truncation to historical data in the logstore.
33//!     b. Flush and checkpoint the logstore data.
34//!
35//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore.
36//!
37//!   It will read all historical data from the logstore first. This can be done just by
38//!   constructing a state store stream, which will read all data until the latest epoch.
39//!   This is a static snapshot of data.
40//!   For any subsequently flushed chunks, we will read them via
41//!   `flushed_chunk_future`. See the next paragraph below.
42//!
43//!   We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how
44//!   it's constructed, what it is.
45//!
46//!   Finally we will pop the earliest item in the buffer.
47//!   - If it's a chunk yield it.
48//!   - If it's a watermark yield it.
49//!   - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`),
50//!     we will read the corresponding chunks in the log store.
51//!     This is done by constructing a `flushed_chunk_future` which will read the log store
52//!     using the `seq_id`.
53//!   - Barrier,
54//!     because they are directly propagated from the upstream when polling it.
55//!
56//! TODO(kwannoel):
57//! - [] Add dedicated metrics for sync log store, namespace according to the upstream.
58//! - [] Add tests
59//! - [] Handle watermark r/w
60//! - [] Handle paused stream
61
62use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79    LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89    KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92    LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93    LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96    Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101    Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105    use risingwave_common::id::FragmentId;
106    use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
107
108    use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
109    use crate::executor::monitor::StreamingMetrics;
110    use crate::task::ActorId;
111
112    #[derive(Clone)]
113    pub struct SyncedKvLogStoreMetrics {
114        // state of the log store
115        pub unclean_state: LabelGuardedIntCounter,
116        pub clean_state: LabelGuardedIntCounter,
117        pub wait_next_poll_ns: LabelGuardedIntCounter,
118
119        // Write metrics
120        pub storage_write_count: LabelGuardedIntCounter,
121        pub storage_write_size: LabelGuardedIntCounter,
122        pub pause_duration_ns: LabelGuardedIntCounter,
123
124        // Buffer metrics
125        pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
126        pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
127        pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
128        pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
129        pub buffer_read_count: LabelGuardedIntCounter,
130        pub buffer_read_size: LabelGuardedIntCounter,
131
132        // Read metrics
133        pub total_read_count: LabelGuardedIntCounter,
134        pub total_read_size: LabelGuardedIntCounter,
135        pub persistent_log_read_metrics: KvLogStoreReadMetrics,
136        pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
137    }
138
139    impl SyncedKvLogStoreMetrics {
140        /// `id`: refers to a unique way to identify the logstore. This can be the sink id,
141        ///       or for joins, it can be the `fragment_id`.
142        /// `name`: refers to the MV / Sink that the log store is associated with.
143        /// `target`: refers to the target of the log store,
144        ///           for instance `MySql` Sink, PG sink, etc...
145        ///           or unaligned join.
146        pub(crate) fn new(
147            metrics: &StreamingMetrics,
148            actor_id: ActorId,
149            fragment_id: FragmentId,
150            name: &str,
151            target: &'static str,
152        ) -> Self {
153            let actor_id_str = actor_id.to_string();
154            let fragment_id_str = fragment_id.to_string();
155            let labels = &[&actor_id_str, target, &fragment_id_str, name];
156
157            let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
158                "dirty",
159                &actor_id_str,
160                target,
161                &fragment_id_str,
162                name,
163            ]);
164            let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
165                "clean",
166                &actor_id_str,
167                target,
168                &fragment_id_str,
169                name,
170            ]);
171            let wait_next_poll_ns = metrics
172                .sync_kv_log_store_wait_next_poll_ns
173                .with_guarded_label_values(labels);
174
175            let storage_write_size = metrics
176                .sync_kv_log_store_storage_write_size
177                .with_guarded_label_values(labels);
178            let storage_write_count = metrics
179                .sync_kv_log_store_storage_write_count
180                .with_guarded_label_values(labels);
181            let storage_pause_duration_ns = metrics
182                .sync_kv_log_store_write_pause_duration_ns
183                .with_guarded_label_values(labels);
184
185            let buffer_unconsumed_item_count = metrics
186                .sync_kv_log_store_buffer_unconsumed_item_count
187                .with_guarded_label_values(labels);
188            let buffer_unconsumed_row_count = metrics
189                .sync_kv_log_store_buffer_unconsumed_row_count
190                .with_guarded_label_values(labels);
191            let buffer_unconsumed_epoch_count = metrics
192                .sync_kv_log_store_buffer_unconsumed_epoch_count
193                .with_guarded_label_values(labels);
194            let buffer_unconsumed_min_epoch = metrics
195                .sync_kv_log_store_buffer_unconsumed_min_epoch
196                .with_guarded_label_values(labels);
197            let buffer_read_count = metrics
198                .sync_kv_log_store_read_count
199                .with_guarded_label_values(&[
200                    "buffer",
201                    &actor_id_str,
202                    target,
203                    &fragment_id_str,
204                    name,
205                ]);
206
207            let buffer_read_size = metrics
208                .sync_kv_log_store_read_size
209                .with_guarded_label_values(&[
210                    "buffer",
211                    &actor_id_str,
212                    target,
213                    &fragment_id_str,
214                    name,
215                ]);
216
217            let total_read_count = metrics
218                .sync_kv_log_store_read_count
219                .with_guarded_label_values(&[
220                    "total",
221                    &actor_id_str,
222                    target,
223                    &fragment_id_str,
224                    name,
225                ]);
226
227            let total_read_size = metrics
228                .sync_kv_log_store_read_size
229                .with_guarded_label_values(&[
230                    "total",
231                    &actor_id_str,
232                    target,
233                    &fragment_id_str,
234                    name,
235                ]);
236
237            const READ_PERSISTENT_LOG: &str = "persistent_log";
238            const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
239
240            let persistent_log_read_size = metrics
241                .sync_kv_log_store_read_size
242                .with_guarded_label_values(&[
243                    READ_PERSISTENT_LOG,
244                    &actor_id_str,
245                    target,
246                    &fragment_id_str,
247                    name,
248                ]);
249
250            let persistent_log_read_count = metrics
251                .sync_kv_log_store_read_count
252                .with_guarded_label_values(&[
253                    READ_PERSISTENT_LOG,
254                    &actor_id_str,
255                    target,
256                    &fragment_id_str,
257                    name,
258                ]);
259
260            let flushed_buffer_read_size = metrics
261                .sync_kv_log_store_read_size
262                .with_guarded_label_values(&[
263                    READ_FLUSHED_BUFFER,
264                    &actor_id_str,
265                    target,
266                    &fragment_id_str,
267                    name,
268                ]);
269
270            let flushed_buffer_read_count = metrics
271                .sync_kv_log_store_read_count
272                .with_guarded_label_values(&[
273                    READ_FLUSHED_BUFFER,
274                    &actor_id_str,
275                    target,
276                    &fragment_id_str,
277                    name,
278                ]);
279
280            Self {
281                unclean_state,
282                clean_state,
283                wait_next_poll_ns,
284                storage_write_size,
285                storage_write_count,
286                pause_duration_ns: storage_pause_duration_ns,
287                buffer_unconsumed_item_count,
288                buffer_unconsumed_row_count,
289                buffer_unconsumed_epoch_count,
290                buffer_unconsumed_min_epoch,
291                buffer_read_count,
292                buffer_read_size,
293                total_read_count,
294                total_read_size,
295                persistent_log_read_metrics: KvLogStoreReadMetrics {
296                    storage_read_size: persistent_log_read_size,
297                    storage_read_count: persistent_log_read_count,
298                },
299                flushed_buffer_read_metrics: KvLogStoreReadMetrics {
300                    storage_read_count: flushed_buffer_read_count,
301                    storage_read_size: flushed_buffer_read_size,
302                },
303            }
304        }
305
306        #[cfg(test)]
307        pub(crate) fn for_test() -> Self {
308            SyncedKvLogStoreMetrics {
309                unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
310                clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
311                wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
312                storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
313                storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
314                pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
315                buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
316                buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
317                buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
318                buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
319                buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
320                buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
321                total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
322                total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
323                persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
324                flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
325            }
326        }
327    }
328}
329
330type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
331
332pub struct SyncedKvLogStoreExecutor<S: StateStore> {
333    actor_context: ActorContextRef,
334    table_id: TableId,
335    metrics: SyncedKvLogStoreMetrics,
336    serde: LogStoreRowSerde,
337
338    // Upstream
339    upstream: Executor,
340
341    // Log store state
342    state_store: S,
343    max_buffer_size: usize,
344
345    // Max chunk size when reading from logstore / buffer
346    chunk_size: usize,
347
348    pause_duration_ms: Duration,
349
350    aligned: bool,
351}
352// Stream interface
353impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
354    #[expect(clippy::too_many_arguments)]
355    pub(crate) fn new(
356        actor_context: ActorContextRef,
357        table_id: TableId,
358        metrics: SyncedKvLogStoreMetrics,
359        serde: LogStoreRowSerde,
360        state_store: S,
361        buffer_size: usize,
362        chunk_size: usize,
363        upstream: Executor,
364        pause_duration_ms: Duration,
365        aligned: bool,
366    ) -> Self {
367        Self {
368            actor_context,
369            table_id,
370            metrics,
371            serde,
372            state_store,
373            upstream,
374            max_buffer_size: buffer_size,
375            chunk_size,
376            pause_duration_ms,
377            aligned,
378        }
379    }
380}
381
382struct FlushedChunkInfo {
383    epoch: u64,
384    start_seq_id: SeqId,
385    end_seq_id: SeqId,
386    flush_info: FlushInfo,
387    vnode_bitmap: Bitmap,
388}
389
390enum WriteFuture<S: LocalStateStore> {
391    /// We trigger a brief pause to let the `ReadFuture` be polled in the following scenarios:
392    /// - When seeing an upstream data chunk, when the buffer becomes full, and the state is clean.
393    /// - When seeing a checkpoint barrier, when the buffer is not empty, and the state is clean.
394    ///
395    /// On pausing, we will transition to a dirty state.
396    ///
397    /// We trigger resume to let the `ReadFuture` to be polled in the following scenarios:
398    /// - After the pause duration.
399    /// - After the read future consumes a chunk.
400    Paused {
401        start_instant: Instant,
402        sleep_future: Option<Pin<Box<Sleep>>>,
403        barrier: Barrier,
404        stream: BoxedMessageStream,
405        write_state: LogStoreWriteState<S>, // Just used to hold the state
406    },
407    ReceiveFromUpstream {
408        future: StreamFuture<BoxedMessageStream>,
409        write_state: LogStoreWriteState<S>,
410    },
411    FlushingChunk {
412        epoch: u64,
413        start_seq_id: SeqId,
414        end_seq_id: SeqId,
415        future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
416        stream: BoxedMessageStream,
417    },
418    Empty,
419}
420
421enum WriteFutureEvent {
422    UpstreamMessageReceived(Message),
423    ChunkFlushed(FlushedChunkInfo),
424}
425
426impl<S: LocalStateStore> WriteFuture<S> {
427    fn flush_chunk(
428        stream: BoxedMessageStream,
429        write_state: LogStoreWriteState<S>,
430        chunk: StreamChunk,
431        epoch: u64,
432        start_seq_id: SeqId,
433        end_seq_id: SeqId,
434    ) -> Self {
435        tracing::trace!(
436            start_seq_id,
437            end_seq_id,
438            epoch,
439            cardinality = chunk.cardinality(),
440            "write_future: flushing chunk"
441        );
442        Self::FlushingChunk {
443            epoch,
444            start_seq_id,
445            end_seq_id,
446            future: Box::pin(write_state.into_write_chunk_future(
447                chunk,
448                epoch,
449                start_seq_id,
450                end_seq_id,
451            )),
452            stream,
453        }
454    }
455
456    fn receive_from_upstream(
457        stream: BoxedMessageStream,
458        write_state: LogStoreWriteState<S>,
459    ) -> Self {
460        Self::ReceiveFromUpstream {
461            future: stream.into_future(),
462            write_state,
463        }
464    }
465
466    fn paused(
467        duration: Duration,
468        barrier: Barrier,
469        stream: BoxedMessageStream,
470        write_state: LogStoreWriteState<S>,
471    ) -> Self {
472        let now = Instant::now();
473        tracing::trace!(?now, ?duration, "write_future_pause");
474        Self::Paused {
475            start_instant: now,
476            sleep_future: Some(Box::pin(sleep_until(now + duration))),
477            barrier,
478            stream,
479            write_state,
480        }
481    }
482
483    async fn next_event(
484        &mut self,
485        metrics: &SyncedKvLogStoreMetrics,
486    ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
487        match self {
488            WriteFuture::Paused {
489                start_instant,
490                sleep_future,
491                ..
492            } => {
493                if let Some(sleep_future) = sleep_future {
494                    sleep_future.await;
495                    metrics
496                        .pause_duration_ns
497                        .inc_by(start_instant.elapsed().as_nanos() as _);
498                    tracing::trace!("resuming write future");
499                }
500                must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
501                    Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
502                })
503            }
504            WriteFuture::ReceiveFromUpstream { future, .. } => {
505                let (opt, stream) = future.await;
506                must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
507                    opt
508                    .ok_or_else(|| anyhow!("end of upstream input").into())
509                    .and_then(|result| result.map(|item| {
510                        (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
511                    }))
512                })
513            }
514            WriteFuture::FlushingChunk { future, .. } => {
515                let (write_state, result) = future.await;
516                let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, ..  } => {
517                    result.map(|(flush_info, vnode_bitmap)| {
518                        (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
519                            epoch,
520                            start_seq_id,
521                            end_seq_id,
522                            flush_info,
523                            vnode_bitmap,
524                        }))
525                    })
526                });
527                result.map_err(Into::into)
528            }
529            WriteFuture::Empty => {
530                unreachable!("should not be polled after ready")
531            }
532        }
533    }
534}
535
536// Stream interface
537impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
538    #[try_stream(ok= Message, error = StreamExecutorError)]
539    pub async fn execute_monitored(self) {
540        let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
541        #[for_await]
542        for message in self.execute_inner() {
543            let current_time = Instant::now();
544            yield message?;
545            wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
546        }
547    }
548
549    #[try_stream(ok = Message, error = StreamExecutorError)]
550    async fn execute_inner(self) {
551        let mut input = self.upstream.execute();
552
553        // init first epoch + local state store
554        let first_barrier = expect_first_barrier(&mut input).await?;
555        let first_write_epoch = first_barrier.epoch;
556        yield Message::Barrier(first_barrier.clone());
557
558        let local_state_store = self
559            .state_store
560            .new_local(NewLocalOptions {
561                table_id: self.table_id,
562                op_consistency_level: OpConsistencyLevel::Inconsistent,
563                table_option: TableOption {
564                    retention_seconds: None,
565                },
566                is_replicated: false,
567                vnodes: self.serde.vnodes().clone(),
568                upload_on_flush: false,
569            })
570            .await;
571
572        let (mut read_state, mut initial_write_state) = new_log_store_state(
573            self.table_id,
574            local_state_store,
575            self.serde,
576            self.chunk_size,
577        );
578        initial_write_state.init(first_write_epoch).await?;
579
580        let mut pause_stream = first_barrier.is_pause_on_startup();
581        let mut initial_write_epoch = first_write_epoch;
582
583        if self.aligned {
584            tracing::info!("aligned mode");
585            // We want to realign the buffer and the stream.
586            // We just block the upstream input stream,
587            // and wait until the persisted logstore is empty.
588            // Then after that we can consume the input stream.
589            let log_store_stream = read_state
590                .read_persisted_log_store(
591                    self.metrics.persistent_log_read_metrics.clone(),
592                    initial_write_epoch.curr,
593                    LogStoreReadStateStreamRangeStart::Unbounded,
594                )
595                .await?;
596
597            #[for_await]
598            for message in log_store_stream {
599                let (_epoch, message) = message?;
600                match message {
601                    KvLogStoreItem::Barrier { .. } => {
602                        continue;
603                    }
604                    KvLogStoreItem::StreamChunk { chunk, .. } => {
605                        yield Message::Chunk(chunk);
606                    }
607                }
608            }
609
610            let mut realigned_logstore = false;
611
612            #[for_await]
613            for message in input {
614                match message? {
615                    Message::Barrier(barrier) => {
616                        let is_checkpoint = barrier.is_checkpoint();
617                        let mut progress = LogStoreVnodeProgress::None;
618                        progress.apply_aligned(
619                            read_state.vnodes().clone(),
620                            barrier.epoch.prev,
621                            None,
622                        );
623                        // Truncate the logstore
624                        let post_seal = initial_write_state
625                            .seal_current_epoch(barrier.epoch.curr, progress.take());
626                        let update_vnode_bitmap =
627                            barrier.as_update_vnode_bitmap(self.actor_context.id);
628                        yield Message::Barrier(barrier);
629                        post_seal.post_yield_barrier(update_vnode_bitmap).await?;
630                        if !realigned_logstore && is_checkpoint {
631                            realigned_logstore = true;
632                            tracing::info!("realigned logstore");
633                        }
634                    }
635                    Message::Chunk(chunk) => {
636                        yield Message::Chunk(chunk);
637                    }
638                    Message::Watermark(watermark) => {
639                        yield Message::Watermark(watermark);
640                    }
641                }
642            }
643
644            return Ok(());
645        }
646
647        // We only recreate the consume stream when:
648        // 1. On bootstrap
649        // 2. On vnode update
650        'recreate_consume_stream: loop {
651            let mut seq_id = FIRST_SEQ_ID;
652            let mut buffer = SyncedLogStoreBuffer {
653                buffer: VecDeque::new(),
654                current_size: 0,
655                max_size: self.max_buffer_size,
656                max_chunk_size: self.chunk_size,
657                next_chunk_id: 0,
658                metrics: self.metrics.clone(),
659                flushed_count: 0,
660            };
661
662            let log_store_stream = read_state
663                .read_persisted_log_store(
664                    self.metrics.persistent_log_read_metrics.clone(),
665                    initial_write_epoch.curr,
666                    LogStoreReadStateStreamRangeStart::Unbounded,
667                )
668                .await?;
669
670            let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
671            let mut clean_state = log_store_stream.peek().await.is_none();
672            tracing::trace!(?clean_state);
673
674            let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
675
676            let mut write_future_state =
677                WriteFuture::receive_from_upstream(input, initial_write_state);
678
679            let mut progress = LogStoreVnodeProgress::None;
680
681            loop {
682                let select_result = {
683                    let read_future = async {
684                        if pause_stream {
685                            pending().await
686                        } else {
687                            read_future_state
688                                .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
689                                .await
690                        }
691                    };
692                    pin_mut!(read_future);
693                    let write_future = write_future_state.next_event(&self.metrics);
694                    pin_mut!(write_future);
695                    let output = select(write_future, read_future).await;
696                    drop_either_future(output)
697                };
698                match select_result {
699                    Either::Left(result) => {
700                        // drop the future to ensure that the future must be reset later
701                        drop(write_future_state);
702                        let (stream, mut write_state, either) = result?;
703                        match either {
704                            WriteFutureEvent::UpstreamMessageReceived(msg) => {
705                                match msg {
706                                    Message::Barrier(barrier) => {
707                                        if clean_state
708                                            && barrier.kind.is_checkpoint()
709                                            && !buffer.is_empty()
710                                        {
711                                            write_future_state = WriteFuture::paused(
712                                                self.pause_duration_ms,
713                                                barrier,
714                                                stream,
715                                                write_state,
716                                            );
717                                            clean_state = false;
718                                            self.metrics.unclean_state.inc();
719                                        } else {
720                                            if let Some(mutation) = barrier.mutation.as_deref() {
721                                                match mutation {
722                                                    Mutation::Pause => {
723                                                        pause_stream = true;
724                                                    }
725                                                    Mutation::Resume => {
726                                                        pause_stream = false;
727                                                    }
728                                                    _ => {}
729                                                }
730                                            }
731                                            let write_state_post_write_barrier =
732                                                Self::write_barrier(
733                                                    self.actor_context.id,
734                                                    &mut write_state,
735                                                    barrier.clone(),
736                                                    &self.metrics,
737                                                    progress.take(),
738                                                    &mut buffer,
739                                                )
740                                                .await?;
741                                            seq_id = FIRST_SEQ_ID;
742                                            let update_vnode_bitmap = barrier
743                                                .as_update_vnode_bitmap(self.actor_context.id);
744                                            let barrier_epoch = barrier.epoch;
745                                            tracing::trace!(
746                                                ?update_vnode_bitmap,
747                                                actor_id = %self.actor_context.id,
748                                                "update vnode bitmap"
749                                            );
750
751                                            yield Message::Barrier(barrier);
752
753                                            write_state_post_write_barrier
754                                                .post_yield_barrier(update_vnode_bitmap.clone())
755                                                .await?;
756                                            if let Some(vnode_bitmap) = update_vnode_bitmap {
757                                                // Apply Vnode Update
758                                                read_state.update_vnode_bitmap(vnode_bitmap);
759                                                initial_write_epoch = barrier_epoch;
760                                                input = stream;
761                                                initial_write_state = write_state;
762                                                continue 'recreate_consume_stream;
763                                            } else {
764                                                write_future_state =
765                                                    WriteFuture::receive_from_upstream(
766                                                        stream,
767                                                        write_state,
768                                                    );
769                                            }
770                                        }
771                                    }
772                                    Message::Chunk(chunk) => {
773                                        let start_seq_id = seq_id;
774                                        let new_seq_id = seq_id + chunk.cardinality() as SeqId;
775                                        let end_seq_id = new_seq_id - 1;
776                                        let epoch = write_state.epoch().curr;
777                                        tracing::trace!(
778                                            start_seq_id,
779                                            end_seq_id,
780                                            new_seq_id,
781                                            epoch,
782                                            cardinality = chunk.cardinality(),
783                                            "received chunk"
784                                        );
785                                        if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
786                                            start_seq_id,
787                                            end_seq_id,
788                                            chunk,
789                                            epoch,
790                                        ) {
791                                            seq_id = new_seq_id;
792                                            write_future_state = WriteFuture::flush_chunk(
793                                                stream,
794                                                write_state,
795                                                chunk_to_flush,
796                                                epoch,
797                                                start_seq_id,
798                                                end_seq_id,
799                                            );
800                                        } else {
801                                            seq_id = new_seq_id;
802                                            write_future_state = WriteFuture::receive_from_upstream(
803                                                stream,
804                                                write_state,
805                                            );
806                                        }
807                                    }
808                                    // FIXME(kwannoel): This should truncate the logstore,
809                                    // it will not bypass like barrier.
810                                    Message::Watermark(_watermark) => {
811                                        write_future_state =
812                                            WriteFuture::receive_from_upstream(stream, write_state);
813                                    }
814                                }
815                            }
816                            WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
817                                start_seq_id,
818                                end_seq_id,
819                                epoch,
820                                flush_info,
821                                vnode_bitmap,
822                            }) => {
823                                buffer.add_flushed_item_to_buffer(
824                                    start_seq_id,
825                                    end_seq_id,
826                                    vnode_bitmap,
827                                    epoch,
828                                );
829                                self.metrics
830                                    .storage_write_count
831                                    .inc_by(flush_info.flush_count as _);
832                                self.metrics
833                                    .storage_write_size
834                                    .inc_by(flush_info.flush_size as _);
835                                write_future_state =
836                                    WriteFuture::receive_from_upstream(stream, write_state);
837                            }
838                        }
839                    }
840                    Either::Right(result) => {
841                        if !clean_state
842                            && matches!(read_future_state, ReadFuture::Idle)
843                            && buffer.is_empty()
844                        {
845                            clean_state = true;
846                            self.metrics.clean_state.inc();
847
848                            // Let write future resume immediately
849                            if let WriteFuture::Paused { sleep_future, .. } =
850                                &mut write_future_state
851                            {
852                                tracing::trace!("resuming paused future");
853                                assert!(buffer.current_size < self.max_buffer_size);
854                                *sleep_future = None;
855                            }
856                        }
857                        let chunk = result?;
858                        self.metrics
859                            .total_read_count
860                            .inc_by(chunk.cardinality() as _);
861
862                        yield Message::Chunk(chunk);
863                    }
864                }
865            }
866        }
867    }
868}
869
870type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
871
872enum ReadFuture<S: StateStoreRead> {
873    ReadingPersistedStream(PersistedStream<S>),
874    ReadingFlushedChunk {
875        future: ReadFlushedChunkFuture,
876        end_seq_id: SeqId,
877    },
878    Idle,
879}
880
881// Read methods
882impl<S: StateStoreRead> ReadFuture<S> {
883    async fn next_chunk(
884        &mut self,
885        progress: &mut LogStoreVnodeProgress,
886        read_state: &LogStoreReadState<S>,
887        buffer: &mut SyncedLogStoreBuffer,
888        metrics: &SyncedKvLogStoreMetrics,
889    ) -> StreamExecutorResult<StreamChunk> {
890        match self {
891            ReadFuture::ReadingPersistedStream(stream) => {
892                while let Some((epoch, item)) = stream.try_next().await? {
893                    match item {
894                        KvLogStoreItem::Barrier { vnodes, .. } => {
895                            tracing::trace!(epoch, "read logstore barrier");
896                            // update the progress
897                            progress.apply_aligned(vnodes, epoch, None);
898                            continue;
899                        }
900                        KvLogStoreItem::StreamChunk {
901                            chunk,
902                            progress: chunk_progress,
903                        } => {
904                            tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
905                            progress.apply_per_vnode(epoch, chunk_progress);
906                            return Ok(chunk);
907                        }
908                    }
909                }
910                *self = ReadFuture::Idle;
911            }
912            ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
913        }
914        match self {
915            ReadFuture::ReadingPersistedStream(_) => {
916                unreachable!("must have finished read persisted stream when reaching here")
917            }
918            ReadFuture::ReadingFlushedChunk { .. } => {}
919            ReadFuture::Idle => loop {
920                let Some((item_epoch, item)) = buffer.pop_front() else {
921                    return pending().await;
922                };
923                match item {
924                    LogStoreBufferItem::StreamChunk {
925                        chunk,
926                        start_seq_id,
927                        end_seq_id,
928                        flushed,
929                        ..
930                    } => {
931                        metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
932                        tracing::trace!(
933                            start_seq_id,
934                            end_seq_id,
935                            flushed,
936                            cardinality = chunk.cardinality(),
937                            "read buffered chunk of size"
938                        );
939                        progress.apply_aligned(
940                            read_state.vnodes().clone(),
941                            item_epoch,
942                            Some(end_seq_id),
943                        );
944                        return Ok(chunk);
945                    }
946                    LogStoreBufferItem::Flushed {
947                        vnode_bitmap,
948                        start_seq_id,
949                        end_seq_id,
950                        chunk_id,
951                    } => {
952                        tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
953                        let read_metrics = metrics.flushed_buffer_read_metrics.clone();
954                        let future = read_state
955                            .read_flushed_chunk(
956                                vnode_bitmap,
957                                chunk_id,
958                                start_seq_id,
959                                end_seq_id,
960                                item_epoch,
961                                read_metrics,
962                            )
963                            .boxed();
964                        *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
965                        break;
966                    }
967                    LogStoreBufferItem::Barrier { .. } => {
968                        tracing::trace!(item_epoch, "read buffer barrier");
969                        progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
970                        continue;
971                    }
972                }
973            },
974        }
975
976        let (future, end_seq_id) = match self {
977            ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
978                unreachable!("should be at ReadingFlushedChunk")
979            }
980            ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
981        };
982
983        let (_, chunk, epoch) = future.await?;
984        progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
985        tracing::trace!(
986            end_seq_id,
987            "read flushed chunk of size: {}",
988            chunk.cardinality()
989        );
990        *self = ReadFuture::Idle;
991        Ok(chunk)
992    }
993}
994
995// Write methods
996impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
997    async fn write_barrier<'a>(
998        actor_id: ActorId,
999        write_state: &'a mut LogStoreWriteState<S::Local>,
1000        barrier: Barrier,
1001        metrics: &SyncedKvLogStoreMetrics,
1002        progress: LogStoreVnodeProgress,
1003        buffer: &mut SyncedLogStoreBuffer,
1004    ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1005        tracing::trace!(%actor_id, ?progress, "applying truncation");
1006        // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
1007        // to reduce memory consumption of logstore.
1008
1009        let epoch = barrier.epoch.prev;
1010        let mut writer = write_state.start_writer(false);
1011        writer.write_barrier(epoch, barrier.is_checkpoint())?;
1012
1013        if barrier.is_checkpoint() {
1014            for (epoch, item) in buffer.buffer.iter_mut().rev() {
1015                match item {
1016                    LogStoreBufferItem::StreamChunk {
1017                        chunk,
1018                        start_seq_id,
1019                        end_seq_id,
1020                        flushed,
1021                        ..
1022                    } => {
1023                        if !*flushed {
1024                            writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1025                            *flushed = true;
1026                        } else {
1027                            break;
1028                        }
1029                    }
1030                    LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1031                }
1032            }
1033        }
1034
1035        // Apply truncation
1036        let (flush_info, _) = writer.finish().await?;
1037        metrics
1038            .storage_write_count
1039            .inc_by(flush_info.flush_count as _);
1040        metrics
1041            .storage_write_size
1042            .inc_by(flush_info.flush_size as _);
1043        let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1044
1045        // Add to buffer
1046        buffer.buffer.push_back((
1047            epoch,
1048            LogStoreBufferItem::Barrier {
1049                is_checkpoint: barrier.is_checkpoint(),
1050                next_epoch: barrier.epoch.curr,
1051                add_columns: None,
1052            },
1053        ));
1054        buffer.next_chunk_id = 0;
1055        buffer.update_unconsumed_buffer_metrics();
1056
1057        Ok(post_seal)
1058    }
1059}
1060
1061struct SyncedLogStoreBuffer {
1062    buffer: VecDeque<(u64, LogStoreBufferItem)>,
1063    current_size: usize,
1064    max_size: usize,
1065    max_chunk_size: usize,
1066    next_chunk_id: ChunkId,
1067    metrics: SyncedKvLogStoreMetrics,
1068    flushed_count: usize,
1069}
1070
1071impl SyncedLogStoreBuffer {
1072    fn is_empty(&self) -> bool {
1073        self.current_size == 0
1074    }
1075
1076    fn add_or_flush_chunk(
1077        &mut self,
1078        start_seq_id: SeqId,
1079        end_seq_id: SeqId,
1080        chunk: StreamChunk,
1081        epoch: u64,
1082    ) -> Option<StreamChunk> {
1083        let current_size = self.current_size;
1084        let chunk_size = chunk.cardinality();
1085
1086        tracing::trace!(
1087            current_size,
1088            chunk_size,
1089            max_size = self.max_size,
1090            "checking chunk size"
1091        );
1092        let should_flush_chunk = current_size + chunk_size > self.max_size;
1093        if should_flush_chunk {
1094            tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1095            Some(chunk)
1096        } else {
1097            tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1098            self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1099            None
1100        }
1101    }
1102
1103    /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer.
1104    /// This doesn't contain any data, but it contains the metadata to read the flushed chunk.
1105    fn add_flushed_item_to_buffer(
1106        &mut self,
1107        start_seq_id: SeqId,
1108        end_seq_id: SeqId,
1109        new_vnode_bitmap: Bitmap,
1110        epoch: u64,
1111    ) {
1112        let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1113
1114        if let Some((
1115            item_epoch,
1116            LogStoreBufferItem::Flushed {
1117                start_seq_id: prev_start_seq_id,
1118                end_seq_id: prev_end_seq_id,
1119                vnode_bitmap,
1120                ..
1121            },
1122        )) = self.buffer.back_mut()
1123            && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1124            && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1125            && projected_flushed_chunk_size <= self.max_chunk_size
1126        {
1127            assert!(
1128                *prev_end_seq_id < start_seq_id,
1129                "prev end_seq_id {} should be smaller than current start_seq_id {}",
1130                end_seq_id,
1131                start_seq_id
1132            );
1133            assert_eq!(
1134                epoch, *item_epoch,
1135                "epoch of newly added flushed item must be the same as the last flushed item"
1136            );
1137            *prev_end_seq_id = end_seq_id;
1138            *vnode_bitmap |= new_vnode_bitmap;
1139        } else {
1140            let chunk_id = self.next_chunk_id;
1141            self.next_chunk_id += 1;
1142            self.buffer.push_back((
1143                epoch,
1144                LogStoreBufferItem::Flushed {
1145                    start_seq_id,
1146                    end_seq_id,
1147                    vnode_bitmap: new_vnode_bitmap,
1148                    chunk_id,
1149                },
1150            ));
1151            self.flushed_count += 1;
1152            tracing::trace!(
1153                "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1154            );
1155        }
1156        // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported.
1157        self.update_unconsumed_buffer_metrics();
1158    }
1159
1160    fn add_chunk_to_buffer(
1161        &mut self,
1162        chunk: StreamChunk,
1163        start_seq_id: SeqId,
1164        end_seq_id: SeqId,
1165        epoch: u64,
1166    ) {
1167        let chunk_id = self.next_chunk_id;
1168        self.next_chunk_id += 1;
1169        self.current_size += chunk.cardinality();
1170        self.buffer.push_back((
1171            epoch,
1172            LogStoreBufferItem::StreamChunk {
1173                chunk,
1174                start_seq_id,
1175                end_seq_id,
1176                flushed: false,
1177                chunk_id,
1178            },
1179        ));
1180        self.update_unconsumed_buffer_metrics();
1181    }
1182
1183    fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1184        let item = self.buffer.pop_front();
1185        match &item {
1186            Some((_, LogStoreBufferItem::Flushed { .. })) => {
1187                self.flushed_count -= 1;
1188            }
1189            Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1190                self.current_size -= chunk.cardinality();
1191            }
1192            _ => {}
1193        }
1194        self.update_unconsumed_buffer_metrics();
1195        item
1196    }
1197
1198    fn update_unconsumed_buffer_metrics(&self) {
1199        let mut epoch_count = 0;
1200        let mut row_count = 0;
1201        for (_, item) in &self.buffer {
1202            match item {
1203                LogStoreBufferItem::StreamChunk { chunk, .. } => {
1204                    row_count += chunk.cardinality();
1205                }
1206                LogStoreBufferItem::Flushed {
1207                    start_seq_id,
1208                    end_seq_id,
1209                    ..
1210                } => {
1211                    row_count += (end_seq_id - start_seq_id) as usize;
1212                }
1213                LogStoreBufferItem::Barrier { .. } => {
1214                    epoch_count += 1;
1215                }
1216            }
1217        }
1218        self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1219        self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1220        self.metrics
1221            .buffer_unconsumed_item_count
1222            .set(self.buffer.len() as _);
1223        self.metrics.buffer_unconsumed_min_epoch.set(
1224            self.buffer
1225                .front()
1226                .map(|(epoch, _)| *epoch)
1227                .unwrap_or_default() as _,
1228        );
1229    }
1230}
1231
1232impl<S> Execute for SyncedKvLogStoreExecutor<S>
1233where
1234    S: StateStore,
1235{
1236    fn execute(self: Box<Self>) -> BoxedMessageStream {
1237        self.execute_monitored().boxed()
1238    }
1239}
1240
1241#[cfg(test)]
1242mod tests {
1243    use itertools::Itertools;
1244    use pretty_assertions::assert_eq;
1245    use risingwave_common::catalog::Field;
1246    use risingwave_common::hash::VirtualNode;
1247    use risingwave_common::test_prelude::*;
1248    use risingwave_common::util::epoch::test_epoch;
1249    use risingwave_storage::memory::MemoryStateStore;
1250
1251    use super::*;
1252    use crate::assert_stream_chunk_eq;
1253    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1254    use crate::common::log_store_impl::kv_log_store::test_utils::{
1255        check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1256    };
1257    use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1258    use crate::executor::test_utils::MockSource;
1259
1260    fn init_logger() {
1261        let _ = tracing_subscriber::fmt()
1262            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1263            .with_ansi(false)
1264            .try_init();
1265    }
1266
1267    // test read/write buffer
1268    #[tokio::test]
1269    async fn test_read_write_buffer() {
1270        init_logger();
1271
1272        let pk_info = &KV_LOG_STORE_V2_INFO;
1273        let column_descs = test_payload_schema(pk_info);
1274        let fields = column_descs
1275            .into_iter()
1276            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1277            .collect_vec();
1278        let schema = Schema { fields };
1279        let stream_key = vec![0];
1280        let (mut tx, source) = MockSource::channel();
1281        let source = source.into_executor(schema.clone(), stream_key.clone());
1282
1283        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1284
1285        let table = gen_test_log_store_table(pk_info);
1286
1287        let log_store_executor = SyncedKvLogStoreExecutor::new(
1288            ActorContext::for_test(123),
1289            table.id,
1290            SyncedKvLogStoreMetrics::for_test(),
1291            LogStoreRowSerde::new(&table, vnodes, pk_info),
1292            MemoryStateStore::new(),
1293            10,
1294            256,
1295            source,
1296            Duration::from_millis(256),
1297            false,
1298        )
1299        .boxed();
1300
1301        // Init
1302        tx.push_barrier(test_epoch(1), false);
1303
1304        let chunk_1 = StreamChunk::from_pretty(
1305            "  I   T
1306            +  5  10
1307            +  6  10
1308            +  8  10
1309            +  9  10
1310            +  10 11",
1311        );
1312
1313        let chunk_2 = StreamChunk::from_pretty(
1314            "   I   T
1315            -   5  10
1316            -   6  10
1317            -   8  10
1318            U-  9  10
1319            U+ 10  11",
1320        );
1321
1322        tx.push_chunk(chunk_1.clone());
1323        tx.push_chunk(chunk_2.clone());
1324
1325        let mut stream = log_store_executor.execute();
1326
1327        match stream.next().await {
1328            Some(Ok(Message::Barrier(barrier))) => {
1329                assert_eq!(barrier.epoch.curr, test_epoch(1));
1330            }
1331            other => panic!("Expected a barrier message, got {:?}", other),
1332        }
1333
1334        match stream.next().await {
1335            Some(Ok(Message::Chunk(chunk))) => {
1336                assert_stream_chunk_eq!(chunk, chunk_1);
1337            }
1338            other => panic!("Expected a chunk message, got {:?}", other),
1339        }
1340
1341        match stream.next().await {
1342            Some(Ok(Message::Chunk(chunk))) => {
1343                assert_stream_chunk_eq!(chunk, chunk_2);
1344            }
1345            other => panic!("Expected a chunk message, got {:?}", other),
1346        }
1347
1348        tx.push_barrier(test_epoch(2), false);
1349
1350        match stream.next().await {
1351            Some(Ok(Message::Barrier(barrier))) => {
1352                assert_eq!(barrier.epoch.curr, test_epoch(2));
1353            }
1354            other => panic!("Expected a barrier message, got {:?}", other),
1355        }
1356    }
1357
1358    // test barrier persisted read
1359    //
1360    // sequence of events (earliest -> latest):
1361    // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item
1362    // * poll just means we read from the executor stream.
1363    #[tokio::test]
1364    async fn test_barrier_persisted_read() {
1365        init_logger();
1366
1367        let pk_info = &KV_LOG_STORE_V2_INFO;
1368        let column_descs = test_payload_schema(pk_info);
1369        let fields = column_descs
1370            .into_iter()
1371            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1372            .collect_vec();
1373        let schema = Schema { fields };
1374        let stream_key = vec![0];
1375        let (mut tx, source) = MockSource::channel();
1376        let source = source.into_executor(schema.clone(), stream_key.clone());
1377
1378        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1379
1380        let table = gen_test_log_store_table(pk_info);
1381
1382        let log_store_executor = SyncedKvLogStoreExecutor::new(
1383            ActorContext::for_test(123),
1384            table.id,
1385            SyncedKvLogStoreMetrics::for_test(),
1386            LogStoreRowSerde::new(&table, vnodes, pk_info),
1387            MemoryStateStore::new(),
1388            10,
1389            256,
1390            source,
1391            Duration::from_millis(256),
1392            false,
1393        )
1394        .boxed();
1395
1396        // Init
1397        tx.push_barrier(test_epoch(1), false);
1398
1399        let chunk_1 = StreamChunk::from_pretty(
1400            "  I   T
1401            +  5  10
1402            +  6  10
1403            +  8  10
1404            +  9  10
1405            +  10 11",
1406        );
1407
1408        let chunk_2 = StreamChunk::from_pretty(
1409            "   I   T
1410            -   5  10
1411            -   6  10
1412            -   8  10
1413            U- 10  11
1414            U+ 10  10",
1415        );
1416
1417        tx.push_chunk(chunk_1.clone());
1418        tx.push_chunk(chunk_2.clone());
1419
1420        tx.push_barrier(test_epoch(2), false);
1421
1422        let mut stream = log_store_executor.execute();
1423
1424        match stream.next().await {
1425            Some(Ok(Message::Barrier(barrier))) => {
1426                assert_eq!(barrier.epoch.curr, test_epoch(1));
1427            }
1428            other => panic!("Expected a barrier message, got {:?}", other),
1429        }
1430
1431        match stream.next().await {
1432            Some(Ok(Message::Chunk(chunk))) => {
1433                assert_stream_chunk_eq!(chunk, chunk_1);
1434            }
1435            other => panic!("Expected a chunk message, got {:?}", other),
1436        }
1437
1438        match stream.next().await {
1439            Some(Ok(Message::Chunk(chunk))) => {
1440                assert_stream_chunk_eq!(chunk, chunk_2);
1441            }
1442            other => panic!("Expected a chunk message, got {:?}", other),
1443        }
1444
1445        match stream.next().await {
1446            Some(Ok(Message::Barrier(barrier))) => {
1447                assert_eq!(barrier.epoch.curr, test_epoch(2));
1448            }
1449            other => panic!("Expected a barrier message, got {:?}", other),
1450        }
1451    }
1452
1453    // When we hit buffer max_chunk, we only store placeholder `FlushedItem`.
1454    // So we just let capacity = 0, and we will always flush incoming chunks to state store.
1455    #[tokio::test]
1456    async fn test_max_chunk_persisted_read() {
1457        init_logger();
1458
1459        let pk_info = &KV_LOG_STORE_V2_INFO;
1460        let column_descs = test_payload_schema(pk_info);
1461        let fields = column_descs
1462            .into_iter()
1463            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1464            .collect_vec();
1465        let schema = Schema { fields };
1466        let stream_key = vec![0];
1467        let (mut tx, source) = MockSource::channel();
1468        let source = source.into_executor(schema.clone(), stream_key.clone());
1469
1470        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1471
1472        let table = gen_test_log_store_table(pk_info);
1473
1474        let log_store_executor = SyncedKvLogStoreExecutor::new(
1475            ActorContext::for_test(123),
1476            table.id,
1477            SyncedKvLogStoreMetrics::for_test(),
1478            LogStoreRowSerde::new(&table, vnodes, pk_info),
1479            MemoryStateStore::new(),
1480            0,
1481            256,
1482            source,
1483            Duration::from_millis(256),
1484            false,
1485        )
1486        .boxed();
1487
1488        // Init
1489        tx.push_barrier(test_epoch(1), false);
1490
1491        let chunk_1 = StreamChunk::from_pretty(
1492            "  I   T
1493            +  5  10
1494            +  6  10
1495            +  8  10
1496            +  9  10
1497            +  10 11",
1498        );
1499
1500        let chunk_2 = StreamChunk::from_pretty(
1501            "   I   T
1502            -   5  10
1503            -   6  10
1504            -   8  10
1505            U- 10  11
1506            U+ 10  10",
1507        );
1508
1509        tx.push_chunk(chunk_1.clone());
1510        tx.push_chunk(chunk_2.clone());
1511
1512        tx.push_barrier(test_epoch(2), false);
1513
1514        let mut stream = log_store_executor.execute();
1515
1516        for i in 1..=2 {
1517            match stream.next().await {
1518                Some(Ok(Message::Barrier(barrier))) => {
1519                    assert_eq!(barrier.epoch.curr, test_epoch(i));
1520                }
1521                other => panic!("Expected a barrier message, got {:?}", other),
1522            }
1523        }
1524
1525        match stream.next().await {
1526            Some(Ok(Message::Chunk(actual))) => {
1527                let expected = StreamChunk::from_pretty(
1528                    "   I   T
1529                    +   5  10
1530                    +   6  10
1531                    +   8  10
1532                    +   9  10
1533                    +  10  11
1534                    -   5  10
1535                    -   6  10
1536                    -   8  10
1537                    U- 10  11
1538                    U+ 10  10",
1539                );
1540                assert_stream_chunk_eq!(actual, expected);
1541            }
1542            other => panic!("Expected a chunk message, got {:?}", other),
1543        }
1544    }
1545}