1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93 LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105 use risingwave_common::id::FragmentId;
106 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
107
108 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
109 use crate::executor::monitor::StreamingMetrics;
110 use crate::task::ActorId;
111
112 #[derive(Clone)]
113 pub struct SyncedKvLogStoreMetrics {
114 pub unclean_state: LabelGuardedIntCounter,
116 pub clean_state: LabelGuardedIntCounter,
117 pub wait_next_poll_ns: LabelGuardedIntCounter,
118
119 pub storage_write_count: LabelGuardedIntCounter,
121 pub storage_write_size: LabelGuardedIntCounter,
122 pub pause_duration_ns: LabelGuardedIntCounter,
123
124 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
126 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
127 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
128 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
129 pub buffer_read_count: LabelGuardedIntCounter,
130 pub buffer_read_size: LabelGuardedIntCounter,
131
132 pub total_read_count: LabelGuardedIntCounter,
134 pub total_read_size: LabelGuardedIntCounter,
135 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
136 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
137 }
138
139 impl SyncedKvLogStoreMetrics {
140 pub(crate) fn new(
147 metrics: &StreamingMetrics,
148 actor_id: ActorId,
149 fragment_id: FragmentId,
150 name: &str,
151 target: &'static str,
152 ) -> Self {
153 let actor_id_str = actor_id.to_string();
154 let fragment_id_str = fragment_id.to_string();
155 let labels = &[&actor_id_str, target, &fragment_id_str, name];
156
157 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
158 "dirty",
159 &actor_id_str,
160 target,
161 &fragment_id_str,
162 name,
163 ]);
164 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
165 "clean",
166 &actor_id_str,
167 target,
168 &fragment_id_str,
169 name,
170 ]);
171 let wait_next_poll_ns = metrics
172 .sync_kv_log_store_wait_next_poll_ns
173 .with_guarded_label_values(labels);
174
175 let storage_write_size = metrics
176 .sync_kv_log_store_storage_write_size
177 .with_guarded_label_values(labels);
178 let storage_write_count = metrics
179 .sync_kv_log_store_storage_write_count
180 .with_guarded_label_values(labels);
181 let storage_pause_duration_ns = metrics
182 .sync_kv_log_store_write_pause_duration_ns
183 .with_guarded_label_values(labels);
184
185 let buffer_unconsumed_item_count = metrics
186 .sync_kv_log_store_buffer_unconsumed_item_count
187 .with_guarded_label_values(labels);
188 let buffer_unconsumed_row_count = metrics
189 .sync_kv_log_store_buffer_unconsumed_row_count
190 .with_guarded_label_values(labels);
191 let buffer_unconsumed_epoch_count = metrics
192 .sync_kv_log_store_buffer_unconsumed_epoch_count
193 .with_guarded_label_values(labels);
194 let buffer_unconsumed_min_epoch = metrics
195 .sync_kv_log_store_buffer_unconsumed_min_epoch
196 .with_guarded_label_values(labels);
197 let buffer_read_count = metrics
198 .sync_kv_log_store_read_count
199 .with_guarded_label_values(&[
200 "buffer",
201 &actor_id_str,
202 target,
203 &fragment_id_str,
204 name,
205 ]);
206
207 let buffer_read_size = metrics
208 .sync_kv_log_store_read_size
209 .with_guarded_label_values(&[
210 "buffer",
211 &actor_id_str,
212 target,
213 &fragment_id_str,
214 name,
215 ]);
216
217 let total_read_count = metrics
218 .sync_kv_log_store_read_count
219 .with_guarded_label_values(&[
220 "total",
221 &actor_id_str,
222 target,
223 &fragment_id_str,
224 name,
225 ]);
226
227 let total_read_size = metrics
228 .sync_kv_log_store_read_size
229 .with_guarded_label_values(&[
230 "total",
231 &actor_id_str,
232 target,
233 &fragment_id_str,
234 name,
235 ]);
236
237 const READ_PERSISTENT_LOG: &str = "persistent_log";
238 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
239
240 let persistent_log_read_size = metrics
241 .sync_kv_log_store_read_size
242 .with_guarded_label_values(&[
243 READ_PERSISTENT_LOG,
244 &actor_id_str,
245 target,
246 &fragment_id_str,
247 name,
248 ]);
249
250 let persistent_log_read_count = metrics
251 .sync_kv_log_store_read_count
252 .with_guarded_label_values(&[
253 READ_PERSISTENT_LOG,
254 &actor_id_str,
255 target,
256 &fragment_id_str,
257 name,
258 ]);
259
260 let flushed_buffer_read_size = metrics
261 .sync_kv_log_store_read_size
262 .with_guarded_label_values(&[
263 READ_FLUSHED_BUFFER,
264 &actor_id_str,
265 target,
266 &fragment_id_str,
267 name,
268 ]);
269
270 let flushed_buffer_read_count = metrics
271 .sync_kv_log_store_read_count
272 .with_guarded_label_values(&[
273 READ_FLUSHED_BUFFER,
274 &actor_id_str,
275 target,
276 &fragment_id_str,
277 name,
278 ]);
279
280 Self {
281 unclean_state,
282 clean_state,
283 wait_next_poll_ns,
284 storage_write_size,
285 storage_write_count,
286 pause_duration_ns: storage_pause_duration_ns,
287 buffer_unconsumed_item_count,
288 buffer_unconsumed_row_count,
289 buffer_unconsumed_epoch_count,
290 buffer_unconsumed_min_epoch,
291 buffer_read_count,
292 buffer_read_size,
293 total_read_count,
294 total_read_size,
295 persistent_log_read_metrics: KvLogStoreReadMetrics {
296 storage_read_size: persistent_log_read_size,
297 storage_read_count: persistent_log_read_count,
298 },
299 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
300 storage_read_count: flushed_buffer_read_count,
301 storage_read_size: flushed_buffer_read_size,
302 },
303 }
304 }
305
306 #[cfg(test)]
307 pub(crate) fn for_test() -> Self {
308 SyncedKvLogStoreMetrics {
309 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
310 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
311 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
312 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
313 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
314 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
315 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
316 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
317 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
318 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
319 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
320 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
321 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
322 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
323 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
324 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
325 }
326 }
327 }
328}
329
330type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
331
332pub struct SyncedKvLogStoreExecutor<S: StateStore> {
333 actor_context: ActorContextRef,
334 table_id: TableId,
335 metrics: SyncedKvLogStoreMetrics,
336 serde: LogStoreRowSerde,
337
338 upstream: Executor,
340
341 state_store: S,
343 max_buffer_size: usize,
344
345 chunk_size: usize,
347
348 pause_duration_ms: Duration,
349
350 aligned: bool,
351}
352impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
354 #[expect(clippy::too_many_arguments)]
355 pub(crate) fn new(
356 actor_context: ActorContextRef,
357 table_id: TableId,
358 metrics: SyncedKvLogStoreMetrics,
359 serde: LogStoreRowSerde,
360 state_store: S,
361 buffer_size: usize,
362 chunk_size: usize,
363 upstream: Executor,
364 pause_duration_ms: Duration,
365 aligned: bool,
366 ) -> Self {
367 Self {
368 actor_context,
369 table_id,
370 metrics,
371 serde,
372 state_store,
373 upstream,
374 max_buffer_size: buffer_size,
375 chunk_size,
376 pause_duration_ms,
377 aligned,
378 }
379 }
380}
381
382struct FlushedChunkInfo {
383 epoch: u64,
384 start_seq_id: SeqId,
385 end_seq_id: SeqId,
386 flush_info: FlushInfo,
387 vnode_bitmap: Bitmap,
388}
389
390enum WriteFuture<S: LocalStateStore> {
391 Paused {
401 start_instant: Instant,
402 sleep_future: Option<Pin<Box<Sleep>>>,
403 barrier: Barrier,
404 stream: BoxedMessageStream,
405 write_state: LogStoreWriteState<S>, },
407 ReceiveFromUpstream {
408 future: StreamFuture<BoxedMessageStream>,
409 write_state: LogStoreWriteState<S>,
410 },
411 FlushingChunk {
412 epoch: u64,
413 start_seq_id: SeqId,
414 end_seq_id: SeqId,
415 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
416 stream: BoxedMessageStream,
417 },
418 Empty,
419}
420
421enum WriteFutureEvent {
422 UpstreamMessageReceived(Message),
423 ChunkFlushed(FlushedChunkInfo),
424}
425
426impl<S: LocalStateStore> WriteFuture<S> {
427 fn flush_chunk(
428 stream: BoxedMessageStream,
429 write_state: LogStoreWriteState<S>,
430 chunk: StreamChunk,
431 epoch: u64,
432 start_seq_id: SeqId,
433 end_seq_id: SeqId,
434 ) -> Self {
435 tracing::trace!(
436 start_seq_id,
437 end_seq_id,
438 epoch,
439 cardinality = chunk.cardinality(),
440 "write_future: flushing chunk"
441 );
442 Self::FlushingChunk {
443 epoch,
444 start_seq_id,
445 end_seq_id,
446 future: Box::pin(write_state.into_write_chunk_future(
447 chunk,
448 epoch,
449 start_seq_id,
450 end_seq_id,
451 )),
452 stream,
453 }
454 }
455
456 fn receive_from_upstream(
457 stream: BoxedMessageStream,
458 write_state: LogStoreWriteState<S>,
459 ) -> Self {
460 Self::ReceiveFromUpstream {
461 future: stream.into_future(),
462 write_state,
463 }
464 }
465
466 fn paused(
467 duration: Duration,
468 barrier: Barrier,
469 stream: BoxedMessageStream,
470 write_state: LogStoreWriteState<S>,
471 ) -> Self {
472 let now = Instant::now();
473 tracing::trace!(?now, ?duration, "write_future_pause");
474 Self::Paused {
475 start_instant: now,
476 sleep_future: Some(Box::pin(sleep_until(now + duration))),
477 barrier,
478 stream,
479 write_state,
480 }
481 }
482
483 async fn next_event(
484 &mut self,
485 metrics: &SyncedKvLogStoreMetrics,
486 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
487 match self {
488 WriteFuture::Paused {
489 start_instant,
490 sleep_future,
491 ..
492 } => {
493 if let Some(sleep_future) = sleep_future {
494 sleep_future.await;
495 metrics
496 .pause_duration_ns
497 .inc_by(start_instant.elapsed().as_nanos() as _);
498 tracing::trace!("resuming write future");
499 }
500 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
501 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
502 })
503 }
504 WriteFuture::ReceiveFromUpstream { future, .. } => {
505 let (opt, stream) = future.await;
506 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
507 opt
508 .ok_or_else(|| anyhow!("end of upstream input").into())
509 .and_then(|result| result.map(|item| {
510 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
511 }))
512 })
513 }
514 WriteFuture::FlushingChunk { future, .. } => {
515 let (write_state, result) = future.await;
516 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
517 result.map(|(flush_info, vnode_bitmap)| {
518 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
519 epoch,
520 start_seq_id,
521 end_seq_id,
522 flush_info,
523 vnode_bitmap,
524 }))
525 })
526 });
527 result.map_err(Into::into)
528 }
529 WriteFuture::Empty => {
530 unreachable!("should not be polled after ready")
531 }
532 }
533 }
534}
535
536impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
538 #[try_stream(ok= Message, error = StreamExecutorError)]
539 pub async fn execute_monitored(self) {
540 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
541 #[for_await]
542 for message in self.execute_inner() {
543 let current_time = Instant::now();
544 yield message?;
545 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
546 }
547 }
548
549 #[try_stream(ok = Message, error = StreamExecutorError)]
550 async fn execute_inner(self) {
551 let mut input = self.upstream.execute();
552
553 let first_barrier = expect_first_barrier(&mut input).await?;
555 let first_write_epoch = first_barrier.epoch;
556 yield Message::Barrier(first_barrier.clone());
557
558 let local_state_store = self
559 .state_store
560 .new_local(NewLocalOptions {
561 table_id: self.table_id,
562 op_consistency_level: OpConsistencyLevel::Inconsistent,
563 table_option: TableOption {
564 retention_seconds: None,
565 },
566 is_replicated: false,
567 vnodes: self.serde.vnodes().clone(),
568 upload_on_flush: false,
569 })
570 .await;
571
572 let (mut read_state, mut initial_write_state) = new_log_store_state(
573 self.table_id,
574 local_state_store,
575 self.serde,
576 self.chunk_size,
577 );
578 initial_write_state.init(first_write_epoch).await?;
579
580 let mut pause_stream = first_barrier.is_pause_on_startup();
581 let mut initial_write_epoch = first_write_epoch;
582
583 if self.aligned {
584 tracing::info!("aligned mode");
585 let log_store_stream = read_state
590 .read_persisted_log_store(
591 self.metrics.persistent_log_read_metrics.clone(),
592 initial_write_epoch.curr,
593 LogStoreReadStateStreamRangeStart::Unbounded,
594 )
595 .await?;
596
597 #[for_await]
598 for message in log_store_stream {
599 let (_epoch, message) = message?;
600 match message {
601 KvLogStoreItem::Barrier { .. } => {
602 continue;
603 }
604 KvLogStoreItem::StreamChunk { chunk, .. } => {
605 yield Message::Chunk(chunk);
606 }
607 }
608 }
609
610 let mut realigned_logstore = false;
611
612 #[for_await]
613 for message in input {
614 match message? {
615 Message::Barrier(barrier) => {
616 let is_checkpoint = barrier.is_checkpoint();
617 let mut progress = LogStoreVnodeProgress::None;
618 progress.apply_aligned(
619 read_state.vnodes().clone(),
620 barrier.epoch.prev,
621 None,
622 );
623 let post_seal = initial_write_state
625 .seal_current_epoch(barrier.epoch.curr, progress.take());
626 let update_vnode_bitmap =
627 barrier.as_update_vnode_bitmap(self.actor_context.id);
628 yield Message::Barrier(barrier);
629 post_seal.post_yield_barrier(update_vnode_bitmap).await?;
630 if !realigned_logstore && is_checkpoint {
631 realigned_logstore = true;
632 tracing::info!("realigned logstore");
633 }
634 }
635 Message::Chunk(chunk) => {
636 yield Message::Chunk(chunk);
637 }
638 Message::Watermark(watermark) => {
639 yield Message::Watermark(watermark);
640 }
641 }
642 }
643
644 return Ok(());
645 }
646
647 'recreate_consume_stream: loop {
651 let mut seq_id = FIRST_SEQ_ID;
652 let mut buffer = SyncedLogStoreBuffer {
653 buffer: VecDeque::new(),
654 current_size: 0,
655 max_size: self.max_buffer_size,
656 max_chunk_size: self.chunk_size,
657 next_chunk_id: 0,
658 metrics: self.metrics.clone(),
659 flushed_count: 0,
660 };
661
662 let log_store_stream = read_state
663 .read_persisted_log_store(
664 self.metrics.persistent_log_read_metrics.clone(),
665 initial_write_epoch.curr,
666 LogStoreReadStateStreamRangeStart::Unbounded,
667 )
668 .await?;
669
670 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
671 let mut clean_state = log_store_stream.peek().await.is_none();
672 tracing::trace!(?clean_state);
673
674 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
675
676 let mut write_future_state =
677 WriteFuture::receive_from_upstream(input, initial_write_state);
678
679 let mut progress = LogStoreVnodeProgress::None;
680
681 loop {
682 let select_result = {
683 let read_future = async {
684 if pause_stream {
685 pending().await
686 } else {
687 read_future_state
688 .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
689 .await
690 }
691 };
692 pin_mut!(read_future);
693 let write_future = write_future_state.next_event(&self.metrics);
694 pin_mut!(write_future);
695 let output = select(write_future, read_future).await;
696 drop_either_future(output)
697 };
698 match select_result {
699 Either::Left(result) => {
700 drop(write_future_state);
702 let (stream, mut write_state, either) = result?;
703 match either {
704 WriteFutureEvent::UpstreamMessageReceived(msg) => {
705 match msg {
706 Message::Barrier(barrier) => {
707 if clean_state
708 && barrier.kind.is_checkpoint()
709 && !buffer.is_empty()
710 {
711 write_future_state = WriteFuture::paused(
712 self.pause_duration_ms,
713 barrier,
714 stream,
715 write_state,
716 );
717 clean_state = false;
718 self.metrics.unclean_state.inc();
719 } else {
720 if let Some(mutation) = barrier.mutation.as_deref() {
721 match mutation {
722 Mutation::Pause => {
723 pause_stream = true;
724 }
725 Mutation::Resume => {
726 pause_stream = false;
727 }
728 _ => {}
729 }
730 }
731 let write_state_post_write_barrier =
732 Self::write_barrier(
733 self.actor_context.id,
734 &mut write_state,
735 barrier.clone(),
736 &self.metrics,
737 progress.take(),
738 &mut buffer,
739 )
740 .await?;
741 seq_id = FIRST_SEQ_ID;
742 let update_vnode_bitmap = barrier
743 .as_update_vnode_bitmap(self.actor_context.id);
744 let barrier_epoch = barrier.epoch;
745 tracing::trace!(
746 ?update_vnode_bitmap,
747 actor_id = %self.actor_context.id,
748 "update vnode bitmap"
749 );
750
751 yield Message::Barrier(barrier);
752
753 write_state_post_write_barrier
754 .post_yield_barrier(update_vnode_bitmap.clone())
755 .await?;
756 if let Some(vnode_bitmap) = update_vnode_bitmap {
757 read_state.update_vnode_bitmap(vnode_bitmap);
759 initial_write_epoch = barrier_epoch;
760 input = stream;
761 initial_write_state = write_state;
762 continue 'recreate_consume_stream;
763 } else {
764 write_future_state =
765 WriteFuture::receive_from_upstream(
766 stream,
767 write_state,
768 );
769 }
770 }
771 }
772 Message::Chunk(chunk) => {
773 let start_seq_id = seq_id;
774 let new_seq_id = seq_id + chunk.cardinality() as SeqId;
775 let end_seq_id = new_seq_id - 1;
776 let epoch = write_state.epoch().curr;
777 tracing::trace!(
778 start_seq_id,
779 end_seq_id,
780 new_seq_id,
781 epoch,
782 cardinality = chunk.cardinality(),
783 "received chunk"
784 );
785 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
786 start_seq_id,
787 end_seq_id,
788 chunk,
789 epoch,
790 ) {
791 seq_id = new_seq_id;
792 write_future_state = WriteFuture::flush_chunk(
793 stream,
794 write_state,
795 chunk_to_flush,
796 epoch,
797 start_seq_id,
798 end_seq_id,
799 );
800 } else {
801 seq_id = new_seq_id;
802 write_future_state = WriteFuture::receive_from_upstream(
803 stream,
804 write_state,
805 );
806 }
807 }
808 Message::Watermark(_watermark) => {
811 write_future_state =
812 WriteFuture::receive_from_upstream(stream, write_state);
813 }
814 }
815 }
816 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
817 start_seq_id,
818 end_seq_id,
819 epoch,
820 flush_info,
821 vnode_bitmap,
822 }) => {
823 buffer.add_flushed_item_to_buffer(
824 start_seq_id,
825 end_seq_id,
826 vnode_bitmap,
827 epoch,
828 );
829 self.metrics
830 .storage_write_count
831 .inc_by(flush_info.flush_count as _);
832 self.metrics
833 .storage_write_size
834 .inc_by(flush_info.flush_size as _);
835 write_future_state =
836 WriteFuture::receive_from_upstream(stream, write_state);
837 }
838 }
839 }
840 Either::Right(result) => {
841 if !clean_state
842 && matches!(read_future_state, ReadFuture::Idle)
843 && buffer.is_empty()
844 {
845 clean_state = true;
846 self.metrics.clean_state.inc();
847
848 if let WriteFuture::Paused { sleep_future, .. } =
850 &mut write_future_state
851 {
852 tracing::trace!("resuming paused future");
853 assert!(buffer.current_size < self.max_buffer_size);
854 *sleep_future = None;
855 }
856 }
857 let chunk = result?;
858 self.metrics
859 .total_read_count
860 .inc_by(chunk.cardinality() as _);
861
862 yield Message::Chunk(chunk);
863 }
864 }
865 }
866 }
867 }
868}
869
870type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
871
872enum ReadFuture<S: StateStoreRead> {
873 ReadingPersistedStream(PersistedStream<S>),
874 ReadingFlushedChunk {
875 future: ReadFlushedChunkFuture,
876 end_seq_id: SeqId,
877 },
878 Idle,
879}
880
881impl<S: StateStoreRead> ReadFuture<S> {
883 async fn next_chunk(
884 &mut self,
885 progress: &mut LogStoreVnodeProgress,
886 read_state: &LogStoreReadState<S>,
887 buffer: &mut SyncedLogStoreBuffer,
888 metrics: &SyncedKvLogStoreMetrics,
889 ) -> StreamExecutorResult<StreamChunk> {
890 match self {
891 ReadFuture::ReadingPersistedStream(stream) => {
892 while let Some((epoch, item)) = stream.try_next().await? {
893 match item {
894 KvLogStoreItem::Barrier { vnodes, .. } => {
895 tracing::trace!(epoch, "read logstore barrier");
896 progress.apply_aligned(vnodes, epoch, None);
898 continue;
899 }
900 KvLogStoreItem::StreamChunk {
901 chunk,
902 progress: chunk_progress,
903 } => {
904 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
905 progress.apply_per_vnode(epoch, chunk_progress);
906 return Ok(chunk);
907 }
908 }
909 }
910 *self = ReadFuture::Idle;
911 }
912 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
913 }
914 match self {
915 ReadFuture::ReadingPersistedStream(_) => {
916 unreachable!("must have finished read persisted stream when reaching here")
917 }
918 ReadFuture::ReadingFlushedChunk { .. } => {}
919 ReadFuture::Idle => loop {
920 let Some((item_epoch, item)) = buffer.pop_front() else {
921 return pending().await;
922 };
923 match item {
924 LogStoreBufferItem::StreamChunk {
925 chunk,
926 start_seq_id,
927 end_seq_id,
928 flushed,
929 ..
930 } => {
931 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
932 tracing::trace!(
933 start_seq_id,
934 end_seq_id,
935 flushed,
936 cardinality = chunk.cardinality(),
937 "read buffered chunk of size"
938 );
939 progress.apply_aligned(
940 read_state.vnodes().clone(),
941 item_epoch,
942 Some(end_seq_id),
943 );
944 return Ok(chunk);
945 }
946 LogStoreBufferItem::Flushed {
947 vnode_bitmap,
948 start_seq_id,
949 end_seq_id,
950 chunk_id,
951 } => {
952 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
953 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
954 let future = read_state
955 .read_flushed_chunk(
956 vnode_bitmap,
957 chunk_id,
958 start_seq_id,
959 end_seq_id,
960 item_epoch,
961 read_metrics,
962 )
963 .boxed();
964 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
965 break;
966 }
967 LogStoreBufferItem::Barrier { .. } => {
968 tracing::trace!(item_epoch, "read buffer barrier");
969 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
970 continue;
971 }
972 }
973 },
974 }
975
976 let (future, end_seq_id) = match self {
977 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
978 unreachable!("should be at ReadingFlushedChunk")
979 }
980 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
981 };
982
983 let (_, chunk, epoch) = future.await?;
984 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
985 tracing::trace!(
986 end_seq_id,
987 "read flushed chunk of size: {}",
988 chunk.cardinality()
989 );
990 *self = ReadFuture::Idle;
991 Ok(chunk)
992 }
993}
994
995impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
997 async fn write_barrier<'a>(
998 actor_id: ActorId,
999 write_state: &'a mut LogStoreWriteState<S::Local>,
1000 barrier: Barrier,
1001 metrics: &SyncedKvLogStoreMetrics,
1002 progress: LogStoreVnodeProgress,
1003 buffer: &mut SyncedLogStoreBuffer,
1004 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1005 tracing::trace!(%actor_id, ?progress, "applying truncation");
1006 let epoch = barrier.epoch.prev;
1010 let mut writer = write_state.start_writer(false);
1011 writer.write_barrier(epoch, barrier.is_checkpoint())?;
1012
1013 if barrier.is_checkpoint() {
1014 for (epoch, item) in buffer.buffer.iter_mut().rev() {
1015 match item {
1016 LogStoreBufferItem::StreamChunk {
1017 chunk,
1018 start_seq_id,
1019 end_seq_id,
1020 flushed,
1021 ..
1022 } => {
1023 if !*flushed {
1024 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1025 *flushed = true;
1026 } else {
1027 break;
1028 }
1029 }
1030 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1031 }
1032 }
1033 }
1034
1035 let (flush_info, _) = writer.finish().await?;
1037 metrics
1038 .storage_write_count
1039 .inc_by(flush_info.flush_count as _);
1040 metrics
1041 .storage_write_size
1042 .inc_by(flush_info.flush_size as _);
1043 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1044
1045 buffer.buffer.push_back((
1047 epoch,
1048 LogStoreBufferItem::Barrier {
1049 is_checkpoint: barrier.is_checkpoint(),
1050 next_epoch: barrier.epoch.curr,
1051 add_columns: None,
1052 is_stop: false,
1053 },
1054 ));
1055 buffer.next_chunk_id = 0;
1056 buffer.update_unconsumed_buffer_metrics();
1057
1058 Ok(post_seal)
1059 }
1060}
1061
1062struct SyncedLogStoreBuffer {
1063 buffer: VecDeque<(u64, LogStoreBufferItem)>,
1064 current_size: usize,
1065 max_size: usize,
1066 max_chunk_size: usize,
1067 next_chunk_id: ChunkId,
1068 metrics: SyncedKvLogStoreMetrics,
1069 flushed_count: usize,
1070}
1071
1072impl SyncedLogStoreBuffer {
1073 fn is_empty(&self) -> bool {
1074 self.current_size == 0
1075 }
1076
1077 fn add_or_flush_chunk(
1078 &mut self,
1079 start_seq_id: SeqId,
1080 end_seq_id: SeqId,
1081 chunk: StreamChunk,
1082 epoch: u64,
1083 ) -> Option<StreamChunk> {
1084 let current_size = self.current_size;
1085 let chunk_size = chunk.cardinality();
1086
1087 tracing::trace!(
1088 current_size,
1089 chunk_size,
1090 max_size = self.max_size,
1091 "checking chunk size"
1092 );
1093 let should_flush_chunk = current_size + chunk_size > self.max_size;
1094 if should_flush_chunk {
1095 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1096 Some(chunk)
1097 } else {
1098 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1099 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1100 None
1101 }
1102 }
1103
1104 fn add_flushed_item_to_buffer(
1107 &mut self,
1108 start_seq_id: SeqId,
1109 end_seq_id: SeqId,
1110 new_vnode_bitmap: Bitmap,
1111 epoch: u64,
1112 ) {
1113 let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1114
1115 if let Some((
1116 item_epoch,
1117 LogStoreBufferItem::Flushed {
1118 start_seq_id: prev_start_seq_id,
1119 end_seq_id: prev_end_seq_id,
1120 vnode_bitmap,
1121 ..
1122 },
1123 )) = self.buffer.back_mut()
1124 && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1125 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1126 && projected_flushed_chunk_size <= self.max_chunk_size
1127 {
1128 assert!(
1129 *prev_end_seq_id < start_seq_id,
1130 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1131 end_seq_id,
1132 start_seq_id
1133 );
1134 assert_eq!(
1135 epoch, *item_epoch,
1136 "epoch of newly added flushed item must be the same as the last flushed item"
1137 );
1138 *prev_end_seq_id = end_seq_id;
1139 *vnode_bitmap |= new_vnode_bitmap;
1140 } else {
1141 let chunk_id = self.next_chunk_id;
1142 self.next_chunk_id += 1;
1143 self.buffer.push_back((
1144 epoch,
1145 LogStoreBufferItem::Flushed {
1146 start_seq_id,
1147 end_seq_id,
1148 vnode_bitmap: new_vnode_bitmap,
1149 chunk_id,
1150 },
1151 ));
1152 self.flushed_count += 1;
1153 tracing::trace!(
1154 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1155 );
1156 }
1157 self.update_unconsumed_buffer_metrics();
1159 }
1160
1161 fn add_chunk_to_buffer(
1162 &mut self,
1163 chunk: StreamChunk,
1164 start_seq_id: SeqId,
1165 end_seq_id: SeqId,
1166 epoch: u64,
1167 ) {
1168 let chunk_id = self.next_chunk_id;
1169 self.next_chunk_id += 1;
1170 self.current_size += chunk.cardinality();
1171 self.buffer.push_back((
1172 epoch,
1173 LogStoreBufferItem::StreamChunk {
1174 chunk,
1175 start_seq_id,
1176 end_seq_id,
1177 flushed: false,
1178 chunk_id,
1179 },
1180 ));
1181 self.update_unconsumed_buffer_metrics();
1182 }
1183
1184 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1185 let item = self.buffer.pop_front();
1186 match &item {
1187 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1188 self.flushed_count -= 1;
1189 }
1190 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1191 self.current_size -= chunk.cardinality();
1192 }
1193 _ => {}
1194 }
1195 self.update_unconsumed_buffer_metrics();
1196 item
1197 }
1198
1199 fn update_unconsumed_buffer_metrics(&self) {
1200 let mut epoch_count = 0;
1201 let mut row_count = 0;
1202 for (_, item) in &self.buffer {
1203 match item {
1204 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1205 row_count += chunk.cardinality();
1206 }
1207 LogStoreBufferItem::Flushed {
1208 start_seq_id,
1209 end_seq_id,
1210 ..
1211 } => {
1212 row_count += (end_seq_id - start_seq_id) as usize;
1213 }
1214 LogStoreBufferItem::Barrier { .. } => {
1215 epoch_count += 1;
1216 }
1217 }
1218 }
1219 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1220 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1221 self.metrics
1222 .buffer_unconsumed_item_count
1223 .set(self.buffer.len() as _);
1224 self.metrics.buffer_unconsumed_min_epoch.set(
1225 self.buffer
1226 .front()
1227 .map(|(epoch, _)| *epoch)
1228 .unwrap_or_default() as _,
1229 );
1230 }
1231}
1232
1233impl<S> Execute for SyncedKvLogStoreExecutor<S>
1234where
1235 S: StateStore,
1236{
1237 fn execute(self: Box<Self>) -> BoxedMessageStream {
1238 self.execute_monitored().boxed()
1239 }
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244 use itertools::Itertools;
1245 use pretty_assertions::assert_eq;
1246 use risingwave_common::catalog::Field;
1247 use risingwave_common::hash::VirtualNode;
1248 use risingwave_common::test_prelude::*;
1249 use risingwave_common::util::epoch::test_epoch;
1250 use risingwave_storage::memory::MemoryStateStore;
1251
1252 use super::*;
1253 use crate::assert_stream_chunk_eq;
1254 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1255 use crate::common::log_store_impl::kv_log_store::test_utils::{
1256 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1257 };
1258 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1259 use crate::executor::test_utils::MockSource;
1260
1261 fn init_logger() {
1262 let _ = tracing_subscriber::fmt()
1263 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1264 .with_ansi(false)
1265 .try_init();
1266 }
1267
1268 #[tokio::test]
1270 async fn test_read_write_buffer() {
1271 init_logger();
1272
1273 let pk_info = &KV_LOG_STORE_V2_INFO;
1274 let column_descs = test_payload_schema(pk_info);
1275 let fields = column_descs
1276 .into_iter()
1277 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1278 .collect_vec();
1279 let schema = Schema { fields };
1280 let stream_key = vec![0];
1281 let (mut tx, source) = MockSource::channel();
1282 let source = source.into_executor(schema.clone(), stream_key.clone());
1283
1284 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1285
1286 let table = gen_test_log_store_table(pk_info);
1287
1288 let log_store_executor = SyncedKvLogStoreExecutor::new(
1289 ActorContext::for_test(123),
1290 table.id,
1291 SyncedKvLogStoreMetrics::for_test(),
1292 LogStoreRowSerde::new(&table, vnodes, pk_info),
1293 MemoryStateStore::new(),
1294 10,
1295 256,
1296 source,
1297 Duration::from_millis(256),
1298 false,
1299 )
1300 .boxed();
1301
1302 tx.push_barrier(test_epoch(1), false);
1304
1305 let chunk_1 = StreamChunk::from_pretty(
1306 " I T
1307 + 5 10
1308 + 6 10
1309 + 8 10
1310 + 9 10
1311 + 10 11",
1312 );
1313
1314 let chunk_2 = StreamChunk::from_pretty(
1315 " I T
1316 - 5 10
1317 - 6 10
1318 - 8 10
1319 U- 9 10
1320 U+ 10 11",
1321 );
1322
1323 tx.push_chunk(chunk_1.clone());
1324 tx.push_chunk(chunk_2.clone());
1325
1326 let mut stream = log_store_executor.execute();
1327
1328 match stream.next().await {
1329 Some(Ok(Message::Barrier(barrier))) => {
1330 assert_eq!(barrier.epoch.curr, test_epoch(1));
1331 }
1332 other => panic!("Expected a barrier message, got {:?}", other),
1333 }
1334
1335 match stream.next().await {
1336 Some(Ok(Message::Chunk(chunk))) => {
1337 assert_stream_chunk_eq!(chunk, chunk_1);
1338 }
1339 other => panic!("Expected a chunk message, got {:?}", other),
1340 }
1341
1342 match stream.next().await {
1343 Some(Ok(Message::Chunk(chunk))) => {
1344 assert_stream_chunk_eq!(chunk, chunk_2);
1345 }
1346 other => panic!("Expected a chunk message, got {:?}", other),
1347 }
1348
1349 tx.push_barrier(test_epoch(2), false);
1350
1351 match stream.next().await {
1352 Some(Ok(Message::Barrier(barrier))) => {
1353 assert_eq!(barrier.epoch.curr, test_epoch(2));
1354 }
1355 other => panic!("Expected a barrier message, got {:?}", other),
1356 }
1357 }
1358
1359 #[tokio::test]
1365 async fn test_barrier_persisted_read() {
1366 init_logger();
1367
1368 let pk_info = &KV_LOG_STORE_V2_INFO;
1369 let column_descs = test_payload_schema(pk_info);
1370 let fields = column_descs
1371 .into_iter()
1372 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1373 .collect_vec();
1374 let schema = Schema { fields };
1375 let stream_key = vec![0];
1376 let (mut tx, source) = MockSource::channel();
1377 let source = source.into_executor(schema.clone(), stream_key.clone());
1378
1379 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1380
1381 let table = gen_test_log_store_table(pk_info);
1382
1383 let log_store_executor = SyncedKvLogStoreExecutor::new(
1384 ActorContext::for_test(123),
1385 table.id,
1386 SyncedKvLogStoreMetrics::for_test(),
1387 LogStoreRowSerde::new(&table, vnodes, pk_info),
1388 MemoryStateStore::new(),
1389 10,
1390 256,
1391 source,
1392 Duration::from_millis(256),
1393 false,
1394 )
1395 .boxed();
1396
1397 tx.push_barrier(test_epoch(1), false);
1399
1400 let chunk_1 = StreamChunk::from_pretty(
1401 " I T
1402 + 5 10
1403 + 6 10
1404 + 8 10
1405 + 9 10
1406 + 10 11",
1407 );
1408
1409 let chunk_2 = StreamChunk::from_pretty(
1410 " I T
1411 - 5 10
1412 - 6 10
1413 - 8 10
1414 U- 10 11
1415 U+ 10 10",
1416 );
1417
1418 tx.push_chunk(chunk_1.clone());
1419 tx.push_chunk(chunk_2.clone());
1420
1421 tx.push_barrier(test_epoch(2), false);
1422
1423 let mut stream = log_store_executor.execute();
1424
1425 match stream.next().await {
1426 Some(Ok(Message::Barrier(barrier))) => {
1427 assert_eq!(barrier.epoch.curr, test_epoch(1));
1428 }
1429 other => panic!("Expected a barrier message, got {:?}", other),
1430 }
1431
1432 match stream.next().await {
1433 Some(Ok(Message::Chunk(chunk))) => {
1434 assert_stream_chunk_eq!(chunk, chunk_1);
1435 }
1436 other => panic!("Expected a chunk message, got {:?}", other),
1437 }
1438
1439 match stream.next().await {
1440 Some(Ok(Message::Chunk(chunk))) => {
1441 assert_stream_chunk_eq!(chunk, chunk_2);
1442 }
1443 other => panic!("Expected a chunk message, got {:?}", other),
1444 }
1445
1446 match stream.next().await {
1447 Some(Ok(Message::Barrier(barrier))) => {
1448 assert_eq!(barrier.epoch.curr, test_epoch(2));
1449 }
1450 other => panic!("Expected a barrier message, got {:?}", other),
1451 }
1452 }
1453
1454 #[tokio::test]
1457 async fn test_max_chunk_persisted_read() {
1458 init_logger();
1459
1460 let pk_info = &KV_LOG_STORE_V2_INFO;
1461 let column_descs = test_payload_schema(pk_info);
1462 let fields = column_descs
1463 .into_iter()
1464 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1465 .collect_vec();
1466 let schema = Schema { fields };
1467 let stream_key = vec![0];
1468 let (mut tx, source) = MockSource::channel();
1469 let source = source.into_executor(schema.clone(), stream_key.clone());
1470
1471 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1472
1473 let table = gen_test_log_store_table(pk_info);
1474
1475 let log_store_executor = SyncedKvLogStoreExecutor::new(
1476 ActorContext::for_test(123),
1477 table.id,
1478 SyncedKvLogStoreMetrics::for_test(),
1479 LogStoreRowSerde::new(&table, vnodes, pk_info),
1480 MemoryStateStore::new(),
1481 0,
1482 256,
1483 source,
1484 Duration::from_millis(256),
1485 false,
1486 )
1487 .boxed();
1488
1489 tx.push_barrier(test_epoch(1), false);
1491
1492 let chunk_1 = StreamChunk::from_pretty(
1493 " I T
1494 + 5 10
1495 + 6 10
1496 + 8 10
1497 + 9 10
1498 + 10 11",
1499 );
1500
1501 let chunk_2 = StreamChunk::from_pretty(
1502 " I T
1503 - 5 10
1504 - 6 10
1505 - 8 10
1506 U- 10 11
1507 U+ 10 10",
1508 );
1509
1510 tx.push_chunk(chunk_1.clone());
1511 tx.push_chunk(chunk_2.clone());
1512
1513 tx.push_barrier(test_epoch(2), false);
1514
1515 let mut stream = log_store_executor.execute();
1516
1517 for i in 1..=2 {
1518 match stream.next().await {
1519 Some(Ok(Message::Barrier(barrier))) => {
1520 assert_eq!(barrier.epoch.curr, test_epoch(i));
1521 }
1522 other => panic!("Expected a barrier message, got {:?}", other),
1523 }
1524 }
1525
1526 match stream.next().await {
1527 Some(Ok(Message::Chunk(actual))) => {
1528 let expected = StreamChunk::from_pretty(
1529 " I T
1530 + 5 10
1531 + 6 10
1532 + 8 10
1533 + 9 10
1534 + 10 11
1535 - 5 10
1536 - 6 10
1537 - 8 10
1538 U- 10 11
1539 U+ 10 10",
1540 );
1541 assert_stream_chunk_eq!(actual, expected);
1542 }
1543 other => panic!("Expected a chunk message, got {:?}", other),
1544 }
1545 }
1546}