1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_common_estimate_size::EstimateSize;
77use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
78use risingwave_storage::StateStore;
79use risingwave_storage::store::timeout_auto_rebuild::TimeoutAutoRebuildIter;
80use risingwave_storage::store::{
81 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
82};
83use rw_futures_util::drop_either_future;
84use tokio::time::{Duration, Instant, Sleep, sleep_until};
85use tokio_stream::adapters::Peekable;
86
87use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
88use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
89use crate::common::log_store_impl::kv_log_store::serde::{
90 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
91};
92use crate::common::log_store_impl::kv_log_store::state::{
93 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
94 LogStoreWriteState, new_log_store_state,
95};
96use crate::common::log_store_impl::kv_log_store::{
97 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
98};
99use crate::executor::prelude::*;
100use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
101use crate::executor::{
102 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
103};
104
105pub mod metrics {
106 use risingwave_common::id::FragmentId;
107 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
108
109 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
110 use crate::executor::monitor::StreamingMetrics;
111 use crate::task::ActorId;
112
113 #[derive(Clone)]
114 pub struct SyncedKvLogStoreMetrics {
115 pub unclean_state: LabelGuardedIntCounter,
117 pub clean_state: LabelGuardedIntCounter,
118 pub wait_next_poll_ns: LabelGuardedIntCounter,
119
120 pub storage_write_count: LabelGuardedIntCounter,
122 pub storage_write_size: LabelGuardedIntCounter,
123 pub pause_duration_ns: LabelGuardedIntCounter,
124
125 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
127 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
128 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
129 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
130 pub buffer_memory_bytes: LabelGuardedIntGauge,
131 pub buffer_read_count: LabelGuardedIntCounter,
132 pub buffer_read_size: LabelGuardedIntCounter,
133
134 pub total_read_count: LabelGuardedIntCounter,
136 pub total_read_size: LabelGuardedIntCounter,
137 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
138 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
139 }
140
141 impl SyncedKvLogStoreMetrics {
142 pub(crate) fn new(
149 metrics: &StreamingMetrics,
150 actor_id: ActorId,
151 fragment_id: FragmentId,
152 name: &str,
153 target: &'static str,
154 ) -> Self {
155 let actor_id_str = actor_id.to_string();
156 let fragment_id_str = fragment_id.to_string();
157 let labels = &[&actor_id_str, target, &fragment_id_str, name];
158
159 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
160 "dirty",
161 &actor_id_str,
162 target,
163 &fragment_id_str,
164 name,
165 ]);
166 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
167 "clean",
168 &actor_id_str,
169 target,
170 &fragment_id_str,
171 name,
172 ]);
173 let wait_next_poll_ns = metrics
174 .sync_kv_log_store_wait_next_poll_ns
175 .with_guarded_label_values(labels);
176
177 let storage_write_size = metrics
178 .sync_kv_log_store_storage_write_size
179 .with_guarded_label_values(labels);
180 let storage_write_count = metrics
181 .sync_kv_log_store_storage_write_count
182 .with_guarded_label_values(labels);
183 let storage_pause_duration_ns = metrics
184 .sync_kv_log_store_write_pause_duration_ns
185 .with_guarded_label_values(labels);
186
187 let buffer_unconsumed_item_count = metrics
188 .sync_kv_log_store_buffer_unconsumed_item_count
189 .with_guarded_label_values(labels);
190 let buffer_unconsumed_row_count = metrics
191 .sync_kv_log_store_buffer_unconsumed_row_count
192 .with_guarded_label_values(labels);
193 let buffer_unconsumed_epoch_count = metrics
194 .sync_kv_log_store_buffer_unconsumed_epoch_count
195 .with_guarded_label_values(labels);
196 let buffer_unconsumed_min_epoch = metrics
197 .sync_kv_log_store_buffer_unconsumed_min_epoch
198 .with_guarded_label_values(labels);
199 let buffer_memory_bytes = metrics
200 .sync_kv_log_store_buffer_memory_bytes
201 .with_guarded_label_values(labels);
202 let buffer_read_count = metrics
203 .sync_kv_log_store_read_count
204 .with_guarded_label_values(&[
205 "buffer",
206 &actor_id_str,
207 target,
208 &fragment_id_str,
209 name,
210 ]);
211
212 let buffer_read_size = metrics
213 .sync_kv_log_store_read_size
214 .with_guarded_label_values(&[
215 "buffer",
216 &actor_id_str,
217 target,
218 &fragment_id_str,
219 name,
220 ]);
221
222 let total_read_count = metrics
223 .sync_kv_log_store_read_count
224 .with_guarded_label_values(&[
225 "total",
226 &actor_id_str,
227 target,
228 &fragment_id_str,
229 name,
230 ]);
231
232 let total_read_size = metrics
233 .sync_kv_log_store_read_size
234 .with_guarded_label_values(&[
235 "total",
236 &actor_id_str,
237 target,
238 &fragment_id_str,
239 name,
240 ]);
241
242 const READ_PERSISTENT_LOG: &str = "persistent_log";
243 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
244
245 let persistent_log_read_size = metrics
246 .sync_kv_log_store_read_size
247 .with_guarded_label_values(&[
248 READ_PERSISTENT_LOG,
249 &actor_id_str,
250 target,
251 &fragment_id_str,
252 name,
253 ]);
254
255 let persistent_log_read_count = metrics
256 .sync_kv_log_store_read_count
257 .with_guarded_label_values(&[
258 READ_PERSISTENT_LOG,
259 &actor_id_str,
260 target,
261 &fragment_id_str,
262 name,
263 ]);
264
265 let flushed_buffer_read_size = metrics
266 .sync_kv_log_store_read_size
267 .with_guarded_label_values(&[
268 READ_FLUSHED_BUFFER,
269 &actor_id_str,
270 target,
271 &fragment_id_str,
272 name,
273 ]);
274
275 let flushed_buffer_read_count = metrics
276 .sync_kv_log_store_read_count
277 .with_guarded_label_values(&[
278 READ_FLUSHED_BUFFER,
279 &actor_id_str,
280 target,
281 &fragment_id_str,
282 name,
283 ]);
284
285 Self {
286 unclean_state,
287 clean_state,
288 wait_next_poll_ns,
289 storage_write_size,
290 storage_write_count,
291 pause_duration_ns: storage_pause_duration_ns,
292 buffer_unconsumed_item_count,
293 buffer_unconsumed_row_count,
294 buffer_unconsumed_epoch_count,
295 buffer_unconsumed_min_epoch,
296 buffer_memory_bytes,
297 buffer_read_count,
298 buffer_read_size,
299 total_read_count,
300 total_read_size,
301 persistent_log_read_metrics: KvLogStoreReadMetrics {
302 storage_read_size: persistent_log_read_size,
303 storage_read_count: persistent_log_read_count,
304 },
305 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
306 storage_read_count: flushed_buffer_read_count,
307 storage_read_size: flushed_buffer_read_size,
308 },
309 }
310 }
311
312 #[cfg(test)]
313 pub(crate) fn for_test() -> Self {
314 SyncedKvLogStoreMetrics {
315 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
316 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
317 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
318 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
319 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
320 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
321 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
322 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
323 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
324 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
325 buffer_memory_bytes: LabelGuardedIntGauge::test_int_gauge::<4>(),
326 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
327 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
328 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
329 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
330 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
331 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
332 }
333 }
334 }
335}
336
337type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
338
339pub struct SyncedKvLogStoreExecutor<S: StateStore> {
340 actor_context: ActorContextRef,
341 table_id: TableId,
342 metrics: SyncedKvLogStoreMetrics,
343 serde: LogStoreRowSerde,
344
345 upstream: Executor,
347
348 state_store: S,
350 max_buffer_size: usize,
351
352 chunk_size: usize,
354
355 pause_duration_ms: Duration,
356
357 aligned: bool,
358}
359impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
361 #[expect(clippy::too_many_arguments)]
362 pub(crate) fn new(
363 actor_context: ActorContextRef,
364 table_id: TableId,
365 metrics: SyncedKvLogStoreMetrics,
366 serde: LogStoreRowSerde,
367 state_store: S,
368 buffer_size: usize,
369 chunk_size: usize,
370 upstream: Executor,
371 pause_duration_ms: Duration,
372 aligned: bool,
373 ) -> Self {
374 Self {
375 actor_context,
376 table_id,
377 metrics,
378 serde,
379 state_store,
380 upstream,
381 max_buffer_size: buffer_size,
382 chunk_size,
383 pause_duration_ms,
384 aligned,
385 }
386 }
387}
388
389struct FlushedChunkInfo {
390 epoch: u64,
391 start_seq_id: SeqId,
392 end_seq_id: SeqId,
393 flush_info: FlushInfo,
394 vnode_bitmap: Bitmap,
395}
396
397enum WriteFuture<S: LocalStateStore> {
398 Paused {
408 start_instant: Instant,
409 sleep_future: Option<Pin<Box<Sleep>>>,
410 barrier: Barrier,
411 stream: BoxedMessageStream,
412 write_state: LogStoreWriteState<S>, },
414 ReceiveFromUpstream {
415 future: StreamFuture<BoxedMessageStream>,
416 write_state: LogStoreWriteState<S>,
417 },
418 FlushingChunk {
419 epoch: u64,
420 start_seq_id: SeqId,
421 end_seq_id: SeqId,
422 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
423 stream: BoxedMessageStream,
424 },
425 Empty,
426}
427
428enum WriteFutureEvent {
429 UpstreamMessageReceived(Message),
430 ChunkFlushed(FlushedChunkInfo),
431}
432
433impl<S: LocalStateStore> WriteFuture<S> {
434 fn flush_chunk(
435 stream: BoxedMessageStream,
436 write_state: LogStoreWriteState<S>,
437 chunk: StreamChunk,
438 epoch: u64,
439 start_seq_id: SeqId,
440 end_seq_id: SeqId,
441 ) -> Self {
442 tracing::trace!(
443 start_seq_id,
444 end_seq_id,
445 epoch,
446 cardinality = chunk.cardinality(),
447 "write_future: flushing chunk"
448 );
449 Self::FlushingChunk {
450 epoch,
451 start_seq_id,
452 end_seq_id,
453 future: Box::pin(write_state.into_write_chunk_future(
454 chunk,
455 epoch,
456 start_seq_id,
457 end_seq_id,
458 )),
459 stream,
460 }
461 }
462
463 fn receive_from_upstream(
464 stream: BoxedMessageStream,
465 write_state: LogStoreWriteState<S>,
466 ) -> Self {
467 Self::ReceiveFromUpstream {
468 future: stream.into_future(),
469 write_state,
470 }
471 }
472
473 fn paused(
474 duration: Duration,
475 barrier: Barrier,
476 stream: BoxedMessageStream,
477 write_state: LogStoreWriteState<S>,
478 ) -> Self {
479 let now = Instant::now();
480 tracing::trace!(?now, ?duration, "write_future_pause");
481 Self::Paused {
482 start_instant: now,
483 sleep_future: Some(Box::pin(sleep_until(now + duration))),
484 barrier,
485 stream,
486 write_state,
487 }
488 }
489
490 async fn next_event(
491 &mut self,
492 metrics: &SyncedKvLogStoreMetrics,
493 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
494 match self {
495 WriteFuture::Paused {
496 start_instant,
497 sleep_future,
498 ..
499 } => {
500 if let Some(sleep_future) = sleep_future {
501 sleep_future.await;
502 metrics
503 .pause_duration_ns
504 .inc_by(start_instant.elapsed().as_nanos() as _);
505 tracing::trace!("resuming write future");
506 }
507 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
508 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
509 })
510 }
511 WriteFuture::ReceiveFromUpstream { future, .. } => {
512 let (opt, stream) = future.await;
513 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
514 opt
515 .ok_or_else(|| anyhow!("end of upstream input").into())
516 .and_then(|result| result.map(|item| {
517 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
518 }))
519 })
520 }
521 WriteFuture::FlushingChunk { future, .. } => {
522 let (write_state, result) = future.await;
523 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
524 result.map(|(flush_info, vnode_bitmap)| {
525 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
526 epoch,
527 start_seq_id,
528 end_seq_id,
529 flush_info,
530 vnode_bitmap,
531 }))
532 })
533 });
534 result.map_err(Into::into)
535 }
536 WriteFuture::Empty => {
537 unreachable!("should not be polled after ready")
538 }
539 }
540 }
541}
542
543impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
545 #[try_stream(ok= Message, error = StreamExecutorError)]
546 pub async fn execute_monitored(self) {
547 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
548 #[for_await]
549 for message in self.execute_inner() {
550 let current_time = Instant::now();
551 yield message?;
552 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
553 }
554 }
555
556 #[try_stream(ok = Message, error = StreamExecutorError)]
557 async fn execute_inner(self) {
558 let mut input = self.upstream.execute();
559
560 let first_barrier = expect_first_barrier(&mut input).await?;
562 let first_write_epoch = first_barrier.epoch;
563 yield Message::Barrier(first_barrier.clone());
564
565 let local_state_store = self
566 .state_store
567 .new_local(NewLocalOptions {
568 table_id: self.table_id,
569 fragment_id: self.actor_context.fragment_id,
570 op_consistency_level: OpConsistencyLevel::Inconsistent,
571 table_option: TableOption {
572 retention_seconds: None,
573 },
574 is_replicated: false,
575 vnodes: self.serde.vnodes().clone(),
576 upload_on_flush: false,
577 })
578 .await;
579
580 let (mut read_state, mut initial_write_state) = new_log_store_state(
581 self.table_id,
582 local_state_store,
583 self.serde,
584 self.chunk_size,
585 );
586 initial_write_state.init(first_write_epoch).await?;
587
588 let mut pause_stream = first_barrier.is_pause_on_startup();
589 let mut initial_write_epoch = first_write_epoch;
590
591 if self.aligned {
592 tracing::info!("aligned mode");
593 let log_store_stream = read_state
598 .read_persisted_log_store(
599 self.metrics.persistent_log_read_metrics.clone(),
600 initial_write_epoch.curr,
601 LogStoreReadStateStreamRangeStart::Unbounded,
602 )
603 .await?;
604
605 #[for_await]
606 for message in log_store_stream {
607 let (_epoch, message) = message?;
608 match message {
609 KvLogStoreItem::Barrier { .. } => {
610 continue;
611 }
612 KvLogStoreItem::StreamChunk { chunk, .. } => {
613 yield Message::Chunk(chunk);
614 }
615 }
616 }
617
618 let mut realigned_logstore = false;
619
620 #[for_await]
621 for message in input {
622 match message? {
623 Message::Barrier(barrier) => {
624 let is_checkpoint = barrier.is_checkpoint();
625 let mut progress = LogStoreVnodeProgress::None;
626 progress.apply_aligned(
627 read_state.vnodes().clone(),
628 barrier.epoch.prev,
629 None,
630 );
631 let post_seal = initial_write_state
633 .seal_current_epoch(barrier.epoch.curr, progress.take());
634 let update_vnode_bitmap =
635 barrier.as_update_vnode_bitmap(self.actor_context.id);
636 yield Message::Barrier(barrier);
637 post_seal.post_yield_barrier(update_vnode_bitmap).await?;
638 if !realigned_logstore && is_checkpoint {
639 realigned_logstore = true;
640 tracing::info!("realigned logstore");
641 }
642 }
643 Message::Chunk(chunk) => {
644 yield Message::Chunk(chunk);
645 }
646 Message::Watermark(watermark) => {
647 yield Message::Watermark(watermark);
648 }
649 }
650 }
651
652 return Ok(());
653 }
654
655 'recreate_consume_stream: loop {
659 let mut seq_id = FIRST_SEQ_ID;
660 let mut buffer = SyncedLogStoreBuffer {
661 buffer: VecDeque::new(),
662 current_size: 0,
663 max_size: self.max_buffer_size,
664 max_chunk_size: self.chunk_size,
665 next_chunk_id: 0,
666 metrics: self.metrics.clone(),
667 flushed_count: 0,
668 };
669
670 let log_store_stream = read_state
671 .read_persisted_log_store(
672 self.metrics.persistent_log_read_metrics.clone(),
673 initial_write_epoch.curr,
674 LogStoreReadStateStreamRangeStart::Unbounded,
675 )
676 .await?;
677
678 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
679 let mut clean_state = log_store_stream.peek().await.is_none();
680 tracing::trace!(?clean_state);
681
682 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
683
684 let mut write_future_state =
685 WriteFuture::receive_from_upstream(input, initial_write_state);
686
687 let mut progress = LogStoreVnodeProgress::None;
688
689 loop {
690 let select_result = {
691 let read_future = async {
692 if pause_stream {
693 pending().await
694 } else {
695 read_future_state
696 .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
697 .await
698 }
699 };
700 pin_mut!(read_future);
701 let write_future = write_future_state.next_event(&self.metrics);
702 pin_mut!(write_future);
703 let output = select(write_future, read_future).await;
704 drop_either_future(output)
705 };
706 match select_result {
707 Either::Left(result) => {
708 drop(write_future_state);
710 let (stream, mut write_state, either) = result?;
711 match either {
712 WriteFutureEvent::UpstreamMessageReceived(msg) => {
713 match msg {
714 Message::Barrier(barrier) => {
715 if clean_state
716 && barrier.kind.is_checkpoint()
717 && !buffer.is_empty()
718 {
719 write_future_state = WriteFuture::paused(
720 self.pause_duration_ms,
721 barrier,
722 stream,
723 write_state,
724 );
725 clean_state = false;
726 self.metrics.unclean_state.inc();
727 } else {
728 if let Some(mutation) = barrier.mutation.as_deref() {
729 match mutation {
730 Mutation::Pause => {
731 pause_stream = true;
732 }
733 Mutation::Resume => {
734 pause_stream = false;
735 }
736 _ => {}
737 }
738 }
739 let write_state_post_write_barrier =
740 Self::write_barrier(
741 self.actor_context.id,
742 &mut write_state,
743 barrier.clone(),
744 &self.metrics,
745 progress.take(),
746 &mut buffer,
747 )
748 .await?;
749 seq_id = FIRST_SEQ_ID;
750 let update_vnode_bitmap = barrier
751 .as_update_vnode_bitmap(self.actor_context.id);
752 let barrier_epoch = barrier.epoch;
753 tracing::trace!(
754 ?update_vnode_bitmap,
755 actor_id = %self.actor_context.id,
756 "update vnode bitmap"
757 );
758
759 yield Message::Barrier(barrier);
760
761 write_state_post_write_barrier
762 .post_yield_barrier(update_vnode_bitmap.clone())
763 .await?;
764 if let Some(vnode_bitmap) = update_vnode_bitmap {
765 read_state.update_vnode_bitmap(vnode_bitmap);
767 initial_write_epoch = barrier_epoch;
768 input = stream;
769 initial_write_state = write_state;
770 continue 'recreate_consume_stream;
771 } else {
772 write_future_state =
773 WriteFuture::receive_from_upstream(
774 stream,
775 write_state,
776 );
777 }
778 }
779 }
780 Message::Chunk(chunk) => {
781 if chunk.cardinality() == 0 {
786 tracing::warn!(
787 epoch = write_state.epoch().curr,
788 "received empty chunk (cardinality=0), skipping"
789 );
790 write_future_state = WriteFuture::receive_from_upstream(
791 stream,
792 write_state,
793 );
794 } else {
795 let start_seq_id = seq_id;
796 let new_seq_id = seq_id + chunk.cardinality() as SeqId;
797 let end_seq_id = new_seq_id - 1;
798 let epoch = write_state.epoch().curr;
799 tracing::trace!(
800 start_seq_id,
801 end_seq_id,
802 new_seq_id,
803 epoch,
804 cardinality = chunk.cardinality(),
805 "received chunk"
806 );
807 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
808 start_seq_id,
809 end_seq_id,
810 chunk,
811 epoch,
812 ) {
813 seq_id = new_seq_id;
814 write_future_state = WriteFuture::flush_chunk(
815 stream,
816 write_state,
817 chunk_to_flush,
818 epoch,
819 start_seq_id,
820 end_seq_id,
821 );
822 } else {
823 seq_id = new_seq_id;
824 write_future_state =
825 WriteFuture::receive_from_upstream(
826 stream,
827 write_state,
828 );
829 }
830 }
831 }
832 Message::Watermark(_watermark) => {
835 write_future_state =
836 WriteFuture::receive_from_upstream(stream, write_state);
837 }
838 }
839 }
840 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
841 start_seq_id,
842 end_seq_id,
843 epoch,
844 flush_info,
845 vnode_bitmap,
846 }) => {
847 buffer.add_flushed_item_to_buffer(
848 start_seq_id,
849 end_seq_id,
850 vnode_bitmap,
851 epoch,
852 );
853 self.metrics
854 .storage_write_count
855 .inc_by(flush_info.flush_count as _);
856 self.metrics
857 .storage_write_size
858 .inc_by(flush_info.flush_size as _);
859 write_future_state =
860 WriteFuture::receive_from_upstream(stream, write_state);
861 }
862 }
863 }
864 Either::Right(result) => {
865 if !clean_state
866 && matches!(read_future_state, ReadFuture::Idle)
867 && buffer.is_empty()
868 {
869 clean_state = true;
870 self.metrics.clean_state.inc();
871
872 if let WriteFuture::Paused { sleep_future, .. } =
874 &mut write_future_state
875 {
876 tracing::trace!("resuming paused future");
877 assert!(buffer.current_size < self.max_buffer_size);
878 *sleep_future = None;
879 }
880 }
881 let chunk = result?;
882 self.metrics
883 .total_read_count
884 .inc_by(chunk.cardinality() as _);
885
886 yield Message::Chunk(chunk);
887 }
888 }
889 }
890 }
891 }
892}
893
894type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
895
896enum ReadFuture<S: StateStoreRead> {
897 ReadingPersistedStream(PersistedStream<S>),
898 ReadingFlushedChunk {
899 future: ReadFlushedChunkFuture,
900 end_seq_id: SeqId,
901 },
902 Idle,
903}
904
905impl<S: StateStoreRead> ReadFuture<S> {
907 async fn next_chunk(
908 &mut self,
909 progress: &mut LogStoreVnodeProgress,
910 read_state: &LogStoreReadState<S>,
911 buffer: &mut SyncedLogStoreBuffer,
912 metrics: &SyncedKvLogStoreMetrics,
913 ) -> StreamExecutorResult<StreamChunk> {
914 match self {
915 ReadFuture::ReadingPersistedStream(stream) => {
916 while let Some((epoch, item)) = stream.try_next().await? {
917 match item {
918 KvLogStoreItem::Barrier { vnodes, .. } => {
919 tracing::trace!(epoch, "read logstore barrier");
920 progress.apply_aligned(vnodes, epoch, None);
922 continue;
923 }
924 KvLogStoreItem::StreamChunk {
925 chunk,
926 progress: chunk_progress,
927 } => {
928 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
929 progress.apply_per_vnode(epoch, chunk_progress);
930 return Ok(chunk);
931 }
932 }
933 }
934 *self = ReadFuture::Idle;
935 }
936 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
937 }
938 match self {
939 ReadFuture::ReadingPersistedStream(_) => {
940 unreachable!("must have finished read persisted stream when reaching here")
941 }
942 ReadFuture::ReadingFlushedChunk { .. } => {}
943 ReadFuture::Idle => loop {
944 let Some((item_epoch, item)) = buffer.pop_front() else {
945 return pending().await;
946 };
947 match item {
948 LogStoreBufferItem::StreamChunk {
949 chunk,
950 start_seq_id,
951 end_seq_id,
952 flushed,
953 ..
954 } => {
955 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
956 tracing::trace!(
957 start_seq_id,
958 end_seq_id,
959 flushed,
960 cardinality = chunk.cardinality(),
961 "read buffered chunk of size"
962 );
963 progress.apply_aligned(
964 read_state.vnodes().clone(),
965 item_epoch,
966 Some(end_seq_id),
967 );
968 return Ok(chunk);
969 }
970 LogStoreBufferItem::Flushed {
971 vnode_bitmap,
972 start_seq_id,
973 end_seq_id,
974 chunk_id,
975 } => {
976 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
977 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
978 let future = read_state
979 .read_flushed_chunk(
980 vnode_bitmap,
981 chunk_id,
982 start_seq_id,
983 end_seq_id,
984 item_epoch,
985 read_metrics,
986 )
987 .boxed();
988 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
989 break;
990 }
991 LogStoreBufferItem::Barrier { .. } => {
992 tracing::trace!(item_epoch, "read buffer barrier");
993 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
994 continue;
995 }
996 }
997 },
998 }
999
1000 let (future, end_seq_id) = match self {
1001 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
1002 unreachable!("should be at ReadingFlushedChunk")
1003 }
1004 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
1005 };
1006
1007 let (_, chunk, epoch) = future.await?;
1008 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
1009 tracing::trace!(
1010 end_seq_id,
1011 "read flushed chunk of size: {}",
1012 chunk.cardinality()
1013 );
1014 *self = ReadFuture::Idle;
1015 Ok(chunk)
1016 }
1017}
1018
1019impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
1021 async fn write_barrier<'a>(
1022 actor_id: ActorId,
1023 write_state: &'a mut LogStoreWriteState<S::Local>,
1024 barrier: Barrier,
1025 metrics: &SyncedKvLogStoreMetrics,
1026 progress: LogStoreVnodeProgress,
1027 buffer: &mut SyncedLogStoreBuffer,
1028 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1029 tracing::trace!(%actor_id, ?progress, "applying truncation");
1030 let epoch = barrier.epoch.prev;
1034 let mut writer = write_state.start_writer(false);
1035 writer.write_barrier(epoch, barrier.is_checkpoint())?;
1036
1037 if barrier.is_checkpoint() {
1038 for (epoch, item) in buffer.buffer.iter_mut().rev() {
1039 match item {
1040 LogStoreBufferItem::StreamChunk {
1041 chunk,
1042 start_seq_id,
1043 end_seq_id,
1044 flushed,
1045 ..
1046 } => {
1047 if !*flushed {
1048 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1049 *flushed = true;
1050 } else {
1051 break;
1052 }
1053 }
1054 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1055 }
1056 }
1057 }
1058
1059 let (flush_info, _) = writer.finish().await?;
1061 metrics
1062 .storage_write_count
1063 .inc_by(flush_info.flush_count as _);
1064 metrics
1065 .storage_write_size
1066 .inc_by(flush_info.flush_size as _);
1067 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1068
1069 buffer.buffer.push_back((
1071 epoch,
1072 LogStoreBufferItem::Barrier {
1073 is_checkpoint: barrier.is_checkpoint(),
1074 next_epoch: barrier.epoch.curr,
1075 schema_change: None,
1076 is_stop: false,
1077 },
1078 ));
1079 buffer.next_chunk_id = 0;
1080 buffer.update_buffer_metrics();
1081
1082 Ok(post_seal)
1083 }
1084}
1085
1086struct SyncedLogStoreBuffer {
1087 buffer: VecDeque<(u64, LogStoreBufferItem)>,
1088 current_size: usize,
1089 max_size: usize,
1090 max_chunk_size: usize,
1091 next_chunk_id: ChunkId,
1092 metrics: SyncedKvLogStoreMetrics,
1093 flushed_count: usize,
1094}
1095
1096impl SyncedLogStoreBuffer {
1097 fn is_empty(&self) -> bool {
1098 self.current_size == 0
1099 }
1100
1101 fn add_or_flush_chunk(
1102 &mut self,
1103 start_seq_id: SeqId,
1104 end_seq_id: SeqId,
1105 chunk: StreamChunk,
1106 epoch: u64,
1107 ) -> Option<StreamChunk> {
1108 let current_size = self.current_size;
1109 let chunk_size = chunk.cardinality();
1110
1111 tracing::trace!(
1112 current_size,
1113 chunk_size,
1114 max_size = self.max_size,
1115 "checking chunk size"
1116 );
1117 let should_flush_chunk = current_size + chunk_size > self.max_size;
1118 if should_flush_chunk {
1119 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1120 Some(chunk)
1121 } else {
1122 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1123 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1124 None
1125 }
1126 }
1127
1128 fn add_flushed_item_to_buffer(
1131 &mut self,
1132 start_seq_id: SeqId,
1133 end_seq_id: SeqId,
1134 new_vnode_bitmap: Bitmap,
1135 epoch: u64,
1136 ) {
1137 let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1138
1139 if let Some((
1140 item_epoch,
1141 LogStoreBufferItem::Flushed {
1142 start_seq_id: prev_start_seq_id,
1143 end_seq_id: prev_end_seq_id,
1144 vnode_bitmap,
1145 ..
1146 },
1147 )) = self.buffer.back_mut()
1148 && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1149 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1150 && projected_flushed_chunk_size <= self.max_chunk_size
1151 {
1152 assert!(
1153 *prev_end_seq_id < start_seq_id,
1154 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1155 end_seq_id,
1156 start_seq_id
1157 );
1158 assert_eq!(
1159 epoch, *item_epoch,
1160 "epoch of newly added flushed item must be the same as the last flushed item"
1161 );
1162 *prev_end_seq_id = end_seq_id;
1163 *vnode_bitmap |= new_vnode_bitmap;
1164 } else {
1165 let chunk_id = self.next_chunk_id;
1166 self.next_chunk_id += 1;
1167 self.buffer.push_back((
1168 epoch,
1169 LogStoreBufferItem::Flushed {
1170 start_seq_id,
1171 end_seq_id,
1172 vnode_bitmap: new_vnode_bitmap,
1173 chunk_id,
1174 },
1175 ));
1176 self.flushed_count += 1;
1177 tracing::trace!(
1178 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1179 );
1180 }
1181 self.update_buffer_metrics();
1183 }
1184
1185 fn add_chunk_to_buffer(
1186 &mut self,
1187 chunk: StreamChunk,
1188 start_seq_id: SeqId,
1189 end_seq_id: SeqId,
1190 epoch: u64,
1191 ) {
1192 let chunk_id = self.next_chunk_id;
1193 self.next_chunk_id += 1;
1194 self.current_size += chunk.cardinality();
1195 self.buffer.push_back((
1196 epoch,
1197 LogStoreBufferItem::StreamChunk {
1198 chunk,
1199 start_seq_id,
1200 end_seq_id,
1201 flushed: false,
1202 chunk_id,
1203 },
1204 ));
1205 self.update_buffer_metrics();
1206 }
1207
1208 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1209 let item = self.buffer.pop_front();
1210 match &item {
1211 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1212 self.flushed_count -= 1;
1213 }
1214 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1215 self.current_size -= chunk.cardinality();
1216 }
1217 _ => {}
1218 }
1219 self.update_buffer_metrics();
1220 item
1221 }
1222
1223 fn update_buffer_metrics(&self) {
1224 let mut epoch_count = 0;
1225 let mut row_count = 0;
1226 let mut memory_bytes = 0;
1227 for (_, item) in &self.buffer {
1228 match item {
1229 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1230 row_count += chunk.cardinality();
1231 }
1232 LogStoreBufferItem::Flushed {
1233 start_seq_id,
1234 end_seq_id,
1235 ..
1236 } => {
1237 row_count += (end_seq_id - start_seq_id) as usize;
1238 }
1239 LogStoreBufferItem::Barrier { .. } => {
1240 epoch_count += 1;
1241 }
1242 }
1243 memory_bytes += item.estimated_size();
1244 }
1245 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1246 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1247 self.metrics
1248 .buffer_unconsumed_item_count
1249 .set(self.buffer.len() as _);
1250 self.metrics.buffer_unconsumed_min_epoch.set(
1251 self.buffer
1252 .front()
1253 .map(|(epoch, _)| *epoch)
1254 .unwrap_or_default() as _,
1255 );
1256 self.metrics.buffer_memory_bytes.set(memory_bytes as _);
1257 }
1258}
1259
1260impl<S> Execute for SyncedKvLogStoreExecutor<S>
1261where
1262 S: StateStore,
1263{
1264 fn execute(self: Box<Self>) -> BoxedMessageStream {
1265 self.execute_monitored().boxed()
1266 }
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271 use itertools::Itertools;
1272 use pretty_assertions::assert_eq;
1273 use risingwave_common::catalog::Field;
1274 use risingwave_common::hash::VirtualNode;
1275 use risingwave_common::test_prelude::*;
1276 use risingwave_common::util::epoch::test_epoch;
1277 use risingwave_storage::memory::MemoryStateStore;
1278
1279 use super::*;
1280 use crate::assert_stream_chunk_eq;
1281 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1282 use crate::common::log_store_impl::kv_log_store::test_utils::{
1283 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1284 };
1285 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1286 use crate::executor::test_utils::MockSource;
1287
1288 fn init_logger() {
1289 let _ = tracing_subscriber::fmt()
1290 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1291 .with_ansi(false)
1292 .try_init();
1293 }
1294
1295 #[tokio::test]
1297 async fn test_read_write_buffer() {
1298 init_logger();
1299
1300 let pk_info = &KV_LOG_STORE_V2_INFO;
1301 let column_descs = test_payload_schema(pk_info);
1302 let fields = column_descs
1303 .into_iter()
1304 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1305 .collect_vec();
1306 let schema = Schema { fields };
1307 let stream_key = vec![0];
1308 let (mut tx, source) = MockSource::channel();
1309 let source = source.into_executor(schema.clone(), stream_key.clone());
1310
1311 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1312
1313 let table = gen_test_log_store_table(pk_info);
1314
1315 let log_store_executor = SyncedKvLogStoreExecutor::new(
1316 ActorContext::for_test(123),
1317 table.id,
1318 SyncedKvLogStoreMetrics::for_test(),
1319 LogStoreRowSerde::new(&table, vnodes, pk_info),
1320 MemoryStateStore::new(),
1321 10,
1322 256,
1323 source,
1324 Duration::from_millis(256),
1325 false,
1326 )
1327 .boxed();
1328
1329 tx.push_barrier(test_epoch(1), false);
1331
1332 let chunk_1 = StreamChunk::from_pretty(
1333 " I T
1334 + 5 10
1335 + 6 10
1336 + 8 10
1337 + 9 10
1338 + 10 11",
1339 );
1340
1341 let chunk_2 = StreamChunk::from_pretty(
1342 " I T
1343 - 5 10
1344 - 6 10
1345 - 8 10
1346 U- 9 10
1347 U+ 10 11",
1348 );
1349
1350 tx.push_chunk(chunk_1.clone());
1351 tx.push_chunk(chunk_2.clone());
1352
1353 let mut stream = log_store_executor.execute();
1354
1355 match stream.next().await {
1356 Some(Ok(Message::Barrier(barrier))) => {
1357 assert_eq!(barrier.epoch.curr, test_epoch(1));
1358 }
1359 other => panic!("Expected a barrier message, got {:?}", other),
1360 }
1361
1362 match stream.next().await {
1363 Some(Ok(Message::Chunk(chunk))) => {
1364 assert_stream_chunk_eq!(chunk, chunk_1);
1365 }
1366 other => panic!("Expected a chunk message, got {:?}", other),
1367 }
1368
1369 match stream.next().await {
1370 Some(Ok(Message::Chunk(chunk))) => {
1371 assert_stream_chunk_eq!(chunk, chunk_2);
1372 }
1373 other => panic!("Expected a chunk message, got {:?}", other),
1374 }
1375
1376 tx.push_barrier(test_epoch(2), false);
1377
1378 match stream.next().await {
1379 Some(Ok(Message::Barrier(barrier))) => {
1380 assert_eq!(barrier.epoch.curr, test_epoch(2));
1381 }
1382 other => panic!("Expected a barrier message, got {:?}", other),
1383 }
1384 }
1385
1386 #[tokio::test]
1392 async fn test_barrier_persisted_read() {
1393 init_logger();
1394
1395 let pk_info = &KV_LOG_STORE_V2_INFO;
1396 let column_descs = test_payload_schema(pk_info);
1397 let fields = column_descs
1398 .into_iter()
1399 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1400 .collect_vec();
1401 let schema = Schema { fields };
1402 let stream_key = vec![0];
1403 let (mut tx, source) = MockSource::channel();
1404 let source = source.into_executor(schema.clone(), stream_key.clone());
1405
1406 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1407
1408 let table = gen_test_log_store_table(pk_info);
1409
1410 let log_store_executor = SyncedKvLogStoreExecutor::new(
1411 ActorContext::for_test(123),
1412 table.id,
1413 SyncedKvLogStoreMetrics::for_test(),
1414 LogStoreRowSerde::new(&table, vnodes, pk_info),
1415 MemoryStateStore::new(),
1416 10,
1417 256,
1418 source,
1419 Duration::from_millis(256),
1420 false,
1421 )
1422 .boxed();
1423
1424 tx.push_barrier(test_epoch(1), false);
1426
1427 let chunk_1 = StreamChunk::from_pretty(
1428 " I T
1429 + 5 10
1430 + 6 10
1431 + 8 10
1432 + 9 10
1433 + 10 11",
1434 );
1435
1436 let chunk_2 = StreamChunk::from_pretty(
1437 " I T
1438 - 5 10
1439 - 6 10
1440 - 8 10
1441 U- 10 11
1442 U+ 10 10",
1443 );
1444
1445 tx.push_chunk(chunk_1.clone());
1446 tx.push_chunk(chunk_2.clone());
1447
1448 tx.push_barrier(test_epoch(2), false);
1449
1450 let mut stream = log_store_executor.execute();
1451
1452 match stream.next().await {
1453 Some(Ok(Message::Barrier(barrier))) => {
1454 assert_eq!(barrier.epoch.curr, test_epoch(1));
1455 }
1456 other => panic!("Expected a barrier message, got {:?}", other),
1457 }
1458
1459 match stream.next().await {
1460 Some(Ok(Message::Chunk(chunk))) => {
1461 assert_stream_chunk_eq!(chunk, chunk_1);
1462 }
1463 other => panic!("Expected a chunk message, got {:?}", other),
1464 }
1465
1466 match stream.next().await {
1467 Some(Ok(Message::Chunk(chunk))) => {
1468 assert_stream_chunk_eq!(chunk, chunk_2);
1469 }
1470 other => panic!("Expected a chunk message, got {:?}", other),
1471 }
1472
1473 match stream.next().await {
1474 Some(Ok(Message::Barrier(barrier))) => {
1475 assert_eq!(barrier.epoch.curr, test_epoch(2));
1476 }
1477 other => panic!("Expected a barrier message, got {:?}", other),
1478 }
1479 }
1480
1481 #[tokio::test]
1484 async fn test_max_chunk_persisted_read() {
1485 init_logger();
1486
1487 let pk_info = &KV_LOG_STORE_V2_INFO;
1488 let column_descs = test_payload_schema(pk_info);
1489 let fields = column_descs
1490 .into_iter()
1491 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1492 .collect_vec();
1493 let schema = Schema { fields };
1494 let stream_key = vec![0];
1495 let (mut tx, source) = MockSource::channel();
1496 let source = source.into_executor(schema.clone(), stream_key.clone());
1497
1498 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1499
1500 let table = gen_test_log_store_table(pk_info);
1501
1502 let log_store_executor = SyncedKvLogStoreExecutor::new(
1503 ActorContext::for_test(123),
1504 table.id,
1505 SyncedKvLogStoreMetrics::for_test(),
1506 LogStoreRowSerde::new(&table, vnodes, pk_info),
1507 MemoryStateStore::new(),
1508 0,
1509 256,
1510 source,
1511 Duration::from_millis(256),
1512 false,
1513 )
1514 .boxed();
1515
1516 tx.push_barrier(test_epoch(1), false);
1518
1519 let chunk_1 = StreamChunk::from_pretty(
1520 " I T
1521 + 5 10
1522 + 6 10
1523 + 8 10
1524 + 9 10
1525 + 10 11",
1526 );
1527
1528 let chunk_2 = StreamChunk::from_pretty(
1529 " I T
1530 - 5 10
1531 - 6 10
1532 - 8 10
1533 U- 10 11
1534 U+ 10 10",
1535 );
1536
1537 tx.push_chunk(chunk_1.clone());
1538 tx.push_chunk(chunk_2.clone());
1539
1540 tx.push_barrier(test_epoch(2), false);
1541
1542 let mut stream = log_store_executor.execute();
1543
1544 for i in 1..=2 {
1545 match stream.next().await {
1546 Some(Ok(Message::Barrier(barrier))) => {
1547 assert_eq!(barrier.epoch.curr, test_epoch(i));
1548 }
1549 other => panic!("Expected a barrier message, got {:?}", other),
1550 }
1551 }
1552
1553 match stream.next().await {
1554 Some(Ok(Message::Chunk(actual))) => {
1555 let expected = StreamChunk::from_pretty(
1556 " I T
1557 + 5 10
1558 + 6 10
1559 + 8 10
1560 + 9 10
1561 + 10 11
1562 - 5 10
1563 - 6 10
1564 - 8 10
1565 U- 10 11
1566 U+ 10 10",
1567 );
1568 assert_stream_chunk_eq!(actual, expected);
1569 }
1570 other => panic!("Expected a chunk message, got {:?}", other),
1571 }
1572 }
1573}