1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93 LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105 use risingwave_common::id::FragmentId;
106 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
107
108 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
109 use crate::executor::monitor::StreamingMetrics;
110 use crate::task::ActorId;
111
112 #[derive(Clone)]
113 pub struct SyncedKvLogStoreMetrics {
114 pub unclean_state: LabelGuardedIntCounter,
116 pub clean_state: LabelGuardedIntCounter,
117 pub wait_next_poll_ns: LabelGuardedIntCounter,
118
119 pub storage_write_count: LabelGuardedIntCounter,
121 pub storage_write_size: LabelGuardedIntCounter,
122 pub pause_duration_ns: LabelGuardedIntCounter,
123
124 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
126 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
127 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
128 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
129 pub buffer_read_count: LabelGuardedIntCounter,
130 pub buffer_read_size: LabelGuardedIntCounter,
131
132 pub total_read_count: LabelGuardedIntCounter,
134 pub total_read_size: LabelGuardedIntCounter,
135 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
136 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
137 }
138
139 impl SyncedKvLogStoreMetrics {
140 pub(crate) fn new(
147 metrics: &StreamingMetrics,
148 actor_id: ActorId,
149 fragment_id: FragmentId,
150 name: &str,
151 target: &'static str,
152 ) -> Self {
153 let actor_id_str = actor_id.to_string();
154 let fragment_id_str = fragment_id.to_string();
155 let labels = &[&actor_id_str, target, &fragment_id_str, name];
156
157 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
158 "dirty",
159 &actor_id_str,
160 target,
161 &fragment_id_str,
162 name,
163 ]);
164 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
165 "clean",
166 &actor_id_str,
167 target,
168 &fragment_id_str,
169 name,
170 ]);
171 let wait_next_poll_ns = metrics
172 .sync_kv_log_store_wait_next_poll_ns
173 .with_guarded_label_values(labels);
174
175 let storage_write_size = metrics
176 .sync_kv_log_store_storage_write_size
177 .with_guarded_label_values(labels);
178 let storage_write_count = metrics
179 .sync_kv_log_store_storage_write_count
180 .with_guarded_label_values(labels);
181 let storage_pause_duration_ns = metrics
182 .sync_kv_log_store_write_pause_duration_ns
183 .with_guarded_label_values(labels);
184
185 let buffer_unconsumed_item_count = metrics
186 .sync_kv_log_store_buffer_unconsumed_item_count
187 .with_guarded_label_values(labels);
188 let buffer_unconsumed_row_count = metrics
189 .sync_kv_log_store_buffer_unconsumed_row_count
190 .with_guarded_label_values(labels);
191 let buffer_unconsumed_epoch_count = metrics
192 .sync_kv_log_store_buffer_unconsumed_epoch_count
193 .with_guarded_label_values(labels);
194 let buffer_unconsumed_min_epoch = metrics
195 .sync_kv_log_store_buffer_unconsumed_min_epoch
196 .with_guarded_label_values(labels);
197 let buffer_read_count = metrics
198 .sync_kv_log_store_read_count
199 .with_guarded_label_values(&[
200 "buffer",
201 &actor_id_str,
202 target,
203 &fragment_id_str,
204 name,
205 ]);
206
207 let buffer_read_size = metrics
208 .sync_kv_log_store_read_size
209 .with_guarded_label_values(&[
210 "buffer",
211 &actor_id_str,
212 target,
213 &fragment_id_str,
214 name,
215 ]);
216
217 let total_read_count = metrics
218 .sync_kv_log_store_read_count
219 .with_guarded_label_values(&[
220 "total",
221 &actor_id_str,
222 target,
223 &fragment_id_str,
224 name,
225 ]);
226
227 let total_read_size = metrics
228 .sync_kv_log_store_read_size
229 .with_guarded_label_values(&[
230 "total",
231 &actor_id_str,
232 target,
233 &fragment_id_str,
234 name,
235 ]);
236
237 const READ_PERSISTENT_LOG: &str = "persistent_log";
238 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
239
240 let persistent_log_read_size = metrics
241 .sync_kv_log_store_read_size
242 .with_guarded_label_values(&[
243 READ_PERSISTENT_LOG,
244 &actor_id_str,
245 target,
246 &fragment_id_str,
247 name,
248 ]);
249
250 let persistent_log_read_count = metrics
251 .sync_kv_log_store_read_count
252 .with_guarded_label_values(&[
253 READ_PERSISTENT_LOG,
254 &actor_id_str,
255 target,
256 &fragment_id_str,
257 name,
258 ]);
259
260 let flushed_buffer_read_size = metrics
261 .sync_kv_log_store_read_size
262 .with_guarded_label_values(&[
263 READ_FLUSHED_BUFFER,
264 &actor_id_str,
265 target,
266 &fragment_id_str,
267 name,
268 ]);
269
270 let flushed_buffer_read_count = metrics
271 .sync_kv_log_store_read_count
272 .with_guarded_label_values(&[
273 READ_FLUSHED_BUFFER,
274 &actor_id_str,
275 target,
276 &fragment_id_str,
277 name,
278 ]);
279
280 Self {
281 unclean_state,
282 clean_state,
283 wait_next_poll_ns,
284 storage_write_size,
285 storage_write_count,
286 pause_duration_ns: storage_pause_duration_ns,
287 buffer_unconsumed_item_count,
288 buffer_unconsumed_row_count,
289 buffer_unconsumed_epoch_count,
290 buffer_unconsumed_min_epoch,
291 buffer_read_count,
292 buffer_read_size,
293 total_read_count,
294 total_read_size,
295 persistent_log_read_metrics: KvLogStoreReadMetrics {
296 storage_read_size: persistent_log_read_size,
297 storage_read_count: persistent_log_read_count,
298 },
299 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
300 storage_read_count: flushed_buffer_read_count,
301 storage_read_size: flushed_buffer_read_size,
302 },
303 }
304 }
305
306 #[cfg(test)]
307 pub(crate) fn for_test() -> Self {
308 SyncedKvLogStoreMetrics {
309 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
310 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
311 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
312 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
313 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
314 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
315 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
316 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
317 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
318 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
319 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
320 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
321 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
322 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
323 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
324 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
325 }
326 }
327 }
328}
329
330type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
331
332pub struct SyncedKvLogStoreExecutor<S: StateStore> {
333 actor_context: ActorContextRef,
334 table_id: TableId,
335 metrics: SyncedKvLogStoreMetrics,
336 serde: LogStoreRowSerde,
337
338 upstream: Executor,
340
341 state_store: S,
343 max_buffer_size: usize,
344
345 chunk_size: usize,
347
348 pause_duration_ms: Duration,
349
350 aligned: bool,
351}
352impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
354 #[expect(clippy::too_many_arguments)]
355 pub(crate) fn new(
356 actor_context: ActorContextRef,
357 table_id: TableId,
358 metrics: SyncedKvLogStoreMetrics,
359 serde: LogStoreRowSerde,
360 state_store: S,
361 buffer_size: usize,
362 chunk_size: usize,
363 upstream: Executor,
364 pause_duration_ms: Duration,
365 aligned: bool,
366 ) -> Self {
367 Self {
368 actor_context,
369 table_id,
370 metrics,
371 serde,
372 state_store,
373 upstream,
374 max_buffer_size: buffer_size,
375 chunk_size,
376 pause_duration_ms,
377 aligned,
378 }
379 }
380}
381
382struct FlushedChunkInfo {
383 epoch: u64,
384 start_seq_id: SeqId,
385 end_seq_id: SeqId,
386 flush_info: FlushInfo,
387 vnode_bitmap: Bitmap,
388}
389
390enum WriteFuture<S: LocalStateStore> {
391 Paused {
401 start_instant: Instant,
402 sleep_future: Option<Pin<Box<Sleep>>>,
403 barrier: Barrier,
404 stream: BoxedMessageStream,
405 write_state: LogStoreWriteState<S>, },
407 ReceiveFromUpstream {
408 future: StreamFuture<BoxedMessageStream>,
409 write_state: LogStoreWriteState<S>,
410 },
411 FlushingChunk {
412 epoch: u64,
413 start_seq_id: SeqId,
414 end_seq_id: SeqId,
415 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
416 stream: BoxedMessageStream,
417 },
418 Empty,
419}
420
421enum WriteFutureEvent {
422 UpstreamMessageReceived(Message),
423 ChunkFlushed(FlushedChunkInfo),
424}
425
426impl<S: LocalStateStore> WriteFuture<S> {
427 fn flush_chunk(
428 stream: BoxedMessageStream,
429 write_state: LogStoreWriteState<S>,
430 chunk: StreamChunk,
431 epoch: u64,
432 start_seq_id: SeqId,
433 end_seq_id: SeqId,
434 ) -> Self {
435 tracing::trace!(
436 start_seq_id,
437 end_seq_id,
438 epoch,
439 cardinality = chunk.cardinality(),
440 "write_future: flushing chunk"
441 );
442 Self::FlushingChunk {
443 epoch,
444 start_seq_id,
445 end_seq_id,
446 future: Box::pin(write_state.into_write_chunk_future(
447 chunk,
448 epoch,
449 start_seq_id,
450 end_seq_id,
451 )),
452 stream,
453 }
454 }
455
456 fn receive_from_upstream(
457 stream: BoxedMessageStream,
458 write_state: LogStoreWriteState<S>,
459 ) -> Self {
460 Self::ReceiveFromUpstream {
461 future: stream.into_future(),
462 write_state,
463 }
464 }
465
466 fn paused(
467 duration: Duration,
468 barrier: Barrier,
469 stream: BoxedMessageStream,
470 write_state: LogStoreWriteState<S>,
471 ) -> Self {
472 let now = Instant::now();
473 tracing::trace!(?now, ?duration, "write_future_pause");
474 Self::Paused {
475 start_instant: now,
476 sleep_future: Some(Box::pin(sleep_until(now + duration))),
477 barrier,
478 stream,
479 write_state,
480 }
481 }
482
483 async fn next_event(
484 &mut self,
485 metrics: &SyncedKvLogStoreMetrics,
486 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
487 match self {
488 WriteFuture::Paused {
489 start_instant,
490 sleep_future,
491 ..
492 } => {
493 if let Some(sleep_future) = sleep_future {
494 sleep_future.await;
495 metrics
496 .pause_duration_ns
497 .inc_by(start_instant.elapsed().as_nanos() as _);
498 tracing::trace!("resuming write future");
499 }
500 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
501 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
502 })
503 }
504 WriteFuture::ReceiveFromUpstream { future, .. } => {
505 let (opt, stream) = future.await;
506 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
507 opt
508 .ok_or_else(|| anyhow!("end of upstream input").into())
509 .and_then(|result| result.map(|item| {
510 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
511 }))
512 })
513 }
514 WriteFuture::FlushingChunk { future, .. } => {
515 let (write_state, result) = future.await;
516 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
517 result.map(|(flush_info, vnode_bitmap)| {
518 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
519 epoch,
520 start_seq_id,
521 end_seq_id,
522 flush_info,
523 vnode_bitmap,
524 }))
525 })
526 });
527 result.map_err(Into::into)
528 }
529 WriteFuture::Empty => {
530 unreachable!("should not be polled after ready")
531 }
532 }
533 }
534}
535
536impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
538 #[try_stream(ok= Message, error = StreamExecutorError)]
539 pub async fn execute_monitored(self) {
540 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
541 #[for_await]
542 for message in self.execute_inner() {
543 let current_time = Instant::now();
544 yield message?;
545 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
546 }
547 }
548
549 #[try_stream(ok = Message, error = StreamExecutorError)]
550 async fn execute_inner(self) {
551 let mut input = self.upstream.execute();
552
553 let first_barrier = expect_first_barrier(&mut input).await?;
555 let first_write_epoch = first_barrier.epoch;
556 yield Message::Barrier(first_barrier.clone());
557
558 let local_state_store = self
559 .state_store
560 .new_local(NewLocalOptions {
561 table_id: self.table_id,
562 op_consistency_level: OpConsistencyLevel::Inconsistent,
563 table_option: TableOption {
564 retention_seconds: None,
565 },
566 is_replicated: false,
567 vnodes: self.serde.vnodes().clone(),
568 upload_on_flush: false,
569 })
570 .await;
571
572 let (mut read_state, mut initial_write_state) = new_log_store_state(
573 self.table_id,
574 local_state_store,
575 self.serde,
576 self.chunk_size,
577 );
578 initial_write_state.init(first_write_epoch).await?;
579
580 let mut pause_stream = first_barrier.is_pause_on_startup();
581 let mut initial_write_epoch = first_write_epoch;
582
583 if self.aligned {
584 tracing::info!("aligned mode");
585 let log_store_stream = read_state
590 .read_persisted_log_store(
591 self.metrics.persistent_log_read_metrics.clone(),
592 initial_write_epoch.curr,
593 LogStoreReadStateStreamRangeStart::Unbounded,
594 )
595 .await?;
596
597 #[for_await]
598 for message in log_store_stream {
599 let (_epoch, message) = message?;
600 match message {
601 KvLogStoreItem::Barrier { .. } => {
602 continue;
603 }
604 KvLogStoreItem::StreamChunk { chunk, .. } => {
605 yield Message::Chunk(chunk);
606 }
607 }
608 }
609
610 let mut realigned_logstore = false;
611
612 #[for_await]
613 for message in input {
614 match message? {
615 Message::Barrier(barrier) => {
616 let is_checkpoint = barrier.is_checkpoint();
617 let mut progress = LogStoreVnodeProgress::None;
618 progress.apply_aligned(
619 read_state.vnodes().clone(),
620 barrier.epoch.prev,
621 None,
622 );
623 let post_seal = initial_write_state
625 .seal_current_epoch(barrier.epoch.curr, progress.take());
626 let update_vnode_bitmap =
627 barrier.as_update_vnode_bitmap(self.actor_context.id);
628 yield Message::Barrier(barrier);
629 post_seal.post_yield_barrier(update_vnode_bitmap).await?;
630 if !realigned_logstore && is_checkpoint {
631 realigned_logstore = true;
632 tracing::info!("realigned logstore");
633 }
634 }
635 Message::Chunk(chunk) => {
636 yield Message::Chunk(chunk);
637 }
638 Message::Watermark(watermark) => {
639 yield Message::Watermark(watermark);
640 }
641 }
642 }
643
644 return Ok(());
645 }
646
647 'recreate_consume_stream: loop {
651 let mut seq_id = FIRST_SEQ_ID;
652 let mut buffer = SyncedLogStoreBuffer {
653 buffer: VecDeque::new(),
654 current_size: 0,
655 max_size: self.max_buffer_size,
656 max_chunk_size: self.chunk_size,
657 next_chunk_id: 0,
658 metrics: self.metrics.clone(),
659 flushed_count: 0,
660 };
661
662 let log_store_stream = read_state
663 .read_persisted_log_store(
664 self.metrics.persistent_log_read_metrics.clone(),
665 initial_write_epoch.curr,
666 LogStoreReadStateStreamRangeStart::Unbounded,
667 )
668 .await?;
669
670 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
671 let mut clean_state = log_store_stream.peek().await.is_none();
672 tracing::trace!(?clean_state);
673
674 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
675
676 let mut write_future_state =
677 WriteFuture::receive_from_upstream(input, initial_write_state);
678
679 let mut progress = LogStoreVnodeProgress::None;
680
681 loop {
682 let select_result = {
683 let read_future = async {
684 if pause_stream {
685 pending().await
686 } else {
687 read_future_state
688 .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
689 .await
690 }
691 };
692 pin_mut!(read_future);
693 let write_future = write_future_state.next_event(&self.metrics);
694 pin_mut!(write_future);
695 let output = select(write_future, read_future).await;
696 drop_either_future(output)
697 };
698 match select_result {
699 Either::Left(result) => {
700 drop(write_future_state);
702 let (stream, mut write_state, either) = result?;
703 match either {
704 WriteFutureEvent::UpstreamMessageReceived(msg) => {
705 match msg {
706 Message::Barrier(barrier) => {
707 if clean_state
708 && barrier.kind.is_checkpoint()
709 && !buffer.is_empty()
710 {
711 write_future_state = WriteFuture::paused(
712 self.pause_duration_ms,
713 barrier,
714 stream,
715 write_state,
716 );
717 clean_state = false;
718 self.metrics.unclean_state.inc();
719 } else {
720 if let Some(mutation) = barrier.mutation.as_deref() {
721 match mutation {
722 Mutation::Pause => {
723 pause_stream = true;
724 }
725 Mutation::Resume => {
726 pause_stream = false;
727 }
728 _ => {}
729 }
730 }
731 let write_state_post_write_barrier =
732 Self::write_barrier(
733 self.actor_context.id,
734 &mut write_state,
735 barrier.clone(),
736 &self.metrics,
737 progress.take(),
738 &mut buffer,
739 )
740 .await?;
741 seq_id = FIRST_SEQ_ID;
742 let update_vnode_bitmap = barrier
743 .as_update_vnode_bitmap(self.actor_context.id);
744 let barrier_epoch = barrier.epoch;
745 tracing::trace!(
746 ?update_vnode_bitmap,
747 actor_id = %self.actor_context.id,
748 "update vnode bitmap"
749 );
750
751 yield Message::Barrier(barrier);
752
753 write_state_post_write_barrier
754 .post_yield_barrier(update_vnode_bitmap.clone())
755 .await?;
756 if let Some(vnode_bitmap) = update_vnode_bitmap {
757 read_state.update_vnode_bitmap(vnode_bitmap);
759 initial_write_epoch = barrier_epoch;
760 input = stream;
761 initial_write_state = write_state;
762 continue 'recreate_consume_stream;
763 } else {
764 write_future_state =
765 WriteFuture::receive_from_upstream(
766 stream,
767 write_state,
768 );
769 }
770 }
771 }
772 Message::Chunk(chunk) => {
773 let start_seq_id = seq_id;
774 let new_seq_id = seq_id + chunk.cardinality() as SeqId;
775 let end_seq_id = new_seq_id - 1;
776 let epoch = write_state.epoch().curr;
777 tracing::trace!(
778 start_seq_id,
779 end_seq_id,
780 new_seq_id,
781 epoch,
782 cardinality = chunk.cardinality(),
783 "received chunk"
784 );
785 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
786 start_seq_id,
787 end_seq_id,
788 chunk,
789 epoch,
790 ) {
791 seq_id = new_seq_id;
792 write_future_state = WriteFuture::flush_chunk(
793 stream,
794 write_state,
795 chunk_to_flush,
796 epoch,
797 start_seq_id,
798 end_seq_id,
799 );
800 } else {
801 seq_id = new_seq_id;
802 write_future_state = WriteFuture::receive_from_upstream(
803 stream,
804 write_state,
805 );
806 }
807 }
808 Message::Watermark(_watermark) => {
811 write_future_state =
812 WriteFuture::receive_from_upstream(stream, write_state);
813 }
814 }
815 }
816 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
817 start_seq_id,
818 end_seq_id,
819 epoch,
820 flush_info,
821 vnode_bitmap,
822 }) => {
823 buffer.add_flushed_item_to_buffer(
824 start_seq_id,
825 end_seq_id,
826 vnode_bitmap,
827 epoch,
828 );
829 self.metrics
830 .storage_write_count
831 .inc_by(flush_info.flush_count as _);
832 self.metrics
833 .storage_write_size
834 .inc_by(flush_info.flush_size as _);
835 write_future_state =
836 WriteFuture::receive_from_upstream(stream, write_state);
837 }
838 }
839 }
840 Either::Right(result) => {
841 if !clean_state
842 && matches!(read_future_state, ReadFuture::Idle)
843 && buffer.is_empty()
844 {
845 clean_state = true;
846 self.metrics.clean_state.inc();
847
848 if let WriteFuture::Paused { sleep_future, .. } =
850 &mut write_future_state
851 {
852 tracing::trace!("resuming paused future");
853 assert!(buffer.current_size < self.max_buffer_size);
854 *sleep_future = None;
855 }
856 }
857 let chunk = result?;
858 self.metrics
859 .total_read_count
860 .inc_by(chunk.cardinality() as _);
861
862 yield Message::Chunk(chunk);
863 }
864 }
865 }
866 }
867 }
868}
869
870type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
871
872enum ReadFuture<S: StateStoreRead> {
873 ReadingPersistedStream(PersistedStream<S>),
874 ReadingFlushedChunk {
875 future: ReadFlushedChunkFuture,
876 end_seq_id: SeqId,
877 },
878 Idle,
879}
880
881impl<S: StateStoreRead> ReadFuture<S> {
883 async fn next_chunk(
884 &mut self,
885 progress: &mut LogStoreVnodeProgress,
886 read_state: &LogStoreReadState<S>,
887 buffer: &mut SyncedLogStoreBuffer,
888 metrics: &SyncedKvLogStoreMetrics,
889 ) -> StreamExecutorResult<StreamChunk> {
890 match self {
891 ReadFuture::ReadingPersistedStream(stream) => {
892 while let Some((epoch, item)) = stream.try_next().await? {
893 match item {
894 KvLogStoreItem::Barrier { vnodes, .. } => {
895 tracing::trace!(epoch, "read logstore barrier");
896 progress.apply_aligned(vnodes, epoch, None);
898 continue;
899 }
900 KvLogStoreItem::StreamChunk {
901 chunk,
902 progress: chunk_progress,
903 } => {
904 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
905 progress.apply_per_vnode(epoch, chunk_progress);
906 return Ok(chunk);
907 }
908 }
909 }
910 *self = ReadFuture::Idle;
911 }
912 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
913 }
914 match self {
915 ReadFuture::ReadingPersistedStream(_) => {
916 unreachable!("must have finished read persisted stream when reaching here")
917 }
918 ReadFuture::ReadingFlushedChunk { .. } => {}
919 ReadFuture::Idle => loop {
920 let Some((item_epoch, item)) = buffer.pop_front() else {
921 return pending().await;
922 };
923 match item {
924 LogStoreBufferItem::StreamChunk {
925 chunk,
926 start_seq_id,
927 end_seq_id,
928 flushed,
929 ..
930 } => {
931 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
932 tracing::trace!(
933 start_seq_id,
934 end_seq_id,
935 flushed,
936 cardinality = chunk.cardinality(),
937 "read buffered chunk of size"
938 );
939 progress.apply_aligned(
940 read_state.vnodes().clone(),
941 item_epoch,
942 Some(end_seq_id),
943 );
944 return Ok(chunk);
945 }
946 LogStoreBufferItem::Flushed {
947 vnode_bitmap,
948 start_seq_id,
949 end_seq_id,
950 chunk_id,
951 } => {
952 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
953 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
954 let future = read_state
955 .read_flushed_chunk(
956 vnode_bitmap,
957 chunk_id,
958 start_seq_id,
959 end_seq_id,
960 item_epoch,
961 read_metrics,
962 )
963 .boxed();
964 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
965 break;
966 }
967 LogStoreBufferItem::Barrier { .. } => {
968 tracing::trace!(item_epoch, "read buffer barrier");
969 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
970 continue;
971 }
972 }
973 },
974 }
975
976 let (future, end_seq_id) = match self {
977 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
978 unreachable!("should be at ReadingFlushedChunk")
979 }
980 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
981 };
982
983 let (_, chunk, epoch) = future.await?;
984 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
985 tracing::trace!(
986 end_seq_id,
987 "read flushed chunk of size: {}",
988 chunk.cardinality()
989 );
990 *self = ReadFuture::Idle;
991 Ok(chunk)
992 }
993}
994
995impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
997 async fn write_barrier<'a>(
998 actor_id: ActorId,
999 write_state: &'a mut LogStoreWriteState<S::Local>,
1000 barrier: Barrier,
1001 metrics: &SyncedKvLogStoreMetrics,
1002 progress: LogStoreVnodeProgress,
1003 buffer: &mut SyncedLogStoreBuffer,
1004 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1005 tracing::trace!(%actor_id, ?progress, "applying truncation");
1006 let epoch = barrier.epoch.prev;
1010 let mut writer = write_state.start_writer(false);
1011 writer.write_barrier(epoch, barrier.is_checkpoint())?;
1012
1013 if barrier.is_checkpoint() {
1014 for (epoch, item) in buffer.buffer.iter_mut().rev() {
1015 match item {
1016 LogStoreBufferItem::StreamChunk {
1017 chunk,
1018 start_seq_id,
1019 end_seq_id,
1020 flushed,
1021 ..
1022 } => {
1023 if !*flushed {
1024 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1025 *flushed = true;
1026 } else {
1027 break;
1028 }
1029 }
1030 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1031 }
1032 }
1033 }
1034
1035 let (flush_info, _) = writer.finish().await?;
1037 metrics
1038 .storage_write_count
1039 .inc_by(flush_info.flush_count as _);
1040 metrics
1041 .storage_write_size
1042 .inc_by(flush_info.flush_size as _);
1043 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1044
1045 buffer.buffer.push_back((
1047 epoch,
1048 LogStoreBufferItem::Barrier {
1049 is_checkpoint: barrier.is_checkpoint(),
1050 next_epoch: barrier.epoch.curr,
1051 },
1052 ));
1053 buffer.next_chunk_id = 0;
1054 buffer.update_unconsumed_buffer_metrics();
1055
1056 Ok(post_seal)
1057 }
1058}
1059
1060struct SyncedLogStoreBuffer {
1061 buffer: VecDeque<(u64, LogStoreBufferItem)>,
1062 current_size: usize,
1063 max_size: usize,
1064 max_chunk_size: usize,
1065 next_chunk_id: ChunkId,
1066 metrics: SyncedKvLogStoreMetrics,
1067 flushed_count: usize,
1068}
1069
1070impl SyncedLogStoreBuffer {
1071 fn is_empty(&self) -> bool {
1072 self.current_size == 0
1073 }
1074
1075 fn add_or_flush_chunk(
1076 &mut self,
1077 start_seq_id: SeqId,
1078 end_seq_id: SeqId,
1079 chunk: StreamChunk,
1080 epoch: u64,
1081 ) -> Option<StreamChunk> {
1082 let current_size = self.current_size;
1083 let chunk_size = chunk.cardinality();
1084
1085 tracing::trace!(
1086 current_size,
1087 chunk_size,
1088 max_size = self.max_size,
1089 "checking chunk size"
1090 );
1091 let should_flush_chunk = current_size + chunk_size > self.max_size;
1092 if should_flush_chunk {
1093 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1094 Some(chunk)
1095 } else {
1096 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1097 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1098 None
1099 }
1100 }
1101
1102 fn add_flushed_item_to_buffer(
1105 &mut self,
1106 start_seq_id: SeqId,
1107 end_seq_id: SeqId,
1108 new_vnode_bitmap: Bitmap,
1109 epoch: u64,
1110 ) {
1111 let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1112
1113 if let Some((
1114 item_epoch,
1115 LogStoreBufferItem::Flushed {
1116 start_seq_id: prev_start_seq_id,
1117 end_seq_id: prev_end_seq_id,
1118 vnode_bitmap,
1119 ..
1120 },
1121 )) = self.buffer.back_mut()
1122 && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1123 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1124 && projected_flushed_chunk_size <= self.max_chunk_size
1125 {
1126 assert!(
1127 *prev_end_seq_id < start_seq_id,
1128 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1129 end_seq_id,
1130 start_seq_id
1131 );
1132 assert_eq!(
1133 epoch, *item_epoch,
1134 "epoch of newly added flushed item must be the same as the last flushed item"
1135 );
1136 *prev_end_seq_id = end_seq_id;
1137 *vnode_bitmap |= new_vnode_bitmap;
1138 } else {
1139 let chunk_id = self.next_chunk_id;
1140 self.next_chunk_id += 1;
1141 self.buffer.push_back((
1142 epoch,
1143 LogStoreBufferItem::Flushed {
1144 start_seq_id,
1145 end_seq_id,
1146 vnode_bitmap: new_vnode_bitmap,
1147 chunk_id,
1148 },
1149 ));
1150 self.flushed_count += 1;
1151 tracing::trace!(
1152 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1153 );
1154 }
1155 self.update_unconsumed_buffer_metrics();
1157 }
1158
1159 fn add_chunk_to_buffer(
1160 &mut self,
1161 chunk: StreamChunk,
1162 start_seq_id: SeqId,
1163 end_seq_id: SeqId,
1164 epoch: u64,
1165 ) {
1166 let chunk_id = self.next_chunk_id;
1167 self.next_chunk_id += 1;
1168 self.current_size += chunk.cardinality();
1169 self.buffer.push_back((
1170 epoch,
1171 LogStoreBufferItem::StreamChunk {
1172 chunk,
1173 start_seq_id,
1174 end_seq_id,
1175 flushed: false,
1176 chunk_id,
1177 },
1178 ));
1179 self.update_unconsumed_buffer_metrics();
1180 }
1181
1182 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1183 let item = self.buffer.pop_front();
1184 match &item {
1185 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1186 self.flushed_count -= 1;
1187 }
1188 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1189 self.current_size -= chunk.cardinality();
1190 }
1191 _ => {}
1192 }
1193 self.update_unconsumed_buffer_metrics();
1194 item
1195 }
1196
1197 fn update_unconsumed_buffer_metrics(&self) {
1198 let mut epoch_count = 0;
1199 let mut row_count = 0;
1200 for (_, item) in &self.buffer {
1201 match item {
1202 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1203 row_count += chunk.cardinality();
1204 }
1205 LogStoreBufferItem::Flushed {
1206 start_seq_id,
1207 end_seq_id,
1208 ..
1209 } => {
1210 row_count += (end_seq_id - start_seq_id) as usize;
1211 }
1212 LogStoreBufferItem::Barrier { .. } => {
1213 epoch_count += 1;
1214 }
1215 }
1216 }
1217 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1218 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1219 self.metrics
1220 .buffer_unconsumed_item_count
1221 .set(self.buffer.len() as _);
1222 self.metrics.buffer_unconsumed_min_epoch.set(
1223 self.buffer
1224 .front()
1225 .map(|(epoch, _)| *epoch)
1226 .unwrap_or_default() as _,
1227 );
1228 }
1229}
1230
1231impl<S> Execute for SyncedKvLogStoreExecutor<S>
1232where
1233 S: StateStore,
1234{
1235 fn execute(self: Box<Self>) -> BoxedMessageStream {
1236 self.execute_monitored().boxed()
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use itertools::Itertools;
1243 use pretty_assertions::assert_eq;
1244 use risingwave_common::catalog::Field;
1245 use risingwave_common::hash::VirtualNode;
1246 use risingwave_common::test_prelude::*;
1247 use risingwave_common::util::epoch::test_epoch;
1248 use risingwave_storage::memory::MemoryStateStore;
1249
1250 use super::*;
1251 use crate::assert_stream_chunk_eq;
1252 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1253 use crate::common::log_store_impl::kv_log_store::test_utils::{
1254 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1255 };
1256 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1257 use crate::executor::test_utils::MockSource;
1258
1259 fn init_logger() {
1260 let _ = tracing_subscriber::fmt()
1261 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1262 .with_ansi(false)
1263 .try_init();
1264 }
1265
1266 #[tokio::test]
1268 async fn test_read_write_buffer() {
1269 init_logger();
1270
1271 let pk_info = &KV_LOG_STORE_V2_INFO;
1272 let column_descs = test_payload_schema(pk_info);
1273 let fields = column_descs
1274 .into_iter()
1275 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1276 .collect_vec();
1277 let schema = Schema { fields };
1278 let stream_key = vec![0];
1279 let (mut tx, source) = MockSource::channel();
1280 let source = source.into_executor(schema.clone(), stream_key.clone());
1281
1282 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1283
1284 let table = gen_test_log_store_table(pk_info);
1285
1286 let log_store_executor = SyncedKvLogStoreExecutor::new(
1287 ActorContext::for_test(123),
1288 table.id,
1289 SyncedKvLogStoreMetrics::for_test(),
1290 LogStoreRowSerde::new(&table, vnodes, pk_info),
1291 MemoryStateStore::new(),
1292 10,
1293 256,
1294 source,
1295 Duration::from_millis(256),
1296 false,
1297 )
1298 .boxed();
1299
1300 tx.push_barrier(test_epoch(1), false);
1302
1303 let chunk_1 = StreamChunk::from_pretty(
1304 " I T
1305 + 5 10
1306 + 6 10
1307 + 8 10
1308 + 9 10
1309 + 10 11",
1310 );
1311
1312 let chunk_2 = StreamChunk::from_pretty(
1313 " I T
1314 - 5 10
1315 - 6 10
1316 - 8 10
1317 U- 9 10
1318 U+ 10 11",
1319 );
1320
1321 tx.push_chunk(chunk_1.clone());
1322 tx.push_chunk(chunk_2.clone());
1323
1324 let mut stream = log_store_executor.execute();
1325
1326 match stream.next().await {
1327 Some(Ok(Message::Barrier(barrier))) => {
1328 assert_eq!(barrier.epoch.curr, test_epoch(1));
1329 }
1330 other => panic!("Expected a barrier message, got {:?}", other),
1331 }
1332
1333 match stream.next().await {
1334 Some(Ok(Message::Chunk(chunk))) => {
1335 assert_stream_chunk_eq!(chunk, chunk_1);
1336 }
1337 other => panic!("Expected a chunk message, got {:?}", other),
1338 }
1339
1340 match stream.next().await {
1341 Some(Ok(Message::Chunk(chunk))) => {
1342 assert_stream_chunk_eq!(chunk, chunk_2);
1343 }
1344 other => panic!("Expected a chunk message, got {:?}", other),
1345 }
1346
1347 tx.push_barrier(test_epoch(2), false);
1348
1349 match stream.next().await {
1350 Some(Ok(Message::Barrier(barrier))) => {
1351 assert_eq!(barrier.epoch.curr, test_epoch(2));
1352 }
1353 other => panic!("Expected a barrier message, got {:?}", other),
1354 }
1355 }
1356
1357 #[tokio::test]
1363 async fn test_barrier_persisted_read() {
1364 init_logger();
1365
1366 let pk_info = &KV_LOG_STORE_V2_INFO;
1367 let column_descs = test_payload_schema(pk_info);
1368 let fields = column_descs
1369 .into_iter()
1370 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1371 .collect_vec();
1372 let schema = Schema { fields };
1373 let stream_key = vec![0];
1374 let (mut tx, source) = MockSource::channel();
1375 let source = source.into_executor(schema.clone(), stream_key.clone());
1376
1377 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1378
1379 let table = gen_test_log_store_table(pk_info);
1380
1381 let log_store_executor = SyncedKvLogStoreExecutor::new(
1382 ActorContext::for_test(123),
1383 table.id,
1384 SyncedKvLogStoreMetrics::for_test(),
1385 LogStoreRowSerde::new(&table, vnodes, pk_info),
1386 MemoryStateStore::new(),
1387 10,
1388 256,
1389 source,
1390 Duration::from_millis(256),
1391 false,
1392 )
1393 .boxed();
1394
1395 tx.push_barrier(test_epoch(1), false);
1397
1398 let chunk_1 = StreamChunk::from_pretty(
1399 " I T
1400 + 5 10
1401 + 6 10
1402 + 8 10
1403 + 9 10
1404 + 10 11",
1405 );
1406
1407 let chunk_2 = StreamChunk::from_pretty(
1408 " I T
1409 - 5 10
1410 - 6 10
1411 - 8 10
1412 U- 10 11
1413 U+ 10 10",
1414 );
1415
1416 tx.push_chunk(chunk_1.clone());
1417 tx.push_chunk(chunk_2.clone());
1418
1419 tx.push_barrier(test_epoch(2), false);
1420
1421 let mut stream = log_store_executor.execute();
1422
1423 match stream.next().await {
1424 Some(Ok(Message::Barrier(barrier))) => {
1425 assert_eq!(barrier.epoch.curr, test_epoch(1));
1426 }
1427 other => panic!("Expected a barrier message, got {:?}", other),
1428 }
1429
1430 match stream.next().await {
1431 Some(Ok(Message::Chunk(chunk))) => {
1432 assert_stream_chunk_eq!(chunk, chunk_1);
1433 }
1434 other => panic!("Expected a chunk message, got {:?}", other),
1435 }
1436
1437 match stream.next().await {
1438 Some(Ok(Message::Chunk(chunk))) => {
1439 assert_stream_chunk_eq!(chunk, chunk_2);
1440 }
1441 other => panic!("Expected a chunk message, got {:?}", other),
1442 }
1443
1444 match stream.next().await {
1445 Some(Ok(Message::Barrier(barrier))) => {
1446 assert_eq!(barrier.epoch.curr, test_epoch(2));
1447 }
1448 other => panic!("Expected a barrier message, got {:?}", other),
1449 }
1450 }
1451
1452 #[tokio::test]
1455 async fn test_max_chunk_persisted_read() {
1456 init_logger();
1457
1458 let pk_info = &KV_LOG_STORE_V2_INFO;
1459 let column_descs = test_payload_schema(pk_info);
1460 let fields = column_descs
1461 .into_iter()
1462 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1463 .collect_vec();
1464 let schema = Schema { fields };
1465 let stream_key = vec![0];
1466 let (mut tx, source) = MockSource::channel();
1467 let source = source.into_executor(schema.clone(), stream_key.clone());
1468
1469 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1470
1471 let table = gen_test_log_store_table(pk_info);
1472
1473 let log_store_executor = SyncedKvLogStoreExecutor::new(
1474 ActorContext::for_test(123),
1475 table.id,
1476 SyncedKvLogStoreMetrics::for_test(),
1477 LogStoreRowSerde::new(&table, vnodes, pk_info),
1478 MemoryStateStore::new(),
1479 0,
1480 256,
1481 source,
1482 Duration::from_millis(256),
1483 false,
1484 )
1485 .boxed();
1486
1487 tx.push_barrier(test_epoch(1), false);
1489
1490 let chunk_1 = StreamChunk::from_pretty(
1491 " I T
1492 + 5 10
1493 + 6 10
1494 + 8 10
1495 + 9 10
1496 + 10 11",
1497 );
1498
1499 let chunk_2 = StreamChunk::from_pretty(
1500 " I T
1501 - 5 10
1502 - 6 10
1503 - 8 10
1504 U- 10 11
1505 U+ 10 10",
1506 );
1507
1508 tx.push_chunk(chunk_1.clone());
1509 tx.push_chunk(chunk_2.clone());
1510
1511 tx.push_barrier(test_epoch(2), false);
1512
1513 let mut stream = log_store_executor.execute();
1514
1515 for i in 1..=2 {
1516 match stream.next().await {
1517 Some(Ok(Message::Barrier(barrier))) => {
1518 assert_eq!(barrier.epoch.curr, test_epoch(i));
1519 }
1520 other => panic!("Expected a barrier message, got {:?}", other),
1521 }
1522 }
1523
1524 match stream.next().await {
1525 Some(Ok(Message::Chunk(actual))) => {
1526 let expected = StreamChunk::from_pretty(
1527 " I T
1528 + 5 10
1529 + 6 10
1530 + 8 10
1531 + 9 10
1532 + 10 11
1533 - 5 10
1534 - 6 10
1535 - 8 10
1536 U- 10 11
1537 U+ 10 10",
1538 );
1539 assert_stream_chunk_eq!(actual, expected);
1540 }
1541 other => panic!("Expected a chunk message, got {:?}", other),
1542 }
1543 }
1544}