1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93 LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
106
107 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
108 use crate::executor::monitor::StreamingMetrics;
109 use crate::task::ActorId;
110
111 #[derive(Clone)]
112 pub struct SyncedKvLogStoreMetrics {
113 pub unclean_state: LabelGuardedIntCounter,
115 pub clean_state: LabelGuardedIntCounter,
116 pub wait_next_poll_ns: LabelGuardedIntCounter,
117
118 pub storage_write_count: LabelGuardedIntCounter,
120 pub storage_write_size: LabelGuardedIntCounter,
121 pub pause_duration_ns: LabelGuardedIntCounter,
122
123 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
125 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
126 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
127 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
128 pub buffer_read_count: LabelGuardedIntCounter,
129 pub buffer_read_size: LabelGuardedIntCounter,
130
131 pub total_read_count: LabelGuardedIntCounter,
133 pub total_read_size: LabelGuardedIntCounter,
134 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
135 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
136 }
137
138 impl SyncedKvLogStoreMetrics {
139 pub(crate) fn new(
146 metrics: &StreamingMetrics,
147 actor_id: ActorId,
148 id: u32,
149 name: &str,
150 target: &'static str,
151 ) -> Self {
152 let actor_id_str = actor_id.to_string();
153 let id_str = id.to_string();
154 let labels = &[&actor_id_str, target, &id_str, name];
155
156 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
157 "dirty",
158 &actor_id_str,
159 target,
160 &id_str,
161 name,
162 ]);
163 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
164 "clean",
165 &actor_id_str,
166 target,
167 &id_str,
168 name,
169 ]);
170 let wait_next_poll_ns = metrics
171 .sync_kv_log_store_wait_next_poll_ns
172 .with_guarded_label_values(labels);
173
174 let storage_write_size = metrics
175 .sync_kv_log_store_storage_write_size
176 .with_guarded_label_values(labels);
177 let storage_write_count = metrics
178 .sync_kv_log_store_storage_write_count
179 .with_guarded_label_values(labels);
180 let storage_pause_duration_ns = metrics
181 .sync_kv_log_store_write_pause_duration_ns
182 .with_guarded_label_values(labels);
183
184 let buffer_unconsumed_item_count = metrics
185 .sync_kv_log_store_buffer_unconsumed_item_count
186 .with_guarded_label_values(labels);
187 let buffer_unconsumed_row_count = metrics
188 .sync_kv_log_store_buffer_unconsumed_row_count
189 .with_guarded_label_values(labels);
190 let buffer_unconsumed_epoch_count = metrics
191 .sync_kv_log_store_buffer_unconsumed_epoch_count
192 .with_guarded_label_values(labels);
193 let buffer_unconsumed_min_epoch = metrics
194 .sync_kv_log_store_buffer_unconsumed_min_epoch
195 .with_guarded_label_values(labels);
196 let buffer_read_count = metrics
197 .sync_kv_log_store_read_count
198 .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
199
200 let buffer_read_size = metrics
201 .sync_kv_log_store_read_size
202 .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
203
204 let total_read_count = metrics
205 .sync_kv_log_store_read_count
206 .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
207
208 let total_read_size = metrics
209 .sync_kv_log_store_read_size
210 .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
211
212 const READ_PERSISTENT_LOG: &str = "persistent_log";
213 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
214
215 let persistent_log_read_size = metrics
216 .sync_kv_log_store_read_size
217 .with_guarded_label_values(&[
218 READ_PERSISTENT_LOG,
219 &actor_id_str,
220 target,
221 &id_str,
222 name,
223 ]);
224
225 let persistent_log_read_count = metrics
226 .sync_kv_log_store_read_count
227 .with_guarded_label_values(&[
228 READ_PERSISTENT_LOG,
229 &actor_id_str,
230 target,
231 &id_str,
232 name,
233 ]);
234
235 let flushed_buffer_read_size = metrics
236 .sync_kv_log_store_read_size
237 .with_guarded_label_values(&[
238 READ_FLUSHED_BUFFER,
239 &actor_id_str,
240 target,
241 &id_str,
242 name,
243 ]);
244
245 let flushed_buffer_read_count = metrics
246 .sync_kv_log_store_read_count
247 .with_guarded_label_values(&[
248 READ_FLUSHED_BUFFER,
249 &actor_id_str,
250 target,
251 &id_str,
252 name,
253 ]);
254
255 Self {
256 unclean_state,
257 clean_state,
258 wait_next_poll_ns,
259 storage_write_size,
260 storage_write_count,
261 pause_duration_ns: storage_pause_duration_ns,
262 buffer_unconsumed_item_count,
263 buffer_unconsumed_row_count,
264 buffer_unconsumed_epoch_count,
265 buffer_unconsumed_min_epoch,
266 buffer_read_count,
267 buffer_read_size,
268 total_read_count,
269 total_read_size,
270 persistent_log_read_metrics: KvLogStoreReadMetrics {
271 storage_read_size: persistent_log_read_size,
272 storage_read_count: persistent_log_read_count,
273 },
274 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
275 storage_read_count: flushed_buffer_read_count,
276 storage_read_size: flushed_buffer_read_size,
277 },
278 }
279 }
280
281 #[cfg(test)]
282 pub(crate) fn for_test() -> Self {
283 SyncedKvLogStoreMetrics {
284 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
285 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
286 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
287 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
288 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
289 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
290 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
291 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
292 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
293 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
294 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
295 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
296 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
297 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
298 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
299 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
300 }
301 }
302 }
303}
304
305type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
306
307pub struct SyncedKvLogStoreExecutor<S: StateStore> {
308 actor_context: ActorContextRef,
309 table_id: TableId,
310 metrics: SyncedKvLogStoreMetrics,
311 serde: LogStoreRowSerde,
312
313 upstream: Executor,
315
316 state_store: S,
318 max_buffer_size: usize,
319
320 chunk_size: usize,
322
323 pause_duration_ms: Duration,
324
325 aligned: bool,
326}
327impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
329 #[expect(clippy::too_many_arguments)]
330 pub(crate) fn new(
331 actor_context: ActorContextRef,
332 table_id: u32,
333 metrics: SyncedKvLogStoreMetrics,
334 serde: LogStoreRowSerde,
335 state_store: S,
336 buffer_size: usize,
337 chunk_size: usize,
338 upstream: Executor,
339 pause_duration_ms: Duration,
340 aligned: bool,
341 ) -> Self {
342 Self {
343 actor_context,
344 table_id: TableId::new(table_id),
345 metrics,
346 serde,
347 state_store,
348 upstream,
349 max_buffer_size: buffer_size,
350 chunk_size,
351 pause_duration_ms,
352 aligned,
353 }
354 }
355}
356
357struct FlushedChunkInfo {
358 epoch: u64,
359 start_seq_id: SeqId,
360 end_seq_id: SeqId,
361 flush_info: FlushInfo,
362 vnode_bitmap: Bitmap,
363}
364
365enum WriteFuture<S: LocalStateStore> {
366 Paused {
376 start_instant: Instant,
377 sleep_future: Option<Pin<Box<Sleep>>>,
378 barrier: Barrier,
379 stream: BoxedMessageStream,
380 write_state: LogStoreWriteState<S>, },
382 ReceiveFromUpstream {
383 future: StreamFuture<BoxedMessageStream>,
384 write_state: LogStoreWriteState<S>,
385 },
386 FlushingChunk {
387 epoch: u64,
388 start_seq_id: SeqId,
389 end_seq_id: SeqId,
390 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
391 stream: BoxedMessageStream,
392 },
393 Empty,
394}
395
396enum WriteFutureEvent {
397 UpstreamMessageReceived(Message),
398 ChunkFlushed(FlushedChunkInfo),
399}
400
401impl<S: LocalStateStore> WriteFuture<S> {
402 fn flush_chunk(
403 stream: BoxedMessageStream,
404 write_state: LogStoreWriteState<S>,
405 chunk: StreamChunk,
406 epoch: u64,
407 start_seq_id: SeqId,
408 end_seq_id: SeqId,
409 ) -> Self {
410 tracing::trace!(
411 start_seq_id,
412 end_seq_id,
413 epoch,
414 cardinality = chunk.cardinality(),
415 "write_future: flushing chunk"
416 );
417 Self::FlushingChunk {
418 epoch,
419 start_seq_id,
420 end_seq_id,
421 future: Box::pin(write_state.into_write_chunk_future(
422 chunk,
423 epoch,
424 start_seq_id,
425 end_seq_id,
426 )),
427 stream,
428 }
429 }
430
431 fn receive_from_upstream(
432 stream: BoxedMessageStream,
433 write_state: LogStoreWriteState<S>,
434 ) -> Self {
435 Self::ReceiveFromUpstream {
436 future: stream.into_future(),
437 write_state,
438 }
439 }
440
441 fn paused(
442 duration: Duration,
443 barrier: Barrier,
444 stream: BoxedMessageStream,
445 write_state: LogStoreWriteState<S>,
446 ) -> Self {
447 let now = Instant::now();
448 tracing::trace!(?now, ?duration, "write_future_pause");
449 Self::Paused {
450 start_instant: now,
451 sleep_future: Some(Box::pin(sleep_until(now + duration))),
452 barrier,
453 stream,
454 write_state,
455 }
456 }
457
458 async fn next_event(
459 &mut self,
460 metrics: &SyncedKvLogStoreMetrics,
461 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
462 match self {
463 WriteFuture::Paused {
464 start_instant,
465 sleep_future,
466 ..
467 } => {
468 if let Some(sleep_future) = sleep_future {
469 sleep_future.await;
470 metrics
471 .pause_duration_ns
472 .inc_by(start_instant.elapsed().as_nanos() as _);
473 tracing::trace!("resuming write future");
474 }
475 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
476 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
477 })
478 }
479 WriteFuture::ReceiveFromUpstream { future, .. } => {
480 let (opt, stream) = future.await;
481 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
482 opt
483 .ok_or_else(|| anyhow!("end of upstream input").into())
484 .and_then(|result| result.map(|item| {
485 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
486 }))
487 })
488 }
489 WriteFuture::FlushingChunk { future, .. } => {
490 let (write_state, result) = future.await;
491 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
492 result.map(|(flush_info, vnode_bitmap)| {
493 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
494 epoch,
495 start_seq_id,
496 end_seq_id,
497 flush_info,
498 vnode_bitmap,
499 }))
500 })
501 });
502 result.map_err(Into::into)
503 }
504 WriteFuture::Empty => {
505 unreachable!("should not be polled after ready")
506 }
507 }
508 }
509}
510
511impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
513 #[try_stream(ok= Message, error = StreamExecutorError)]
514 pub async fn execute_monitored(self) {
515 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
516 #[for_await]
517 for message in self.execute_inner() {
518 let current_time = Instant::now();
519 yield message?;
520 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
521 }
522 }
523
524 #[try_stream(ok = Message, error = StreamExecutorError)]
525 async fn execute_inner(self) {
526 let mut input = self.upstream.execute();
527
528 let first_barrier = expect_first_barrier(&mut input).await?;
530 let first_write_epoch = first_barrier.epoch;
531 yield Message::Barrier(first_barrier.clone());
532
533 let local_state_store = self
534 .state_store
535 .new_local(NewLocalOptions {
536 table_id: self.table_id,
537 op_consistency_level: OpConsistencyLevel::Inconsistent,
538 table_option: TableOption {
539 retention_seconds: None,
540 },
541 is_replicated: false,
542 vnodes: self.serde.vnodes().clone(),
543 upload_on_flush: false,
544 })
545 .await;
546
547 let (mut read_state, mut initial_write_state) = new_log_store_state(
548 self.table_id,
549 local_state_store,
550 self.serde,
551 self.chunk_size,
552 );
553 initial_write_state.init(first_write_epoch).await?;
554
555 let mut pause_stream = first_barrier.is_pause_on_startup();
556 let mut initial_write_epoch = first_write_epoch;
557
558 if self.aligned {
559 tracing::info!("aligned mode");
560 let log_store_stream = read_state
565 .read_persisted_log_store(
566 self.metrics.persistent_log_read_metrics.clone(),
567 initial_write_epoch.curr,
568 LogStoreReadStateStreamRangeStart::Unbounded,
569 )
570 .await?;
571
572 #[for_await]
573 for message in log_store_stream {
574 let (_epoch, message) = message?;
575 match message {
576 KvLogStoreItem::Barrier { .. } => {
577 continue;
578 }
579 KvLogStoreItem::StreamChunk { chunk, .. } => {
580 yield Message::Chunk(chunk);
581 }
582 }
583 }
584
585 let mut realigned_logstore = false;
586
587 #[for_await]
588 for message in input {
589 match message? {
590 Message::Barrier(barrier) => {
591 let is_checkpoint = barrier.is_checkpoint();
592 let mut progress = LogStoreVnodeProgress::None;
593 progress.apply_aligned(
594 read_state.vnodes().clone(),
595 barrier.epoch.prev,
596 None,
597 );
598 let post_seal = initial_write_state
600 .seal_current_epoch(barrier.epoch.curr, progress.take());
601 let update_vnode_bitmap =
602 barrier.as_update_vnode_bitmap(self.actor_context.id);
603 yield Message::Barrier(barrier);
604 post_seal.post_yield_barrier(update_vnode_bitmap).await?;
605 if !realigned_logstore && is_checkpoint {
606 realigned_logstore = true;
607 tracing::info!("realigned logstore");
608 }
609 }
610 Message::Chunk(chunk) => {
611 yield Message::Chunk(chunk);
612 }
613 Message::Watermark(watermark) => {
614 yield Message::Watermark(watermark);
615 }
616 }
617 }
618
619 return Ok(());
620 }
621
622 'recreate_consume_stream: loop {
626 let mut seq_id = FIRST_SEQ_ID;
627 let mut buffer = SyncedLogStoreBuffer {
628 buffer: VecDeque::new(),
629 current_size: 0,
630 max_size: self.max_buffer_size,
631 max_chunk_size: self.chunk_size,
632 next_chunk_id: 0,
633 metrics: self.metrics.clone(),
634 flushed_count: 0,
635 };
636
637 let log_store_stream = read_state
638 .read_persisted_log_store(
639 self.metrics.persistent_log_read_metrics.clone(),
640 initial_write_epoch.curr,
641 LogStoreReadStateStreamRangeStart::Unbounded,
642 )
643 .await?;
644
645 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
646 let mut clean_state = log_store_stream.peek().await.is_none();
647 tracing::trace!(?clean_state);
648
649 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
650
651 let mut write_future_state =
652 WriteFuture::receive_from_upstream(input, initial_write_state);
653
654 let mut progress = LogStoreVnodeProgress::None;
655
656 loop {
657 let select_result = {
658 let read_future = async {
659 if pause_stream {
660 pending().await
661 } else {
662 read_future_state
663 .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
664 .await
665 }
666 };
667 pin_mut!(read_future);
668 let write_future = write_future_state.next_event(&self.metrics);
669 pin_mut!(write_future);
670 let output = select(write_future, read_future).await;
671 drop_either_future(output)
672 };
673 match select_result {
674 Either::Left(result) => {
675 drop(write_future_state);
677 let (stream, mut write_state, either) = result?;
678 match either {
679 WriteFutureEvent::UpstreamMessageReceived(msg) => {
680 match msg {
681 Message::Barrier(barrier) => {
682 if clean_state
683 && barrier.kind.is_checkpoint()
684 && !buffer.is_empty()
685 {
686 write_future_state = WriteFuture::paused(
687 self.pause_duration_ms,
688 barrier,
689 stream,
690 write_state,
691 );
692 clean_state = false;
693 self.metrics.unclean_state.inc();
694 } else {
695 if let Some(mutation) = barrier.mutation.as_deref() {
696 match mutation {
697 Mutation::Pause => {
698 pause_stream = true;
699 }
700 Mutation::Resume => {
701 pause_stream = false;
702 }
703 _ => {}
704 }
705 }
706 let write_state_post_write_barrier =
707 Self::write_barrier(
708 self.actor_context.id,
709 &mut write_state,
710 barrier.clone(),
711 &self.metrics,
712 progress.take(),
713 &mut buffer,
714 )
715 .await?;
716 seq_id = FIRST_SEQ_ID;
717 let update_vnode_bitmap = barrier
718 .as_update_vnode_bitmap(self.actor_context.id);
719 let barrier_epoch = barrier.epoch;
720 tracing::trace!(
721 ?update_vnode_bitmap,
722 actor_id = self.actor_context.id,
723 "update vnode bitmap"
724 );
725
726 yield Message::Barrier(barrier);
727
728 write_state_post_write_barrier
729 .post_yield_barrier(update_vnode_bitmap.clone())
730 .await?;
731 if let Some(vnode_bitmap) = update_vnode_bitmap {
732 read_state.update_vnode_bitmap(vnode_bitmap);
734 initial_write_epoch = barrier_epoch;
735 input = stream;
736 initial_write_state = write_state;
737 continue 'recreate_consume_stream;
738 } else {
739 write_future_state =
740 WriteFuture::receive_from_upstream(
741 stream,
742 write_state,
743 );
744 }
745 }
746 }
747 Message::Chunk(chunk) => {
748 let start_seq_id = seq_id;
749 let new_seq_id = seq_id + chunk.cardinality() as SeqId;
750 let end_seq_id = new_seq_id - 1;
751 let epoch = write_state.epoch().curr;
752 tracing::trace!(
753 start_seq_id,
754 end_seq_id,
755 new_seq_id,
756 epoch,
757 cardinality = chunk.cardinality(),
758 "received chunk"
759 );
760 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
761 start_seq_id,
762 end_seq_id,
763 chunk,
764 epoch,
765 ) {
766 seq_id = new_seq_id;
767 write_future_state = WriteFuture::flush_chunk(
768 stream,
769 write_state,
770 chunk_to_flush,
771 epoch,
772 start_seq_id,
773 end_seq_id,
774 );
775 } else {
776 seq_id = new_seq_id;
777 write_future_state = WriteFuture::receive_from_upstream(
778 stream,
779 write_state,
780 );
781 }
782 }
783 Message::Watermark(_watermark) => {
786 write_future_state =
787 WriteFuture::receive_from_upstream(stream, write_state);
788 }
789 }
790 }
791 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
792 start_seq_id,
793 end_seq_id,
794 epoch,
795 flush_info,
796 vnode_bitmap,
797 }) => {
798 buffer.add_flushed_item_to_buffer(
799 start_seq_id,
800 end_seq_id,
801 vnode_bitmap,
802 epoch,
803 );
804 self.metrics
805 .storage_write_count
806 .inc_by(flush_info.flush_count as _);
807 self.metrics
808 .storage_write_size
809 .inc_by(flush_info.flush_size as _);
810 write_future_state =
811 WriteFuture::receive_from_upstream(stream, write_state);
812 }
813 }
814 }
815 Either::Right(result) => {
816 if !clean_state
817 && matches!(read_future_state, ReadFuture::Idle)
818 && buffer.is_empty()
819 {
820 clean_state = true;
821 self.metrics.clean_state.inc();
822
823 if let WriteFuture::Paused { sleep_future, .. } =
825 &mut write_future_state
826 {
827 tracing::trace!("resuming paused future");
828 assert!(buffer.current_size < self.max_buffer_size);
829 *sleep_future = None;
830 }
831 }
832 let chunk = result?;
833 self.metrics
834 .total_read_count
835 .inc_by(chunk.cardinality() as _);
836
837 yield Message::Chunk(chunk);
838 }
839 }
840 }
841 }
842 }
843}
844
845type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
846
847enum ReadFuture<S: StateStoreRead> {
848 ReadingPersistedStream(PersistedStream<S>),
849 ReadingFlushedChunk {
850 future: ReadFlushedChunkFuture,
851 end_seq_id: SeqId,
852 },
853 Idle,
854}
855
856impl<S: StateStoreRead> ReadFuture<S> {
858 async fn next_chunk(
859 &mut self,
860 progress: &mut LogStoreVnodeProgress,
861 read_state: &LogStoreReadState<S>,
862 buffer: &mut SyncedLogStoreBuffer,
863 metrics: &SyncedKvLogStoreMetrics,
864 ) -> StreamExecutorResult<StreamChunk> {
865 match self {
866 ReadFuture::ReadingPersistedStream(stream) => {
867 while let Some((epoch, item)) = stream.try_next().await? {
868 match item {
869 KvLogStoreItem::Barrier { vnodes, .. } => {
870 tracing::trace!(epoch, "read logstore barrier");
871 progress.apply_aligned(vnodes, epoch, None);
873 continue;
874 }
875 KvLogStoreItem::StreamChunk {
876 chunk,
877 progress: chunk_progress,
878 } => {
879 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
880 progress.apply_per_vnode(epoch, chunk_progress);
881 return Ok(chunk);
882 }
883 }
884 }
885 *self = ReadFuture::Idle;
886 }
887 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
888 }
889 match self {
890 ReadFuture::ReadingPersistedStream(_) => {
891 unreachable!("must have finished read persisted stream when reaching here")
892 }
893 ReadFuture::ReadingFlushedChunk { .. } => {}
894 ReadFuture::Idle => loop {
895 let Some((item_epoch, item)) = buffer.pop_front() else {
896 return pending().await;
897 };
898 match item {
899 LogStoreBufferItem::StreamChunk {
900 chunk,
901 start_seq_id,
902 end_seq_id,
903 flushed,
904 ..
905 } => {
906 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
907 tracing::trace!(
908 start_seq_id,
909 end_seq_id,
910 flushed,
911 cardinality = chunk.cardinality(),
912 "read buffered chunk of size"
913 );
914 progress.apply_aligned(
915 read_state.vnodes().clone(),
916 item_epoch,
917 Some(end_seq_id),
918 );
919 return Ok(chunk);
920 }
921 LogStoreBufferItem::Flushed {
922 vnode_bitmap,
923 start_seq_id,
924 end_seq_id,
925 chunk_id,
926 } => {
927 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
928 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
929 let future = read_state
930 .read_flushed_chunk(
931 vnode_bitmap,
932 chunk_id,
933 start_seq_id,
934 end_seq_id,
935 item_epoch,
936 read_metrics,
937 )
938 .boxed();
939 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
940 break;
941 }
942 LogStoreBufferItem::Barrier { .. } => {
943 tracing::trace!(item_epoch, "read buffer barrier");
944 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
945 continue;
946 }
947 }
948 },
949 }
950
951 let (future, end_seq_id) = match self {
952 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
953 unreachable!("should be at ReadingFlushedChunk")
954 }
955 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
956 };
957
958 let (_, chunk, epoch) = future.await?;
959 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
960 tracing::trace!(
961 end_seq_id,
962 "read flushed chunk of size: {}",
963 chunk.cardinality()
964 );
965 *self = ReadFuture::Idle;
966 Ok(chunk)
967 }
968}
969
970impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
972 async fn write_barrier<'a>(
973 actor_id: u32,
974 write_state: &'a mut LogStoreWriteState<S::Local>,
975 barrier: Barrier,
976 metrics: &SyncedKvLogStoreMetrics,
977 progress: LogStoreVnodeProgress,
978 buffer: &mut SyncedLogStoreBuffer,
979 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
980 tracing::trace!(actor_id, ?progress, "applying truncation");
981 let epoch = barrier.epoch.prev;
985 let mut writer = write_state.start_writer(false);
986 writer.write_barrier(epoch, barrier.is_checkpoint())?;
987
988 if barrier.is_checkpoint() {
989 for (epoch, item) in buffer.buffer.iter_mut().rev() {
990 match item {
991 LogStoreBufferItem::StreamChunk {
992 chunk,
993 start_seq_id,
994 end_seq_id,
995 flushed,
996 ..
997 } => {
998 if !*flushed {
999 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1000 *flushed = true;
1001 } else {
1002 break;
1003 }
1004 }
1005 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1006 }
1007 }
1008 }
1009
1010 let (flush_info, _) = writer.finish().await?;
1012 metrics
1013 .storage_write_count
1014 .inc_by(flush_info.flush_count as _);
1015 metrics
1016 .storage_write_size
1017 .inc_by(flush_info.flush_size as _);
1018 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1019
1020 buffer.buffer.push_back((
1022 epoch,
1023 LogStoreBufferItem::Barrier {
1024 is_checkpoint: barrier.is_checkpoint(),
1025 next_epoch: barrier.epoch.curr,
1026 },
1027 ));
1028 buffer.next_chunk_id = 0;
1029 buffer.update_unconsumed_buffer_metrics();
1030
1031 Ok(post_seal)
1032 }
1033}
1034
1035struct SyncedLogStoreBuffer {
1036 buffer: VecDeque<(u64, LogStoreBufferItem)>,
1037 current_size: usize,
1038 max_size: usize,
1039 max_chunk_size: usize,
1040 next_chunk_id: ChunkId,
1041 metrics: SyncedKvLogStoreMetrics,
1042 flushed_count: usize,
1043}
1044
1045impl SyncedLogStoreBuffer {
1046 fn is_empty(&self) -> bool {
1047 self.current_size == 0
1048 }
1049
1050 fn add_or_flush_chunk(
1051 &mut self,
1052 start_seq_id: SeqId,
1053 end_seq_id: SeqId,
1054 chunk: StreamChunk,
1055 epoch: u64,
1056 ) -> Option<StreamChunk> {
1057 let current_size = self.current_size;
1058 let chunk_size = chunk.cardinality();
1059
1060 tracing::trace!(
1061 current_size,
1062 chunk_size,
1063 max_size = self.max_size,
1064 "checking chunk size"
1065 );
1066 let should_flush_chunk = current_size + chunk_size > self.max_size;
1067 if should_flush_chunk {
1068 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1069 Some(chunk)
1070 } else {
1071 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1072 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1073 None
1074 }
1075 }
1076
1077 fn add_flushed_item_to_buffer(
1080 &mut self,
1081 start_seq_id: SeqId,
1082 end_seq_id: SeqId,
1083 new_vnode_bitmap: Bitmap,
1084 epoch: u64,
1085 ) {
1086 let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1087
1088 if let Some((
1089 item_epoch,
1090 LogStoreBufferItem::Flushed {
1091 start_seq_id: prev_start_seq_id,
1092 end_seq_id: prev_end_seq_id,
1093 vnode_bitmap,
1094 ..
1095 },
1096 )) = self.buffer.back_mut()
1097 && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1098 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1099 && projected_flushed_chunk_size <= self.max_chunk_size
1100 {
1101 assert!(
1102 *prev_end_seq_id < start_seq_id,
1103 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1104 end_seq_id,
1105 start_seq_id
1106 );
1107 assert_eq!(
1108 epoch, *item_epoch,
1109 "epoch of newly added flushed item must be the same as the last flushed item"
1110 );
1111 *prev_end_seq_id = end_seq_id;
1112 *vnode_bitmap |= new_vnode_bitmap;
1113 } else {
1114 let chunk_id = self.next_chunk_id;
1115 self.next_chunk_id += 1;
1116 self.buffer.push_back((
1117 epoch,
1118 LogStoreBufferItem::Flushed {
1119 start_seq_id,
1120 end_seq_id,
1121 vnode_bitmap: new_vnode_bitmap,
1122 chunk_id,
1123 },
1124 ));
1125 self.flushed_count += 1;
1126 tracing::trace!(
1127 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1128 );
1129 }
1130 self.update_unconsumed_buffer_metrics();
1132 }
1133
1134 fn add_chunk_to_buffer(
1135 &mut self,
1136 chunk: StreamChunk,
1137 start_seq_id: SeqId,
1138 end_seq_id: SeqId,
1139 epoch: u64,
1140 ) {
1141 let chunk_id = self.next_chunk_id;
1142 self.next_chunk_id += 1;
1143 self.current_size += chunk.cardinality();
1144 self.buffer.push_back((
1145 epoch,
1146 LogStoreBufferItem::StreamChunk {
1147 chunk,
1148 start_seq_id,
1149 end_seq_id,
1150 flushed: false,
1151 chunk_id,
1152 },
1153 ));
1154 self.update_unconsumed_buffer_metrics();
1155 }
1156
1157 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1158 let item = self.buffer.pop_front();
1159 match &item {
1160 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1161 self.flushed_count -= 1;
1162 }
1163 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1164 self.current_size -= chunk.cardinality();
1165 }
1166 _ => {}
1167 }
1168 self.update_unconsumed_buffer_metrics();
1169 item
1170 }
1171
1172 fn update_unconsumed_buffer_metrics(&self) {
1173 let mut epoch_count = 0;
1174 let mut row_count = 0;
1175 for (_, item) in &self.buffer {
1176 match item {
1177 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1178 row_count += chunk.cardinality();
1179 }
1180 LogStoreBufferItem::Flushed {
1181 start_seq_id,
1182 end_seq_id,
1183 ..
1184 } => {
1185 row_count += (end_seq_id - start_seq_id) as usize;
1186 }
1187 LogStoreBufferItem::Barrier { .. } => {
1188 epoch_count += 1;
1189 }
1190 }
1191 }
1192 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1193 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1194 self.metrics
1195 .buffer_unconsumed_item_count
1196 .set(self.buffer.len() as _);
1197 self.metrics.buffer_unconsumed_min_epoch.set(
1198 self.buffer
1199 .front()
1200 .map(|(epoch, _)| *epoch)
1201 .unwrap_or_default() as _,
1202 );
1203 }
1204}
1205
1206impl<S> Execute for SyncedKvLogStoreExecutor<S>
1207where
1208 S: StateStore,
1209{
1210 fn execute(self: Box<Self>) -> BoxedMessageStream {
1211 self.execute_monitored().boxed()
1212 }
1213}
1214
1215#[cfg(test)]
1216mod tests {
1217 use itertools::Itertools;
1218 use pretty_assertions::assert_eq;
1219 use risingwave_common::catalog::Field;
1220 use risingwave_common::hash::VirtualNode;
1221 use risingwave_common::test_prelude::*;
1222 use risingwave_common::util::epoch::test_epoch;
1223 use risingwave_storage::memory::MemoryStateStore;
1224
1225 use super::*;
1226 use crate::assert_stream_chunk_eq;
1227 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1228 use crate::common::log_store_impl::kv_log_store::test_utils::{
1229 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1230 };
1231 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1232 use crate::executor::test_utils::MockSource;
1233
1234 fn init_logger() {
1235 let _ = tracing_subscriber::fmt()
1236 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1237 .with_ansi(false)
1238 .try_init();
1239 }
1240
1241 #[tokio::test]
1243 async fn test_read_write_buffer() {
1244 init_logger();
1245
1246 let pk_info = &KV_LOG_STORE_V2_INFO;
1247 let column_descs = test_payload_schema(pk_info);
1248 let fields = column_descs
1249 .into_iter()
1250 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1251 .collect_vec();
1252 let schema = Schema { fields };
1253 let pk_indices = vec![0];
1254 let (mut tx, source) = MockSource::channel();
1255 let source = source.into_executor(schema.clone(), pk_indices.clone());
1256
1257 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1258
1259 let table = gen_test_log_store_table(pk_info);
1260
1261 let log_store_executor = SyncedKvLogStoreExecutor::new(
1262 ActorContext::for_test(123),
1263 table.id,
1264 SyncedKvLogStoreMetrics::for_test(),
1265 LogStoreRowSerde::new(&table, vnodes, pk_info),
1266 MemoryStateStore::new(),
1267 10,
1268 256,
1269 source,
1270 Duration::from_millis(256),
1271 false,
1272 )
1273 .boxed();
1274
1275 tx.push_barrier(test_epoch(1), false);
1277
1278 let chunk_1 = StreamChunk::from_pretty(
1279 " I T
1280 + 5 10
1281 + 6 10
1282 + 8 10
1283 + 9 10
1284 + 10 11",
1285 );
1286
1287 let chunk_2 = StreamChunk::from_pretty(
1288 " I T
1289 - 5 10
1290 - 6 10
1291 - 8 10
1292 U- 9 10
1293 U+ 10 11",
1294 );
1295
1296 tx.push_chunk(chunk_1.clone());
1297 tx.push_chunk(chunk_2.clone());
1298
1299 let mut stream = log_store_executor.execute();
1300
1301 match stream.next().await {
1302 Some(Ok(Message::Barrier(barrier))) => {
1303 assert_eq!(barrier.epoch.curr, test_epoch(1));
1304 }
1305 other => panic!("Expected a barrier message, got {:?}", other),
1306 }
1307
1308 match stream.next().await {
1309 Some(Ok(Message::Chunk(chunk))) => {
1310 assert_stream_chunk_eq!(chunk, chunk_1);
1311 }
1312 other => panic!("Expected a chunk message, got {:?}", other),
1313 }
1314
1315 match stream.next().await {
1316 Some(Ok(Message::Chunk(chunk))) => {
1317 assert_stream_chunk_eq!(chunk, chunk_2);
1318 }
1319 other => panic!("Expected a chunk message, got {:?}", other),
1320 }
1321
1322 tx.push_barrier(test_epoch(2), false);
1323
1324 match stream.next().await {
1325 Some(Ok(Message::Barrier(barrier))) => {
1326 assert_eq!(barrier.epoch.curr, test_epoch(2));
1327 }
1328 other => panic!("Expected a barrier message, got {:?}", other),
1329 }
1330 }
1331
1332 #[tokio::test]
1338 async fn test_barrier_persisted_read() {
1339 init_logger();
1340
1341 let pk_info = &KV_LOG_STORE_V2_INFO;
1342 let column_descs = test_payload_schema(pk_info);
1343 let fields = column_descs
1344 .into_iter()
1345 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1346 .collect_vec();
1347 let schema = Schema { fields };
1348 let pk_indices = vec![0];
1349 let (mut tx, source) = MockSource::channel();
1350 let source = source.into_executor(schema.clone(), pk_indices.clone());
1351
1352 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1353
1354 let table = gen_test_log_store_table(pk_info);
1355
1356 let log_store_executor = SyncedKvLogStoreExecutor::new(
1357 ActorContext::for_test(123),
1358 table.id,
1359 SyncedKvLogStoreMetrics::for_test(),
1360 LogStoreRowSerde::new(&table, vnodes, pk_info),
1361 MemoryStateStore::new(),
1362 10,
1363 256,
1364 source,
1365 Duration::from_millis(256),
1366 false,
1367 )
1368 .boxed();
1369
1370 tx.push_barrier(test_epoch(1), false);
1372
1373 let chunk_1 = StreamChunk::from_pretty(
1374 " I T
1375 + 5 10
1376 + 6 10
1377 + 8 10
1378 + 9 10
1379 + 10 11",
1380 );
1381
1382 let chunk_2 = StreamChunk::from_pretty(
1383 " I T
1384 - 5 10
1385 - 6 10
1386 - 8 10
1387 U- 10 11
1388 U+ 10 10",
1389 );
1390
1391 tx.push_chunk(chunk_1.clone());
1392 tx.push_chunk(chunk_2.clone());
1393
1394 tx.push_barrier(test_epoch(2), false);
1395
1396 let mut stream = log_store_executor.execute();
1397
1398 match stream.next().await {
1399 Some(Ok(Message::Barrier(barrier))) => {
1400 assert_eq!(barrier.epoch.curr, test_epoch(1));
1401 }
1402 other => panic!("Expected a barrier message, got {:?}", other),
1403 }
1404
1405 match stream.next().await {
1406 Some(Ok(Message::Chunk(chunk))) => {
1407 assert_stream_chunk_eq!(chunk, chunk_1);
1408 }
1409 other => panic!("Expected a chunk message, got {:?}", other),
1410 }
1411
1412 match stream.next().await {
1413 Some(Ok(Message::Chunk(chunk))) => {
1414 assert_stream_chunk_eq!(chunk, chunk_2);
1415 }
1416 other => panic!("Expected a chunk message, got {:?}", other),
1417 }
1418
1419 match stream.next().await {
1420 Some(Ok(Message::Barrier(barrier))) => {
1421 assert_eq!(barrier.epoch.curr, test_epoch(2));
1422 }
1423 other => panic!("Expected a barrier message, got {:?}", other),
1424 }
1425 }
1426
1427 #[tokio::test]
1430 async fn test_max_chunk_persisted_read() {
1431 init_logger();
1432
1433 let pk_info = &KV_LOG_STORE_V2_INFO;
1434 let column_descs = test_payload_schema(pk_info);
1435 let fields = column_descs
1436 .into_iter()
1437 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1438 .collect_vec();
1439 let schema = Schema { fields };
1440 let pk_indices = vec![0];
1441 let (mut tx, source) = MockSource::channel();
1442 let source = source.into_executor(schema.clone(), pk_indices.clone());
1443
1444 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1445
1446 let table = gen_test_log_store_table(pk_info);
1447
1448 let log_store_executor = SyncedKvLogStoreExecutor::new(
1449 ActorContext::for_test(123),
1450 table.id,
1451 SyncedKvLogStoreMetrics::for_test(),
1452 LogStoreRowSerde::new(&table, vnodes, pk_info),
1453 MemoryStateStore::new(),
1454 0,
1455 256,
1456 source,
1457 Duration::from_millis(256),
1458 false,
1459 )
1460 .boxed();
1461
1462 tx.push_barrier(test_epoch(1), false);
1464
1465 let chunk_1 = StreamChunk::from_pretty(
1466 " I T
1467 + 5 10
1468 + 6 10
1469 + 8 10
1470 + 9 10
1471 + 10 11",
1472 );
1473
1474 let chunk_2 = StreamChunk::from_pretty(
1475 " I T
1476 - 5 10
1477 - 6 10
1478 - 8 10
1479 U- 10 11
1480 U+ 10 10",
1481 );
1482
1483 tx.push_chunk(chunk_1.clone());
1484 tx.push_chunk(chunk_2.clone());
1485
1486 tx.push_barrier(test_epoch(2), false);
1487
1488 let mut stream = log_store_executor.execute();
1489
1490 for i in 1..=2 {
1491 match stream.next().await {
1492 Some(Ok(Message::Barrier(barrier))) => {
1493 assert_eq!(barrier.epoch.curr, test_epoch(i));
1494 }
1495 other => panic!("Expected a barrier message, got {:?}", other),
1496 }
1497 }
1498
1499 match stream.next().await {
1500 Some(Ok(Message::Chunk(actual))) => {
1501 let expected = StreamChunk::from_pretty(
1502 " I T
1503 + 5 10
1504 + 6 10
1505 + 8 10
1506 + 9 10
1507 + 10 11
1508 - 5 10
1509 - 6 10
1510 - 8 10
1511 U- 10 11
1512 U+ 10 10",
1513 );
1514 assert_stream_chunk_eq!(actual, expected);
1515 }
1516 other => panic!("Expected a chunk message, got {:?}", other),
1517 }
1518 }
1519}