1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_common::util::epoch::EpochPair;
77use risingwave_common_estimate_size::EstimateSize;
78use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
79use risingwave_pb::id::FragmentId;
80use risingwave_storage::StateStore;
81use risingwave_storage::store::timeout_auto_rebuild::TimeoutAutoRebuildIter;
82use risingwave_storage::store::{
83 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
84};
85use rw_futures_util::drop_either_future;
86use tokio::time::{Duration, Instant, Sleep, sleep_until};
87use tokio_stream::adapters::Peekable;
88
89use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
90use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
91use crate::common::log_store_impl::kv_log_store::serde::{
92 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
93};
94use crate::common::log_store_impl::kv_log_store::state::{
95 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
96 LogStoreWriteState, new_log_store_state,
97};
98use crate::common::log_store_impl::kv_log_store::{
99 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
100};
101use crate::executor::prelude::*;
102use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
103use crate::executor::{
104 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
105};
106
107pub mod metrics {
108 use risingwave_common::id::FragmentId;
109 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
110
111 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
112 use crate::executor::monitor::StreamingMetrics;
113 use crate::task::ActorId;
114
115 #[derive(Clone)]
116 pub struct SyncedKvLogStoreMetrics {
117 pub unclean_state: LabelGuardedIntCounter,
119 pub clean_state: LabelGuardedIntCounter,
120 pub wait_next_poll_ns: LabelGuardedIntCounter,
121
122 pub storage_write_count: LabelGuardedIntCounter,
124 pub storage_write_size: LabelGuardedIntCounter,
125 pub pause_duration_ns: LabelGuardedIntCounter,
126
127 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
129 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
130 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
131 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
132 pub buffer_memory_bytes: LabelGuardedIntGauge,
133 pub buffer_read_count: LabelGuardedIntCounter,
134 pub buffer_read_size: LabelGuardedIntCounter,
135
136 pub total_read_count: LabelGuardedIntCounter,
138 pub total_read_size: LabelGuardedIntCounter,
139 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
140 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
141 }
142
143 impl SyncedKvLogStoreMetrics {
144 pub(crate) fn new(
151 metrics: &StreamingMetrics,
152 actor_id: ActorId,
153 fragment_id: FragmentId,
154 name: &str,
155 target: &'static str,
156 ) -> Self {
157 let actor_id_str = actor_id.to_string();
158 let fragment_id_str = fragment_id.to_string();
159 let labels = &[&actor_id_str, target, &fragment_id_str, name];
160
161 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
162 "dirty",
163 &actor_id_str,
164 target,
165 &fragment_id_str,
166 name,
167 ]);
168 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
169 "clean",
170 &actor_id_str,
171 target,
172 &fragment_id_str,
173 name,
174 ]);
175 let wait_next_poll_ns = metrics
176 .sync_kv_log_store_wait_next_poll_ns
177 .with_guarded_label_values(labels);
178
179 let storage_write_size = metrics
180 .sync_kv_log_store_storage_write_size
181 .with_guarded_label_values(labels);
182 let storage_write_count = metrics
183 .sync_kv_log_store_storage_write_count
184 .with_guarded_label_values(labels);
185 let storage_pause_duration_ns = metrics
186 .sync_kv_log_store_write_pause_duration_ns
187 .with_guarded_label_values(labels);
188
189 let buffer_unconsumed_item_count = metrics
190 .sync_kv_log_store_buffer_unconsumed_item_count
191 .with_guarded_label_values(labels);
192 let buffer_unconsumed_row_count = metrics
193 .sync_kv_log_store_buffer_unconsumed_row_count
194 .with_guarded_label_values(labels);
195 let buffer_unconsumed_epoch_count = metrics
196 .sync_kv_log_store_buffer_unconsumed_epoch_count
197 .with_guarded_label_values(labels);
198 let buffer_unconsumed_min_epoch = metrics
199 .sync_kv_log_store_buffer_unconsumed_min_epoch
200 .with_guarded_label_values(labels);
201 let buffer_memory_bytes = metrics
202 .sync_kv_log_store_buffer_memory_bytes
203 .with_guarded_label_values(labels);
204 let buffer_read_count = metrics
205 .sync_kv_log_store_read_count
206 .with_guarded_label_values(&[
207 "buffer",
208 &actor_id_str,
209 target,
210 &fragment_id_str,
211 name,
212 ]);
213
214 let buffer_read_size = metrics
215 .sync_kv_log_store_read_size
216 .with_guarded_label_values(&[
217 "buffer",
218 &actor_id_str,
219 target,
220 &fragment_id_str,
221 name,
222 ]);
223
224 let total_read_count = metrics
225 .sync_kv_log_store_read_count
226 .with_guarded_label_values(&[
227 "total",
228 &actor_id_str,
229 target,
230 &fragment_id_str,
231 name,
232 ]);
233
234 let total_read_size = metrics
235 .sync_kv_log_store_read_size
236 .with_guarded_label_values(&[
237 "total",
238 &actor_id_str,
239 target,
240 &fragment_id_str,
241 name,
242 ]);
243
244 const READ_PERSISTENT_LOG: &str = "persistent_log";
245 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
246
247 let persistent_log_read_size = metrics
248 .sync_kv_log_store_read_size
249 .with_guarded_label_values(&[
250 READ_PERSISTENT_LOG,
251 &actor_id_str,
252 target,
253 &fragment_id_str,
254 name,
255 ]);
256
257 let persistent_log_read_count = metrics
258 .sync_kv_log_store_read_count
259 .with_guarded_label_values(&[
260 READ_PERSISTENT_LOG,
261 &actor_id_str,
262 target,
263 &fragment_id_str,
264 name,
265 ]);
266
267 let flushed_buffer_read_size = metrics
268 .sync_kv_log_store_read_size
269 .with_guarded_label_values(&[
270 READ_FLUSHED_BUFFER,
271 &actor_id_str,
272 target,
273 &fragment_id_str,
274 name,
275 ]);
276
277 let flushed_buffer_read_count = metrics
278 .sync_kv_log_store_read_count
279 .with_guarded_label_values(&[
280 READ_FLUSHED_BUFFER,
281 &actor_id_str,
282 target,
283 &fragment_id_str,
284 name,
285 ]);
286
287 Self {
288 unclean_state,
289 clean_state,
290 wait_next_poll_ns,
291 storage_write_size,
292 storage_write_count,
293 pause_duration_ns: storage_pause_duration_ns,
294 buffer_unconsumed_item_count,
295 buffer_unconsumed_row_count,
296 buffer_unconsumed_epoch_count,
297 buffer_unconsumed_min_epoch,
298 buffer_memory_bytes,
299 buffer_read_count,
300 buffer_read_size,
301 total_read_count,
302 total_read_size,
303 persistent_log_read_metrics: KvLogStoreReadMetrics {
304 storage_read_size: persistent_log_read_size,
305 storage_read_count: persistent_log_read_count,
306 },
307 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
308 storage_read_count: flushed_buffer_read_count,
309 storage_read_size: flushed_buffer_read_size,
310 },
311 }
312 }
313
314 #[cfg(test)]
315 pub(crate) fn for_test() -> Self {
316 SyncedKvLogStoreMetrics {
317 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
318 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
319 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
320 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
321 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
322 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
323 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
324 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
325 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
326 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
327 buffer_memory_bytes: LabelGuardedIntGauge::test_int_gauge::<4>(),
328 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
329 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
330 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
331 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
332 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
333 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
334 }
335 }
336 }
337}
338
339pub(crate) type ReadFlushedChunkFuture =
340 BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
341
342pub struct SyncKvLogStoreContext<S: StateStore> {
343 pub table_id: TableId,
344 pub fragment_id: FragmentId,
345 pub metrics: SyncedKvLogStoreMetrics,
346 pub serde: LogStoreRowSerde,
347 pub state_store: S,
348 pub max_buffer_size: usize,
349 pub chunk_size: usize,
350 pub pause_duration_ms: Duration,
351 pub aligned: bool,
352}
353
354pub struct SyncedKvLogStoreExecutor<S: StateStore> {
355 actor_context: ActorContextRef,
356 upstream: Executor,
357 logstore_context: SyncKvLogStoreContext<S>,
358}
359
360impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
362 #[expect(clippy::too_many_arguments)]
363 pub(crate) fn new(
364 actor_context: ActorContextRef,
365 table_id: TableId,
366 metrics: SyncedKvLogStoreMetrics,
367 serde: LogStoreRowSerde,
368 state_store: S,
369 buffer_size: usize,
370 chunk_size: usize,
371 upstream: Executor,
372 pause_duration_ms: Duration,
373 aligned: bool,
374 ) -> Self {
375 let logstore_context = SyncKvLogStoreContext {
376 table_id,
377 fragment_id: actor_context.fragment_id,
378 metrics,
379 serde,
380 state_store,
381 max_buffer_size: buffer_size,
382 chunk_size,
383 pause_duration_ms,
384 aligned,
385 };
386 Self {
387 actor_context,
388 upstream,
389 logstore_context,
390 }
391 }
392}
393
394pub(crate) struct FlushedChunkInfo {
395 epoch: u64,
396 start_seq_id: SeqId,
397 end_seq_id: SeqId,
398 flush_info: FlushInfo,
399 vnode_bitmap: Bitmap,
400}
401
402pub(crate) enum WriteFuture<S: LocalStateStore> {
403 Paused {
413 start_instant: Instant,
414 sleep_future: Option<Pin<Box<Sleep>>>,
415 barrier: Barrier,
416 stream: BoxedMessageStream,
417 write_state: LogStoreWriteState<S>, },
419 ReceiveFromUpstream {
420 future: StreamFuture<BoxedMessageStream>,
421 write_state: LogStoreWriteState<S>,
422 },
423 FlushingChunk {
424 epoch: u64,
425 start_seq_id: SeqId,
426 end_seq_id: SeqId,
427 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
428 stream: BoxedMessageStream,
429 },
430 Empty,
431}
432
433pub(crate) enum WriteFutureEvent {
434 UpstreamMessageReceived(Message),
435 ChunkFlushed(FlushedChunkInfo),
436}
437
438impl<S: LocalStateStore> WriteFuture<S> {
439 fn flush_chunk(
440 stream: BoxedMessageStream,
441 write_state: LogStoreWriteState<S>,
442 chunk: StreamChunk,
443 epoch: u64,
444 start_seq_id: SeqId,
445 end_seq_id: SeqId,
446 ) -> Self {
447 tracing::trace!(
448 start_seq_id,
449 end_seq_id,
450 epoch,
451 cardinality = chunk.cardinality(),
452 "write_future: flushing chunk"
453 );
454 Self::FlushingChunk {
455 epoch,
456 start_seq_id,
457 end_seq_id,
458 future: Box::pin(write_state.into_write_chunk_future(
459 chunk,
460 epoch,
461 start_seq_id,
462 end_seq_id,
463 )),
464 stream,
465 }
466 }
467
468 pub(crate) fn receive_from_upstream(
469 stream: BoxedMessageStream,
470 write_state: LogStoreWriteState<S>,
471 ) -> Self {
472 Self::ReceiveFromUpstream {
473 future: stream.into_future(),
474 write_state,
475 }
476 }
477
478 pub(crate) fn paused(
479 duration: Duration,
480 barrier: Barrier,
481 stream: BoxedMessageStream,
482 write_state: LogStoreWriteState<S>,
483 ) -> Self {
484 let now = Instant::now();
485 tracing::trace!(?now, ?duration, "write_future_pause");
486 Self::Paused {
487 start_instant: now,
488 sleep_future: Some(Box::pin(sleep_until(now + duration))),
489 barrier,
490 stream,
491 write_state,
492 }
493 }
494
495 pub(crate) async fn next_event(
496 &mut self,
497 metrics: &SyncedKvLogStoreMetrics,
498 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
499 match self {
500 WriteFuture::Paused {
501 start_instant,
502 sleep_future,
503 ..
504 } => {
505 if let Some(sleep_future) = sleep_future {
506 sleep_future.await;
507 metrics
508 .pause_duration_ns
509 .inc_by(start_instant.elapsed().as_nanos() as _);
510 tracing::trace!("resuming write future");
511 }
512 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
513 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
514 })
515 }
516 WriteFuture::ReceiveFromUpstream { future, .. } => {
517 let (opt, stream) = future.await;
518 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
519 opt
520 .ok_or_else(|| anyhow!("end of upstream input").into())
521 .and_then(|result| result.map(|item| {
522 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
523 }))
524 })
525 }
526 WriteFuture::FlushingChunk { future, .. } => {
527 let (write_state, result) = future.await;
528 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
529 result.map(|(flush_info, vnode_bitmap)| {
530 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
531 epoch,
532 start_seq_id,
533 end_seq_id,
534 flush_info,
535 vnode_bitmap,
536 }))
537 })
538 });
539 result.map_err(Into::into)
540 }
541 WriteFuture::Empty => {
542 unreachable!("should not be polled after ready")
543 }
544 }
545 }
546}
547
548pub(crate) type LocalLogStoreReadState<S> =
549 LogStoreReadState<<<S as StateStore>::Local as LocalStateStore>::FlushedSnapshotReader>;
550pub(crate) type LocalLogStoreWriteState<S> = LogStoreWriteState<<S as StateStore>::Local>;
551
552impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
554 pub(crate) async fn init_local_log_store_state(
555 context: &SyncKvLogStoreContext<S>,
556 first_write_epoch: EpochPair,
557 ) -> StreamExecutorResult<(LocalLogStoreReadState<S>, LocalLogStoreWriteState<S>)> {
558 let local_state_store = context
559 .state_store
560 .new_local(NewLocalOptions {
561 table_id: context.table_id,
562 fragment_id: context.fragment_id,
563 op_consistency_level: OpConsistencyLevel::Inconsistent,
564 table_option: TableOption {
565 retention_seconds: None,
566 },
567 is_replicated: false,
568 vnodes: context.serde.vnodes().clone(),
569 upload_on_flush: false,
570 })
571 .await;
572
573 let (read_state, mut initial_write_state) = new_log_store_state(
574 context.table_id,
575 local_state_store,
576 context.serde.clone(),
577 context.chunk_size,
578 );
579 initial_write_state.init(first_write_epoch).await?;
580 Ok((read_state, initial_write_state))
581 }
582
583 #[try_stream(ok = Message, error = StreamExecutorError)]
584 pub(crate) async fn aligned_message_stream(
585 actor_id: ActorId,
586 input: BoxedMessageStream,
587 read_state: LocalLogStoreReadState<S>,
588 mut initial_write_state: LocalLogStoreWriteState<S>,
589 metrics: SyncedKvLogStoreMetrics,
590 initial_write_epoch: EpochPair,
591 ) {
592 tracing::info!("aligned mode");
593 let log_store_stream = read_state
598 .read_persisted_log_store(
599 metrics.persistent_log_read_metrics.clone(),
600 initial_write_epoch.curr,
601 LogStoreReadStateStreamRangeStart::Unbounded,
602 )
603 .await?;
604
605 #[for_await]
606 for message in log_store_stream {
607 let (_epoch, message) = message?;
608 match message {
609 KvLogStoreItem::Barrier { .. } => {
610 continue;
611 }
612 KvLogStoreItem::StreamChunk { chunk, .. } => {
613 yield Message::Chunk(chunk);
614 }
615 }
616 }
617
618 let mut realigned_logstore = false;
619
620 #[for_await]
621 for message in input {
622 match message? {
623 Message::Barrier(barrier) => {
624 let is_checkpoint = barrier.is_checkpoint();
625 let mut progress = LogStoreVnodeProgress::None;
626 progress.apply_aligned(read_state.vnodes().clone(), barrier.epoch.prev, None);
627 let post_seal =
629 initial_write_state.seal_current_epoch(barrier.epoch.curr, progress.take());
630 barrier.assume_no_update_vnode_bitmap(actor_id)?;
631 yield Message::Barrier(barrier);
632 post_seal.post_yield_barrier(None).await?;
633 if !realigned_logstore && is_checkpoint {
634 realigned_logstore = true;
635 tracing::info!("realigned logstore");
636 }
637 }
638 Message::Chunk(chunk) => {
639 yield Message::Chunk(chunk);
640 }
641 Message::Watermark(watermark) => {
642 yield Message::Watermark(watermark);
643 }
644 }
645 }
646 }
647
648 pub(crate) fn apply_pause_resume_mutation(barrier: &Barrier, pause_stream: &mut bool) {
649 if let Some(mutation) = barrier.mutation.as_deref() {
650 match mutation {
651 Mutation::Pause => {
652 *pause_stream = true;
653 }
654 Mutation::Resume => {
655 *pause_stream = false;
656 }
657 _ => {}
658 }
659 }
660 }
661
662 pub(crate) fn process_upstream_chunk(
663 seq_id: SeqId,
664 stream: BoxedMessageStream,
665 write_state: LogStoreWriteState<S::Local>,
666 chunk: StreamChunk,
667 buffer: &mut SyncedLogStoreBuffer,
668 ) -> (SeqId, WriteFuture<S::Local>) {
669 let cardinality = chunk.cardinality();
670 if cardinality == 0 {
671 tracing::warn!(
672 epoch = write_state.epoch().curr,
673 "received empty chunk (cardinality=0), skipping"
674 );
675 return (
676 seq_id,
677 WriteFuture::receive_from_upstream(stream, write_state),
678 );
679 }
680
681 let start_seq_id = seq_id;
682 let new_seq_id = seq_id + cardinality as SeqId;
683 let end_seq_id = new_seq_id - 1;
684 let epoch = write_state.epoch().curr;
685 tracing::trace!(
686 start_seq_id,
687 end_seq_id,
688 new_seq_id,
689 epoch,
690 cardinality,
691 "received chunk"
692 );
693 let next_write_future = if let Some(chunk_to_flush) =
694 buffer.add_or_flush_chunk(start_seq_id, end_seq_id, chunk, epoch)
695 {
696 WriteFuture::flush_chunk(
697 stream,
698 write_state,
699 chunk_to_flush,
700 epoch,
701 start_seq_id,
702 end_seq_id,
703 )
704 } else {
705 WriteFuture::receive_from_upstream(stream, write_state)
706 };
707 (new_seq_id, next_write_future)
708 }
709
710 pub(crate) fn process_flushed_chunk(
711 stream: BoxedMessageStream,
712 write_state: LogStoreWriteState<S::Local>,
713 info: FlushedChunkInfo,
714 buffer: &mut SyncedLogStoreBuffer,
715 metrics: &SyncedKvLogStoreMetrics,
716 ) -> WriteFuture<S::Local> {
717 buffer.add_flushed_item_to_buffer(
718 info.start_seq_id,
719 info.end_seq_id,
720 info.vnode_bitmap,
721 info.epoch,
722 );
723 metrics
724 .storage_write_count
725 .inc_by(info.flush_info.flush_count as _);
726 metrics
727 .storage_write_size
728 .inc_by(info.flush_info.flush_size as _);
729 WriteFuture::receive_from_upstream(stream, write_state)
730 }
731
732 #[try_stream(ok= Message, error = StreamExecutorError)]
733 pub async fn execute_monitored(self) {
734 let wait_next_poll_ns = self.logstore_context.metrics.wait_next_poll_ns.clone();
735 #[for_await]
736 for message in self.execute_inner() {
737 let current_time = Instant::now();
738 yield message?;
739 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
740 }
741 }
742
743 #[try_stream(ok = Message, error = StreamExecutorError)]
744 async fn execute_inner(self) {
745 let mut input = self.upstream.execute();
746
747 let first_barrier = expect_first_barrier(&mut input).await?;
749 let first_write_epoch = first_barrier.epoch;
750 yield Message::Barrier(first_barrier.clone());
751
752 let (read_state, initial_write_state) =
753 Self::init_local_log_store_state(&self.logstore_context, first_write_epoch).await?;
754
755 let mut pause_stream = first_barrier.is_pause_on_startup();
756 let initial_write_epoch = first_write_epoch;
757
758 if self.logstore_context.aligned {
759 let aligned_stream = Self::aligned_message_stream(
760 self.actor_context.id,
761 input,
762 read_state,
763 initial_write_state,
764 self.logstore_context.metrics.clone(),
765 initial_write_epoch,
766 );
767 #[for_await]
768 for message in aligned_stream {
769 yield message?;
770 }
771 return Ok(());
772 }
773
774 let mut seq_id = FIRST_SEQ_ID;
775 let mut buffer = SyncedLogStoreBuffer::new(
776 self.logstore_context.max_buffer_size,
777 self.logstore_context.chunk_size,
778 &self.logstore_context.metrics,
779 );
780
781 let log_store_stream = read_state
782 .read_persisted_log_store(
783 self.logstore_context
784 .metrics
785 .persistent_log_read_metrics
786 .clone(),
787 initial_write_epoch.curr,
788 LogStoreReadStateStreamRangeStart::Unbounded,
789 )
790 .await?;
791
792 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
793 let mut clean_state = log_store_stream.peek().await.is_none();
794 tracing::trace!(?clean_state);
795
796 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
797
798 let mut write_future_state = WriteFuture::receive_from_upstream(input, initial_write_state);
799
800 let mut progress = LogStoreVnodeProgress::None;
801
802 loop {
803 let select_result = {
804 let read_future = async {
805 if pause_stream {
806 pending().await
807 } else {
808 read_future_state
809 .next_chunk(
810 &mut progress,
811 &read_state,
812 &mut buffer,
813 &self.logstore_context.metrics,
814 )
815 .await
816 }
817 };
818 pin_mut!(read_future);
819 let write_future = write_future_state.next_event(&self.logstore_context.metrics);
820 pin_mut!(write_future);
821 let output = select(write_future, read_future).await;
822 drop_either_future(output)
823 };
824 match select_result {
825 Either::Left(result) => {
826 drop(write_future_state);
828 let (stream, mut write_state, either) = result?;
829 match either {
830 WriteFutureEvent::UpstreamMessageReceived(msg) => match msg {
831 Message::Barrier(barrier) => {
832 if clean_state && barrier.kind.is_checkpoint() && !buffer.is_empty()
833 {
834 write_future_state = WriteFuture::paused(
835 self.logstore_context.pause_duration_ms,
836 barrier,
837 stream,
838 write_state,
839 );
840 clean_state = false;
841 self.logstore_context.metrics.unclean_state.inc();
842 } else {
843 Self::apply_pause_resume_mutation(&barrier, &mut pause_stream);
844 let write_state_post_write_barrier = Self::write_barrier(
845 self.actor_context.id,
846 &mut write_state,
847 barrier.clone(),
848 &self.logstore_context.metrics,
849 progress.take(),
850 &mut buffer,
851 )
852 .await?;
853 seq_id = FIRST_SEQ_ID;
854 barrier.assume_no_update_vnode_bitmap(self.actor_context.id)?;
855
856 yield Message::Barrier(barrier);
857
858 write_state_post_write_barrier
859 .post_yield_barrier(None)
860 .await?;
861
862 write_future_state =
863 WriteFuture::receive_from_upstream(stream, write_state);
864 }
865 }
866 Message::Chunk(chunk) => {
867 let (new_seq_id, next_write_future) = Self::process_upstream_chunk(
868 seq_id,
869 stream,
870 write_state,
871 chunk,
872 &mut buffer,
873 );
874 seq_id = new_seq_id;
875 write_future_state = next_write_future;
876 }
877 Message::Watermark(_watermark) => {
879 write_future_state =
880 WriteFuture::receive_from_upstream(stream, write_state);
881 }
882 },
883 WriteFutureEvent::ChunkFlushed(info) => {
884 write_future_state = Self::process_flushed_chunk(
885 stream,
886 write_state,
887 info,
888 &mut buffer,
889 &self.logstore_context.metrics,
890 );
891 }
892 }
893 }
894 Either::Right(result) => {
895 let clean_state_reached = read_future_state.mark_clean_state(
896 &mut clean_state,
897 &buffer,
898 &self.logstore_context.metrics,
899 );
900
901 if clean_state_reached {
902 if let WriteFuture::Paused { sleep_future, .. } = &mut write_future_state {
904 tracing::trace!("resuming paused future");
905 assert!(buffer.has_available_capacity());
906 *sleep_future = None;
907 }
908 }
909 let chunk = result?;
910 self.logstore_context
911 .metrics
912 .total_read_count
913 .inc_by(chunk.cardinality() as _);
914
915 yield Message::Chunk(chunk);
916 }
917 }
918 }
919 }
920}
921
922pub(crate) type PersistedStream<S> =
923 Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
924
925pub(crate) enum ReadFuture<S: StateStoreRead> {
926 ReadingPersistedStream(PersistedStream<S>),
927 ReadingFlushedChunk {
928 future: ReadFlushedChunkFuture,
929 end_seq_id: SeqId,
930 },
931 Idle,
932}
933
934impl<S: StateStoreRead> ReadFuture<S> {
936 pub(crate) fn mark_clean_state(
937 &self,
938 clean_state: &mut bool,
939 buffer: &SyncedLogStoreBuffer,
940 metrics: &SyncedKvLogStoreMetrics,
941 ) -> bool {
942 if !*clean_state && matches!(self, ReadFuture::Idle) && buffer.is_empty() {
943 *clean_state = true;
944 metrics.clean_state.inc();
945 true
946 } else {
947 false
948 }
949 }
950
951 pub(crate) async fn next_chunk(
952 &mut self,
953 progress: &mut LogStoreVnodeProgress,
954 read_state: &LogStoreReadState<S>,
955 buffer: &mut SyncedLogStoreBuffer,
956 metrics: &SyncedKvLogStoreMetrics,
957 ) -> StreamExecutorResult<StreamChunk> {
958 match self {
959 ReadFuture::ReadingPersistedStream(stream) => {
960 while let Some((epoch, item)) = stream.try_next().await? {
961 match item {
962 KvLogStoreItem::Barrier { vnodes, .. } => {
963 tracing::trace!(epoch, "read logstore barrier");
964 progress.apply_aligned(vnodes, epoch, None);
966 continue;
967 }
968 KvLogStoreItem::StreamChunk {
969 chunk,
970 progress: chunk_progress,
971 } => {
972 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
973 progress.apply_per_vnode(epoch, chunk_progress);
974 return Ok(chunk);
975 }
976 }
977 }
978 *self = ReadFuture::Idle;
979 }
980 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
981 }
982 match self {
983 ReadFuture::ReadingPersistedStream(_) => {
984 unreachable!("must have finished read persisted stream when reaching here")
985 }
986 ReadFuture::ReadingFlushedChunk { .. } => {}
987 ReadFuture::Idle => loop {
988 let Some((item_epoch, item)) = buffer.pop_front() else {
989 return pending().await;
990 };
991 match item {
992 LogStoreBufferItem::StreamChunk {
993 chunk,
994 start_seq_id,
995 end_seq_id,
996 flushed,
997 ..
998 } => {
999 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
1000 tracing::trace!(
1001 start_seq_id,
1002 end_seq_id,
1003 flushed,
1004 cardinality = chunk.cardinality(),
1005 "read buffered chunk of size"
1006 );
1007 progress.apply_aligned(
1008 read_state.vnodes().clone(),
1009 item_epoch,
1010 Some(end_seq_id),
1011 );
1012 return Ok(chunk);
1013 }
1014 LogStoreBufferItem::Flushed {
1015 vnode_bitmap,
1016 start_seq_id,
1017 end_seq_id,
1018 chunk_id,
1019 } => {
1020 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
1021 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
1022 let future = read_state
1023 .read_flushed_chunk(
1024 vnode_bitmap,
1025 chunk_id,
1026 start_seq_id,
1027 end_seq_id,
1028 item_epoch,
1029 read_metrics,
1030 )
1031 .boxed();
1032 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
1033 break;
1034 }
1035 LogStoreBufferItem::Barrier { .. } => {
1036 tracing::trace!(item_epoch, "read buffer barrier");
1037 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
1038 continue;
1039 }
1040 }
1041 },
1042 }
1043
1044 let (future, end_seq_id) = match self {
1045 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
1046 unreachable!("should be at ReadingFlushedChunk")
1047 }
1048 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
1049 };
1050
1051 let (_, chunk, epoch) = future.await?;
1052 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
1053 tracing::trace!(
1054 end_seq_id,
1055 "read flushed chunk of size: {}",
1056 chunk.cardinality()
1057 );
1058 *self = ReadFuture::Idle;
1059 Ok(chunk)
1060 }
1061}
1062
1063impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
1065 pub(crate) async fn write_barrier<'a>(
1066 actor_id: ActorId,
1067 write_state: &'a mut LogStoreWriteState<S::Local>,
1068 barrier: Barrier,
1069 metrics: &SyncedKvLogStoreMetrics,
1070 progress: LogStoreVnodeProgress,
1071 buffer: &mut SyncedLogStoreBuffer,
1072 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1073 tracing::trace!(%actor_id, ?progress, "applying truncation");
1074 let epoch = barrier.epoch.prev;
1078 let mut writer = write_state.start_writer(false);
1079 writer.write_barrier(epoch, barrier.is_checkpoint())?;
1080
1081 if barrier.is_checkpoint() {
1082 for (epoch, item) in buffer.buffer.iter_mut().rev() {
1083 match item {
1084 LogStoreBufferItem::StreamChunk {
1085 chunk,
1086 start_seq_id,
1087 end_seq_id,
1088 flushed,
1089 ..
1090 } => {
1091 if !*flushed {
1092 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1093 *flushed = true;
1094 } else {
1095 break;
1096 }
1097 }
1098 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1099 }
1100 }
1101 }
1102
1103 let (flush_info, _) = writer.finish().await?;
1105 metrics
1106 .storage_write_count
1107 .inc_by(flush_info.flush_count as _);
1108 metrics
1109 .storage_write_size
1110 .inc_by(flush_info.flush_size as _);
1111 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1112
1113 buffer.buffer.push_back((
1115 epoch,
1116 LogStoreBufferItem::Barrier {
1117 is_checkpoint: barrier.is_checkpoint(),
1118 next_epoch: barrier.epoch.curr,
1119 schema_change: None,
1120 is_stop: false,
1121 },
1122 ));
1123 buffer.next_chunk_id = 0;
1124 buffer.update_buffer_metrics();
1125
1126 Ok(post_seal)
1127 }
1128}
1129
1130pub(crate) struct SyncedLogStoreBuffer {
1131 buffer: VecDeque<(u64, LogStoreBufferItem)>,
1132 current_size: usize,
1133 max_size: usize,
1134 max_chunk_size: usize,
1135 next_chunk_id: ChunkId,
1136 metrics: SyncedKvLogStoreMetrics,
1137 flushed_count: usize,
1138}
1139
1140impl SyncedLogStoreBuffer {
1141 pub fn new(
1142 max_buffer_size: usize,
1143 chunk_size: usize,
1144 metrics: &SyncedKvLogStoreMetrics,
1145 ) -> Self {
1146 SyncedLogStoreBuffer {
1147 buffer: VecDeque::new(),
1148 current_size: 0,
1149 max_size: max_buffer_size,
1150 max_chunk_size: chunk_size,
1151 next_chunk_id: 0,
1152 metrics: metrics.clone(),
1153 flushed_count: 0,
1154 }
1155 }
1156
1157 pub fn is_empty(&self) -> bool {
1158 self.current_size == 0
1159 }
1160
1161 pub fn has_available_capacity(&self) -> bool {
1162 self.current_size < self.max_size
1163 }
1164
1165 fn add_or_flush_chunk(
1166 &mut self,
1167 start_seq_id: SeqId,
1168 end_seq_id: SeqId,
1169 chunk: StreamChunk,
1170 epoch: u64,
1171 ) -> Option<StreamChunk> {
1172 let current_size = self.current_size;
1173 let chunk_size = chunk.cardinality();
1174
1175 tracing::trace!(
1176 current_size,
1177 chunk_size,
1178 max_size = self.max_size,
1179 "checking chunk size"
1180 );
1181 let should_flush_chunk = current_size + chunk_size > self.max_size;
1182 if should_flush_chunk {
1183 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1184 Some(chunk)
1185 } else {
1186 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1187 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1188 None
1189 }
1190 }
1191
1192 fn add_flushed_item_to_buffer(
1195 &mut self,
1196 start_seq_id: SeqId,
1197 end_seq_id: SeqId,
1198 new_vnode_bitmap: Bitmap,
1199 epoch: u64,
1200 ) {
1201 let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1202
1203 if let Some((
1204 item_epoch,
1205 LogStoreBufferItem::Flushed {
1206 start_seq_id: prev_start_seq_id,
1207 end_seq_id: prev_end_seq_id,
1208 vnode_bitmap,
1209 ..
1210 },
1211 )) = self.buffer.back_mut()
1212 && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1213 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1214 && projected_flushed_chunk_size <= self.max_chunk_size
1215 {
1216 assert!(
1217 *prev_end_seq_id < start_seq_id,
1218 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1219 end_seq_id,
1220 start_seq_id
1221 );
1222 assert_eq!(
1223 epoch, *item_epoch,
1224 "epoch of newly added flushed item must be the same as the last flushed item"
1225 );
1226 *prev_end_seq_id = end_seq_id;
1227 *vnode_bitmap |= new_vnode_bitmap;
1228 } else {
1229 let chunk_id = self.next_chunk_id;
1230 self.next_chunk_id += 1;
1231 self.buffer.push_back((
1232 epoch,
1233 LogStoreBufferItem::Flushed {
1234 start_seq_id,
1235 end_seq_id,
1236 vnode_bitmap: new_vnode_bitmap,
1237 chunk_id,
1238 },
1239 ));
1240 self.flushed_count += 1;
1241 tracing::trace!(
1242 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1243 );
1244 }
1245 self.update_buffer_metrics();
1247 }
1248
1249 fn add_chunk_to_buffer(
1250 &mut self,
1251 chunk: StreamChunk,
1252 start_seq_id: SeqId,
1253 end_seq_id: SeqId,
1254 epoch: u64,
1255 ) {
1256 let chunk_id = self.next_chunk_id;
1257 self.next_chunk_id += 1;
1258 self.current_size += chunk.cardinality();
1259 self.buffer.push_back((
1260 epoch,
1261 LogStoreBufferItem::StreamChunk {
1262 chunk,
1263 start_seq_id,
1264 end_seq_id,
1265 flushed: false,
1266 chunk_id,
1267 },
1268 ));
1269 self.update_buffer_metrics();
1270 }
1271
1272 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1273 let item = self.buffer.pop_front();
1274 match &item {
1275 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1276 self.flushed_count -= 1;
1277 }
1278 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1279 self.current_size -= chunk.cardinality();
1280 }
1281 _ => {}
1282 }
1283 self.update_buffer_metrics();
1284 item
1285 }
1286
1287 fn update_buffer_metrics(&self) {
1288 let mut epoch_count = 0;
1289 let mut row_count = 0;
1290 let mut memory_bytes = 0;
1291 for (_, item) in &self.buffer {
1292 match item {
1293 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1294 row_count += chunk.cardinality();
1295 }
1296 LogStoreBufferItem::Flushed {
1297 start_seq_id,
1298 end_seq_id,
1299 ..
1300 } => {
1301 row_count += (end_seq_id - start_seq_id) as usize;
1302 }
1303 LogStoreBufferItem::Barrier { .. } => {
1304 epoch_count += 1;
1305 }
1306 }
1307 memory_bytes += item.estimated_size();
1308 }
1309 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1310 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1311 self.metrics
1312 .buffer_unconsumed_item_count
1313 .set(self.buffer.len() as _);
1314 self.metrics.buffer_unconsumed_min_epoch.set(
1315 self.buffer
1316 .front()
1317 .map(|(epoch, _)| *epoch)
1318 .unwrap_or_default() as _,
1319 );
1320 self.metrics.buffer_memory_bytes.set(memory_bytes as _);
1321 }
1322}
1323
1324impl<S> Execute for SyncedKvLogStoreExecutor<S>
1325where
1326 S: StateStore,
1327{
1328 fn execute(self: Box<Self>) -> BoxedMessageStream {
1329 self.execute_monitored().boxed()
1330 }
1331}
1332
1333#[cfg(test)]
1334mod tests {
1335 use itertools::Itertools;
1336 use pretty_assertions::assert_eq;
1337 use risingwave_common::catalog::Field;
1338 use risingwave_common::hash::VirtualNode;
1339 use risingwave_common::test_prelude::*;
1340 use risingwave_common::util::epoch::test_epoch;
1341 use risingwave_storage::memory::MemoryStateStore;
1342
1343 use super::*;
1344 use crate::assert_stream_chunk_eq;
1345 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1346 use crate::common::log_store_impl::kv_log_store::test_utils::{
1347 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1348 };
1349 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1350 use crate::executor::test_utils::MockSource;
1351
1352 fn init_logger() {
1353 let _ = tracing_subscriber::fmt()
1354 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1355 .with_ansi(false)
1356 .try_init();
1357 }
1358
1359 #[tokio::test]
1361 async fn test_read_write_buffer() {
1362 init_logger();
1363
1364 let pk_info = &KV_LOG_STORE_V2_INFO;
1365 let column_descs = test_payload_schema(pk_info);
1366 let fields = column_descs
1367 .into_iter()
1368 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1369 .collect_vec();
1370 let schema = Schema { fields };
1371 let stream_key = vec![0];
1372 let (mut tx, source) = MockSource::channel();
1373 let source = source.into_executor(schema.clone(), stream_key.clone());
1374
1375 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1376
1377 let table = gen_test_log_store_table(pk_info);
1378
1379 let log_store_executor = SyncedKvLogStoreExecutor::new(
1380 ActorContext::for_test(123),
1381 table.id,
1382 SyncedKvLogStoreMetrics::for_test(),
1383 LogStoreRowSerde::new(&table, vnodes, pk_info),
1384 MemoryStateStore::new(),
1385 10,
1386 256,
1387 source,
1388 Duration::from_millis(256),
1389 false,
1390 )
1391 .boxed();
1392
1393 tx.push_barrier(test_epoch(1), false);
1395
1396 let chunk_1 = StreamChunk::from_pretty(
1397 " I T
1398 + 5 10
1399 + 6 10
1400 + 8 10
1401 + 9 10
1402 + 10 11",
1403 );
1404
1405 let chunk_2 = StreamChunk::from_pretty(
1406 " I T
1407 - 5 10
1408 - 6 10
1409 - 8 10
1410 U- 9 10
1411 U+ 10 11",
1412 );
1413
1414 tx.push_chunk(chunk_1.clone());
1415 tx.push_chunk(chunk_2.clone());
1416
1417 let mut stream = log_store_executor.execute();
1418
1419 match stream.next().await {
1420 Some(Ok(Message::Barrier(barrier))) => {
1421 assert_eq!(barrier.epoch.curr, test_epoch(1));
1422 }
1423 other => panic!("Expected a barrier message, got {:?}", other),
1424 }
1425
1426 match stream.next().await {
1427 Some(Ok(Message::Chunk(chunk))) => {
1428 assert_stream_chunk_eq!(chunk, chunk_1);
1429 }
1430 other => panic!("Expected a chunk message, got {:?}", other),
1431 }
1432
1433 match stream.next().await {
1434 Some(Ok(Message::Chunk(chunk))) => {
1435 assert_stream_chunk_eq!(chunk, chunk_2);
1436 }
1437 other => panic!("Expected a chunk message, got {:?}", other),
1438 }
1439
1440 tx.push_barrier(test_epoch(2), false);
1441
1442 match stream.next().await {
1443 Some(Ok(Message::Barrier(barrier))) => {
1444 assert_eq!(barrier.epoch.curr, test_epoch(2));
1445 }
1446 other => panic!("Expected a barrier message, got {:?}", other),
1447 }
1448 }
1449
1450 #[tokio::test]
1456 async fn test_barrier_persisted_read() {
1457 init_logger();
1458
1459 let pk_info = &KV_LOG_STORE_V2_INFO;
1460 let column_descs = test_payload_schema(pk_info);
1461 let fields = column_descs
1462 .into_iter()
1463 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1464 .collect_vec();
1465 let schema = Schema { fields };
1466 let stream_key = vec![0];
1467 let (mut tx, source) = MockSource::channel();
1468 let source = source.into_executor(schema.clone(), stream_key.clone());
1469
1470 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1471
1472 let table = gen_test_log_store_table(pk_info);
1473
1474 let log_store_executor = SyncedKvLogStoreExecutor::new(
1475 ActorContext::for_test(123),
1476 table.id,
1477 SyncedKvLogStoreMetrics::for_test(),
1478 LogStoreRowSerde::new(&table, vnodes, pk_info),
1479 MemoryStateStore::new(),
1480 10,
1481 256,
1482 source,
1483 Duration::from_millis(256),
1484 false,
1485 )
1486 .boxed();
1487
1488 tx.push_barrier(test_epoch(1), false);
1490
1491 let chunk_1 = StreamChunk::from_pretty(
1492 " I T
1493 + 5 10
1494 + 6 10
1495 + 8 10
1496 + 9 10
1497 + 10 11",
1498 );
1499
1500 let chunk_2 = StreamChunk::from_pretty(
1501 " I T
1502 - 5 10
1503 - 6 10
1504 - 8 10
1505 U- 10 11
1506 U+ 10 10",
1507 );
1508
1509 tx.push_chunk(chunk_1.clone());
1510 tx.push_chunk(chunk_2.clone());
1511
1512 tx.push_barrier(test_epoch(2), false);
1513
1514 let mut stream = log_store_executor.execute();
1515
1516 match stream.next().await {
1517 Some(Ok(Message::Barrier(barrier))) => {
1518 assert_eq!(barrier.epoch.curr, test_epoch(1));
1519 }
1520 other => panic!("Expected a barrier message, got {:?}", other),
1521 }
1522
1523 match stream.next().await {
1524 Some(Ok(Message::Chunk(chunk))) => {
1525 assert_stream_chunk_eq!(chunk, chunk_1);
1526 }
1527 other => panic!("Expected a chunk message, got {:?}", other),
1528 }
1529
1530 match stream.next().await {
1531 Some(Ok(Message::Chunk(chunk))) => {
1532 assert_stream_chunk_eq!(chunk, chunk_2);
1533 }
1534 other => panic!("Expected a chunk message, got {:?}", other),
1535 }
1536
1537 match stream.next().await {
1538 Some(Ok(Message::Barrier(barrier))) => {
1539 assert_eq!(barrier.epoch.curr, test_epoch(2));
1540 }
1541 other => panic!("Expected a barrier message, got {:?}", other),
1542 }
1543 }
1544
1545 #[tokio::test]
1548 async fn test_max_chunk_persisted_read() {
1549 init_logger();
1550
1551 let pk_info = &KV_LOG_STORE_V2_INFO;
1552 let column_descs = test_payload_schema(pk_info);
1553 let fields = column_descs
1554 .into_iter()
1555 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1556 .collect_vec();
1557 let schema = Schema { fields };
1558 let stream_key = vec![0];
1559 let (mut tx, source) = MockSource::channel();
1560 let source = source.into_executor(schema.clone(), stream_key.clone());
1561
1562 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1563
1564 let table = gen_test_log_store_table(pk_info);
1565
1566 let log_store_executor = SyncedKvLogStoreExecutor::new(
1567 ActorContext::for_test(123),
1568 table.id,
1569 SyncedKvLogStoreMetrics::for_test(),
1570 LogStoreRowSerde::new(&table, vnodes, pk_info),
1571 MemoryStateStore::new(),
1572 0,
1573 256,
1574 source,
1575 Duration::from_millis(256),
1576 false,
1577 )
1578 .boxed();
1579
1580 tx.push_barrier(test_epoch(1), false);
1582
1583 let chunk_1 = StreamChunk::from_pretty(
1584 " I T
1585 + 5 10
1586 + 6 10
1587 + 8 10
1588 + 9 10
1589 + 10 11",
1590 );
1591
1592 let chunk_2 = StreamChunk::from_pretty(
1593 " I T
1594 - 5 10
1595 - 6 10
1596 - 8 10
1597 U- 10 11
1598 U+ 10 10",
1599 );
1600
1601 tx.push_chunk(chunk_1.clone());
1602 tx.push_chunk(chunk_2.clone());
1603
1604 tx.push_barrier(test_epoch(2), false);
1605
1606 let mut stream = log_store_executor.execute();
1607
1608 for i in 1..=2 {
1609 match stream.next().await {
1610 Some(Ok(Message::Barrier(barrier))) => {
1611 assert_eq!(barrier.epoch.curr, test_epoch(i));
1612 }
1613 other => panic!("Expected a barrier message, got {:?}", other),
1614 }
1615 }
1616
1617 match stream.next().await {
1618 Some(Ok(Message::Chunk(actual))) => {
1619 let expected = StreamChunk::from_pretty(
1620 " I T
1621 + 5 10
1622 + 6 10
1623 + 8 10
1624 + 9 10
1625 + 10 11
1626 - 5 10
1627 - 6 10
1628 - 8 10
1629 U- 10 11
1630 U+ 10 10",
1631 );
1632 assert_stream_chunk_eq!(actual, expected);
1633 }
1634 other => panic!("Expected a chunk message, got {:?}", other),
1635 }
1636 }
1637}