risingwave_stream/executor/
sync_kv_log_store.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This contains the synced kv log store implementation.
16//! It's meant to buffer a large number of records emitted from upstream,
17//! to avoid overwhelming the downstream executor.
18//!
19//! The synced kv log store polls two futures:
20//!
21//! 1. Upstream: upstream message source
22//!
23//!   It will write stream messages to the log store buffer. e.g. `Message::Barrier`, `Message::Chunk`, ...
24//!   When writing a stream chunk, if the log store buffer is full, it will:
25//!     a. Flush the buffer to the log store.
26//!     b. Convert the stream chunk into a reference (`LogStoreBufferItem::Flushed`)
27//!       which can read the corresponding chunks in the log store.
28//!       We will compact adjacent references,
29//!       so it can read multiple chunks if there's a build up.
30//!
31//!   On receiving barriers, it will:
32//!     a. Apply truncation to historical data in the logstore.
33//!     b. Flush and checkpoint the logstore data.
34//!
35//! 2. State store + buffer + recently flushed chunks: the storage components of the logstore.
36//!
37//!   It will read all historical data from the logstore first. This can be done just by
38//!   constructing a state store stream, which will read all data until the latest epoch.
39//!   This is a static snapshot of data.
40//!   For any subsequently flushed chunks, we will read them via
41//!   `flushed_chunk_future`. See the next paragraph below.
42//!
43//!   We will next read `flushed_chunk_future` (if there's one pre-existing one), see below for how
44//!   it's constructed, what it is.
45//!
46//!   Finally we will pop the earliest item in the buffer.
47//!   - If it's a chunk yield it.
48//!   - If it's a watermark yield it.
49//!   - If it's a flushed chunk reference (`LogStoreBufferItem::Flushed`),
50//!     we will read the corresponding chunks in the log store.
51//!     This is done by constructing a `flushed_chunk_future` which will read the log store
52//!     using the `seq_id`.
53//!   - Barrier,
54//!     because they are directly propagated from the upstream when polling it.
55//!
56//! TODO(kwannoel):
57//! - [] Add dedicated metrics for sync log store, namespace according to the upstream.
58//! - [] Add tests
59//! - [] Handle watermark r/w
60//! - [] Handle paused stream
61
62use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_common::util::epoch::EpochPair;
77use risingwave_common_estimate_size::EstimateSize;
78use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
79use risingwave_pb::id::FragmentId;
80use risingwave_storage::StateStore;
81use risingwave_storage::store::timeout_auto_rebuild::TimeoutAutoRebuildIter;
82use risingwave_storage::store::{
83    LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
84};
85use rw_futures_util::drop_either_future;
86use tokio::time::{Duration, Instant, Sleep, sleep_until};
87use tokio_stream::adapters::Peekable;
88
89use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
90use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
91use crate::common::log_store_impl::kv_log_store::serde::{
92    KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
93};
94use crate::common::log_store_impl::kv_log_store::state::{
95    LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
96    LogStoreWriteState, new_log_store_state,
97};
98use crate::common::log_store_impl::kv_log_store::{
99    Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
100};
101use crate::executor::prelude::*;
102use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
103use crate::executor::{
104    Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
105};
106
107pub mod metrics {
108    use risingwave_common::id::FragmentId;
109    use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
110
111    use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
112    use crate::executor::monitor::StreamingMetrics;
113    use crate::task::ActorId;
114
115    #[derive(Clone)]
116    pub struct SyncedKvLogStoreMetrics {
117        // state of the log store
118        pub unclean_state: LabelGuardedIntCounter,
119        pub clean_state: LabelGuardedIntCounter,
120        pub wait_next_poll_ns: LabelGuardedIntCounter,
121
122        // Write metrics
123        pub storage_write_count: LabelGuardedIntCounter,
124        pub storage_write_size: LabelGuardedIntCounter,
125        pub pause_duration_ns: LabelGuardedIntCounter,
126
127        // Buffer metrics
128        pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
129        pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
130        pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
131        pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
132        pub buffer_memory_bytes: LabelGuardedIntGauge,
133        pub buffer_read_count: LabelGuardedIntCounter,
134        pub buffer_read_size: LabelGuardedIntCounter,
135
136        // Read metrics
137        pub total_read_count: LabelGuardedIntCounter,
138        pub total_read_size: LabelGuardedIntCounter,
139        pub persistent_log_read_metrics: KvLogStoreReadMetrics,
140        pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
141    }
142
143    impl SyncedKvLogStoreMetrics {
144        /// `id`: refers to a unique way to identify the logstore. This can be the sink id,
145        ///       or for joins, it can be the `fragment_id`.
146        /// `name`: refers to the MV / Sink that the log store is associated with.
147        /// `target`: refers to the target of the log store,
148        ///           for instance `MySql` Sink, PG sink, etc...
149        ///           or unaligned join.
150        pub(crate) fn new(
151            metrics: &StreamingMetrics,
152            actor_id: ActorId,
153            fragment_id: FragmentId,
154            name: &str,
155            target: &'static str,
156        ) -> Self {
157            let actor_id_str = actor_id.to_string();
158            let fragment_id_str = fragment_id.to_string();
159            let labels = &[&actor_id_str, target, &fragment_id_str, name];
160
161            let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
162                "dirty",
163                &actor_id_str,
164                target,
165                &fragment_id_str,
166                name,
167            ]);
168            let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
169                "clean",
170                &actor_id_str,
171                target,
172                &fragment_id_str,
173                name,
174            ]);
175            let wait_next_poll_ns = metrics
176                .sync_kv_log_store_wait_next_poll_ns
177                .with_guarded_label_values(labels);
178
179            let storage_write_size = metrics
180                .sync_kv_log_store_storage_write_size
181                .with_guarded_label_values(labels);
182            let storage_write_count = metrics
183                .sync_kv_log_store_storage_write_count
184                .with_guarded_label_values(labels);
185            let storage_pause_duration_ns = metrics
186                .sync_kv_log_store_write_pause_duration_ns
187                .with_guarded_label_values(labels);
188
189            let buffer_unconsumed_item_count = metrics
190                .sync_kv_log_store_buffer_unconsumed_item_count
191                .with_guarded_label_values(labels);
192            let buffer_unconsumed_row_count = metrics
193                .sync_kv_log_store_buffer_unconsumed_row_count
194                .with_guarded_label_values(labels);
195            let buffer_unconsumed_epoch_count = metrics
196                .sync_kv_log_store_buffer_unconsumed_epoch_count
197                .with_guarded_label_values(labels);
198            let buffer_unconsumed_min_epoch = metrics
199                .sync_kv_log_store_buffer_unconsumed_min_epoch
200                .with_guarded_label_values(labels);
201            let buffer_memory_bytes = metrics
202                .sync_kv_log_store_buffer_memory_bytes
203                .with_guarded_label_values(labels);
204            let buffer_read_count = metrics
205                .sync_kv_log_store_read_count
206                .with_guarded_label_values(&[
207                    "buffer",
208                    &actor_id_str,
209                    target,
210                    &fragment_id_str,
211                    name,
212                ]);
213
214            let buffer_read_size = metrics
215                .sync_kv_log_store_read_size
216                .with_guarded_label_values(&[
217                    "buffer",
218                    &actor_id_str,
219                    target,
220                    &fragment_id_str,
221                    name,
222                ]);
223
224            let total_read_count = metrics
225                .sync_kv_log_store_read_count
226                .with_guarded_label_values(&[
227                    "total",
228                    &actor_id_str,
229                    target,
230                    &fragment_id_str,
231                    name,
232                ]);
233
234            let total_read_size = metrics
235                .sync_kv_log_store_read_size
236                .with_guarded_label_values(&[
237                    "total",
238                    &actor_id_str,
239                    target,
240                    &fragment_id_str,
241                    name,
242                ]);
243
244            const READ_PERSISTENT_LOG: &str = "persistent_log";
245            const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
246
247            let persistent_log_read_size = metrics
248                .sync_kv_log_store_read_size
249                .with_guarded_label_values(&[
250                    READ_PERSISTENT_LOG,
251                    &actor_id_str,
252                    target,
253                    &fragment_id_str,
254                    name,
255                ]);
256
257            let persistent_log_read_count = metrics
258                .sync_kv_log_store_read_count
259                .with_guarded_label_values(&[
260                    READ_PERSISTENT_LOG,
261                    &actor_id_str,
262                    target,
263                    &fragment_id_str,
264                    name,
265                ]);
266
267            let flushed_buffer_read_size = metrics
268                .sync_kv_log_store_read_size
269                .with_guarded_label_values(&[
270                    READ_FLUSHED_BUFFER,
271                    &actor_id_str,
272                    target,
273                    &fragment_id_str,
274                    name,
275                ]);
276
277            let flushed_buffer_read_count = metrics
278                .sync_kv_log_store_read_count
279                .with_guarded_label_values(&[
280                    READ_FLUSHED_BUFFER,
281                    &actor_id_str,
282                    target,
283                    &fragment_id_str,
284                    name,
285                ]);
286
287            Self {
288                unclean_state,
289                clean_state,
290                wait_next_poll_ns,
291                storage_write_size,
292                storage_write_count,
293                pause_duration_ns: storage_pause_duration_ns,
294                buffer_unconsumed_item_count,
295                buffer_unconsumed_row_count,
296                buffer_unconsumed_epoch_count,
297                buffer_unconsumed_min_epoch,
298                buffer_memory_bytes,
299                buffer_read_count,
300                buffer_read_size,
301                total_read_count,
302                total_read_size,
303                persistent_log_read_metrics: KvLogStoreReadMetrics {
304                    storage_read_size: persistent_log_read_size,
305                    storage_read_count: persistent_log_read_count,
306                },
307                flushed_buffer_read_metrics: KvLogStoreReadMetrics {
308                    storage_read_count: flushed_buffer_read_count,
309                    storage_read_size: flushed_buffer_read_size,
310                },
311            }
312        }
313
314        #[cfg(test)]
315        pub(crate) fn for_test() -> Self {
316            SyncedKvLogStoreMetrics {
317                unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
318                clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
319                wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
320                storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
321                storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
322                pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
323                buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
324                buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
325                buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
326                buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
327                buffer_memory_bytes: LabelGuardedIntGauge::test_int_gauge::<4>(),
328                buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
329                buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
330                total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
331                total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
332                persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
333                flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
334            }
335        }
336    }
337}
338
339pub(crate) type ReadFlushedChunkFuture =
340    BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
341
342pub struct SyncKvLogStoreContext<S: StateStore> {
343    pub table_id: TableId,
344    pub fragment_id: FragmentId,
345    pub metrics: SyncedKvLogStoreMetrics,
346    pub serde: LogStoreRowSerde,
347    pub state_store: S,
348    pub max_buffer_size: usize,
349    pub chunk_size: usize,
350    pub pause_duration_ms: Duration,
351    pub aligned: bool,
352}
353
354pub struct SyncedKvLogStoreExecutor<S: StateStore> {
355    actor_context: ActorContextRef,
356    upstream: Executor,
357    logstore_context: SyncKvLogStoreContext<S>,
358}
359
360// Stream interface
361impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
362    #[expect(clippy::too_many_arguments)]
363    pub(crate) fn new(
364        actor_context: ActorContextRef,
365        table_id: TableId,
366        metrics: SyncedKvLogStoreMetrics,
367        serde: LogStoreRowSerde,
368        state_store: S,
369        buffer_size: usize,
370        chunk_size: usize,
371        upstream: Executor,
372        pause_duration_ms: Duration,
373        aligned: bool,
374    ) -> Self {
375        let logstore_context = SyncKvLogStoreContext {
376            table_id,
377            fragment_id: actor_context.fragment_id,
378            metrics,
379            serde,
380            state_store,
381            max_buffer_size: buffer_size,
382            chunk_size,
383            pause_duration_ms,
384            aligned,
385        };
386        Self {
387            actor_context,
388            upstream,
389            logstore_context,
390        }
391    }
392}
393
394pub(crate) struct FlushedChunkInfo {
395    epoch: u64,
396    start_seq_id: SeqId,
397    end_seq_id: SeqId,
398    flush_info: FlushInfo,
399    vnode_bitmap: Bitmap,
400}
401
402pub(crate) enum WriteFuture<S: LocalStateStore> {
403    /// We trigger a brief pause to let the `ReadFuture` be polled in the following scenarios:
404    /// - When seeing an upstream data chunk, when the buffer becomes full, and the state is clean.
405    /// - When seeing a checkpoint barrier, when the buffer is not empty, and the state is clean.
406    ///
407    /// On pausing, we will transition to a dirty state.
408    ///
409    /// We trigger resume to let the `ReadFuture` to be polled in the following scenarios:
410    /// - After the pause duration.
411    /// - After the read future consumes a chunk.
412    Paused {
413        start_instant: Instant,
414        sleep_future: Option<Pin<Box<Sleep>>>,
415        barrier: Barrier,
416        stream: BoxedMessageStream,
417        write_state: LogStoreWriteState<S>, // Just used to hold the state
418    },
419    ReceiveFromUpstream {
420        future: StreamFuture<BoxedMessageStream>,
421        write_state: LogStoreWriteState<S>,
422    },
423    FlushingChunk {
424        epoch: u64,
425        start_seq_id: SeqId,
426        end_seq_id: SeqId,
427        future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
428        stream: BoxedMessageStream,
429    },
430    Empty,
431}
432
433pub(crate) enum WriteFutureEvent {
434    UpstreamMessageReceived(Message),
435    ChunkFlushed(FlushedChunkInfo),
436}
437
438impl<S: LocalStateStore> WriteFuture<S> {
439    fn flush_chunk(
440        stream: BoxedMessageStream,
441        write_state: LogStoreWriteState<S>,
442        chunk: StreamChunk,
443        epoch: u64,
444        start_seq_id: SeqId,
445        end_seq_id: SeqId,
446    ) -> Self {
447        tracing::trace!(
448            start_seq_id,
449            end_seq_id,
450            epoch,
451            cardinality = chunk.cardinality(),
452            "write_future: flushing chunk"
453        );
454        Self::FlushingChunk {
455            epoch,
456            start_seq_id,
457            end_seq_id,
458            future: Box::pin(write_state.into_write_chunk_future(
459                chunk,
460                epoch,
461                start_seq_id,
462                end_seq_id,
463            )),
464            stream,
465        }
466    }
467
468    pub(crate) fn receive_from_upstream(
469        stream: BoxedMessageStream,
470        write_state: LogStoreWriteState<S>,
471    ) -> Self {
472        Self::ReceiveFromUpstream {
473            future: stream.into_future(),
474            write_state,
475        }
476    }
477
478    pub(crate) fn paused(
479        duration: Duration,
480        barrier: Barrier,
481        stream: BoxedMessageStream,
482        write_state: LogStoreWriteState<S>,
483    ) -> Self {
484        let now = Instant::now();
485        tracing::trace!(?now, ?duration, "write_future_pause");
486        Self::Paused {
487            start_instant: now,
488            sleep_future: Some(Box::pin(sleep_until(now + duration))),
489            barrier,
490            stream,
491            write_state,
492        }
493    }
494
495    pub(crate) async fn next_event(
496        &mut self,
497        metrics: &SyncedKvLogStoreMetrics,
498    ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
499        match self {
500            WriteFuture::Paused {
501                start_instant,
502                sleep_future,
503                ..
504            } => {
505                if let Some(sleep_future) = sleep_future {
506                    sleep_future.await;
507                    metrics
508                        .pause_duration_ns
509                        .inc_by(start_instant.elapsed().as_nanos() as _);
510                    tracing::trace!("resuming write future");
511                }
512                must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
513                    Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
514                })
515            }
516            WriteFuture::ReceiveFromUpstream { future, .. } => {
517                let (opt, stream) = future.await;
518                must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
519                    opt
520                    .ok_or_else(|| anyhow!("end of upstream input").into())
521                    .and_then(|result| result.map(|item| {
522                        (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
523                    }))
524                })
525            }
526            WriteFuture::FlushingChunk { future, .. } => {
527                let (write_state, result) = future.await;
528                let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, ..  } => {
529                    result.map(|(flush_info, vnode_bitmap)| {
530                        (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
531                            epoch,
532                            start_seq_id,
533                            end_seq_id,
534                            flush_info,
535                            vnode_bitmap,
536                        }))
537                    })
538                });
539                result.map_err(Into::into)
540            }
541            WriteFuture::Empty => {
542                unreachable!("should not be polled after ready")
543            }
544        }
545    }
546}
547
548pub(crate) type LocalLogStoreReadState<S> =
549    LogStoreReadState<<<S as StateStore>::Local as LocalStateStore>::FlushedSnapshotReader>;
550pub(crate) type LocalLogStoreWriteState<S> = LogStoreWriteState<<S as StateStore>::Local>;
551
552// Stream interface
553impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
554    pub(crate) async fn init_local_log_store_state(
555        context: &SyncKvLogStoreContext<S>,
556        first_write_epoch: EpochPair,
557    ) -> StreamExecutorResult<(LocalLogStoreReadState<S>, LocalLogStoreWriteState<S>)> {
558        let local_state_store = context
559            .state_store
560            .new_local(NewLocalOptions {
561                table_id: context.table_id,
562                fragment_id: context.fragment_id,
563                op_consistency_level: OpConsistencyLevel::Inconsistent,
564                table_option: TableOption {
565                    retention_seconds: None,
566                },
567                is_replicated: false,
568                vnodes: context.serde.vnodes().clone(),
569                upload_on_flush: false,
570            })
571            .await;
572
573        let (read_state, mut initial_write_state) = new_log_store_state(
574            context.table_id,
575            local_state_store,
576            context.serde.clone(),
577            context.chunk_size,
578        );
579        initial_write_state.init(first_write_epoch).await?;
580        Ok((read_state, initial_write_state))
581    }
582
583    #[try_stream(ok = Message, error = StreamExecutorError)]
584    pub(crate) async fn aligned_message_stream(
585        actor_id: ActorId,
586        input: BoxedMessageStream,
587        read_state: LocalLogStoreReadState<S>,
588        mut initial_write_state: LocalLogStoreWriteState<S>,
589        metrics: SyncedKvLogStoreMetrics,
590        initial_write_epoch: EpochPair,
591    ) {
592        tracing::info!("aligned mode");
593        // We want to realign the buffer and the stream.
594        // We just block the upstream input stream,
595        // and wait until the persisted logstore is empty.
596        // Then after that we can consume the input stream.
597        let log_store_stream = read_state
598            .read_persisted_log_store(
599                metrics.persistent_log_read_metrics.clone(),
600                initial_write_epoch.curr,
601                LogStoreReadStateStreamRangeStart::Unbounded,
602            )
603            .await?;
604
605        #[for_await]
606        for message in log_store_stream {
607            let (_epoch, message) = message?;
608            match message {
609                KvLogStoreItem::Barrier { .. } => {
610                    continue;
611                }
612                KvLogStoreItem::StreamChunk { chunk, .. } => {
613                    yield Message::Chunk(chunk);
614                }
615            }
616        }
617
618        let mut realigned_logstore = false;
619
620        #[for_await]
621        for message in input {
622            match message? {
623                Message::Barrier(barrier) => {
624                    let is_checkpoint = barrier.is_checkpoint();
625                    let mut progress = LogStoreVnodeProgress::None;
626                    progress.apply_aligned(read_state.vnodes().clone(), barrier.epoch.prev, None);
627                    // Truncate the logstore.
628                    let post_seal =
629                        initial_write_state.seal_current_epoch(barrier.epoch.curr, progress.take());
630                    barrier.assume_no_update_vnode_bitmap(actor_id)?;
631                    yield Message::Barrier(barrier);
632                    post_seal.post_yield_barrier(None).await?;
633                    if !realigned_logstore && is_checkpoint {
634                        realigned_logstore = true;
635                        tracing::info!("realigned logstore");
636                    }
637                }
638                Message::Chunk(chunk) => {
639                    yield Message::Chunk(chunk);
640                }
641                Message::Watermark(watermark) => {
642                    yield Message::Watermark(watermark);
643                }
644            }
645        }
646    }
647
648    pub(crate) fn apply_pause_resume_mutation(barrier: &Barrier, pause_stream: &mut bool) {
649        if let Some(mutation) = barrier.mutation.as_deref() {
650            match mutation {
651                Mutation::Pause => {
652                    *pause_stream = true;
653                }
654                Mutation::Resume => {
655                    *pause_stream = false;
656                }
657                _ => {}
658            }
659        }
660    }
661
662    pub(crate) fn process_upstream_chunk(
663        seq_id: SeqId,
664        stream: BoxedMessageStream,
665        write_state: LogStoreWriteState<S::Local>,
666        chunk: StreamChunk,
667        buffer: &mut SyncedLogStoreBuffer,
668    ) -> (SeqId, WriteFuture<S::Local>) {
669        let cardinality = chunk.cardinality();
670        if cardinality == 0 {
671            tracing::warn!(
672                epoch = write_state.epoch().curr,
673                "received empty chunk (cardinality=0), skipping"
674            );
675            return (
676                seq_id,
677                WriteFuture::receive_from_upstream(stream, write_state),
678            );
679        }
680
681        let start_seq_id = seq_id;
682        let new_seq_id = seq_id + cardinality as SeqId;
683        let end_seq_id = new_seq_id - 1;
684        let epoch = write_state.epoch().curr;
685        tracing::trace!(
686            start_seq_id,
687            end_seq_id,
688            new_seq_id,
689            epoch,
690            cardinality,
691            "received chunk"
692        );
693        let next_write_future = if let Some(chunk_to_flush) =
694            buffer.add_or_flush_chunk(start_seq_id, end_seq_id, chunk, epoch)
695        {
696            WriteFuture::flush_chunk(
697                stream,
698                write_state,
699                chunk_to_flush,
700                epoch,
701                start_seq_id,
702                end_seq_id,
703            )
704        } else {
705            WriteFuture::receive_from_upstream(stream, write_state)
706        };
707        (new_seq_id, next_write_future)
708    }
709
710    pub(crate) fn process_flushed_chunk(
711        stream: BoxedMessageStream,
712        write_state: LogStoreWriteState<S::Local>,
713        info: FlushedChunkInfo,
714        buffer: &mut SyncedLogStoreBuffer,
715        metrics: &SyncedKvLogStoreMetrics,
716    ) -> WriteFuture<S::Local> {
717        buffer.add_flushed_item_to_buffer(
718            info.start_seq_id,
719            info.end_seq_id,
720            info.vnode_bitmap,
721            info.epoch,
722        );
723        metrics
724            .storage_write_count
725            .inc_by(info.flush_info.flush_count as _);
726        metrics
727            .storage_write_size
728            .inc_by(info.flush_info.flush_size as _);
729        WriteFuture::receive_from_upstream(stream, write_state)
730    }
731
732    #[try_stream(ok= Message, error = StreamExecutorError)]
733    pub async fn execute_monitored(self) {
734        let wait_next_poll_ns = self.logstore_context.metrics.wait_next_poll_ns.clone();
735        #[for_await]
736        for message in self.execute_inner() {
737            let current_time = Instant::now();
738            yield message?;
739            wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
740        }
741    }
742
743    #[try_stream(ok = Message, error = StreamExecutorError)]
744    async fn execute_inner(self) {
745        let mut input = self.upstream.execute();
746
747        // init first epoch + local state store
748        let first_barrier = expect_first_barrier(&mut input).await?;
749        let first_write_epoch = first_barrier.epoch;
750        yield Message::Barrier(first_barrier.clone());
751
752        let (read_state, initial_write_state) =
753            Self::init_local_log_store_state(&self.logstore_context, first_write_epoch).await?;
754
755        let mut pause_stream = first_barrier.is_pause_on_startup();
756        let initial_write_epoch = first_write_epoch;
757
758        if self.logstore_context.aligned {
759            let aligned_stream = Self::aligned_message_stream(
760                self.actor_context.id,
761                input,
762                read_state,
763                initial_write_state,
764                self.logstore_context.metrics.clone(),
765                initial_write_epoch,
766            );
767            #[for_await]
768            for message in aligned_stream {
769                yield message?;
770            }
771            return Ok(());
772        }
773
774        let mut seq_id = FIRST_SEQ_ID;
775        let mut buffer = SyncedLogStoreBuffer::new(
776            self.logstore_context.max_buffer_size,
777            self.logstore_context.chunk_size,
778            &self.logstore_context.metrics,
779        );
780
781        let log_store_stream = read_state
782            .read_persisted_log_store(
783                self.logstore_context
784                    .metrics
785                    .persistent_log_read_metrics
786                    .clone(),
787                initial_write_epoch.curr,
788                LogStoreReadStateStreamRangeStart::Unbounded,
789            )
790            .await?;
791
792        let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
793        let mut clean_state = log_store_stream.peek().await.is_none();
794        tracing::trace!(?clean_state);
795
796        let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
797
798        let mut write_future_state = WriteFuture::receive_from_upstream(input, initial_write_state);
799
800        let mut progress = LogStoreVnodeProgress::None;
801
802        loop {
803            let select_result = {
804                let read_future = async {
805                    if pause_stream {
806                        pending().await
807                    } else {
808                        read_future_state
809                            .next_chunk(
810                                &mut progress,
811                                &read_state,
812                                &mut buffer,
813                                &self.logstore_context.metrics,
814                            )
815                            .await
816                    }
817                };
818                pin_mut!(read_future);
819                let write_future = write_future_state.next_event(&self.logstore_context.metrics);
820                pin_mut!(write_future);
821                let output = select(write_future, read_future).await;
822                drop_either_future(output)
823            };
824            match select_result {
825                Either::Left(result) => {
826                    // drop the future to ensure that the future must be reset later
827                    drop(write_future_state);
828                    let (stream, mut write_state, either) = result?;
829                    match either {
830                        WriteFutureEvent::UpstreamMessageReceived(msg) => match msg {
831                            Message::Barrier(barrier) => {
832                                if clean_state && barrier.kind.is_checkpoint() && !buffer.is_empty()
833                                {
834                                    write_future_state = WriteFuture::paused(
835                                        self.logstore_context.pause_duration_ms,
836                                        barrier,
837                                        stream,
838                                        write_state,
839                                    );
840                                    clean_state = false;
841                                    self.logstore_context.metrics.unclean_state.inc();
842                                } else {
843                                    Self::apply_pause_resume_mutation(&barrier, &mut pause_stream);
844                                    let write_state_post_write_barrier = Self::write_barrier(
845                                        self.actor_context.id,
846                                        &mut write_state,
847                                        barrier.clone(),
848                                        &self.logstore_context.metrics,
849                                        progress.take(),
850                                        &mut buffer,
851                                    )
852                                    .await?;
853                                    seq_id = FIRST_SEQ_ID;
854                                    barrier.assume_no_update_vnode_bitmap(self.actor_context.id)?;
855
856                                    yield Message::Barrier(barrier);
857
858                                    write_state_post_write_barrier
859                                        .post_yield_barrier(None)
860                                        .await?;
861
862                                    write_future_state =
863                                        WriteFuture::receive_from_upstream(stream, write_state);
864                                }
865                            }
866                            Message::Chunk(chunk) => {
867                                let (new_seq_id, next_write_future) = Self::process_upstream_chunk(
868                                    seq_id,
869                                    stream,
870                                    write_state,
871                                    chunk,
872                                    &mut buffer,
873                                );
874                                seq_id = new_seq_id;
875                                write_future_state = next_write_future;
876                            }
877                            // TODO: handle upstream watermark in sync log store.
878                            Message::Watermark(_watermark) => {
879                                write_future_state =
880                                    WriteFuture::receive_from_upstream(stream, write_state);
881                            }
882                        },
883                        WriteFutureEvent::ChunkFlushed(info) => {
884                            write_future_state = Self::process_flushed_chunk(
885                                stream,
886                                write_state,
887                                info,
888                                &mut buffer,
889                                &self.logstore_context.metrics,
890                            );
891                        }
892                    }
893                }
894                Either::Right(result) => {
895                    let clean_state_reached = read_future_state.mark_clean_state(
896                        &mut clean_state,
897                        &buffer,
898                        &self.logstore_context.metrics,
899                    );
900
901                    if clean_state_reached {
902                        // Let write future resume immediately
903                        if let WriteFuture::Paused { sleep_future, .. } = &mut write_future_state {
904                            tracing::trace!("resuming paused future");
905                            assert!(buffer.has_available_capacity());
906                            *sleep_future = None;
907                        }
908                    }
909                    let chunk = result?;
910                    self.logstore_context
911                        .metrics
912                        .total_read_count
913                        .inc_by(chunk.cardinality() as _);
914
915                    yield Message::Chunk(chunk);
916                }
917            }
918        }
919    }
920}
921
922pub(crate) type PersistedStream<S> =
923    Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
924
925pub(crate) enum ReadFuture<S: StateStoreRead> {
926    ReadingPersistedStream(PersistedStream<S>),
927    ReadingFlushedChunk {
928        future: ReadFlushedChunkFuture,
929        end_seq_id: SeqId,
930    },
931    Idle,
932}
933
934// Read methods
935impl<S: StateStoreRead> ReadFuture<S> {
936    pub(crate) fn mark_clean_state(
937        &self,
938        clean_state: &mut bool,
939        buffer: &SyncedLogStoreBuffer,
940        metrics: &SyncedKvLogStoreMetrics,
941    ) -> bool {
942        if !*clean_state && matches!(self, ReadFuture::Idle) && buffer.is_empty() {
943            *clean_state = true;
944            metrics.clean_state.inc();
945            true
946        } else {
947            false
948        }
949    }
950
951    pub(crate) async fn next_chunk(
952        &mut self,
953        progress: &mut LogStoreVnodeProgress,
954        read_state: &LogStoreReadState<S>,
955        buffer: &mut SyncedLogStoreBuffer,
956        metrics: &SyncedKvLogStoreMetrics,
957    ) -> StreamExecutorResult<StreamChunk> {
958        match self {
959            ReadFuture::ReadingPersistedStream(stream) => {
960                while let Some((epoch, item)) = stream.try_next().await? {
961                    match item {
962                        KvLogStoreItem::Barrier { vnodes, .. } => {
963                            tracing::trace!(epoch, "read logstore barrier");
964                            // update the progress
965                            progress.apply_aligned(vnodes, epoch, None);
966                            continue;
967                        }
968                        KvLogStoreItem::StreamChunk {
969                            chunk,
970                            progress: chunk_progress,
971                        } => {
972                            tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
973                            progress.apply_per_vnode(epoch, chunk_progress);
974                            return Ok(chunk);
975                        }
976                    }
977                }
978                *self = ReadFuture::Idle;
979            }
980            ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
981        }
982        match self {
983            ReadFuture::ReadingPersistedStream(_) => {
984                unreachable!("must have finished read persisted stream when reaching here")
985            }
986            ReadFuture::ReadingFlushedChunk { .. } => {}
987            ReadFuture::Idle => loop {
988                let Some((item_epoch, item)) = buffer.pop_front() else {
989                    return pending().await;
990                };
991                match item {
992                    LogStoreBufferItem::StreamChunk {
993                        chunk,
994                        start_seq_id,
995                        end_seq_id,
996                        flushed,
997                        ..
998                    } => {
999                        metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
1000                        tracing::trace!(
1001                            start_seq_id,
1002                            end_seq_id,
1003                            flushed,
1004                            cardinality = chunk.cardinality(),
1005                            "read buffered chunk of size"
1006                        );
1007                        progress.apply_aligned(
1008                            read_state.vnodes().clone(),
1009                            item_epoch,
1010                            Some(end_seq_id),
1011                        );
1012                        return Ok(chunk);
1013                    }
1014                    LogStoreBufferItem::Flushed {
1015                        vnode_bitmap,
1016                        start_seq_id,
1017                        end_seq_id,
1018                        chunk_id,
1019                    } => {
1020                        tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
1021                        let read_metrics = metrics.flushed_buffer_read_metrics.clone();
1022                        let future = read_state
1023                            .read_flushed_chunk(
1024                                vnode_bitmap,
1025                                chunk_id,
1026                                start_seq_id,
1027                                end_seq_id,
1028                                item_epoch,
1029                                read_metrics,
1030                            )
1031                            .boxed();
1032                        *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
1033                        break;
1034                    }
1035                    LogStoreBufferItem::Barrier { .. } => {
1036                        tracing::trace!(item_epoch, "read buffer barrier");
1037                        progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
1038                        continue;
1039                    }
1040                }
1041            },
1042        }
1043
1044        let (future, end_seq_id) = match self {
1045            ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
1046                unreachable!("should be at ReadingFlushedChunk")
1047            }
1048            ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
1049        };
1050
1051        let (_, chunk, epoch) = future.await?;
1052        progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
1053        tracing::trace!(
1054            end_seq_id,
1055            "read flushed chunk of size: {}",
1056            chunk.cardinality()
1057        );
1058        *self = ReadFuture::Idle;
1059        Ok(chunk)
1060    }
1061}
1062
1063// Write methods
1064impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
1065    pub(crate) async fn write_barrier<'a>(
1066        actor_id: ActorId,
1067        write_state: &'a mut LogStoreWriteState<S::Local>,
1068        barrier: Barrier,
1069        metrics: &SyncedKvLogStoreMetrics,
1070        progress: LogStoreVnodeProgress,
1071        buffer: &mut SyncedLogStoreBuffer,
1072    ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1073        tracing::trace!(%actor_id, ?progress, "applying truncation");
1074        // TODO(kwannoel): As an optimization we can also change flushed chunks to be flushed items
1075        // to reduce memory consumption of logstore.
1076
1077        let epoch = barrier.epoch.prev;
1078        let mut writer = write_state.start_writer(false);
1079        writer.write_barrier(epoch, barrier.is_checkpoint())?;
1080
1081        if barrier.is_checkpoint() {
1082            for (epoch, item) in buffer.buffer.iter_mut().rev() {
1083                match item {
1084                    LogStoreBufferItem::StreamChunk {
1085                        chunk,
1086                        start_seq_id,
1087                        end_seq_id,
1088                        flushed,
1089                        ..
1090                    } => {
1091                        if !*flushed {
1092                            writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1093                            *flushed = true;
1094                        } else {
1095                            break;
1096                        }
1097                    }
1098                    LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1099                }
1100            }
1101        }
1102
1103        // Apply truncation
1104        let (flush_info, _) = writer.finish().await?;
1105        metrics
1106            .storage_write_count
1107            .inc_by(flush_info.flush_count as _);
1108        metrics
1109            .storage_write_size
1110            .inc_by(flush_info.flush_size as _);
1111        let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1112
1113        // Add to buffer
1114        buffer.buffer.push_back((
1115            epoch,
1116            LogStoreBufferItem::Barrier {
1117                is_checkpoint: barrier.is_checkpoint(),
1118                next_epoch: barrier.epoch.curr,
1119                schema_change: None,
1120                is_stop: false,
1121            },
1122        ));
1123        buffer.next_chunk_id = 0;
1124        buffer.update_buffer_metrics();
1125
1126        Ok(post_seal)
1127    }
1128}
1129
1130pub(crate) struct SyncedLogStoreBuffer {
1131    buffer: VecDeque<(u64, LogStoreBufferItem)>,
1132    current_size: usize,
1133    max_size: usize,
1134    max_chunk_size: usize,
1135    next_chunk_id: ChunkId,
1136    metrics: SyncedKvLogStoreMetrics,
1137    flushed_count: usize,
1138}
1139
1140impl SyncedLogStoreBuffer {
1141    pub fn new(
1142        max_buffer_size: usize,
1143        chunk_size: usize,
1144        metrics: &SyncedKvLogStoreMetrics,
1145    ) -> Self {
1146        SyncedLogStoreBuffer {
1147            buffer: VecDeque::new(),
1148            current_size: 0,
1149            max_size: max_buffer_size,
1150            max_chunk_size: chunk_size,
1151            next_chunk_id: 0,
1152            metrics: metrics.clone(),
1153            flushed_count: 0,
1154        }
1155    }
1156
1157    pub fn is_empty(&self) -> bool {
1158        self.current_size == 0
1159    }
1160
1161    pub fn has_available_capacity(&self) -> bool {
1162        self.current_size < self.max_size
1163    }
1164
1165    fn add_or_flush_chunk(
1166        &mut self,
1167        start_seq_id: SeqId,
1168        end_seq_id: SeqId,
1169        chunk: StreamChunk,
1170        epoch: u64,
1171    ) -> Option<StreamChunk> {
1172        let current_size = self.current_size;
1173        let chunk_size = chunk.cardinality();
1174
1175        tracing::trace!(
1176            current_size,
1177            chunk_size,
1178            max_size = self.max_size,
1179            "checking chunk size"
1180        );
1181        let should_flush_chunk = current_size + chunk_size > self.max_size;
1182        if should_flush_chunk {
1183            tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1184            Some(chunk)
1185        } else {
1186            tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1187            self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1188            None
1189        }
1190    }
1191
1192    /// After flushing a chunk, we will preserve a `FlushedItem` inside the buffer.
1193    /// This doesn't contain any data, but it contains the metadata to read the flushed chunk.
1194    fn add_flushed_item_to_buffer(
1195        &mut self,
1196        start_seq_id: SeqId,
1197        end_seq_id: SeqId,
1198        new_vnode_bitmap: Bitmap,
1199        epoch: u64,
1200    ) {
1201        let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1202
1203        if let Some((
1204            item_epoch,
1205            LogStoreBufferItem::Flushed {
1206                start_seq_id: prev_start_seq_id,
1207                end_seq_id: prev_end_seq_id,
1208                vnode_bitmap,
1209                ..
1210            },
1211        )) = self.buffer.back_mut()
1212            && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1213            && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1214            && projected_flushed_chunk_size <= self.max_chunk_size
1215        {
1216            assert!(
1217                *prev_end_seq_id < start_seq_id,
1218                "prev end_seq_id {} should be smaller than current start_seq_id {}",
1219                end_seq_id,
1220                start_seq_id
1221            );
1222            assert_eq!(
1223                epoch, *item_epoch,
1224                "epoch of newly added flushed item must be the same as the last flushed item"
1225            );
1226            *prev_end_seq_id = end_seq_id;
1227            *vnode_bitmap |= new_vnode_bitmap;
1228        } else {
1229            let chunk_id = self.next_chunk_id;
1230            self.next_chunk_id += 1;
1231            self.buffer.push_back((
1232                epoch,
1233                LogStoreBufferItem::Flushed {
1234                    start_seq_id,
1235                    end_seq_id,
1236                    vnode_bitmap: new_vnode_bitmap,
1237                    chunk_id,
1238                },
1239            ));
1240            self.flushed_count += 1;
1241            tracing::trace!(
1242                "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1243            );
1244        }
1245        // FIXME(kwannoel): Seems these metrics are updated _after_ the flush info is reported.
1246        self.update_buffer_metrics();
1247    }
1248
1249    fn add_chunk_to_buffer(
1250        &mut self,
1251        chunk: StreamChunk,
1252        start_seq_id: SeqId,
1253        end_seq_id: SeqId,
1254        epoch: u64,
1255    ) {
1256        let chunk_id = self.next_chunk_id;
1257        self.next_chunk_id += 1;
1258        self.current_size += chunk.cardinality();
1259        self.buffer.push_back((
1260            epoch,
1261            LogStoreBufferItem::StreamChunk {
1262                chunk,
1263                start_seq_id,
1264                end_seq_id,
1265                flushed: false,
1266                chunk_id,
1267            },
1268        ));
1269        self.update_buffer_metrics();
1270    }
1271
1272    fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1273        let item = self.buffer.pop_front();
1274        match &item {
1275            Some((_, LogStoreBufferItem::Flushed { .. })) => {
1276                self.flushed_count -= 1;
1277            }
1278            Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1279                self.current_size -= chunk.cardinality();
1280            }
1281            _ => {}
1282        }
1283        self.update_buffer_metrics();
1284        item
1285    }
1286
1287    fn update_buffer_metrics(&self) {
1288        let mut epoch_count = 0;
1289        let mut row_count = 0;
1290        let mut memory_bytes = 0;
1291        for (_, item) in &self.buffer {
1292            match item {
1293                LogStoreBufferItem::StreamChunk { chunk, .. } => {
1294                    row_count += chunk.cardinality();
1295                }
1296                LogStoreBufferItem::Flushed {
1297                    start_seq_id,
1298                    end_seq_id,
1299                    ..
1300                } => {
1301                    row_count += (end_seq_id - start_seq_id) as usize;
1302                }
1303                LogStoreBufferItem::Barrier { .. } => {
1304                    epoch_count += 1;
1305                }
1306            }
1307            memory_bytes += item.estimated_size();
1308        }
1309        self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1310        self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1311        self.metrics
1312            .buffer_unconsumed_item_count
1313            .set(self.buffer.len() as _);
1314        self.metrics.buffer_unconsumed_min_epoch.set(
1315            self.buffer
1316                .front()
1317                .map(|(epoch, _)| *epoch)
1318                .unwrap_or_default() as _,
1319        );
1320        self.metrics.buffer_memory_bytes.set(memory_bytes as _);
1321    }
1322}
1323
1324impl<S> Execute for SyncedKvLogStoreExecutor<S>
1325where
1326    S: StateStore,
1327{
1328    fn execute(self: Box<Self>) -> BoxedMessageStream {
1329        self.execute_monitored().boxed()
1330    }
1331}
1332
1333#[cfg(test)]
1334mod tests {
1335    use itertools::Itertools;
1336    use pretty_assertions::assert_eq;
1337    use risingwave_common::catalog::Field;
1338    use risingwave_common::hash::VirtualNode;
1339    use risingwave_common::test_prelude::*;
1340    use risingwave_common::util::epoch::test_epoch;
1341    use risingwave_storage::memory::MemoryStateStore;
1342
1343    use super::*;
1344    use crate::assert_stream_chunk_eq;
1345    use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1346    use crate::common::log_store_impl::kv_log_store::test_utils::{
1347        check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1348    };
1349    use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1350    use crate::executor::test_utils::MockSource;
1351
1352    fn init_logger() {
1353        let _ = tracing_subscriber::fmt()
1354            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1355            .with_ansi(false)
1356            .try_init();
1357    }
1358
1359    // test read/write buffer
1360    #[tokio::test]
1361    async fn test_read_write_buffer() {
1362        init_logger();
1363
1364        let pk_info = &KV_LOG_STORE_V2_INFO;
1365        let column_descs = test_payload_schema(pk_info);
1366        let fields = column_descs
1367            .into_iter()
1368            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1369            .collect_vec();
1370        let schema = Schema { fields };
1371        let stream_key = vec![0];
1372        let (mut tx, source) = MockSource::channel();
1373        let source = source.into_executor(schema.clone(), stream_key.clone());
1374
1375        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1376
1377        let table = gen_test_log_store_table(pk_info);
1378
1379        let log_store_executor = SyncedKvLogStoreExecutor::new(
1380            ActorContext::for_test(123),
1381            table.id,
1382            SyncedKvLogStoreMetrics::for_test(),
1383            LogStoreRowSerde::new(&table, vnodes, pk_info),
1384            MemoryStateStore::new(),
1385            10,
1386            256,
1387            source,
1388            Duration::from_millis(256),
1389            false,
1390        )
1391        .boxed();
1392
1393        // Init
1394        tx.push_barrier(test_epoch(1), false);
1395
1396        let chunk_1 = StreamChunk::from_pretty(
1397            "  I   T
1398            +  5  10
1399            +  6  10
1400            +  8  10
1401            +  9  10
1402            +  10 11",
1403        );
1404
1405        let chunk_2 = StreamChunk::from_pretty(
1406            "   I   T
1407            -   5  10
1408            -   6  10
1409            -   8  10
1410            U-  9  10
1411            U+ 10  11",
1412        );
1413
1414        tx.push_chunk(chunk_1.clone());
1415        tx.push_chunk(chunk_2.clone());
1416
1417        let mut stream = log_store_executor.execute();
1418
1419        match stream.next().await {
1420            Some(Ok(Message::Barrier(barrier))) => {
1421                assert_eq!(barrier.epoch.curr, test_epoch(1));
1422            }
1423            other => panic!("Expected a barrier message, got {:?}", other),
1424        }
1425
1426        match stream.next().await {
1427            Some(Ok(Message::Chunk(chunk))) => {
1428                assert_stream_chunk_eq!(chunk, chunk_1);
1429            }
1430            other => panic!("Expected a chunk message, got {:?}", other),
1431        }
1432
1433        match stream.next().await {
1434            Some(Ok(Message::Chunk(chunk))) => {
1435                assert_stream_chunk_eq!(chunk, chunk_2);
1436            }
1437            other => panic!("Expected a chunk message, got {:?}", other),
1438        }
1439
1440        tx.push_barrier(test_epoch(2), false);
1441
1442        match stream.next().await {
1443            Some(Ok(Message::Barrier(barrier))) => {
1444                assert_eq!(barrier.epoch.curr, test_epoch(2));
1445            }
1446            other => panic!("Expected a barrier message, got {:?}", other),
1447        }
1448    }
1449
1450    // test barrier persisted read
1451    //
1452    // sequence of events (earliest -> latest):
1453    // barrier(1) -> chunk(1) -> chunk(2) -> poll(3) items -> barrier(2) -> poll(1) item
1454    // * poll just means we read from the executor stream.
1455    #[tokio::test]
1456    async fn test_barrier_persisted_read() {
1457        init_logger();
1458
1459        let pk_info = &KV_LOG_STORE_V2_INFO;
1460        let column_descs = test_payload_schema(pk_info);
1461        let fields = column_descs
1462            .into_iter()
1463            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1464            .collect_vec();
1465        let schema = Schema { fields };
1466        let stream_key = vec![0];
1467        let (mut tx, source) = MockSource::channel();
1468        let source = source.into_executor(schema.clone(), stream_key.clone());
1469
1470        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1471
1472        let table = gen_test_log_store_table(pk_info);
1473
1474        let log_store_executor = SyncedKvLogStoreExecutor::new(
1475            ActorContext::for_test(123),
1476            table.id,
1477            SyncedKvLogStoreMetrics::for_test(),
1478            LogStoreRowSerde::new(&table, vnodes, pk_info),
1479            MemoryStateStore::new(),
1480            10,
1481            256,
1482            source,
1483            Duration::from_millis(256),
1484            false,
1485        )
1486        .boxed();
1487
1488        // Init
1489        tx.push_barrier(test_epoch(1), false);
1490
1491        let chunk_1 = StreamChunk::from_pretty(
1492            "  I   T
1493            +  5  10
1494            +  6  10
1495            +  8  10
1496            +  9  10
1497            +  10 11",
1498        );
1499
1500        let chunk_2 = StreamChunk::from_pretty(
1501            "   I   T
1502            -   5  10
1503            -   6  10
1504            -   8  10
1505            U- 10  11
1506            U+ 10  10",
1507        );
1508
1509        tx.push_chunk(chunk_1.clone());
1510        tx.push_chunk(chunk_2.clone());
1511
1512        tx.push_barrier(test_epoch(2), false);
1513
1514        let mut stream = log_store_executor.execute();
1515
1516        match stream.next().await {
1517            Some(Ok(Message::Barrier(barrier))) => {
1518                assert_eq!(barrier.epoch.curr, test_epoch(1));
1519            }
1520            other => panic!("Expected a barrier message, got {:?}", other),
1521        }
1522
1523        match stream.next().await {
1524            Some(Ok(Message::Chunk(chunk))) => {
1525                assert_stream_chunk_eq!(chunk, chunk_1);
1526            }
1527            other => panic!("Expected a chunk message, got {:?}", other),
1528        }
1529
1530        match stream.next().await {
1531            Some(Ok(Message::Chunk(chunk))) => {
1532                assert_stream_chunk_eq!(chunk, chunk_2);
1533            }
1534            other => panic!("Expected a chunk message, got {:?}", other),
1535        }
1536
1537        match stream.next().await {
1538            Some(Ok(Message::Barrier(barrier))) => {
1539                assert_eq!(barrier.epoch.curr, test_epoch(2));
1540            }
1541            other => panic!("Expected a barrier message, got {:?}", other),
1542        }
1543    }
1544
1545    // When we hit buffer max_chunk, we only store placeholder `FlushedItem`.
1546    // So we just let capacity = 0, and we will always flush incoming chunks to state store.
1547    #[tokio::test]
1548    async fn test_max_chunk_persisted_read() {
1549        init_logger();
1550
1551        let pk_info = &KV_LOG_STORE_V2_INFO;
1552        let column_descs = test_payload_schema(pk_info);
1553        let fields = column_descs
1554            .into_iter()
1555            .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1556            .collect_vec();
1557        let schema = Schema { fields };
1558        let stream_key = vec![0];
1559        let (mut tx, source) = MockSource::channel();
1560        let source = source.into_executor(schema.clone(), stream_key.clone());
1561
1562        let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1563
1564        let table = gen_test_log_store_table(pk_info);
1565
1566        let log_store_executor = SyncedKvLogStoreExecutor::new(
1567            ActorContext::for_test(123),
1568            table.id,
1569            SyncedKvLogStoreMetrics::for_test(),
1570            LogStoreRowSerde::new(&table, vnodes, pk_info),
1571            MemoryStateStore::new(),
1572            0,
1573            256,
1574            source,
1575            Duration::from_millis(256),
1576            false,
1577        )
1578        .boxed();
1579
1580        // Init
1581        tx.push_barrier(test_epoch(1), false);
1582
1583        let chunk_1 = StreamChunk::from_pretty(
1584            "  I   T
1585            +  5  10
1586            +  6  10
1587            +  8  10
1588            +  9  10
1589            +  10 11",
1590        );
1591
1592        let chunk_2 = StreamChunk::from_pretty(
1593            "   I   T
1594            -   5  10
1595            -   6  10
1596            -   8  10
1597            U- 10  11
1598            U+ 10  10",
1599        );
1600
1601        tx.push_chunk(chunk_1.clone());
1602        tx.push_chunk(chunk_2.clone());
1603
1604        tx.push_barrier(test_epoch(2), false);
1605
1606        let mut stream = log_store_executor.execute();
1607
1608        for i in 1..=2 {
1609            match stream.next().await {
1610                Some(Ok(Message::Barrier(barrier))) => {
1611                    assert_eq!(barrier.epoch.curr, test_epoch(i));
1612                }
1613                other => panic!("Expected a barrier message, got {:?}", other),
1614            }
1615        }
1616
1617        match stream.next().await {
1618            Some(Ok(Message::Chunk(actual))) => {
1619                let expected = StreamChunk::from_pretty(
1620                    "   I   T
1621                    +   5  10
1622                    +   6  10
1623                    +   8  10
1624                    +   9  10
1625                    +  10  11
1626                    -   5  10
1627                    -   6  10
1628                    -   8  10
1629                    U- 10  11
1630                    U+ 10  10",
1631                );
1632                assert_stream_chunk_eq!(actual, expected);
1633            }
1634            other => panic!("Expected a chunk message, got {:?}", other),
1635        }
1636    }
1637}