1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93 LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105 use risingwave_common::id::FragmentId;
106 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
107
108 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
109 use crate::executor::monitor::StreamingMetrics;
110 use crate::task::ActorId;
111
112 #[derive(Clone)]
113 pub struct SyncedKvLogStoreMetrics {
114 pub unclean_state: LabelGuardedIntCounter,
116 pub clean_state: LabelGuardedIntCounter,
117 pub wait_next_poll_ns: LabelGuardedIntCounter,
118
119 pub storage_write_count: LabelGuardedIntCounter,
121 pub storage_write_size: LabelGuardedIntCounter,
122 pub pause_duration_ns: LabelGuardedIntCounter,
123
124 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
126 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
127 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
128 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
129 pub buffer_read_count: LabelGuardedIntCounter,
130 pub buffer_read_size: LabelGuardedIntCounter,
131
132 pub total_read_count: LabelGuardedIntCounter,
134 pub total_read_size: LabelGuardedIntCounter,
135 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
136 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
137 }
138
139 impl SyncedKvLogStoreMetrics {
140 pub(crate) fn new(
147 metrics: &StreamingMetrics,
148 actor_id: ActorId,
149 fragment_id: FragmentId,
150 name: &str,
151 target: &'static str,
152 ) -> Self {
153 let actor_id_str = actor_id.to_string();
154 let fragment_id_str = fragment_id.to_string();
155 let labels = &[&actor_id_str, target, &fragment_id_str, name];
156
157 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
158 "dirty",
159 &actor_id_str,
160 target,
161 &fragment_id_str,
162 name,
163 ]);
164 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
165 "clean",
166 &actor_id_str,
167 target,
168 &fragment_id_str,
169 name,
170 ]);
171 let wait_next_poll_ns = metrics
172 .sync_kv_log_store_wait_next_poll_ns
173 .with_guarded_label_values(labels);
174
175 let storage_write_size = metrics
176 .sync_kv_log_store_storage_write_size
177 .with_guarded_label_values(labels);
178 let storage_write_count = metrics
179 .sync_kv_log_store_storage_write_count
180 .with_guarded_label_values(labels);
181 let storage_pause_duration_ns = metrics
182 .sync_kv_log_store_write_pause_duration_ns
183 .with_guarded_label_values(labels);
184
185 let buffer_unconsumed_item_count = metrics
186 .sync_kv_log_store_buffer_unconsumed_item_count
187 .with_guarded_label_values(labels);
188 let buffer_unconsumed_row_count = metrics
189 .sync_kv_log_store_buffer_unconsumed_row_count
190 .with_guarded_label_values(labels);
191 let buffer_unconsumed_epoch_count = metrics
192 .sync_kv_log_store_buffer_unconsumed_epoch_count
193 .with_guarded_label_values(labels);
194 let buffer_unconsumed_min_epoch = metrics
195 .sync_kv_log_store_buffer_unconsumed_min_epoch
196 .with_guarded_label_values(labels);
197 let buffer_read_count = metrics
198 .sync_kv_log_store_read_count
199 .with_guarded_label_values(&[
200 "buffer",
201 &actor_id_str,
202 target,
203 &fragment_id_str,
204 name,
205 ]);
206
207 let buffer_read_size = metrics
208 .sync_kv_log_store_read_size
209 .with_guarded_label_values(&[
210 "buffer",
211 &actor_id_str,
212 target,
213 &fragment_id_str,
214 name,
215 ]);
216
217 let total_read_count = metrics
218 .sync_kv_log_store_read_count
219 .with_guarded_label_values(&[
220 "total",
221 &actor_id_str,
222 target,
223 &fragment_id_str,
224 name,
225 ]);
226
227 let total_read_size = metrics
228 .sync_kv_log_store_read_size
229 .with_guarded_label_values(&[
230 "total",
231 &actor_id_str,
232 target,
233 &fragment_id_str,
234 name,
235 ]);
236
237 const READ_PERSISTENT_LOG: &str = "persistent_log";
238 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
239
240 let persistent_log_read_size = metrics
241 .sync_kv_log_store_read_size
242 .with_guarded_label_values(&[
243 READ_PERSISTENT_LOG,
244 &actor_id_str,
245 target,
246 &fragment_id_str,
247 name,
248 ]);
249
250 let persistent_log_read_count = metrics
251 .sync_kv_log_store_read_count
252 .with_guarded_label_values(&[
253 READ_PERSISTENT_LOG,
254 &actor_id_str,
255 target,
256 &fragment_id_str,
257 name,
258 ]);
259
260 let flushed_buffer_read_size = metrics
261 .sync_kv_log_store_read_size
262 .with_guarded_label_values(&[
263 READ_FLUSHED_BUFFER,
264 &actor_id_str,
265 target,
266 &fragment_id_str,
267 name,
268 ]);
269
270 let flushed_buffer_read_count = metrics
271 .sync_kv_log_store_read_count
272 .with_guarded_label_values(&[
273 READ_FLUSHED_BUFFER,
274 &actor_id_str,
275 target,
276 &fragment_id_str,
277 name,
278 ]);
279
280 Self {
281 unclean_state,
282 clean_state,
283 wait_next_poll_ns,
284 storage_write_size,
285 storage_write_count,
286 pause_duration_ns: storage_pause_duration_ns,
287 buffer_unconsumed_item_count,
288 buffer_unconsumed_row_count,
289 buffer_unconsumed_epoch_count,
290 buffer_unconsumed_min_epoch,
291 buffer_read_count,
292 buffer_read_size,
293 total_read_count,
294 total_read_size,
295 persistent_log_read_metrics: KvLogStoreReadMetrics {
296 storage_read_size: persistent_log_read_size,
297 storage_read_count: persistent_log_read_count,
298 },
299 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
300 storage_read_count: flushed_buffer_read_count,
301 storage_read_size: flushed_buffer_read_size,
302 },
303 }
304 }
305
306 #[cfg(test)]
307 pub(crate) fn for_test() -> Self {
308 SyncedKvLogStoreMetrics {
309 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
310 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
311 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
312 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
313 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
314 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
315 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
316 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
317 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
318 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
319 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
320 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
321 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
322 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
323 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
324 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
325 }
326 }
327 }
328}
329
330type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
331
332pub struct SyncedKvLogStoreExecutor<S: StateStore> {
333 actor_context: ActorContextRef,
334 table_id: TableId,
335 metrics: SyncedKvLogStoreMetrics,
336 serde: LogStoreRowSerde,
337
338 upstream: Executor,
340
341 state_store: S,
343 max_buffer_size: usize,
344
345 chunk_size: usize,
347
348 pause_duration_ms: Duration,
349
350 aligned: bool,
351}
352impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
354 #[expect(clippy::too_many_arguments)]
355 pub(crate) fn new(
356 actor_context: ActorContextRef,
357 table_id: TableId,
358 metrics: SyncedKvLogStoreMetrics,
359 serde: LogStoreRowSerde,
360 state_store: S,
361 buffer_size: usize,
362 chunk_size: usize,
363 upstream: Executor,
364 pause_duration_ms: Duration,
365 aligned: bool,
366 ) -> Self {
367 Self {
368 actor_context,
369 table_id,
370 metrics,
371 serde,
372 state_store,
373 upstream,
374 max_buffer_size: buffer_size,
375 chunk_size,
376 pause_duration_ms,
377 aligned,
378 }
379 }
380}
381
382struct FlushedChunkInfo {
383 epoch: u64,
384 start_seq_id: SeqId,
385 end_seq_id: SeqId,
386 flush_info: FlushInfo,
387 vnode_bitmap: Bitmap,
388}
389
390enum WriteFuture<S: LocalStateStore> {
391 Paused {
401 start_instant: Instant,
402 sleep_future: Option<Pin<Box<Sleep>>>,
403 barrier: Barrier,
404 stream: BoxedMessageStream,
405 write_state: LogStoreWriteState<S>, },
407 ReceiveFromUpstream {
408 future: StreamFuture<BoxedMessageStream>,
409 write_state: LogStoreWriteState<S>,
410 },
411 FlushingChunk {
412 epoch: u64,
413 start_seq_id: SeqId,
414 end_seq_id: SeqId,
415 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
416 stream: BoxedMessageStream,
417 },
418 Empty,
419}
420
421enum WriteFutureEvent {
422 UpstreamMessageReceived(Message),
423 ChunkFlushed(FlushedChunkInfo),
424}
425
426impl<S: LocalStateStore> WriteFuture<S> {
427 fn flush_chunk(
428 stream: BoxedMessageStream,
429 write_state: LogStoreWriteState<S>,
430 chunk: StreamChunk,
431 epoch: u64,
432 start_seq_id: SeqId,
433 end_seq_id: SeqId,
434 ) -> Self {
435 tracing::trace!(
436 start_seq_id,
437 end_seq_id,
438 epoch,
439 cardinality = chunk.cardinality(),
440 "write_future: flushing chunk"
441 );
442 Self::FlushingChunk {
443 epoch,
444 start_seq_id,
445 end_seq_id,
446 future: Box::pin(write_state.into_write_chunk_future(
447 chunk,
448 epoch,
449 start_seq_id,
450 end_seq_id,
451 )),
452 stream,
453 }
454 }
455
456 fn receive_from_upstream(
457 stream: BoxedMessageStream,
458 write_state: LogStoreWriteState<S>,
459 ) -> Self {
460 Self::ReceiveFromUpstream {
461 future: stream.into_future(),
462 write_state,
463 }
464 }
465
466 fn paused(
467 duration: Duration,
468 barrier: Barrier,
469 stream: BoxedMessageStream,
470 write_state: LogStoreWriteState<S>,
471 ) -> Self {
472 let now = Instant::now();
473 tracing::trace!(?now, ?duration, "write_future_pause");
474 Self::Paused {
475 start_instant: now,
476 sleep_future: Some(Box::pin(sleep_until(now + duration))),
477 barrier,
478 stream,
479 write_state,
480 }
481 }
482
483 async fn next_event(
484 &mut self,
485 metrics: &SyncedKvLogStoreMetrics,
486 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
487 match self {
488 WriteFuture::Paused {
489 start_instant,
490 sleep_future,
491 ..
492 } => {
493 if let Some(sleep_future) = sleep_future {
494 sleep_future.await;
495 metrics
496 .pause_duration_ns
497 .inc_by(start_instant.elapsed().as_nanos() as _);
498 tracing::trace!("resuming write future");
499 }
500 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
501 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
502 })
503 }
504 WriteFuture::ReceiveFromUpstream { future, .. } => {
505 let (opt, stream) = future.await;
506 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
507 opt
508 .ok_or_else(|| anyhow!("end of upstream input").into())
509 .and_then(|result| result.map(|item| {
510 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
511 }))
512 })
513 }
514 WriteFuture::FlushingChunk { future, .. } => {
515 let (write_state, result) = future.await;
516 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
517 result.map(|(flush_info, vnode_bitmap)| {
518 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
519 epoch,
520 start_seq_id,
521 end_seq_id,
522 flush_info,
523 vnode_bitmap,
524 }))
525 })
526 });
527 result.map_err(Into::into)
528 }
529 WriteFuture::Empty => {
530 unreachable!("should not be polled after ready")
531 }
532 }
533 }
534}
535
536impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
538 #[try_stream(ok= Message, error = StreamExecutorError)]
539 pub async fn execute_monitored(self) {
540 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
541 #[for_await]
542 for message in self.execute_inner() {
543 let current_time = Instant::now();
544 yield message?;
545 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
546 }
547 }
548
549 #[try_stream(ok = Message, error = StreamExecutorError)]
550 async fn execute_inner(self) {
551 let mut input = self.upstream.execute();
552
553 let first_barrier = expect_first_barrier(&mut input).await?;
555 let first_write_epoch = first_barrier.epoch;
556 yield Message::Barrier(first_barrier.clone());
557
558 let local_state_store = self
559 .state_store
560 .new_local(NewLocalOptions {
561 table_id: self.table_id,
562 op_consistency_level: OpConsistencyLevel::Inconsistent,
563 table_option: TableOption {
564 retention_seconds: None,
565 },
566 is_replicated: false,
567 vnodes: self.serde.vnodes().clone(),
568 upload_on_flush: false,
569 })
570 .await;
571
572 let (mut read_state, mut initial_write_state) = new_log_store_state(
573 self.table_id,
574 local_state_store,
575 self.serde,
576 self.chunk_size,
577 );
578 initial_write_state.init(first_write_epoch).await?;
579
580 let mut pause_stream = first_barrier.is_pause_on_startup();
581 let mut initial_write_epoch = first_write_epoch;
582
583 if self.aligned {
584 tracing::info!("aligned mode");
585 let log_store_stream = read_state
590 .read_persisted_log_store(
591 self.metrics.persistent_log_read_metrics.clone(),
592 initial_write_epoch.curr,
593 LogStoreReadStateStreamRangeStart::Unbounded,
594 )
595 .await?;
596
597 #[for_await]
598 for message in log_store_stream {
599 let (_epoch, message) = message?;
600 match message {
601 KvLogStoreItem::Barrier { .. } => {
602 continue;
603 }
604 KvLogStoreItem::StreamChunk { chunk, .. } => {
605 yield Message::Chunk(chunk);
606 }
607 }
608 }
609
610 let mut realigned_logstore = false;
611
612 #[for_await]
613 for message in input {
614 match message? {
615 Message::Barrier(barrier) => {
616 let is_checkpoint = barrier.is_checkpoint();
617 let mut progress = LogStoreVnodeProgress::None;
618 progress.apply_aligned(
619 read_state.vnodes().clone(),
620 barrier.epoch.prev,
621 None,
622 );
623 let post_seal = initial_write_state
625 .seal_current_epoch(barrier.epoch.curr, progress.take());
626 let update_vnode_bitmap =
627 barrier.as_update_vnode_bitmap(self.actor_context.id);
628 yield Message::Barrier(barrier);
629 post_seal.post_yield_barrier(update_vnode_bitmap).await?;
630 if !realigned_logstore && is_checkpoint {
631 realigned_logstore = true;
632 tracing::info!("realigned logstore");
633 }
634 }
635 Message::Chunk(chunk) => {
636 yield Message::Chunk(chunk);
637 }
638 Message::Watermark(watermark) => {
639 yield Message::Watermark(watermark);
640 }
641 }
642 }
643
644 return Ok(());
645 }
646
647 'recreate_consume_stream: loop {
651 let mut seq_id = FIRST_SEQ_ID;
652 let mut buffer = SyncedLogStoreBuffer {
653 buffer: VecDeque::new(),
654 current_size: 0,
655 max_size: self.max_buffer_size,
656 max_chunk_size: self.chunk_size,
657 next_chunk_id: 0,
658 metrics: self.metrics.clone(),
659 flushed_count: 0,
660 };
661
662 let log_store_stream = read_state
663 .read_persisted_log_store(
664 self.metrics.persistent_log_read_metrics.clone(),
665 initial_write_epoch.curr,
666 LogStoreReadStateStreamRangeStart::Unbounded,
667 )
668 .await?;
669
670 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
671 let mut clean_state = log_store_stream.peek().await.is_none();
672 tracing::trace!(?clean_state);
673
674 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
675
676 let mut write_future_state =
677 WriteFuture::receive_from_upstream(input, initial_write_state);
678
679 let mut progress = LogStoreVnodeProgress::None;
680
681 loop {
682 let select_result = {
683 let read_future = async {
684 if pause_stream {
685 pending().await
686 } else {
687 read_future_state
688 .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
689 .await
690 }
691 };
692 pin_mut!(read_future);
693 let write_future = write_future_state.next_event(&self.metrics);
694 pin_mut!(write_future);
695 let output = select(write_future, read_future).await;
696 drop_either_future(output)
697 };
698 match select_result {
699 Either::Left(result) => {
700 drop(write_future_state);
702 let (stream, mut write_state, either) = result?;
703 match either {
704 WriteFutureEvent::UpstreamMessageReceived(msg) => {
705 match msg {
706 Message::Barrier(barrier) => {
707 if clean_state
708 && barrier.kind.is_checkpoint()
709 && !buffer.is_empty()
710 {
711 write_future_state = WriteFuture::paused(
712 self.pause_duration_ms,
713 barrier,
714 stream,
715 write_state,
716 );
717 clean_state = false;
718 self.metrics.unclean_state.inc();
719 } else {
720 if let Some(mutation) = barrier.mutation.as_deref() {
721 match mutation {
722 Mutation::Pause => {
723 pause_stream = true;
724 }
725 Mutation::Resume => {
726 pause_stream = false;
727 }
728 _ => {}
729 }
730 }
731 let write_state_post_write_barrier =
732 Self::write_barrier(
733 self.actor_context.id,
734 &mut write_state,
735 barrier.clone(),
736 &self.metrics,
737 progress.take(),
738 &mut buffer,
739 )
740 .await?;
741 seq_id = FIRST_SEQ_ID;
742 let update_vnode_bitmap = barrier
743 .as_update_vnode_bitmap(self.actor_context.id);
744 let barrier_epoch = barrier.epoch;
745 tracing::trace!(
746 ?update_vnode_bitmap,
747 actor_id = %self.actor_context.id,
748 "update vnode bitmap"
749 );
750
751 yield Message::Barrier(barrier);
752
753 write_state_post_write_barrier
754 .post_yield_barrier(update_vnode_bitmap.clone())
755 .await?;
756 if let Some(vnode_bitmap) = update_vnode_bitmap {
757 read_state.update_vnode_bitmap(vnode_bitmap);
759 initial_write_epoch = barrier_epoch;
760 input = stream;
761 initial_write_state = write_state;
762 continue 'recreate_consume_stream;
763 } else {
764 write_future_state =
765 WriteFuture::receive_from_upstream(
766 stream,
767 write_state,
768 );
769 }
770 }
771 }
772 Message::Chunk(chunk) => {
773 let start_seq_id = seq_id;
774 let new_seq_id = seq_id + chunk.cardinality() as SeqId;
775 let end_seq_id = new_seq_id - 1;
776 let epoch = write_state.epoch().curr;
777 tracing::trace!(
778 start_seq_id,
779 end_seq_id,
780 new_seq_id,
781 epoch,
782 cardinality = chunk.cardinality(),
783 "received chunk"
784 );
785 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
786 start_seq_id,
787 end_seq_id,
788 chunk,
789 epoch,
790 ) {
791 seq_id = new_seq_id;
792 write_future_state = WriteFuture::flush_chunk(
793 stream,
794 write_state,
795 chunk_to_flush,
796 epoch,
797 start_seq_id,
798 end_seq_id,
799 );
800 } else {
801 seq_id = new_seq_id;
802 write_future_state = WriteFuture::receive_from_upstream(
803 stream,
804 write_state,
805 );
806 }
807 }
808 Message::Watermark(_watermark) => {
811 write_future_state =
812 WriteFuture::receive_from_upstream(stream, write_state);
813 }
814 }
815 }
816 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
817 start_seq_id,
818 end_seq_id,
819 epoch,
820 flush_info,
821 vnode_bitmap,
822 }) => {
823 buffer.add_flushed_item_to_buffer(
824 start_seq_id,
825 end_seq_id,
826 vnode_bitmap,
827 epoch,
828 );
829 self.metrics
830 .storage_write_count
831 .inc_by(flush_info.flush_count as _);
832 self.metrics
833 .storage_write_size
834 .inc_by(flush_info.flush_size as _);
835 write_future_state =
836 WriteFuture::receive_from_upstream(stream, write_state);
837 }
838 }
839 }
840 Either::Right(result) => {
841 if !clean_state
842 && matches!(read_future_state, ReadFuture::Idle)
843 && buffer.is_empty()
844 {
845 clean_state = true;
846 self.metrics.clean_state.inc();
847
848 if let WriteFuture::Paused { sleep_future, .. } =
850 &mut write_future_state
851 {
852 tracing::trace!("resuming paused future");
853 assert!(buffer.current_size < self.max_buffer_size);
854 *sleep_future = None;
855 }
856 }
857 let chunk = result?;
858 self.metrics
859 .total_read_count
860 .inc_by(chunk.cardinality() as _);
861
862 yield Message::Chunk(chunk);
863 }
864 }
865 }
866 }
867 }
868}
869
870type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
871
872enum ReadFuture<S: StateStoreRead> {
873 ReadingPersistedStream(PersistedStream<S>),
874 ReadingFlushedChunk {
875 future: ReadFlushedChunkFuture,
876 end_seq_id: SeqId,
877 },
878 Idle,
879}
880
881impl<S: StateStoreRead> ReadFuture<S> {
883 async fn next_chunk(
884 &mut self,
885 progress: &mut LogStoreVnodeProgress,
886 read_state: &LogStoreReadState<S>,
887 buffer: &mut SyncedLogStoreBuffer,
888 metrics: &SyncedKvLogStoreMetrics,
889 ) -> StreamExecutorResult<StreamChunk> {
890 match self {
891 ReadFuture::ReadingPersistedStream(stream) => {
892 while let Some((epoch, item)) = stream.try_next().await? {
893 match item {
894 KvLogStoreItem::Barrier { vnodes, .. } => {
895 tracing::trace!(epoch, "read logstore barrier");
896 progress.apply_aligned(vnodes, epoch, None);
898 continue;
899 }
900 KvLogStoreItem::StreamChunk {
901 chunk,
902 progress: chunk_progress,
903 } => {
904 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
905 progress.apply_per_vnode(epoch, chunk_progress);
906 return Ok(chunk);
907 }
908 }
909 }
910 *self = ReadFuture::Idle;
911 }
912 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
913 }
914 match self {
915 ReadFuture::ReadingPersistedStream(_) => {
916 unreachable!("must have finished read persisted stream when reaching here")
917 }
918 ReadFuture::ReadingFlushedChunk { .. } => {}
919 ReadFuture::Idle => loop {
920 let Some((item_epoch, item)) = buffer.pop_front() else {
921 return pending().await;
922 };
923 match item {
924 LogStoreBufferItem::StreamChunk {
925 chunk,
926 start_seq_id,
927 end_seq_id,
928 flushed,
929 ..
930 } => {
931 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
932 tracing::trace!(
933 start_seq_id,
934 end_seq_id,
935 flushed,
936 cardinality = chunk.cardinality(),
937 "read buffered chunk of size"
938 );
939 progress.apply_aligned(
940 read_state.vnodes().clone(),
941 item_epoch,
942 Some(end_seq_id),
943 );
944 return Ok(chunk);
945 }
946 LogStoreBufferItem::Flushed {
947 vnode_bitmap,
948 start_seq_id,
949 end_seq_id,
950 chunk_id,
951 } => {
952 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
953 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
954 let future = read_state
955 .read_flushed_chunk(
956 vnode_bitmap,
957 chunk_id,
958 start_seq_id,
959 end_seq_id,
960 item_epoch,
961 read_metrics,
962 )
963 .boxed();
964 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
965 break;
966 }
967 LogStoreBufferItem::Barrier { .. } => {
968 tracing::trace!(item_epoch, "read buffer barrier");
969 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
970 continue;
971 }
972 }
973 },
974 }
975
976 let (future, end_seq_id) = match self {
977 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
978 unreachable!("should be at ReadingFlushedChunk")
979 }
980 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
981 };
982
983 let (_, chunk, epoch) = future.await?;
984 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
985 tracing::trace!(
986 end_seq_id,
987 "read flushed chunk of size: {}",
988 chunk.cardinality()
989 );
990 *self = ReadFuture::Idle;
991 Ok(chunk)
992 }
993}
994
995impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
997 async fn write_barrier<'a>(
998 actor_id: ActorId,
999 write_state: &'a mut LogStoreWriteState<S::Local>,
1000 barrier: Barrier,
1001 metrics: &SyncedKvLogStoreMetrics,
1002 progress: LogStoreVnodeProgress,
1003 buffer: &mut SyncedLogStoreBuffer,
1004 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1005 tracing::trace!(%actor_id, ?progress, "applying truncation");
1006 let epoch = barrier.epoch.prev;
1010 let mut writer = write_state.start_writer(false);
1011 writer.write_barrier(epoch, barrier.is_checkpoint())?;
1012
1013 if barrier.is_checkpoint() {
1014 for (epoch, item) in buffer.buffer.iter_mut().rev() {
1015 match item {
1016 LogStoreBufferItem::StreamChunk {
1017 chunk,
1018 start_seq_id,
1019 end_seq_id,
1020 flushed,
1021 ..
1022 } => {
1023 if !*flushed {
1024 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1025 *flushed = true;
1026 } else {
1027 break;
1028 }
1029 }
1030 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1031 }
1032 }
1033 }
1034
1035 let (flush_info, _) = writer.finish().await?;
1037 metrics
1038 .storage_write_count
1039 .inc_by(flush_info.flush_count as _);
1040 metrics
1041 .storage_write_size
1042 .inc_by(flush_info.flush_size as _);
1043 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1044
1045 buffer.buffer.push_back((
1047 epoch,
1048 LogStoreBufferItem::Barrier {
1049 is_checkpoint: barrier.is_checkpoint(),
1050 next_epoch: barrier.epoch.curr,
1051 add_columns: None,
1052 },
1053 ));
1054 buffer.next_chunk_id = 0;
1055 buffer.update_unconsumed_buffer_metrics();
1056
1057 Ok(post_seal)
1058 }
1059}
1060
1061struct SyncedLogStoreBuffer {
1062 buffer: VecDeque<(u64, LogStoreBufferItem)>,
1063 current_size: usize,
1064 max_size: usize,
1065 max_chunk_size: usize,
1066 next_chunk_id: ChunkId,
1067 metrics: SyncedKvLogStoreMetrics,
1068 flushed_count: usize,
1069}
1070
1071impl SyncedLogStoreBuffer {
1072 fn is_empty(&self) -> bool {
1073 self.current_size == 0
1074 }
1075
1076 fn add_or_flush_chunk(
1077 &mut self,
1078 start_seq_id: SeqId,
1079 end_seq_id: SeqId,
1080 chunk: StreamChunk,
1081 epoch: u64,
1082 ) -> Option<StreamChunk> {
1083 let current_size = self.current_size;
1084 let chunk_size = chunk.cardinality();
1085
1086 tracing::trace!(
1087 current_size,
1088 chunk_size,
1089 max_size = self.max_size,
1090 "checking chunk size"
1091 );
1092 let should_flush_chunk = current_size + chunk_size > self.max_size;
1093 if should_flush_chunk {
1094 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1095 Some(chunk)
1096 } else {
1097 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1098 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1099 None
1100 }
1101 }
1102
1103 fn add_flushed_item_to_buffer(
1106 &mut self,
1107 start_seq_id: SeqId,
1108 end_seq_id: SeqId,
1109 new_vnode_bitmap: Bitmap,
1110 epoch: u64,
1111 ) {
1112 let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1113
1114 if let Some((
1115 item_epoch,
1116 LogStoreBufferItem::Flushed {
1117 start_seq_id: prev_start_seq_id,
1118 end_seq_id: prev_end_seq_id,
1119 vnode_bitmap,
1120 ..
1121 },
1122 )) = self.buffer.back_mut()
1123 && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1124 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1125 && projected_flushed_chunk_size <= self.max_chunk_size
1126 {
1127 assert!(
1128 *prev_end_seq_id < start_seq_id,
1129 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1130 end_seq_id,
1131 start_seq_id
1132 );
1133 assert_eq!(
1134 epoch, *item_epoch,
1135 "epoch of newly added flushed item must be the same as the last flushed item"
1136 );
1137 *prev_end_seq_id = end_seq_id;
1138 *vnode_bitmap |= new_vnode_bitmap;
1139 } else {
1140 let chunk_id = self.next_chunk_id;
1141 self.next_chunk_id += 1;
1142 self.buffer.push_back((
1143 epoch,
1144 LogStoreBufferItem::Flushed {
1145 start_seq_id,
1146 end_seq_id,
1147 vnode_bitmap: new_vnode_bitmap,
1148 chunk_id,
1149 },
1150 ));
1151 self.flushed_count += 1;
1152 tracing::trace!(
1153 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1154 );
1155 }
1156 self.update_unconsumed_buffer_metrics();
1158 }
1159
1160 fn add_chunk_to_buffer(
1161 &mut self,
1162 chunk: StreamChunk,
1163 start_seq_id: SeqId,
1164 end_seq_id: SeqId,
1165 epoch: u64,
1166 ) {
1167 let chunk_id = self.next_chunk_id;
1168 self.next_chunk_id += 1;
1169 self.current_size += chunk.cardinality();
1170 self.buffer.push_back((
1171 epoch,
1172 LogStoreBufferItem::StreamChunk {
1173 chunk,
1174 start_seq_id,
1175 end_seq_id,
1176 flushed: false,
1177 chunk_id,
1178 },
1179 ));
1180 self.update_unconsumed_buffer_metrics();
1181 }
1182
1183 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1184 let item = self.buffer.pop_front();
1185 match &item {
1186 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1187 self.flushed_count -= 1;
1188 }
1189 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1190 self.current_size -= chunk.cardinality();
1191 }
1192 _ => {}
1193 }
1194 self.update_unconsumed_buffer_metrics();
1195 item
1196 }
1197
1198 fn update_unconsumed_buffer_metrics(&self) {
1199 let mut epoch_count = 0;
1200 let mut row_count = 0;
1201 for (_, item) in &self.buffer {
1202 match item {
1203 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1204 row_count += chunk.cardinality();
1205 }
1206 LogStoreBufferItem::Flushed {
1207 start_seq_id,
1208 end_seq_id,
1209 ..
1210 } => {
1211 row_count += (end_seq_id - start_seq_id) as usize;
1212 }
1213 LogStoreBufferItem::Barrier { .. } => {
1214 epoch_count += 1;
1215 }
1216 }
1217 }
1218 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1219 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1220 self.metrics
1221 .buffer_unconsumed_item_count
1222 .set(self.buffer.len() as _);
1223 self.metrics.buffer_unconsumed_min_epoch.set(
1224 self.buffer
1225 .front()
1226 .map(|(epoch, _)| *epoch)
1227 .unwrap_or_default() as _,
1228 );
1229 }
1230}
1231
1232impl<S> Execute for SyncedKvLogStoreExecutor<S>
1233where
1234 S: StateStore,
1235{
1236 fn execute(self: Box<Self>) -> BoxedMessageStream {
1237 self.execute_monitored().boxed()
1238 }
1239}
1240
1241#[cfg(test)]
1242mod tests {
1243 use itertools::Itertools;
1244 use pretty_assertions::assert_eq;
1245 use risingwave_common::catalog::Field;
1246 use risingwave_common::hash::VirtualNode;
1247 use risingwave_common::test_prelude::*;
1248 use risingwave_common::util::epoch::test_epoch;
1249 use risingwave_storage::memory::MemoryStateStore;
1250
1251 use super::*;
1252 use crate::assert_stream_chunk_eq;
1253 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1254 use crate::common::log_store_impl::kv_log_store::test_utils::{
1255 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1256 };
1257 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1258 use crate::executor::test_utils::MockSource;
1259
1260 fn init_logger() {
1261 let _ = tracing_subscriber::fmt()
1262 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1263 .with_ansi(false)
1264 .try_init();
1265 }
1266
1267 #[tokio::test]
1269 async fn test_read_write_buffer() {
1270 init_logger();
1271
1272 let pk_info = &KV_LOG_STORE_V2_INFO;
1273 let column_descs = test_payload_schema(pk_info);
1274 let fields = column_descs
1275 .into_iter()
1276 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1277 .collect_vec();
1278 let schema = Schema { fields };
1279 let stream_key = vec![0];
1280 let (mut tx, source) = MockSource::channel();
1281 let source = source.into_executor(schema.clone(), stream_key.clone());
1282
1283 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1284
1285 let table = gen_test_log_store_table(pk_info);
1286
1287 let log_store_executor = SyncedKvLogStoreExecutor::new(
1288 ActorContext::for_test(123),
1289 table.id,
1290 SyncedKvLogStoreMetrics::for_test(),
1291 LogStoreRowSerde::new(&table, vnodes, pk_info),
1292 MemoryStateStore::new(),
1293 10,
1294 256,
1295 source,
1296 Duration::from_millis(256),
1297 false,
1298 )
1299 .boxed();
1300
1301 tx.push_barrier(test_epoch(1), false);
1303
1304 let chunk_1 = StreamChunk::from_pretty(
1305 " I T
1306 + 5 10
1307 + 6 10
1308 + 8 10
1309 + 9 10
1310 + 10 11",
1311 );
1312
1313 let chunk_2 = StreamChunk::from_pretty(
1314 " I T
1315 - 5 10
1316 - 6 10
1317 - 8 10
1318 U- 9 10
1319 U+ 10 11",
1320 );
1321
1322 tx.push_chunk(chunk_1.clone());
1323 tx.push_chunk(chunk_2.clone());
1324
1325 let mut stream = log_store_executor.execute();
1326
1327 match stream.next().await {
1328 Some(Ok(Message::Barrier(barrier))) => {
1329 assert_eq!(barrier.epoch.curr, test_epoch(1));
1330 }
1331 other => panic!("Expected a barrier message, got {:?}", other),
1332 }
1333
1334 match stream.next().await {
1335 Some(Ok(Message::Chunk(chunk))) => {
1336 assert_stream_chunk_eq!(chunk, chunk_1);
1337 }
1338 other => panic!("Expected a chunk message, got {:?}", other),
1339 }
1340
1341 match stream.next().await {
1342 Some(Ok(Message::Chunk(chunk))) => {
1343 assert_stream_chunk_eq!(chunk, chunk_2);
1344 }
1345 other => panic!("Expected a chunk message, got {:?}", other),
1346 }
1347
1348 tx.push_barrier(test_epoch(2), false);
1349
1350 match stream.next().await {
1351 Some(Ok(Message::Barrier(barrier))) => {
1352 assert_eq!(barrier.epoch.curr, test_epoch(2));
1353 }
1354 other => panic!("Expected a barrier message, got {:?}", other),
1355 }
1356 }
1357
1358 #[tokio::test]
1364 async fn test_barrier_persisted_read() {
1365 init_logger();
1366
1367 let pk_info = &KV_LOG_STORE_V2_INFO;
1368 let column_descs = test_payload_schema(pk_info);
1369 let fields = column_descs
1370 .into_iter()
1371 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1372 .collect_vec();
1373 let schema = Schema { fields };
1374 let stream_key = vec![0];
1375 let (mut tx, source) = MockSource::channel();
1376 let source = source.into_executor(schema.clone(), stream_key.clone());
1377
1378 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1379
1380 let table = gen_test_log_store_table(pk_info);
1381
1382 let log_store_executor = SyncedKvLogStoreExecutor::new(
1383 ActorContext::for_test(123),
1384 table.id,
1385 SyncedKvLogStoreMetrics::for_test(),
1386 LogStoreRowSerde::new(&table, vnodes, pk_info),
1387 MemoryStateStore::new(),
1388 10,
1389 256,
1390 source,
1391 Duration::from_millis(256),
1392 false,
1393 )
1394 .boxed();
1395
1396 tx.push_barrier(test_epoch(1), false);
1398
1399 let chunk_1 = StreamChunk::from_pretty(
1400 " I T
1401 + 5 10
1402 + 6 10
1403 + 8 10
1404 + 9 10
1405 + 10 11",
1406 );
1407
1408 let chunk_2 = StreamChunk::from_pretty(
1409 " I T
1410 - 5 10
1411 - 6 10
1412 - 8 10
1413 U- 10 11
1414 U+ 10 10",
1415 );
1416
1417 tx.push_chunk(chunk_1.clone());
1418 tx.push_chunk(chunk_2.clone());
1419
1420 tx.push_barrier(test_epoch(2), false);
1421
1422 let mut stream = log_store_executor.execute();
1423
1424 match stream.next().await {
1425 Some(Ok(Message::Barrier(barrier))) => {
1426 assert_eq!(barrier.epoch.curr, test_epoch(1));
1427 }
1428 other => panic!("Expected a barrier message, got {:?}", other),
1429 }
1430
1431 match stream.next().await {
1432 Some(Ok(Message::Chunk(chunk))) => {
1433 assert_stream_chunk_eq!(chunk, chunk_1);
1434 }
1435 other => panic!("Expected a chunk message, got {:?}", other),
1436 }
1437
1438 match stream.next().await {
1439 Some(Ok(Message::Chunk(chunk))) => {
1440 assert_stream_chunk_eq!(chunk, chunk_2);
1441 }
1442 other => panic!("Expected a chunk message, got {:?}", other),
1443 }
1444
1445 match stream.next().await {
1446 Some(Ok(Message::Barrier(barrier))) => {
1447 assert_eq!(barrier.epoch.curr, test_epoch(2));
1448 }
1449 other => panic!("Expected a barrier message, got {:?}", other),
1450 }
1451 }
1452
1453 #[tokio::test]
1456 async fn test_max_chunk_persisted_read() {
1457 init_logger();
1458
1459 let pk_info = &KV_LOG_STORE_V2_INFO;
1460 let column_descs = test_payload_schema(pk_info);
1461 let fields = column_descs
1462 .into_iter()
1463 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1464 .collect_vec();
1465 let schema = Schema { fields };
1466 let stream_key = vec![0];
1467 let (mut tx, source) = MockSource::channel();
1468 let source = source.into_executor(schema.clone(), stream_key.clone());
1469
1470 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1471
1472 let table = gen_test_log_store_table(pk_info);
1473
1474 let log_store_executor = SyncedKvLogStoreExecutor::new(
1475 ActorContext::for_test(123),
1476 table.id,
1477 SyncedKvLogStoreMetrics::for_test(),
1478 LogStoreRowSerde::new(&table, vnodes, pk_info),
1479 MemoryStateStore::new(),
1480 0,
1481 256,
1482 source,
1483 Duration::from_millis(256),
1484 false,
1485 )
1486 .boxed();
1487
1488 tx.push_barrier(test_epoch(1), false);
1490
1491 let chunk_1 = StreamChunk::from_pretty(
1492 " I T
1493 + 5 10
1494 + 6 10
1495 + 8 10
1496 + 9 10
1497 + 10 11",
1498 );
1499
1500 let chunk_2 = StreamChunk::from_pretty(
1501 " I T
1502 - 5 10
1503 - 6 10
1504 - 8 10
1505 U- 10 11
1506 U+ 10 10",
1507 );
1508
1509 tx.push_chunk(chunk_1.clone());
1510 tx.push_chunk(chunk_2.clone());
1511
1512 tx.push_barrier(test_epoch(2), false);
1513
1514 let mut stream = log_store_executor.execute();
1515
1516 for i in 1..=2 {
1517 match stream.next().await {
1518 Some(Ok(Message::Barrier(barrier))) => {
1519 assert_eq!(barrier.epoch.curr, test_epoch(i));
1520 }
1521 other => panic!("Expected a barrier message, got {:?}", other),
1522 }
1523 }
1524
1525 match stream.next().await {
1526 Some(Ok(Message::Chunk(actual))) => {
1527 let expected = StreamChunk::from_pretty(
1528 " I T
1529 + 5 10
1530 + 6 10
1531 + 8 10
1532 + 9 10
1533 + 10 11
1534 - 5 10
1535 - 6 10
1536 - 8 10
1537 U- 10 11
1538 U+ 10 10",
1539 );
1540 assert_stream_chunk_eq!(actual, expected);
1541 }
1542 other => panic!("Expected a chunk message, got {:?}", other),
1543 }
1544 }
1545}