risingwave_stream/executor/
sync_kv_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This contains the synced kv log store implementation.
16//! It's meant to buffer a large number of records emitted from upstream,
17//! to avoid overwhelming the downstream executor.
18//!
19//! The synced kv log store polls two futures:
20//!
21//! 1. Upstream: upstream message source
22//!
23//!   It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ...
24//!   When writing a stream chunk, if the log store buffer is full, it will:
25//!     a. Flush the buffer to the log store.
26//!     b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`)
27//!       which can read the corresponding chunks in the log store.
28//!       We will compact adjacent references,
29//!       so it can read multiple chunks if there's a build up.
30//!
31//!   On receiving barriers, it will:
32//!     a. Apply truncation to historical data in the logstore.
33//!     b. Flush and checkpoint the logstore data.
34//!
35//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore.
36//!
37//!   It will read all historical data from the logstore first. This can be done just by
38//!   constructing a state store stream, which will read all data until the latest epoch.
39//!   This is a static snapshot of data.
40//!   For any subsequently flushed chunks, we will read them via
41//!   `flushed_chunk_future`. See the next paragraph below.
42//!
43//!   We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how
44//!   it's constructed, what it is.
45//!
46//!   Finally we will pop the earliest item in the buffer.
47//!   - If it's a chunk yield it.
48//!   - If it's a watermark yield it.
49//!   - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`),
50//!     we will read the corresponding chunks in the log store.
51//!     This is done by constructing a `flushed_chunk_future` which will read the log store
52//!     using the `seq_id`.
53//!   - Barrier,
54//!     because they are directly propagated from the upstream when polling it.
55//!
56//! TODO(kwannoel):
57//! - [] Add dedicated metrics for sync log store, namespace according to the upstream.
58//! - [] Add tests
59//! - [] Handle watermark r/w
60//! - [] Handle paused stream
61
62use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79    LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89    KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92    LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93    LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96    Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101    Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105    use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
106
107    use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
108    use crate::executor::monitor::StreamingMetrics;
109    use crate::task::ActorId;
110
111    #[derive(Clone)]
112    pub struct SyncedKvLogStoreMetrics {
113        // state of the log store
114        pub unclean_state: LabelGuardedIntCounter,
115        pub clean_state: LabelGuardedIntCounter,
116        pub wait_next_poll_ns: LabelGuardedIntCounter,
117
118        // Write metrics
119        pub storage_write_count: LabelGuardedIntCounter,
120        pub storage_write_size: LabelGuardedIntCounter,
121        pub pause_duration_ns: LabelGuardedIntCounter,
122
123        // Buffer metrics
124        pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
125        pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
126        pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
127        pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
128        pub buffer_read_count: LabelGuardedIntCounter,
129        pub buffer_read_size: LabelGuardedIntCounter,
130
131        // Read metrics
132        pub total_read_count: LabelGuardedIntCounter,
133        pub total_read_size: LabelGuardedIntCounter,
134        pub persistent_log_read_metrics: KvLogStoreReadMetrics,
135        pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
136    }
137
138    impl SyncedKvLogStoreMetrics {
139        /// `id`: refers to a unique way to identify the logstore. This can be the sink id,
140        ///       or for joins, it can be the `fragment_id`.
141        /// `name`: refers to the MV / Sink that the log store is associated with.
142        /// `target`: refers to the target of the log store,
143        ///           for instance `MySql` Sink, PG sink, etc...
144        ///           or unaligned join.
145        pub(crate) fn new(
146            metrics: &StreamingMetrics,
147            actor_id: ActorId,
148            id: u32,
149            name: &str,
150            target: &'static str,
151        ) -> Self {
152            let actor_id_str = actor_id.to_string();
153            let id_str = id.to_string();
154            let labels = &[&actor_id_str, target, &id_str, name];
155
156            let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
157                "dirty",
158                &actor_id_str,
159                target,
160                &id_str,
161                name,
162            ]);
163            let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
164                "clean",
165                &actor_id_str,
166                target,
167                &id_str,
168                name,
169            ]);
170            let wait_next_poll_ns = metrics
171                .sync_kv_log_store_wait_next_poll_ns
172                .with_guarded_label_values(labels);
173
174            let storage_write_size = metrics
175                .sync_kv_log_store_storage_write_size
176                .with_guarded_label_values(labels);
177            let storage_write_count = metrics
178                .sync_kv_log_store_storage_write_count
179                .with_guarded_label_values(labels);
180            let storage_pause_duration_ns = metrics
181                .sync_kv_log_store_write_pause_duration_ns
182                .with_guarded_label_values(labels);
183
184            let buffer_unconsumed_item_count = metrics
185                .sync_kv_log_store_buffer_unconsumed_item_count
186                .with_guarded_label_values(labels);
187            let buffer_unconsumed_row_count = metrics
188                .sync_kv_log_store_buffer_unconsumed_row_count
189                .with_guarded_label_values(labels);
190            let buffer_unconsumed_epoch_count = metrics
191                .sync_kv_log_store_buffer_unconsumed_epoch_count
192                .with_guarded_label_values(labels);
193            let buffer_unconsumed_min_epoch = metrics
194                .sync_kv_log_store_buffer_unconsumed_min_epoch
195                .with_guarded_label_values(labels);
196            let buffer_read_count = metrics
197                .sync_kv_log_store_read_count
198                .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
199
200            let buffer_read_size = metrics
201                .sync_kv_log_store_read_size
202                .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
203
204            let total_read_count = metrics
205                .sync_kv_log_store_read_count
206                .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
207
208            let total_read_size = metrics
209                .sync_kv_log_store_read_size
210                .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
211
212            const READ_PERSISTENT_LOG: &str = "persistent_log";
213            const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
214
215            let persistent_log_read_size = metrics
216                .sync_kv_log_store_read_size
217                .with_guarded_label_values(&[
218                    READ_PERSISTENT_LOG,
219                    &actor_id_str,
220                    target,
221                    &id_str,
222                    name,
223                ]);
224
225            let persistent_log_read_count = metrics
226                .sync_kv_log_store_read_count
227                .with_guarded_label_values(&[
228                    READ_PERSISTENT_LOG,
229                    &actor_id_str,
230                    target,
231                    &id_str,
232                    name,
233                ]);
234
235            let flushed_buffer_read_size = metrics
236                .sync_kv_log_store_read_size
237                .with_guarded_label_values(&[
238                    READ_FLUSHED_BUFFER,
239                    &actor_id_str,
240                    target,
241                    &id_str,
242                    name,
243                ]);
244
245            let flushed_buffer_read_count = metrics
246                .sync_kv_log_store_read_count
247                .with_guarded_label_values(&[
248                    READ_FLUSHED_BUFFER,
249                    &actor_id_str,
250                    target,
251                    &id_str,
252                    name,
253                ]);
254
255            Self {
256                unclean_state,
257                clean_state,
258                wait_next_poll_ns,
259                storage_write_size,
260                storage_write_count,
261                pause_duration_ns: storage_pause_duration_ns,
262                buffer_unconsumed_item_count,
263                buffer_unconsumed_row_count,
264                buffer_unconsumed_epoch_count,
265                buffer_unconsumed_min_epoch,
266                buffer_read_count,
267                buffer_read_size,
268                total_read_count,
269                total_read_size,
270                persistent_log_read_metrics: KvLogStoreReadMetrics {
271                    storage_read_size: persistent_log_read_size,
272                    storage_read_count: persistent_log_read_count,
273                },
274                flushed_buffer_read_metrics: KvLogStoreReadMetrics {
275                    storage_read_count: flushed_buffer_read_count,
276                    storage_read_size: flushed_buffer_read_size,
277                },
278            }
279        }
280
281        #[cfg(test)]
282        pub(crate) fn for_test() -> Self {
283            SyncedKvLogStoreMetrics {
284                unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
285                clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
286                wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
287                storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
288                storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
289                pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
290                buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
291                buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
292                buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
293                buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
294                buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
295                buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
296                total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
297                total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
298                persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
299                flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
300            }
301        }
302    }
303}
304
305type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
306
307pub struct SyncedKvLogStoreExecutor<S: StateStore> {
308    actor_context: ActorContextRef,
309    table_id: TableId,
310    metrics: SyncedKvLogStoreMetrics,
311    serde: LogStoreRowSerde,
312
313    // Upstream
314    upstream: Executor,
315
316    // Log store state
317    state_store: S,
318    max_buffer_size: usize,
319
320    // Max chunk size when reading from logstore / buffer
321    chunk_size: usize,
322
323    pause_duration_ms: Duration,
324
325    aligned: bool,
326}
327// Stream interface
328impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
329    #[expect(clippy::too_many_arguments)]
330    pub(crate) fn new(
331        actor_context: ActorContextRef,
332        table_id: u32,
333        metrics: SyncedKvLogStoreMetrics,
334        serde: LogStoreRowSerde,
335        state_store: S,
336        buffer_size: usize,
337        chunk_size: usize,
338        upstream: Executor,
339        pause_duration_ms: Duration,
340        aligned: bool,
341    ) -> Self {
342        Self {
343            actor_context,
344            table_id: TableId::new(table_id),
345            metrics,
346            serde,
347            state_store,
348            upstream,
349            max_buffer_size: buffer_size,
350            chunk_size,
351            pause_duration_ms,
352            aligned,
353        }
354    }
355}
356
357struct FlushedChunkInfo {
358    epoch: u64,
359    start_seq_id: SeqId,
360    end_seq_id: SeqId,
361    flush_info: FlushInfo,
362    vnode_bitmap: Bitmap,
363}
364
365enum WriteFuture<S: LocalStateStore> {
366    /// We trigger a brief pause to let the `ReadFuture` be polled in the following scenarios:
367    /// - When seeing an upstream data chunk, when the buffer becomes full, and the state is clean.
368    /// - When seeing a checkpoint barrier, when the buffer is not empty, and the state is clean.
369    ///
370    /// On pausing, we will transition to a dirty state.
371    ///
372    /// We trigger resume to let the `ReadFuture` to be polled in the following scenarios:
373    /// - After the pause duration.
374    /// - After the read future consumes a chunk.
375    Paused {
376        start_instant: Instant,
377        sleep_future: Option<Pin<Box<Sleep>>>,
378        barrier: Barrier,
379        stream: BoxedMessageStream,
380        write_state: LogStoreWriteState<S>, // Just used to hold the state
381    },
382    ReceiveFromUpstream {
383        future: StreamFuture<BoxedMessageStream>,
384        write_state: LogStoreWriteState<S>,
385    },
386    FlushingChunk {
387        epoch: u64,
388        start_seq_id: SeqId,
389        end_seq_id: SeqId,
390        future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
391        stream: BoxedMessageStream,
392    },
393    Empty,
394}
395
396enum WriteFutureEvent {
397    UpstreamMessageReceived(Message),
398    ChunkFlushed(FlushedChunkInfo),
399}
400
401impl<S: LocalStateStore> WriteFuture<S> {
402    fn flush_chunk(
403        stream: BoxedMessageStream,
404        write_state: LogStoreWriteState<S>,
405        chunk: StreamChunk,
406        epoch: u64,
407        start_seq_id: SeqId,
408        end_seq_id: SeqId,
409    ) -> Self {
410        tracing::trace!(
411            start_seq_id,
412            end_seq_id,
413            epoch,
414            cardinality = chunk.cardinality(),
415            "write_future: flushing chunk"
416        );
417        Self::FlushingChunk {
418            epoch,
419            start_seq_id,
420            end_seq_id,
421            future: Box::pin(write_state.into_write_chunk_future(
422                chunk,
423                epoch,
424                start_seq_id,
425                end_seq_id,
426            )),
427            stream,
428        }
429    }
430
431    fn receive_from_upstream(
432        stream: BoxedMessageStream,
433        write_state: LogStoreWriteState<S>,
434    ) -> Self {
435        Self::ReceiveFromUpstream {
436            future: stream.into_future(),
437            write_state,
438        }
439    }
440
441    fn paused(
442        duration: Duration,
443        barrier: Barrier,
444        stream: BoxedMessageStream,
445        write_state: LogStoreWriteState<S>,
446    ) -> Self {
447        let now = Instant::now();
448        tracing::trace!(?now, ?duration, "write_future_pause");
449        Self::Paused {
450            start_instant: now,
451            sleep_future: Some(Box::pin(sleep_until(now + duration))),
452            barrier,
453            stream,
454            write_state,
455        }
456    }
457
458    async fn next_event(
459        &mut self,
460        metrics: &SyncedKvLogStoreMetrics,
461    ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
462        match self {
463            WriteFuture::Paused {
464                start_instant,
465                sleep_future,
466                ..
467            } => {
468                if let Some(sleep_future) = sleep_future {
469                    sleep_future.await;
470                    metrics
471                        .pause_duration_ns
472                        .inc_by(start_instant.elapsed().as_nanos() as _);
473                    tracing::trace!("resuming write future");
474                }
475                must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
476                    Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
477                })
478            }
479            WriteFuture::ReceiveFromUpstream { future, .. } => {
480                let (opt, stream) = future.await;
481                must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
482                    opt
483                    .ok_or_else(|| anyhow!("end of upstream input").into())
484                    .and_then(|result| result.map(|item| {
485                        (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
486                    }))
487                })
488            }
489            WriteFuture::FlushingChunk { future, .. } => {
490                let (write_state, result) = future.await;
491                let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, ..  } => {
492                    result.map(|(flush_info, vnode_bitmap)| {
493                        (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
494                            epoch,
495                            start_seq_id,
496                            end_seq_id,
497                            flush_info,
498                            vnode_bitmap,
499                        }))
500                    })
501                });
502                result.map_err(Into::into)
503            }
504            WriteFuture::Empty => {
505                unreachable!("should not be polled after ready")
506            }
507        }
508    }
509}
510
511// Stream interface
512impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
513    #[try_stream(ok= Message, error = StreamExecutorError)]
514    pub async fn execute_monitored(self) {
515        let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
516        #[for_await]
517        for message in self.execute_inner() {
518            let current_time = Instant::now();
519            yield message?;
520            wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
521        }
522    }
523
524    #[try_stream(ok = Message, error = StreamExecutorError)]
525    async fn execute_inner(self) {
526        let mut input = self.upstream.execute();
527
528        // init first epoch + local state store
529        let first_barrier = expect_first_barrier(&mut input).await?;
530        let first_write_epoch = first_barrier.epoch;
531        yield Message::Barrier(first_barrier.clone());
532
533        let local_state_store = self
534            .state_store
535            .new_local(NewLocalOptions {
536                table_id: self.table_id,
537                op_consistency_level: OpConsistencyLevel::Inconsistent,
538                table_option: TableOption {
539                    retention_seconds: None,
540                },
541                is_replicated: false,
542                vnodes: self.serde.vnodes().clone(),
543                upload_on_flush: false,
544            })
545            .await;
546
547        let (mut read_state, mut initial_write_state) = new_log_store_state(
548            self.table_id,
549            local_state_store,
550            self.serde,
551            self.chunk_size,
552        );
553        initial_write_state.init(first_write_epoch).await?;
554
555        let mut pause_stream = first_barrier.is_pause_on_startup();
556        let mut initial_write_epoch = first_write_epoch;
557
558        if self.aligned {
559            tracing::info!("aligned mode");
560            // We want to realign the buffer and the stream.
561            // We just block the upstream input stream,
562            // and wait until the persisted logstore is empty.
563            // Then after that we can consume the input stream.
564            let log_store_stream = read_state
565                .read_persisted_log_store(
566                    self.metrics.persistent_log_read_metrics.clone(),
567                    initial_write_epoch.curr,
568                    LogStoreReadStateStreamRangeStart::Unbounded,
569                )
570                .await?;
571
572            #[for_await]
573            for message in log_store_stream {
574                let (_epoch, message) = message?;
575                match message {
576                    KvLogStoreItem::Barrier { .. } => {
577                        continue;
578                    }
579                    KvLogStoreItem::StreamChunk { chunk, .. } => {
580                        yield Message::Chunk(chunk);
581                    }
582                }
583            }
584
585            let mut realigned_logstore = false;
586
587            #[for_await]
588            for message in input {
589                match message? {
590                    Message::Barrier(barrier) => {
591                        let is_checkpoint = barrier.is_checkpoint();
592                        let mut progress = LogStoreVnodeProgress::None;
593                        progress.apply_aligned(
594                            read_state.vnodes().clone(),
595                            barrier.epoch.prev,
596                            None,
597                        );
598                        // Truncate the logstore
599                        let post_seal = initial_write_state
600                            .seal_current_epoch(barrier.epoch.curr, progress.take());
601                        let update_vnode_bitmap =
602                            barrier.as_update_vnode_bitmap(self.actor_context.id);
603                        yield Message::Barrier(barrier);
604                        post_seal.post_yield_barrier(update_vnode_bitmap).await?;
605                        if !realigned_logstore && is_checkpoint {
606                            realigned_logstore = true;
607                            tracing::info!("realigned logstore");
608                        }
609                    }
610                    Message::Chunk(chunk) => {
611                        yield Message::Chunk(chunk);
612                    }
613                    Message::Watermark(watermark) => {
614                        yield Message::Watermark(watermark);
615                    }
616                }
617            }
618
619            return Ok(());
620        }
621
622        // We only recreate the consume stream when:
623        // 1. On bootstrap
624        // 2. On vnode update
625        'recreate_consume_stream: loop {
626            let mut seq_id = FIRST_SEQ_ID;
627            let mut buffer = SyncedLogStoreBuffer {
628                buffer: VecDeque::new(),
629                current_size: 0,
630                max_size: self.max_buffer_size,
631                max_chunk_size: self.chunk_size,
632                next_chunk_id: 0,
633                metrics: self.metrics.clone(),
634                flushed_count: 0,
635            };
636
637            let log_store_stream = read_state
638                .read_persisted_log_store(
639                    self.metrics.persistent_log_read_metrics.clone(),
640                    initial_write_epoch.curr,
641                    LogStoreReadStateStreamRangeStart::Unbounded,
642                )
643                .await?;
644
645            let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
646            let mut clean_state = log_store_stream.peek().await.is_none();
647            tracing::trace!(?clean_state);
648
649            let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
650
651            let mut write_future_state =
652                WriteFuture::receive_from_upstream(input, initial_write_state);
653
654            let mut progress = LogStoreVnodeProgress::None;
655
656            loop {
657                let select_result = {
658                    let read_future = async {
659                        if pause_stream {
660                            pending().await
661                        } else {
662                            read_future_state
663                                .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
664                                .await
665                        }
666                    };
667                    pin_mut!(read_future);
668                    let write_future = write_future_state.next_event(&self.metrics);
669                    pin_mut!(write_future);
670                    let output = select(write_future, read_future).await;
671                    drop_either_future(output)
672                };
673                match select_result {
674                    Either::Left(result) => {
675                        // drop the future to ensure that the future must be reset later
676                        drop(write_future_state);
677                        let (stream, mut write_state, either) = result?;
678                        match either {
679                            WriteFutureEvent::UpstreamMessageReceived(msg) => {
680                                match msg {
681                                    Message::Barrier(barrier) => {
682                                        if clean_state
683                                            && barrier.kind.is_checkpoint()
684                                            && !buffer.is_empty()
685                                        {
686                                            write_future_state = WriteFuture::paused(
687                                                self.pause_duration_ms,
688                                                barrier,
689                                                stream,
690                                                write_state,
691                                            );
692                                            clean_state = false;
693                                            self.metrics.unclean_state.inc();
694                                        } else {
695                                            if let Some(mutation) = barrier.mutation.as_deref() {
696                                                match mutation {
697                                                    Mutation::Pause => {
698                                                        pause_stream = true;
699                                                    }
700                                                    Mutation::Resume => {
701                                                        pause_stream = false;
702                                                    }
703                                                    _ => {}
704                                                }
705                                            }
706                                            let write_state_post_write_barrier =
707                                                Self::write_barrier(
708                                                    self.actor_context.id,
709                                                    &mut write_state,
710                                                    barrier.clone(),
711                                                    &self.metrics,
712                                                    progress.take(),
713                                                    &mut buffer,
714                                                )
715                                                .await?;
716                                            seq_id = FIRST_SEQ_ID;
717                                            let update_vnode_bitmap = barrier
718                                                .as_update_vnode_bitmap(self.actor_context.id);
719                                            let barrier_epoch = barrier.epoch;
720                                            tracing::trace!(
721                                                ?update_vnode_bitmap,
722                                                actor_id = self.actor_context.id,
723                                                "update vnode bitmap"
724                                            );
725
726                                            yield Message::Barrier(barrier);
727
728                                            write_state_post_write_barrier
729                                                .post_yield_barrier(update_vnode_bitmap.clone())
730                                                .await?;
731                                            if let Some(vnode_bitmap) = update_vnode_bitmap {
732                                                // Apply Vnode Update
733                                                read_state.update_vnode_bitmap(vnode_bitmap);
734                                                initial_write_epoch = barrier_epoch;
735                                                input = stream;
736                                                initial_write_state = write_state;
737                                                continue 'recreate_consume_stream;
738                                            } else {
739                                                write_future_state =
740                                                    WriteFuture::receive_from_upstream(
741                                                        stream,
742                                                        write_state,
743                                                    );
744                                            }
745                                        }
746                                    }
747                                    Message::Chunk(chunk) => {
748                                        let start_seq_id = seq_id;
749                                        let new_seq_id = seq_id + chunk.cardinality() as SeqId;
750                                        let end_seq_id = new_seq_id - 1;
751                                        let epoch = write_state.epoch().curr;
752                                        tracing::trace!(
753                                            start_seq_id,
754                                            end_seq_id,
755                                            new_seq_id,
756                                            epoch,
757                                            cardinality = chunk.cardinality(),
758                                            "received chunk"
759                                        );
760                                        if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
761                                            start_seq_id,
762                                            end_seq_id,
763                                            chunk,
764                                            epoch,
765                                        ) {
766                                            seq_id = new_seq_id;
767                                            write_future_state = WriteFuture::flush_chunk(
768                                                stream,
769                                                write_state,
770                                                chunk_to_flush,
771                                                epoch,
772                                                start_seq_id,
773                                                end_seq_id,
774                                            );
775                                        } else {
776                                            seq_id = new_seq_id;
777                                            write_future_state = WriteFuture::receive_from_upstream(
778                                                stream,
779                                                write_state,
780                                            );
781                                        }
782                                    }
783                                    // FIXME(kwannoel): This should truncate the logstore,
784                                    // it will not bypass like barrier.
785                                    Message::Watermark(_watermark) => {
786                                        write_future_state =
787                                            WriteFuture::receive_from_upstream(stream, write_state);
788                                    }
789                                }
790                            }
791                            WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
792                                start_seq_id,
793                                end_seq_id,
794                                epoch,
795                                flush_info,
796                                vnode_bitmap,
797                            }) => {
798                                buffer.add_flushed_item_to_buffer(
799                                    start_seq_id,
800                                    end_seq_id,
801                                    vnode_bitmap,
802                                    epoch,
803                                );
804                                self.metrics
805                                    .storage_write_count
806                                    .inc_by(flush_info.flush_count as _);
807                                self.metrics
808                                    .storage_write_size
809                                    .inc_by(flush_info.flush_size as _);
810                                write_future_state =
811                                    WriteFuture::receive_from_upstream(stream, write_state);
812                            }
813                        }
814                    }
815                    Either::Right(result) => {
816                        if !clean_state
817                            && matches!(read_future_state, ReadFuture::Idle)
818                            && buffer.is_empty()
819                        {
820                            clean_state = true;
821                            self.metrics.clean_state.inc();
822
823                            // Let write future resume immediately
824                            if let WriteFuture::Paused { sleep_future, .. } =
825                                &mut write_future_state
826                            {
827                                tracing::trace!("resuming paused future");
828                                assert!(buffer.current_size < self.max_buffer_size);
829                                *sleep_future = None;
830                            }
831                        }
832                        let chunk = result?;
833                        self.metrics
834                            .total_read_count
835                            .inc_by(chunk.cardinality() as _);
836
837                        yield Message::Chunk(chunk);
838                    }
839                }
840            }
841        }
842    }
843}
844
845type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
846
847enum ReadFuture<S: StateStoreRead> {
848    ReadingPersistedStream(PersistedStream<S>),
849    ReadingFlushedChunk {
850        future: ReadFlushedChunkFuture,
851        end_seq_id: SeqId,
852    },
853    Idle,
854}
855
856// Read methods
857impl<S: StateStoreRead> ReadFuture<S> {
858    async fn next_chunk(
859        &mut self,
860        progress: &mut LogStoreVnodeProgress,
861        read_state: &LogStoreReadState<S>,
862        buffer: &mut SyncedLogStoreBuffer,
863        metrics: &SyncedKvLogStoreMetrics,
864    ) -> StreamExecutorResult<StreamChunk> {
865        match self {
866            ReadFuture::ReadingPersistedStream(stream) => {
867                while let Some((epoch, item)) = stream.try_next().await? {
868                    match item {
869                        KvLogStoreItem::Barrier { vnodes, .. } => {
870                            tracing::trace!(epoch, "read logstore barrier");
871                            // update the progress
872                            progress.apply_aligned(vnodes, epoch, None);
873                            continue;
874                        }
875                        KvLogStoreItem::StreamChunk {
876                            chunk,
877                            progress: chunk_progress,
878                        } => {
879                            tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
880                            progress.apply_per_vnode(epoch, chunk_progress);
881                            return Ok(chunk);
882                        }
883                    }
884                }
885                *self = ReadFuture::Idle;
886            }
887            ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
888        }
889        match self {
890            ReadFuture::ReadingPersistedStream(_) => {
891                unreachable!("must have finished read persisted stream when reaching here")
892            }
893            ReadFuture::ReadingFlushedChunk { .. } => {}
894            ReadFuture::Idle => loop {
895                let Some((item_epoch, item)) = buffer.pop_front() else {
896                    return pending().await;
897                };
898                match item {
899                    LogStoreBufferItem::StreamChunk {
900                        chunk,
901                        start_seq_id,
902                        end_seq_id,
903                        flushed,
904                        ..
905                    } => {
906                        metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
907                        tracing::trace!(
908                            start_seq_id,
909                            end_seq_id,
910                            flushed,
911                            cardinality = chunk.cardinality(),
912                            "read buffered chunk of size"
913                        );
914                        progress.apply_aligned(
915                            read_state.vnodes().clone(),
916                            item_epoch,
917                            Some(end_seq_id),
918                        );
919                        return Ok(chunk);
920                    }
921                    LogStoreBufferItem::Flushed {
922                        vnode_bitmap,
923                        start_seq_id,
924                        end_seq_id,
925                        chunk_id,
926                    } => {
927                        tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
928                        let read_metrics = metrics.flushed_buffer_read_metrics.clone();
929                        let future = read_state
930                            .read_flushed_chunk(
931                                vnode_bitmap,
932                                chunk_id,
933                                start_seq_id,
934                                end_seq_id,
935                                item_epoch,
936                                read_metrics,
937                            )
938                            .boxed();
939                        *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
940                        break;
941                    }
942                    LogStoreBufferItem::Barrier { .. } => {
943                        tracing::trace!(item_epoch, "read buffer barrier");
944                        progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
945                        continue;
946                    }
947                }
948            },
949        }
950
951        let (future, end_seq_id) = match self {
952            ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
953                unreachable!("should be at ReadingFlushedChunk")
954            }
955            ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
956        };
957
958        let (_, chunk, epoch) = future.await?;
959        progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
960        tracing::trace!(
961            end_seq_id,
962            "read flushed chunk of size: {}",
963            chunk.cardinality()
964        );
965        *self = ReadFuture::Idle;
966        Ok(chunk)
967    }
968}
969
970// Write methods
971impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
972    async fn write_barrier<'a>(
973        actor_id: u32,
974        write_state: &'a mut LogStoreWriteState<S::Local>,
975        barrier: Barrier,
976        metrics: &SyncedKvLogStoreMetrics,
977        progress: LogStoreVnodeProgress,
978        buffer: &mut SyncedLogStoreBuffer,
979    ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
980        tracing::trace!(actor_id, ?progress, "applying truncation");
981        // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
982        // to reduce memory consumption of logstore.
983
984        let epoch = barrier.epoch.prev;
985        let mut writer = write_state.start_writer(false);
986        writer.write_barrier(epoch, barrier.is_checkpoint())?;
987
988        if barrier.is_checkpoint() {
989            for (epoch, item) in buffer.buffer.iter_mut().rev() {
990                match item {
991                    LogStoreBufferItem::StreamChunk {
992                        chunk,
993                        start_seq_id,
994                        end_seq_id,
995                        flushed,
996                        ..
997                    } => {
998                        if !*flushed {
999                            writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1000                            *flushed = true;
1001                        } else {
1002                            break;
1003                        }
1004                    }
1005                    LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1006                }
1007            }
1008        }
1009
1010        // Apply truncation
1011        let (flush_info, _) = writer.finish().await?;
1012        metrics
1013            .storage_write_count
1014            .inc_by(flush_info.flush_count as _);
1015        metrics
1016            .storage_write_size
1017            .inc_by(flush_info.flush_size as _);
1018        let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1019
1020        // Add to buffer
1021        buffer.buffer.push_back((
1022            epoch,
1023            LogStoreBufferItem::Barrier {
1024                is_checkpoint: barrier.is_checkpoint(),
1025                next_epoch: barrier.epoch.curr,
1026            },
1027        ));
1028        buffer.next_chunk_id = 0;
1029        buffer.update_unconsumed_buffer_metrics();
1030
1031        Ok(post_seal)
1032    }
1033}
1034
1035struct SyncedLogStoreBuffer {
1036    buffer: VecDeque<(u64, LogStoreBufferItem)>,
1037    current_size: usize,
1038    max_size: usize,
1039    max_chunk_size: usize,
1040    next_chunk_id: ChunkId,
1041    metrics: SyncedKvLogStoreMetrics,
1042    flushed_count: usize,
1043}
1044
1045impl SyncedLogStoreBuffer {
1046    fn is_empty(&self) -> bool {
1047        self.current_size == 0
1048    }
1049
1050    fn add_or_flush_chunk(
1051        &mut self,
1052        start_seq_id: SeqId,
1053        end_seq_id: SeqId,
1054        chunk: StreamChunk,
1055        epoch: u64,
1056    ) -> Option<StreamChunk> {
1057        let current_size = self.current_size;
1058        let chunk_size = chunk.cardinality();
1059
1060        tracing::trace!(
1061            current_size,
1062            chunk_size,
1063            max_size = self.max_size,
1064            "checking chunk size"
1065        );
1066        let should_flush_chunk = current_size + chunk_size > self.max_size;
1067        if should_flush_chunk {
1068            tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1069            Some(chunk)
1070        } else {
1071            tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1072            self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1073            None
1074        }
1075    }
1076
1077    /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer.
1078    /// This doesn't contain any data, but it contains the metadata to read the flushed chunk.
1079    fn add_flushed_item_to_buffer(
1080        &mut self,
1081        start_seq_id: SeqId,
1082        end_seq_id: SeqId,
1083        new_vnode_bitmap: Bitmap,
1084        epoch: u64,
1085    ) {
1086        let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1087
1088        if let Some((
1089            item_epoch,
1090            LogStoreBufferItem::Flushed {
1091                start_seq_id: prev_start_seq_id,
1092                end_seq_id: prev_end_seq_id,
1093                vnode_bitmap,
1094                ..
1095            },
1096        )) = self.buffer.back_mut()
1097            && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1098            && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1099            && projected_flushed_chunk_size <= self.max_chunk_size
1100        {
1101            assert!(
1102                *prev_end_seq_id < start_seq_id,
1103                "prev end_seq_id {} should be smaller than current start_seq_id {}",
1104                end_seq_id,
1105                start_seq_id
1106            );
1107            assert_eq!(
1108                epoch, *item_epoch,
1109                "epoch of newly added flushed item must be the same as the last flushed item"
1110            );
1111            *prev_end_seq_id = end_seq_id;
1112            *vnode_bitmap |= new_vnode_bitmap;
1113        } else {
1114            let chunk_id = self.next_chunk_id;
1115            self.next_chunk_id += 1;
1116            self.buffer.push_back((
1117                epoch,
1118                LogStoreBufferItem::Flushed {
1119                    start_seq_id,
1120                    end_seq_id,
1121                    vnode_bitmap: new_vnode_bitmap,
1122                    chunk_id,
1123                },
1124            ));
1125            self.flushed_count += 1;
1126            tracing::trace!(
1127                "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1128            );
1129        }
1130        // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported.
1131        self.update_unconsumed_buffer_metrics();
1132    }
1133
1134    fn add_chunk_to_buffer(
1135        &mut self,
1136        chunk: StreamChunk,
1137        start_seq_id: SeqId,
1138        end_seq_id: SeqId,
1139        epoch: u64,
1140    ) {
1141        let chunk_id = self.next_chunk_id;
1142        self.next_chunk_id += 1;
1143        self.current_size += chunk.cardinality();
1144        self.buffer.push_back((
1145            epoch,
1146            LogStoreBufferItem::StreamChunk {
1147                chunk,
1148                start_seq_id,
1149                end_seq_id,
1150                flushed: false,
1151                chunk_id,
1152            },
1153        ));
1154        self.update_unconsumed_buffer_metrics();
1155    }
1156
1157    fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1158        let item = self.buffer.pop_front();
1159        match &item {
1160            Some((_, LogStoreBufferItem::Flushed { .. })) => {
1161                self.flushed_count -= 1;
1162            }
1163            Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1164                self.current_size -= chunk.cardinality();
1165            }
1166            _ => {}
1167        }
1168        self.update_unconsumed_buffer_metrics();
1169        item
1170    }
1171
1172    fn update_unconsumed_buffer_metrics(&self) {
1173        let mut epoch_count = 0;
1174        let mut row_count = 0;
1175        for (_, item) in &self.buffer {
1176            match item {
1177                LogStoreBufferItem::StreamChunk { chunk, .. } => {
1178                    row_count += chunk.cardinality();
1179                }
1180                LogStoreBufferItem::Flushed {
1181                    start_seq_id,
1182                    end_seq_id,
1183                    ..
1184                } => {
1185                    row_count += (end_seq_id - start_seq_id) as usize;
1186                }
1187                LogStoreBufferItem::Barrier { .. } => {
1188                    epoch_count += 1;
1189                }
1190            }
1191        }
1192        self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1193        self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1194        self.metrics
1195            .buffer_unconsumed_item_count
1196            .set(self.buffer.len() as _);
1197        self.metrics.buffer_unconsumed_min_epoch.set(
1198            self.buffer
1199                .front()
1200                .map(|(epoch, _)| *epoch)
1201                .unwrap_or_default() as _,
1202        );
1203    }
1204}
1205
1206impl<S> Execute for SyncedKvLogStoreExecutor<S>
1207where
1208    S: StateStore,
1209{
1210    fn execute(self: Box<Self>) -> BoxedMessageStream {
1211        self.execute_monitored().boxed()
1212    }
1213}
1214
1215#[cfg(test)]
1216mod tests {
1217    use itertools::Itertools;
1218    use pretty_assertions::assert_eq;
1219    use risingwave_common::catalog::Field;
1220    use risingwave_common::hash::VirtualNode;
1221    use risingwave_common::test_prelude::*;
1222    use risingwave_common::util::epoch::test_epoch;
1223    use risingwave_storage::memory::MemoryStateStore;
1224
1225    use super::*;
1226    use crate::assert_stream_chunk_eq;
1227    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1228    use crate::common::log_store_impl::kv_log_store::test_utils::{
1229        check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1230    };
1231    use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1232    use crate::executor::test_utils::MockSource;
1233
1234    fn init_logger() {
1235        let _ = tracing_subscriber::fmt()
1236            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1237            .with_ansi(false)
1238            .try_init();
1239    }
1240
1241    // test read/write buffer
1242    #[tokio::test]
1243    async fn test_read_write_buffer() {
1244        init_logger();
1245
1246        let pk_info = &KV_LOG_STORE_V2_INFO;
1247        let column_descs = test_payload_schema(pk_info);
1248        let fields = column_descs
1249            .into_iter()
1250            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1251            .collect_vec();
1252        let schema = Schema { fields };
1253        let pk_indices = vec![0];
1254        let (mut tx, source) = MockSource::channel();
1255        let source = source.into_executor(schema.clone(), pk_indices.clone());
1256
1257        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1258
1259        let table = gen_test_log_store_table(pk_info);
1260
1261        let log_store_executor = SyncedKvLogStoreExecutor::new(
1262            ActorContext::for_test(123),
1263            table.id,
1264            SyncedKvLogStoreMetrics::for_test(),
1265            LogStoreRowSerde::new(&table, vnodes, pk_info),
1266            MemoryStateStore::new(),
1267            10,
1268            256,
1269            source,
1270            Duration::from_millis(256),
1271            false,
1272        )
1273        .boxed();
1274
1275        // Init
1276        tx.push_barrier(test_epoch(1), false);
1277
1278        let chunk_1 = StreamChunk::from_pretty(
1279            "  I   T
1280            +  5  10
1281            +  6  10
1282            +  8  10
1283            +  9  10
1284            +  10 11",
1285        );
1286
1287        let chunk_2 = StreamChunk::from_pretty(
1288            "   I   T
1289            -   5  10
1290            -   6  10
1291            -   8  10
1292            U-  9  10
1293            U+ 10  11",
1294        );
1295
1296        tx.push_chunk(chunk_1.clone());
1297        tx.push_chunk(chunk_2.clone());
1298
1299        let mut stream = log_store_executor.execute();
1300
1301        match stream.next().await {
1302            Some(Ok(Message::Barrier(barrier))) => {
1303                assert_eq!(barrier.epoch.curr, test_epoch(1));
1304            }
1305            other => panic!("Expected a barrier message, got {:?}", other),
1306        }
1307
1308        match stream.next().await {
1309            Some(Ok(Message::Chunk(chunk))) => {
1310                assert_stream_chunk_eq!(chunk, chunk_1);
1311            }
1312            other => panic!("Expected a chunk message, got {:?}", other),
1313        }
1314
1315        match stream.next().await {
1316            Some(Ok(Message::Chunk(chunk))) => {
1317                assert_stream_chunk_eq!(chunk, chunk_2);
1318            }
1319            other => panic!("Expected a chunk message, got {:?}", other),
1320        }
1321
1322        tx.push_barrier(test_epoch(2), false);
1323
1324        match stream.next().await {
1325            Some(Ok(Message::Barrier(barrier))) => {
1326                assert_eq!(barrier.epoch.curr, test_epoch(2));
1327            }
1328            other => panic!("Expected a barrier message, got {:?}", other),
1329        }
1330    }
1331
1332    // test barrier persisted read
1333    //
1334    // sequence of events (earliest -> latest):
1335    // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item
1336    // * poll just means we read from the executor stream.
1337    #[tokio::test]
1338    async fn test_barrier_persisted_read() {
1339        init_logger();
1340
1341        let pk_info = &KV_LOG_STORE_V2_INFO;
1342        let column_descs = test_payload_schema(pk_info);
1343        let fields = column_descs
1344            .into_iter()
1345            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1346            .collect_vec();
1347        let schema = Schema { fields };
1348        let pk_indices = vec![0];
1349        let (mut tx, source) = MockSource::channel();
1350        let source = source.into_executor(schema.clone(), pk_indices.clone());
1351
1352        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1353
1354        let table = gen_test_log_store_table(pk_info);
1355
1356        let log_store_executor = SyncedKvLogStoreExecutor::new(
1357            ActorContext::for_test(123),
1358            table.id,
1359            SyncedKvLogStoreMetrics::for_test(),
1360            LogStoreRowSerde::new(&table, vnodes, pk_info),
1361            MemoryStateStore::new(),
1362            10,
1363            256,
1364            source,
1365            Duration::from_millis(256),
1366            false,
1367        )
1368        .boxed();
1369
1370        // Init
1371        tx.push_barrier(test_epoch(1), false);
1372
1373        let chunk_1 = StreamChunk::from_pretty(
1374            "  I   T
1375            +  5  10
1376            +  6  10
1377            +  8  10
1378            +  9  10
1379            +  10 11",
1380        );
1381
1382        let chunk_2 = StreamChunk::from_pretty(
1383            "   I   T
1384            -   5  10
1385            -   6  10
1386            -   8  10
1387            U- 10  11
1388            U+ 10  10",
1389        );
1390
1391        tx.push_chunk(chunk_1.clone());
1392        tx.push_chunk(chunk_2.clone());
1393
1394        tx.push_barrier(test_epoch(2), false);
1395
1396        let mut stream = log_store_executor.execute();
1397
1398        match stream.next().await {
1399            Some(Ok(Message::Barrier(barrier))) => {
1400                assert_eq!(barrier.epoch.curr, test_epoch(1));
1401            }
1402            other => panic!("Expected a barrier message, got {:?}", other),
1403        }
1404
1405        match stream.next().await {
1406            Some(Ok(Message::Chunk(chunk))) => {
1407                assert_stream_chunk_eq!(chunk, chunk_1);
1408            }
1409            other => panic!("Expected a chunk message, got {:?}", other),
1410        }
1411
1412        match stream.next().await {
1413            Some(Ok(Message::Chunk(chunk))) => {
1414                assert_stream_chunk_eq!(chunk, chunk_2);
1415            }
1416            other => panic!("Expected a chunk message, got {:?}", other),
1417        }
1418
1419        match stream.next().await {
1420            Some(Ok(Message::Barrier(barrier))) => {
1421                assert_eq!(barrier.epoch.curr, test_epoch(2));
1422            }
1423            other => panic!("Expected a barrier message, got {:?}", other),
1424        }
1425    }
1426
1427    // When we hit buffer max_chunk, we only store placeholder `FlushedItem`.
1428    // So we just let capacity = 0, and we will always flush incoming chunks to state store.
1429    #[tokio::test]
1430    async fn test_max_chunk_persisted_read() {
1431        init_logger();
1432
1433        let pk_info = &KV_LOG_STORE_V2_INFO;
1434        let column_descs = test_payload_schema(pk_info);
1435        let fields = column_descs
1436            .into_iter()
1437            .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1438            .collect_vec();
1439        let schema = Schema { fields };
1440        let pk_indices = vec![0];
1441        let (mut tx, source) = MockSource::channel();
1442        let source = source.into_executor(schema.clone(), pk_indices.clone());
1443
1444        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1445
1446        let table = gen_test_log_store_table(pk_info);
1447
1448        let log_store_executor = SyncedKvLogStoreExecutor::new(
1449            ActorContext::for_test(123),
1450            table.id,
1451            SyncedKvLogStoreMetrics::for_test(),
1452            LogStoreRowSerde::new(&table, vnodes, pk_info),
1453            MemoryStateStore::new(),
1454            0,
1455            256,
1456            source,
1457            Duration::from_millis(256),
1458            false,
1459        )
1460        .boxed();
1461
1462        // Init
1463        tx.push_barrier(test_epoch(1), false);
1464
1465        let chunk_1 = StreamChunk::from_pretty(
1466            "  I   T
1467            +  5  10
1468            +  6  10
1469            +  8  10
1470            +  9  10
1471            +  10 11",
1472        );
1473
1474        let chunk_2 = StreamChunk::from_pretty(
1475            "   I   T
1476            -   5  10
1477            -   6  10
1478            -   8  10
1479            U- 10  11
1480            U+ 10  10",
1481        );
1482
1483        tx.push_chunk(chunk_1.clone());
1484        tx.push_chunk(chunk_2.clone());
1485
1486        tx.push_barrier(test_epoch(2), false);
1487
1488        let mut stream = log_store_executor.execute();
1489
1490        for i in 1..=2 {
1491            match stream.next().await {
1492                Some(Ok(Message::Barrier(barrier))) => {
1493                    assert_eq!(barrier.epoch.curr, test_epoch(i));
1494                }
1495                other => panic!("Expected a barrier message, got {:?}", other),
1496            }
1497        }
1498
1499        match stream.next().await {
1500            Some(Ok(Message::Chunk(actual))) => {
1501                let expected = StreamChunk::from_pretty(
1502                    "   I   T
1503                    +   5  10
1504                    +   6  10
1505                    +   8  10
1506                    +   9  10
1507                    +  10  11
1508                    -   5  10
1509                    -   6  10
1510                    -   8  10
1511                    U- 10  11
1512                    U+ 10  10",
1513                );
1514                assert_stream_chunk_eq!(actual, expected);
1515            }
1516            other => panic!("Expected a chunk message, got {:?}", other),
1517        }
1518    }
1519}