risingwave_stream/executor/
sync_kv_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This contains the synced kv log store implementation.
16//! It's meant to buffer a large number of records emitted from upstream,
17//! to avoid overwhelming the downstream executor.
18//!
19//! The synced kv log store polls two futures:
20//!
21//! 1. Upstream: upstream message source
22//!
23//!   It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ...
24//!   When writing a stream chunk, if the log store buffer is full, it will:
25//!     a. Flush the buffer to the log store.
26//!     b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`)
27//!       which can read the corresponding chunks in the log store.
28//!       We will compact adjacent references,
29//!       so it can read multiple chunks if there's a build up.
30//!
31//!   On receiving barriers, it will:
32//!     a. Apply truncation to historical data in the logstore.
33//!     b. Flush and checkpoint the logstore data.
34//!
35//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore.
36//!
37//!   It will read all historical data from the logstore first. This can be done just by
38//!   constructing a state store stream, which will read all data until the latest epoch.
39//!   This is a static snapshot of data.
40//!   For any subsequently flushed chunks, we will read them via
41//!   `flushed_chunk_future`. See the next paragraph below.
42//!
43//!   We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how
44//!   it's constructed, what it is.
45//!
46//!   Finally we will pop the earliest item in the buffer.
47//!   - If it's a chunk yield it.
48//!   - If it's a watermark yield it.
49//!   - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`),
50//!     we will read the corresponding chunks in the log store.
51//!     This is done by constructing a `flushed_chunk_future` which will read the log store
52//!     using the `seq_id`.
53//!   - Barrier,
54//!     because they are directly propagated from the upstream when polling it.
55//!
56//! TODO(kwannoel):
57//! - [] Add dedicated metrics for sync log store, namespace according to the upstream.
58//! - [] Add tests
59//! - [] Handle watermark r/w
60//! - [] Handle paused stream
61
62use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_common_estimate_size::EstimateSize;
77use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
78use risingwave_storage::StateStore;
79use risingwave_storage::store::timeout_auto_rebuild::TimeoutAutoRebuildIter;
80use risingwave_storage::store::{
81    LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
82};
83use rw_futures_util::drop_either_future;
84use tokio::time::{Duration, Instant, Sleep, sleep_until};
85use tokio_stream::adapters::Peekable;
86
87use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
88use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
89use crate::common::log_store_impl::kv_log_store::serde::{
90    KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
91};
92use crate::common::log_store_impl::kv_log_store::state::{
93    LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
94    LogStoreWriteState, new_log_store_state,
95};
96use crate::common::log_store_impl::kv_log_store::{
97    Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
98};
99use crate::executor::prelude::*;
100use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
101use crate::executor::{
102    Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
103};
104
105pub mod metrics {
106    use risingwave_common::id::FragmentId;
107    use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
108
109    use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
110    use crate::executor::monitor::StreamingMetrics;
111    use crate::task::ActorId;
112
113    #[derive(Clone)]
114    pub struct SyncedKvLogStoreMetrics {
115        // state of the log store
116        pub unclean_state: LabelGuardedIntCounter,
117        pub clean_state: LabelGuardedIntCounter,
118        pub wait_next_poll_ns: LabelGuardedIntCounter,
119
120        // Write metrics
121        pub storage_write_count: LabelGuardedIntCounter,
122        pub storage_write_size: LabelGuardedIntCounter,
123        pub pause_duration_ns: LabelGuardedIntCounter,
124
125        // Buffer metrics
126        pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
127        pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
128        pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
129        pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
130        pub buffer_memory_bytes: LabelGuardedIntGauge,
131        pub buffer_read_count: LabelGuardedIntCounter,
132        pub buffer_read_size: LabelGuardedIntCounter,
133
134        // Read metrics
135        pub total_read_count: LabelGuardedIntCounter,
136        pub total_read_size: LabelGuardedIntCounter,
137        pub persistent_log_read_metrics: KvLogStoreReadMetrics,
138        pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
139    }
140
141    impl SyncedKvLogStoreMetrics {
142        /// `id`: refers to a unique way to identify the logstore. This can be the sink id,
143        ///       or for joins, it can be the `fragment_id`.
144        /// `name`: refers to the MV / Sink that the log store is associated with.
145        /// `target`: refers to the target of the log store,
146        ///           for instance `MySql` Sink, PG sink, etc...
147        ///           or unaligned join.
148        pub(crate) fn new(
149            metrics: &StreamingMetrics,
150            actor_id: ActorId,
151            fragment_id: FragmentId,
152            name: &str,
153            target: &'static str,
154        ) -> Self {
155            let actor_id_str = actor_id.to_string();
156            let fragment_id_str = fragment_id.to_string();
157            let labels = &[&actor_id_str, target, &fragment_id_str, name];
158
159            let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
160                "dirty",
161                &actor_id_str,
162                target,
163                &fragment_id_str,
164                name,
165            ]);
166            let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
167                "clean",
168                &actor_id_str,
169                target,
170                &fragment_id_str,
171                name,
172            ]);
173            let wait_next_poll_ns = metrics
174                .sync_kv_log_store_wait_next_poll_ns
175                .with_guarded_label_values(labels);
176
177            let storage_write_size = metrics
178                .sync_kv_log_store_storage_write_size
179                .with_guarded_label_values(labels);
180            let storage_write_count = metrics
181                .sync_kv_log_store_storage_write_count
182                .with_guarded_label_values(labels);
183            let storage_pause_duration_ns = metrics
184                .sync_kv_log_store_write_pause_duration_ns
185                .with_guarded_label_values(labels);
186
187            let buffer_unconsumed_item_count = metrics
188                .sync_kv_log_store_buffer_unconsumed_item_count
189                .with_guarded_label_values(labels);
190            let buffer_unconsumed_row_count = metrics
191                .sync_kv_log_store_buffer_unconsumed_row_count
192                .with_guarded_label_values(labels);
193            let buffer_unconsumed_epoch_count = metrics
194                .sync_kv_log_store_buffer_unconsumed_epoch_count
195                .with_guarded_label_values(labels);
196            let buffer_unconsumed_min_epoch = metrics
197                .sync_kv_log_store_buffer_unconsumed_min_epoch
198                .with_guarded_label_values(labels);
199            let buffer_memory_bytes = metrics
200                .sync_kv_log_store_buffer_memory_bytes
201                .with_guarded_label_values(labels);
202            let buffer_read_count = metrics
203                .sync_kv_log_store_read_count
204                .with_guarded_label_values(&[
205                    "buffer",
206                    &actor_id_str,
207                    target,
208                    &fragment_id_str,
209                    name,
210                ]);
211
212            let buffer_read_size = metrics
213                .sync_kv_log_store_read_size
214                .with_guarded_label_values(&[
215                    "buffer",
216                    &actor_id_str,
217                    target,
218                    &fragment_id_str,
219                    name,
220                ]);
221
222            let total_read_count = metrics
223                .sync_kv_log_store_read_count
224                .with_guarded_label_values(&[
225                    "total",
226                    &actor_id_str,
227                    target,
228                    &fragment_id_str,
229                    name,
230                ]);
231
232            let total_read_size = metrics
233                .sync_kv_log_store_read_size
234                .with_guarded_label_values(&[
235                    "total",
236                    &actor_id_str,
237                    target,
238                    &fragment_id_str,
239                    name,
240                ]);
241
242            const READ_PERSISTENT_LOG: &str = "persistent_log";
243            const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
244
245            let persistent_log_read_size = metrics
246                .sync_kv_log_store_read_size
247                .with_guarded_label_values(&[
248                    READ_PERSISTENT_LOG,
249                    &actor_id_str,
250                    target,
251                    &fragment_id_str,
252                    name,
253                ]);
254
255            let persistent_log_read_count = metrics
256                .sync_kv_log_store_read_count
257                .with_guarded_label_values(&[
258                    READ_PERSISTENT_LOG,
259                    &actor_id_str,
260                    target,
261                    &fragment_id_str,
262                    name,
263                ]);
264
265            let flushed_buffer_read_size = metrics
266                .sync_kv_log_store_read_size
267                .with_guarded_label_values(&[
268                    READ_FLUSHED_BUFFER,
269                    &actor_id_str,
270                    target,
271                    &fragment_id_str,
272                    name,
273                ]);
274
275            let flushed_buffer_read_count = metrics
276                .sync_kv_log_store_read_count
277                .with_guarded_label_values(&[
278                    READ_FLUSHED_BUFFER,
279                    &actor_id_str,
280                    target,
281                    &fragment_id_str,
282                    name,
283                ]);
284
285            Self {
286                unclean_state,
287                clean_state,
288                wait_next_poll_ns,
289                storage_write_size,
290                storage_write_count,
291                pause_duration_ns: storage_pause_duration_ns,
292                buffer_unconsumed_item_count,
293                buffer_unconsumed_row_count,
294                buffer_unconsumed_epoch_count,
295                buffer_unconsumed_min_epoch,
296                buffer_memory_bytes,
297                buffer_read_count,
298                buffer_read_size,
299                total_read_count,
300                total_read_size,
301                persistent_log_read_metrics: KvLogStoreReadMetrics {
302                    storage_read_size: persistent_log_read_size,
303                    storage_read_count: persistent_log_read_count,
304                },
305                flushed_buffer_read_metrics: KvLogStoreReadMetrics {
306                    storage_read_count: flushed_buffer_read_count,
307                    storage_read_size: flushed_buffer_read_size,
308                },
309            }
310        }
311
312        #[cfg(test)]
313        pub(crate) fn for_test() -> Self {
314            SyncedKvLogStoreMetrics {
315                unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
316                clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
317                wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
318                storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
319                storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
320                pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
321                buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
322                buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
323                buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
324                buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
325                buffer_memory_bytes: LabelGuardedIntGauge::test_int_gauge::<4>(),
326                buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
327                buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
328                total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
329                total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
330                persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
331                flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
332            }
333        }
334    }
335}
336
337type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
338
339pub struct SyncedKvLogStoreExecutor<S: StateStore> {
340    actor_context: ActorContextRef,
341    table_id: TableId,
342    metrics: SyncedKvLogStoreMetrics,
343    serde: LogStoreRowSerde,
344
345    // Upstream
346    upstream: Executor,
347
348    // Log store state
349    state_store: S,
350    max_buffer_size: usize,
351
352    // Max chunk size when reading from logstore / buffer
353    chunk_size: usize,
354
355    pause_duration_ms: Duration,
356
357    aligned: bool,
358}
359// Stream interface
360impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
361    #[expect(clippy::too_many_arguments)]
362    pub(crate) fn new(
363        actor_context: ActorContextRef,
364        table_id: TableId,
365        metrics: SyncedKvLogStoreMetrics,
366        serde: LogStoreRowSerde,
367        state_store: S,
368        buffer_size: usize,
369        chunk_size: usize,
370        upstream: Executor,
371        pause_duration_ms: Duration,
372        aligned: bool,
373    ) -> Self {
374        Self {
375            actor_context,
376            table_id,
377            metrics,
378            serde,
379            state_store,
380            upstream,
381            max_buffer_size: buffer_size,
382            chunk_size,
383            pause_duration_ms,
384            aligned,
385        }
386    }
387}
388
389struct FlushedChunkInfo {
390    epoch: u64,
391    start_seq_id: SeqId,
392    end_seq_id: SeqId,
393    flush_info: FlushInfo,
394    vnode_bitmap: Bitmap,
395}
396
397enum WriteFuture<S: LocalStateStore> {
398    /// We trigger a brief pause to let the `ReadFuture` be polled in the following scenarios:
399    /// - When seeing an upstream data chunk, when the buffer becomes full, and the state is clean.
400    /// - When seeing a checkpoint barrier, when the buffer is not empty, and the state is clean.
401    ///
402    /// On pausing, we will transition to a dirty state.
403    ///
404    /// We trigger resume to let the `ReadFuture` to be polled in the following scenarios:
405    /// - After the pause duration.
406    /// - After the read future consumes a chunk.
407    Paused {
408        start_instant: Instant,
409        sleep_future: Option<Pin<Box<Sleep>>>,
410        barrier: Barrier,
411        stream: BoxedMessageStream,
412        write_state: LogStoreWriteState<S>, // Just used to hold the state
413    },
414    ReceiveFromUpstream {
415        future: StreamFuture<BoxedMessageStream>,
416        write_state: LogStoreWriteState<S>,
417    },
418    FlushingChunk {
419        epoch: u64,
420        start_seq_id: SeqId,
421        end_seq_id: SeqId,
422        future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
423        stream: BoxedMessageStream,
424    },
425    Empty,
426}
427
428enum WriteFutureEvent {
429    UpstreamMessageReceived(Message),
430    ChunkFlushed(FlushedChunkInfo),
431}
432
433impl<S: LocalStateStore> WriteFuture<S> {
434    fn flush_chunk(
435        stream: BoxedMessageStream,
436        write_state: LogStoreWriteState<S>,
437        chunk: StreamChunk,
438        epoch: u64,
439        start_seq_id: SeqId,
440        end_seq_id: SeqId,
441    ) -> Self {
442        tracing::trace!(
443            start_seq_id,
444            end_seq_id,
445            epoch,
446            cardinality = chunk.cardinality(),
447            "write_future: flushing chunk"
448        );
449        Self::FlushingChunk {
450            epoch,
451            start_seq_id,
452            end_seq_id,
453            future: Box::pin(write_state.into_write_chunk_future(
454                chunk,
455                epoch,
456                start_seq_id,
457                end_seq_id,
458            )),
459            stream,
460        }
461    }
462
463    fn receive_from_upstream(
464        stream: BoxedMessageStream,
465        write_state: LogStoreWriteState<S>,
466    ) -> Self {
467        Self::ReceiveFromUpstream {
468            future: stream.into_future(),
469            write_state,
470        }
471    }
472
473    fn paused(
474        duration: Duration,
475        barrier: Barrier,
476        stream: BoxedMessageStream,
477        write_state: LogStoreWriteState<S>,
478    ) -> Self {
479        let now = Instant::now();
480        tracing::trace!(?now, ?duration, "write_future_pause");
481        Self::Paused {
482            start_instant: now,
483            sleep_future: Some(Box::pin(sleep_until(now + duration))),
484            barrier,
485            stream,
486            write_state,
487        }
488    }
489
490    async fn next_event(
491        &mut self,
492        metrics: &SyncedKvLogStoreMetrics,
493    ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
494        match self {
495            WriteFuture::Paused {
496                start_instant,
497                sleep_future,
498                ..
499            } => {
500                if let Some(sleep_future) = sleep_future {
501                    sleep_future.await;
502                    metrics
503                        .pause_duration_ns
504                        .inc_by(start_instant.elapsed().as_nanos() as _);
505                    tracing::trace!("resuming write future");
506                }
507                must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
508                    Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
509                })
510            }
511            WriteFuture::ReceiveFromUpstream { future, .. } => {
512                let (opt, stream) = future.await;
513                must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
514                    opt
515                    .ok_or_else(|| anyhow!("end of upstream input").into())
516                    .and_then(|result| result.map(|item| {
517                        (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
518                    }))
519                })
520            }
521            WriteFuture::FlushingChunk { future, .. } => {
522                let (write_state, result) = future.await;
523                let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, ..  } => {
524                    result.map(|(flush_info, vnode_bitmap)| {
525                        (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
526                            epoch,
527                            start_seq_id,
528                            end_seq_id,
529                            flush_info,
530                            vnode_bitmap,
531                        }))
532                    })
533                });
534                result.map_err(Into::into)
535            }
536            WriteFuture::Empty => {
537                unreachable!("should not be polled after ready")
538            }
539        }
540    }
541}
542
543// Stream interface
544impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
545    #[try_stream(ok= Message, error = StreamExecutorError)]
546    pub async fn execute_monitored(self) {
547        let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
548        #[for_await]
549        for message in self.execute_inner() {
550            let current_time = Instant::now();
551            yield message?;
552            wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
553        }
554    }
555
556    #[try_stream(ok = Message, error = StreamExecutorError)]
557    async fn execute_inner(self) {
558        let mut input = self.upstream.execute();
559
560        // init first epoch + local state store
561        let first_barrier = expect_first_barrier(&mut input).await?;
562        let first_write_epoch = first_barrier.epoch;
563        yield Message::Barrier(first_barrier.clone());
564
565        let local_state_store = self
566            .state_store
567            .new_local(NewLocalOptions {
568                table_id: self.table_id,
569                fragment_id: self.actor_context.fragment_id,
570                op_consistency_level: OpConsistencyLevel::Inconsistent,
571                table_option: TableOption {
572                    retention_seconds: None,
573                },
574                is_replicated: false,
575                vnodes: self.serde.vnodes().clone(),
576                upload_on_flush: false,
577            })
578            .await;
579
580        let (mut read_state, mut initial_write_state) = new_log_store_state(
581            self.table_id,
582            local_state_store,
583            self.serde,
584            self.chunk_size,
585        );
586        initial_write_state.init(first_write_epoch).await?;
587
588        let mut pause_stream = first_barrier.is_pause_on_startup();
589        let mut initial_write_epoch = first_write_epoch;
590
591        if self.aligned {
592            tracing::info!("aligned mode");
593            // We want to realign the buffer and the stream.
594            // We just block the upstream input stream,
595            // and wait until the persisted logstore is empty.
596            // Then after that we can consume the input stream.
597            let log_store_stream = read_state
598                .read_persisted_log_store(
599                    self.metrics.persistent_log_read_metrics.clone(),
600                    initial_write_epoch.curr,
601                    LogStoreReadStateStreamRangeStart::Unbounded,
602                )
603                .await?;
604
605            #[for_await]
606            for message in log_store_stream {
607                let (_epoch, message) = message?;
608                match message {
609                    KvLogStoreItem::Barrier { .. } => {
610                        continue;
611                    }
612                    KvLogStoreItem::StreamChunk { chunk, .. } => {
613                        yield Message::Chunk(chunk);
614                    }
615                }
616            }
617
618            let mut realigned_logstore = false;
619
620            #[for_await]
621            for message in input {
622                match message? {
623                    Message::Barrier(barrier) => {
624                        let is_checkpoint = barrier.is_checkpoint();
625                        let mut progress = LogStoreVnodeProgress::None;
626                        progress.apply_aligned(
627                            read_state.vnodes().clone(),
628                            barrier.epoch.prev,
629                            None,
630                        );
631                        // Truncate the logstore
632                        let post_seal = initial_write_state
633                            .seal_current_epoch(barrier.epoch.curr, progress.take());
634                        let update_vnode_bitmap =
635                            barrier.as_update_vnode_bitmap(self.actor_context.id);
636                        yield Message::Barrier(barrier);
637                        post_seal.post_yield_barrier(update_vnode_bitmap).await?;
638                        if !realigned_logstore && is_checkpoint {
639                            realigned_logstore = true;
640                            tracing::info!("realigned logstore");
641                        }
642                    }
643                    Message::Chunk(chunk) => {
644                        yield Message::Chunk(chunk);
645                    }
646                    Message::Watermark(watermark) => {
647                        yield Message::Watermark(watermark);
648                    }
649                }
650            }
651
652            return Ok(());
653        }
654
655        // We only recreate the consume stream when:
656        // 1. On bootstrap
657        // 2. On vnode update
658        'recreate_consume_stream: loop {
659            let mut seq_id = FIRST_SEQ_ID;
660            let mut buffer = SyncedLogStoreBuffer {
661                buffer: VecDeque::new(),
662                current_size: 0,
663                max_size: self.max_buffer_size,
664                max_chunk_size: self.chunk_size,
665                next_chunk_id: 0,
666                metrics: self.metrics.clone(),
667                flushed_count: 0,
668            };
669
670            let log_store_stream = read_state
671                .read_persisted_log_store(
672                    self.metrics.persistent_log_read_metrics.clone(),
673                    initial_write_epoch.curr,
674                    LogStoreReadStateStreamRangeStart::Unbounded,
675                )
676                .await?;
677
678            let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
679            let mut clean_state = log_store_stream.peek().await.is_none();
680            tracing::trace!(?clean_state);
681
682            let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
683
684            let mut write_future_state =
685                WriteFuture::receive_from_upstream(input, initial_write_state);
686
687            let mut progress = LogStoreVnodeProgress::None;
688
689            loop {
690                let select_result = {
691                    let read_future = async {
692                        if pause_stream {
693                            pending().await
694                        } else {
695                            read_future_state
696                                .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
697                                .await
698                        }
699                    };
700                    pin_mut!(read_future);
701                    let write_future = write_future_state.next_event(&self.metrics);
702                    pin_mut!(write_future);
703                    let output = select(write_future, read_future).await;
704                    drop_either_future(output)
705                };
706                match select_result {
707                    Either::Left(result) => {
708                        // drop the future to ensure that the future must be reset later
709                        drop(write_future_state);
710                        let (stream, mut write_state, either) = result?;
711                        match either {
712                            WriteFutureEvent::UpstreamMessageReceived(msg) => {
713                                match msg {
714                                    Message::Barrier(barrier) => {
715                                        if clean_state
716                                            && barrier.kind.is_checkpoint()
717                                            && !buffer.is_empty()
718                                        {
719                                            write_future_state = WriteFuture::paused(
720                                                self.pause_duration_ms,
721                                                barrier,
722                                                stream,
723                                                write_state,
724                                            );
725                                            clean_state = false;
726                                            self.metrics.unclean_state.inc();
727                                        } else {
728                                            if let Some(mutation) = barrier.mutation.as_deref() {
729                                                match mutation {
730                                                    Mutation::Pause => {
731                                                        pause_stream = true;
732                                                    }
733                                                    Mutation::Resume => {
734                                                        pause_stream = false;
735                                                    }
736                                                    _ => {}
737                                                }
738                                            }
739                                            let write_state_post_write_barrier =
740                                                Self::write_barrier(
741                                                    self.actor_context.id,
742                                                    &mut write_state,
743                                                    barrier.clone(),
744                                                    &self.metrics,
745                                                    progress.take(),
746                                                    &mut buffer,
747                                                )
748                                                .await?;
749                                            seq_id = FIRST_SEQ_ID;
750                                            let update_vnode_bitmap = barrier
751                                                .as_update_vnode_bitmap(self.actor_context.id);
752                                            let barrier_epoch = barrier.epoch;
753                                            tracing::trace!(
754                                                ?update_vnode_bitmap,
755                                                actor_id = %self.actor_context.id,
756                                                "update vnode bitmap"
757                                            );
758
759                                            yield Message::Barrier(barrier);
760
761                                            write_state_post_write_barrier
762                                                .post_yield_barrier(update_vnode_bitmap.clone())
763                                                .await?;
764                                            if let Some(vnode_bitmap) = update_vnode_bitmap {
765                                                // Apply Vnode Update
766                                                read_state.update_vnode_bitmap(vnode_bitmap);
767                                                initial_write_epoch = barrier_epoch;
768                                                input = stream;
769                                                initial_write_state = write_state;
770                                                continue 'recreate_consume_stream;
771                                            } else {
772                                                write_future_state =
773                                                    WriteFuture::receive_from_upstream(
774                                                        stream,
775                                                        write_state,
776                                                    );
777                                            }
778                                        }
779                                    }
780                                    Message::Chunk(chunk) => {
781                                        // Skip empty chunks to avoid non-advancing seq_id
782                                        // (end_seq_id = start_seq_id - 1), which would violate the
783                                        // strict progress increase invariant in apply_aligned when
784                                        // consecutive empty chunks produce identical end_seq_id.
785                                        if chunk.cardinality() == 0 {
786                                            tracing::warn!(
787                                                epoch = write_state.epoch().curr,
788                                                "received empty chunk (cardinality=0), skipping"
789                                            );
790                                            write_future_state = WriteFuture::receive_from_upstream(
791                                                stream,
792                                                write_state,
793                                            );
794                                        } else {
795                                            let start_seq_id = seq_id;
796                                            let new_seq_id = seq_id + chunk.cardinality() as SeqId;
797                                            let end_seq_id = new_seq_id - 1;
798                                            let epoch = write_state.epoch().curr;
799                                            tracing::trace!(
800                                                start_seq_id,
801                                                end_seq_id,
802                                                new_seq_id,
803                                                epoch,
804                                                cardinality = chunk.cardinality(),
805                                                "received chunk"
806                                            );
807                                            if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
808                                                start_seq_id,
809                                                end_seq_id,
810                                                chunk,
811                                                epoch,
812                                            ) {
813                                                seq_id = new_seq_id;
814                                                write_future_state = WriteFuture::flush_chunk(
815                                                    stream,
816                                                    write_state,
817                                                    chunk_to_flush,
818                                                    epoch,
819                                                    start_seq_id,
820                                                    end_seq_id,
821                                                );
822                                            } else {
823                                                seq_id = new_seq_id;
824                                                write_future_state =
825                                                    WriteFuture::receive_from_upstream(
826                                                        stream,
827                                                        write_state,
828                                                    );
829                                            }
830                                        }
831                                    }
832                                    // FIXME(kwannoel): This should truncate the logstore,
833                                    // it will not bypass like barrier.
834                                    Message::Watermark(_watermark) => {
835                                        write_future_state =
836                                            WriteFuture::receive_from_upstream(stream, write_state);
837                                    }
838                                }
839                            }
840                            WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
841                                start_seq_id,
842                                end_seq_id,
843                                epoch,
844                                flush_info,
845                                vnode_bitmap,
846                            }) => {
847                                buffer.add_flushed_item_to_buffer(
848                                    start_seq_id,
849                                    end_seq_id,
850                                    vnode_bitmap,
851                                    epoch,
852                                );
853                                self.metrics
854                                    .storage_write_count
855                                    .inc_by(flush_info.flush_count as _);
856                                self.metrics
857                                    .storage_write_size
858                                    .inc_by(flush_info.flush_size as _);
859                                write_future_state =
860                                    WriteFuture::receive_from_upstream(stream, write_state);
861                            }
862                        }
863                    }
864                    Either::Right(result) => {
865                        if !clean_state
866                            && matches!(read_future_state, ReadFuture::Idle)
867                            && buffer.is_empty()
868                        {
869                            clean_state = true;
870                            self.metrics.clean_state.inc();
871
872                            // Let write future resume immediately
873                            if let WriteFuture::Paused { sleep_future, .. } =
874                                &mut write_future_state
875                            {
876                                tracing::trace!("resuming paused future");
877                                assert!(buffer.current_size < self.max_buffer_size);
878                                *sleep_future = None;
879                            }
880                        }
881                        let chunk = result?;
882                        self.metrics
883                            .total_read_count
884                            .inc_by(chunk.cardinality() as _);
885
886                        yield Message::Chunk(chunk);
887                    }
888                }
889            }
890        }
891    }
892}
893
894type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
895
896enum ReadFuture<S: StateStoreRead> {
897    ReadingPersistedStream(PersistedStream<S>),
898    ReadingFlushedChunk {
899        future: ReadFlushedChunkFuture,
900        end_seq_id: SeqId,
901    },
902    Idle,
903}
904
905// Read methods
906impl<S: StateStoreRead> ReadFuture<S> {
907    async fn next_chunk(
908        &mut self,
909        progress: &mut LogStoreVnodeProgress,
910        read_state: &LogStoreReadState<S>,
911        buffer: &mut SyncedLogStoreBuffer,
912        metrics: &SyncedKvLogStoreMetrics,
913    ) -> StreamExecutorResult<StreamChunk> {
914        match self {
915            ReadFuture::ReadingPersistedStream(stream) => {
916                while let Some((epoch, item)) = stream.try_next().await? {
917                    match item {
918                        KvLogStoreItem::Barrier { vnodes, .. } => {
919                            tracing::trace!(epoch, "read logstore barrier");
920                            // update the progress
921                            progress.apply_aligned(vnodes, epoch, None);
922                            continue;
923                        }
924                        KvLogStoreItem::StreamChunk {
925                            chunk,
926                            progress: chunk_progress,
927                        } => {
928                            tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
929                            progress.apply_per_vnode(epoch, chunk_progress);
930                            return Ok(chunk);
931                        }
932                    }
933                }
934                *self = ReadFuture::Idle;
935            }
936            ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
937        }
938        match self {
939            ReadFuture::ReadingPersistedStream(_) => {
940                unreachable!("must have finished read persisted stream when reaching here")
941            }
942            ReadFuture::ReadingFlushedChunk { .. } => {}
943            ReadFuture::Idle => loop {
944                let Some((item_epoch, item)) = buffer.pop_front() else {
945                    return pending().await;
946                };
947                match item {
948                    LogStoreBufferItem::StreamChunk {
949                        chunk,
950                        start_seq_id,
951                        end_seq_id,
952                        flushed,
953                        ..
954                    } => {
955                        metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
956                        tracing::trace!(
957                            start_seq_id,
958                            end_seq_id,
959                            flushed,
960                            cardinality = chunk.cardinality(),
961                            "read buffered chunk of size"
962                        );
963                        progress.apply_aligned(
964                            read_state.vnodes().clone(),
965                            item_epoch,
966                            Some(end_seq_id),
967                        );
968                        return Ok(chunk);
969                    }
970                    LogStoreBufferItem::Flushed {
971                        vnode_bitmap,
972                        start_seq_id,
973                        end_seq_id,
974                        chunk_id,
975                    } => {
976                        tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
977                        let read_metrics = metrics.flushed_buffer_read_metrics.clone();
978                        let future = read_state
979                            .read_flushed_chunk(
980                                vnode_bitmap,
981                                chunk_id,
982                                start_seq_id,
983                                end_seq_id,
984                                item_epoch,
985                                read_metrics,
986                            )
987                            .boxed();
988                        *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
989                        break;
990                    }
991                    LogStoreBufferItem::Barrier { .. } => {
992                        tracing::trace!(item_epoch, "read buffer barrier");
993                        progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
994                        continue;
995                    }
996                }
997            },
998        }
999
1000        let (future, end_seq_id) = match self {
1001            ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
1002                unreachable!("should be at ReadingFlushedChunk")
1003            }
1004            ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
1005        };
1006
1007        let (_, chunk, epoch) = future.await?;
1008        progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
1009        tracing::trace!(
1010            end_seq_id,
1011            "read flushed chunk of size: {}",
1012            chunk.cardinality()
1013        );
1014        *self = ReadFuture::Idle;
1015        Ok(chunk)
1016    }
1017}
1018
1019// Write methods
1020impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
1021    async fn write_barrier<'a>(
1022        actor_id: ActorId,
1023        write_state: &'a mut LogStoreWriteState<S::Local>,
1024        barrier: Barrier,
1025        metrics: &SyncedKvLogStoreMetrics,
1026        progress: LogStoreVnodeProgress,
1027        buffer: &mut SyncedLogStoreBuffer,
1028    ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1029        tracing::trace!(%actor_id, ?progress, "applying truncation");
1030        // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
1031        // to reduce memory consumption of logstore.
1032
1033        let epoch = barrier.epoch.prev;
1034        let mut writer = write_state.start_writer(false);
1035        writer.write_barrier(epoch, barrier.is_checkpoint())?;
1036
1037        if barrier.is_checkpoint() {
1038            for (epoch, item) in buffer.buffer.iter_mut().rev() {
1039                match item {
1040                    LogStoreBufferItem::StreamChunk {
1041                        chunk,
1042                        start_seq_id,
1043                        end_seq_id,
1044                        flushed,
1045                        ..
1046                    } => {
1047                        if !*flushed {
1048                            writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1049                            *flushed = true;
1050                        } else {
1051                            break;
1052                        }
1053                    }
1054                    LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1055                }
1056            }
1057        }
1058
1059        // Apply truncation
1060        let (flush_info, _) = writer.finish().await?;
1061        metrics
1062            .storage_write_count
1063            .inc_by(flush_info.flush_count as _);
1064        metrics
1065            .storage_write_size
1066            .inc_by(flush_info.flush_size as _);
1067        let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1068
1069        // Add to buffer
1070        buffer.buffer.push_back((
1071            epoch,
1072            LogStoreBufferItem::Barrier {
1073                is_checkpoint: barrier.is_checkpoint(),
1074                next_epoch: barrier.epoch.curr,
1075                schema_change: None,
1076                is_stop: false,
1077            },
1078        ));
1079        buffer.next_chunk_id = 0;
1080        buffer.update_buffer_metrics();
1081
1082        Ok(post_seal)
1083    }
1084}
1085
1086struct SyncedLogStoreBuffer {
1087    buffer: VecDeque<(u64, LogStoreBufferItem)>,
1088    current_size: usize,
1089    max_size: usize,
1090    max_chunk_size: usize,
1091    next_chunk_id: ChunkId,
1092    metrics: SyncedKvLogStoreMetrics,
1093    flushed_count: usize,
1094}
1095
1096impl SyncedLogStoreBuffer {
1097    fn is_empty(&self) -> bool {
1098        self.current_size == 0
1099    }
1100
1101    fn add_or_flush_chunk(
1102        &mut self,
1103        start_seq_id: SeqId,
1104        end_seq_id: SeqId,
1105        chunk: StreamChunk,
1106        epoch: u64,
1107    ) -> Option<StreamChunk> {
1108        let current_size = self.current_size;
1109        let chunk_size = chunk.cardinality();
1110
1111        tracing::trace!(
1112            current_size,
1113            chunk_size,
1114            max_size = self.max_size,
1115            "checking chunk size"
1116        );
1117        let should_flush_chunk = current_size + chunk_size > self.max_size;
1118        if should_flush_chunk {
1119            tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1120            Some(chunk)
1121        } else {
1122            tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1123            self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1124            None
1125        }
1126    }
1127
1128    /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer.
1129    /// This doesn't contain any data, but it contains the metadata to read the flushed chunk.
1130    fn add_flushed_item_to_buffer(
1131        &mut self,
1132        start_seq_id: SeqId,
1133        end_seq_id: SeqId,
1134        new_vnode_bitmap: Bitmap,
1135        epoch: u64,
1136    ) {
1137        let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1138
1139        if let Some((
1140            item_epoch,
1141            LogStoreBufferItem::Flushed {
1142                start_seq_id: prev_start_seq_id,
1143                end_seq_id: prev_end_seq_id,
1144                vnode_bitmap,
1145                ..
1146            },
1147        )) = self.buffer.back_mut()
1148            && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1149            && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1150            && projected_flushed_chunk_size <= self.max_chunk_size
1151        {
1152            assert!(
1153                *prev_end_seq_id < start_seq_id,
1154                "prev end_seq_id {} should be smaller than current start_seq_id {}",
1155                end_seq_id,
1156                start_seq_id
1157            );
1158            assert_eq!(
1159                epoch, *item_epoch,
1160                "epoch of newly added flushed item must be the same as the last flushed item"
1161            );
1162            *prev_end_seq_id = end_seq_id;
1163            *vnode_bitmap |= new_vnode_bitmap;
1164        } else {
1165            let chunk_id = self.next_chunk_id;
1166            self.next_chunk_id += 1;
1167            self.buffer.push_back((
1168                epoch,
1169                LogStoreBufferItem::Flushed {
1170                    start_seq_id,
1171                    end_seq_id,
1172                    vnode_bitmap: new_vnode_bitmap,
1173                    chunk_id,
1174                },
1175            ));
1176            self.flushed_count += 1;
1177            tracing::trace!(
1178                "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1179            );
1180        }
1181        // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported.
1182        self.update_buffer_metrics();
1183    }
1184
1185    fn add_chunk_to_buffer(
1186        &mut self,
1187        chunk: StreamChunk,
1188        start_seq_id: SeqId,
1189        end_seq_id: SeqId,
1190        epoch: u64,
1191    ) {
1192        let chunk_id = self.next_chunk_id;
1193        self.next_chunk_id += 1;
1194        self.current_size += chunk.cardinality();
1195        self.buffer.push_back((
1196            epoch,
1197            LogStoreBufferItem::StreamChunk {
1198                chunk,
1199                start_seq_id,
1200                end_seq_id,
1201                flushed: false,
1202                chunk_id,
1203            },
1204        ));
1205        self.update_buffer_metrics();
1206    }
1207
1208    fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1209        let item = self.buffer.pop_front();
1210        match &item {
1211            Some((_, LogStoreBufferItem::Flushed { .. })) => {
1212                self.flushed_count -= 1;
1213            }
1214            Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1215                self.current_size -= chunk.cardinality();
1216            }
1217            _ => {}
1218        }
1219        self.update_buffer_metrics();
1220        item
1221    }
1222
1223    fn update_buffer_metrics(&self) {
1224        let mut epoch_count = 0;
1225        let mut row_count = 0;
1226        let mut memory_bytes = 0;
1227        for (_, item) in &self.buffer {
1228            match item {
1229                LogStoreBufferItem::StreamChunk { chunk, .. } => {
1230                    row_count += chunk.cardinality();
1231                }
1232                LogStoreBufferItem::Flushed {
1233                    start_seq_id,
1234                    end_seq_id,
1235                    ..
1236                } => {
1237                    row_count += (end_seq_id - start_seq_id) as usize;
1238                }
1239                LogStoreBufferItem::Barrier { .. } => {
1240                    epoch_count += 1;
1241                }
1242            }
1243            memory_bytes += item.estimated_size();
1244        }
1245        self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1246        self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1247        self.metrics
1248            .buffer_unconsumed_item_count
1249            .set(self.buffer.len() as _);
1250        self.metrics.buffer_unconsumed_min_epoch.set(
1251            self.buffer
1252                .front()
1253                .map(|(epoch, _)| *epoch)
1254                .unwrap_or_default() as _,
1255        );
1256        self.metrics.buffer_memory_bytes.set(memory_bytes as _);
1257    }
1258}
1259
1260impl<S> Execute for SyncedKvLogStoreExecutor<S>
1261where
1262    S: StateStore,
1263{
1264    fn execute(self: Box<Self>) -> BoxedMessageStream {
1265        self.execute_monitored().boxed()
1266    }
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271    use itertools::Itertools;
1272    use pretty_assertions::assert_eq;
1273    use risingwave_common::catalog::Field;
1274    use risingwave_common::hash::VirtualNode;
1275    use risingwave_common::test_prelude::*;
1276    use risingwave_common::util::epoch::test_epoch;
1277    use risingwave_storage::memory::MemoryStateStore;
1278
1279    use super::*;
1280    use crate::assert_stream_chunk_eq;
1281    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1282    use crate::common::log_store_impl::kv_log_store::test_utils::{
1283        check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1284    };
1285    use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1286    use crate::executor::test_utils::MockSource;
1287
1288    fn init_logger() {
1289        let _ = tracing_subscriber::fmt()
1290            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1291            .with_ansi(false)
1292            .try_init();
1293    }
1294
1295    // test read/write buffer
1296    #[tokio::test]
1297    async fn test_read_write_buffer() {
1298        init_logger();
1299
1300        let pk_info = &KV_LOG_STORE_V2_INFO;
1301        let column_descs = test_payload_schema(pk_info);
1302        let fields = column_descs
1303            .into_iter()
1304            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1305            .collect_vec();
1306        let schema = Schema { fields };
1307        let stream_key = vec![0];
1308        let (mut tx, source) = MockSource::channel();
1309        let source = source.into_executor(schema.clone(), stream_key.clone());
1310
1311        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1312
1313        let table = gen_test_log_store_table(pk_info);
1314
1315        let log_store_executor = SyncedKvLogStoreExecutor::new(
1316            ActorContext::for_test(123),
1317            table.id,
1318            SyncedKvLogStoreMetrics::for_test(),
1319            LogStoreRowSerde::new(&table, vnodes, pk_info),
1320            MemoryStateStore::new(),
1321            10,
1322            256,
1323            source,
1324            Duration::from_millis(256),
1325            false,
1326        )
1327        .boxed();
1328
1329        // Init
1330        tx.push_barrier(test_epoch(1), false);
1331
1332        let chunk_1 = StreamChunk::from_pretty(
1333            "  I   T
1334            +  5  10
1335            +  6  10
1336            +  8  10
1337            +  9  10
1338            +  10 11",
1339        );
1340
1341        let chunk_2 = StreamChunk::from_pretty(
1342            "   I   T
1343            -   5  10
1344            -   6  10
1345            -   8  10
1346            U-  9  10
1347            U+ 10  11",
1348        );
1349
1350        tx.push_chunk(chunk_1.clone());
1351        tx.push_chunk(chunk_2.clone());
1352
1353        let mut stream = log_store_executor.execute();
1354
1355        match stream.next().await {
1356            Some(Ok(Message::Barrier(barrier))) => {
1357                assert_eq!(barrier.epoch.curr, test_epoch(1));
1358            }
1359            other => panic!("Expected a barrier message, got {:?}", other),
1360        }
1361
1362        match stream.next().await {
1363            Some(Ok(Message::Chunk(chunk))) => {
1364                assert_stream_chunk_eq!(chunk, chunk_1);
1365            }
1366            other => panic!("Expected a chunk message, got {:?}", other),
1367        }
1368
1369        match stream.next().await {
1370            Some(Ok(Message::Chunk(chunk))) => {
1371                assert_stream_chunk_eq!(chunk, chunk_2);
1372            }
1373            other => panic!("Expected a chunk message, got {:?}", other),
1374        }
1375
1376        tx.push_barrier(test_epoch(2), false);
1377
1378        match stream.next().await {
1379            Some(Ok(Message::Barrier(barrier))) => {
1380                assert_eq!(barrier.epoch.curr, test_epoch(2));
1381            }
1382            other => panic!("Expected a barrier message, got {:?}", other),
1383        }
1384    }
1385
1386    // test barrier persisted read
1387    //
1388    // sequence of events (earliest -> latest):
1389    // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item
1390    // * poll just means we read from the executor stream.
1391    #[tokio::test]
1392    async fn test_barrier_persisted_read() {
1393        init_logger();
1394
1395        let pk_info = &KV_LOG_STORE_V2_INFO;
1396        let column_descs = test_payload_schema(pk_info);
1397        let fields = column_descs
1398            .into_iter()
1399            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1400            .collect_vec();
1401        let schema = Schema { fields };
1402        let stream_key = vec![0];
1403        let (mut tx, source) = MockSource::channel();
1404        let source = source.into_executor(schema.clone(), stream_key.clone());
1405
1406        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1407
1408        let table = gen_test_log_store_table(pk_info);
1409
1410        let log_store_executor = SyncedKvLogStoreExecutor::new(
1411            ActorContext::for_test(123),
1412            table.id,
1413            SyncedKvLogStoreMetrics::for_test(),
1414            LogStoreRowSerde::new(&table, vnodes, pk_info),
1415            MemoryStateStore::new(),
1416            10,
1417            256,
1418            source,
1419            Duration::from_millis(256),
1420            false,
1421        )
1422        .boxed();
1423
1424        // Init
1425        tx.push_barrier(test_epoch(1), false);
1426
1427        let chunk_1 = StreamChunk::from_pretty(
1428            "  I   T
1429            +  5  10
1430            +  6  10
1431            +  8  10
1432            +  9  10
1433            +  10 11",
1434        );
1435
1436        let chunk_2 = StreamChunk::from_pretty(
1437            "   I   T
1438            -   5  10
1439            -   6  10
1440            -   8  10
1441            U- 10  11
1442            U+ 10  10",
1443        );
1444
1445        tx.push_chunk(chunk_1.clone());
1446        tx.push_chunk(chunk_2.clone());
1447
1448        tx.push_barrier(test_epoch(2), false);
1449
1450        let mut stream = log_store_executor.execute();
1451
1452        match stream.next().await {
1453            Some(Ok(Message::Barrier(barrier))) => {
1454                assert_eq!(barrier.epoch.curr, test_epoch(1));
1455            }
1456            other => panic!("Expected a barrier message, got {:?}", other),
1457        }
1458
1459        match stream.next().await {
1460            Some(Ok(Message::Chunk(chunk))) => {
1461                assert_stream_chunk_eq!(chunk, chunk_1);
1462            }
1463            other => panic!("Expected a chunk message, got {:?}", other),
1464        }
1465
1466        match stream.next().await {
1467            Some(Ok(Message::Chunk(chunk))) => {
1468                assert_stream_chunk_eq!(chunk, chunk_2);
1469            }
1470            other => panic!("Expected a chunk message, got {:?}", other),
1471        }
1472
1473        match stream.next().await {
1474            Some(Ok(Message::Barrier(barrier))) => {
1475                assert_eq!(barrier.epoch.curr, test_epoch(2));
1476            }
1477            other => panic!("Expected a barrier message, got {:?}", other),
1478        }
1479    }
1480
1481    // When we hit buffer max_chunk, we only store placeholder `FlushedItem`.
1482    // So we just let capacity = 0, and we will always flush incoming chunks to state store.
1483    #[tokio::test]
1484    async fn test_max_chunk_persisted_read() {
1485        init_logger();
1486
1487        let pk_info = &KV_LOG_STORE_V2_INFO;
1488        let column_descs = test_payload_schema(pk_info);
1489        let fields = column_descs
1490            .into_iter()
1491            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1492            .collect_vec();
1493        let schema = Schema { fields };
1494        let stream_key = vec![0];
1495        let (mut tx, source) = MockSource::channel();
1496        let source = source.into_executor(schema.clone(), stream_key.clone());
1497
1498        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1499
1500        let table = gen_test_log_store_table(pk_info);
1501
1502        let log_store_executor = SyncedKvLogStoreExecutor::new(
1503            ActorContext::for_test(123),
1504            table.id,
1505            SyncedKvLogStoreMetrics::for_test(),
1506            LogStoreRowSerde::new(&table, vnodes, pk_info),
1507            MemoryStateStore::new(),
1508            0,
1509            256,
1510            source,
1511            Duration::from_millis(256),
1512            false,
1513        )
1514        .boxed();
1515
1516        // Init
1517        tx.push_barrier(test_epoch(1), false);
1518
1519        let chunk_1 = StreamChunk::from_pretty(
1520            "  I   T
1521            +  5  10
1522            +  6  10
1523            +  8  10
1524            +  9  10
1525            +  10 11",
1526        );
1527
1528        let chunk_2 = StreamChunk::from_pretty(
1529            "   I   T
1530            -   5  10
1531            -   6  10
1532            -   8  10
1533            U- 10  11
1534            U+ 10  10",
1535        );
1536
1537        tx.push_chunk(chunk_1.clone());
1538        tx.push_chunk(chunk_2.clone());
1539
1540        tx.push_barrier(test_epoch(2), false);
1541
1542        let mut stream = log_store_executor.execute();
1543
1544        for i in 1..=2 {
1545            match stream.next().await {
1546                Some(Ok(Message::Barrier(barrier))) => {
1547                    assert_eq!(barrier.epoch.curr, test_epoch(i));
1548                }
1549                other => panic!("Expected a barrier message, got {:?}", other),
1550            }
1551        }
1552
1553        match stream.next().await {
1554            Some(Ok(Message::Chunk(actual))) => {
1555                let expected = StreamChunk::from_pretty(
1556                    "   I   T
1557                    +   5  10
1558                    +   6  10
1559                    +   8  10
1560                    +   9  10
1561                    +  10  11
1562                    -   5  10
1563                    -   6  10
1564                    -   8  10
1565                    U- 10  11
1566                    U+ 10  10",
1567                );
1568                assert_stream_chunk_eq!(actual, expected);
1569            }
1570            other => panic!("Expected a chunk message, got {:?}", other),
1571        }
1572    }
1573}