1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_common_estimate_size::EstimateSize;
77use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
78use risingwave_storage::StateStore;
79use risingwave_storage::store::timeout_auto_rebuild::TimeoutAutoRebuildIter;
80use risingwave_storage::store::{
81 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
82};
83use rw_futures_util::drop_either_future;
84use tokio::time::{Duration, Instant, Sleep, sleep_until};
85use tokio_stream::adapters::Peekable;
86
87use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
88use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
89use crate::common::log_store_impl::kv_log_store::serde::{
90 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
91};
92use crate::common::log_store_impl::kv_log_store::state::{
93 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
94 LogStoreWriteState, new_log_store_state,
95};
96use crate::common::log_store_impl::kv_log_store::{
97 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
98};
99use crate::executor::prelude::*;
100use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
101use crate::executor::{
102 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
103};
104
105pub mod metrics {
106 use risingwave_common::id::FragmentId;
107 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
108
109 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
110 use crate::executor::monitor::StreamingMetrics;
111 use crate::task::ActorId;
112
113 #[derive(Clone)]
114 pub struct SyncedKvLogStoreMetrics {
115 pub unclean_state: LabelGuardedIntCounter,
117 pub clean_state: LabelGuardedIntCounter,
118 pub wait_next_poll_ns: LabelGuardedIntCounter,
119
120 pub storage_write_count: LabelGuardedIntCounter,
122 pub storage_write_size: LabelGuardedIntCounter,
123 pub pause_duration_ns: LabelGuardedIntCounter,
124
125 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
127 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
128 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
129 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
130 pub buffer_memory_bytes: LabelGuardedIntGauge,
131 pub buffer_read_count: LabelGuardedIntCounter,
132 pub buffer_read_size: LabelGuardedIntCounter,
133
134 pub total_read_count: LabelGuardedIntCounter,
136 pub total_read_size: LabelGuardedIntCounter,
137 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
138 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
139 }
140
141 impl SyncedKvLogStoreMetrics {
142 pub(crate) fn new(
149 metrics: &StreamingMetrics,
150 actor_id: ActorId,
151 fragment_id: FragmentId,
152 name: &str,
153 target: &'static str,
154 ) -> Self {
155 let actor_id_str = actor_id.to_string();
156 let fragment_id_str = fragment_id.to_string();
157 let labels = &[&actor_id_str, target, &fragment_id_str, name];
158
159 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
160 "dirty",
161 &actor_id_str,
162 target,
163 &fragment_id_str,
164 name,
165 ]);
166 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
167 "clean",
168 &actor_id_str,
169 target,
170 &fragment_id_str,
171 name,
172 ]);
173 let wait_next_poll_ns = metrics
174 .sync_kv_log_store_wait_next_poll_ns
175 .with_guarded_label_values(labels);
176
177 let storage_write_size = metrics
178 .sync_kv_log_store_storage_write_size
179 .with_guarded_label_values(labels);
180 let storage_write_count = metrics
181 .sync_kv_log_store_storage_write_count
182 .with_guarded_label_values(labels);
183 let storage_pause_duration_ns = metrics
184 .sync_kv_log_store_write_pause_duration_ns
185 .with_guarded_label_values(labels);
186
187 let buffer_unconsumed_item_count = metrics
188 .sync_kv_log_store_buffer_unconsumed_item_count
189 .with_guarded_label_values(labels);
190 let buffer_unconsumed_row_count = metrics
191 .sync_kv_log_store_buffer_unconsumed_row_count
192 .with_guarded_label_values(labels);
193 let buffer_unconsumed_epoch_count = metrics
194 .sync_kv_log_store_buffer_unconsumed_epoch_count
195 .with_guarded_label_values(labels);
196 let buffer_unconsumed_min_epoch = metrics
197 .sync_kv_log_store_buffer_unconsumed_min_epoch
198 .with_guarded_label_values(labels);
199 let buffer_memory_bytes = metrics
200 .sync_kv_log_store_buffer_memory_bytes
201 .with_guarded_label_values(labels);
202 let buffer_read_count = metrics
203 .sync_kv_log_store_read_count
204 .with_guarded_label_values(&[
205 "buffer",
206 &actor_id_str,
207 target,
208 &fragment_id_str,
209 name,
210 ]);
211
212 let buffer_read_size = metrics
213 .sync_kv_log_store_read_size
214 .with_guarded_label_values(&[
215 "buffer",
216 &actor_id_str,
217 target,
218 &fragment_id_str,
219 name,
220 ]);
221
222 let total_read_count = metrics
223 .sync_kv_log_store_read_count
224 .with_guarded_label_values(&[
225 "total",
226 &actor_id_str,
227 target,
228 &fragment_id_str,
229 name,
230 ]);
231
232 let total_read_size = metrics
233 .sync_kv_log_store_read_size
234 .with_guarded_label_values(&[
235 "total",
236 &actor_id_str,
237 target,
238 &fragment_id_str,
239 name,
240 ]);
241
242 const READ_PERSISTENT_LOG: &str = "persistent_log";
243 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
244
245 let persistent_log_read_size = metrics
246 .sync_kv_log_store_read_size
247 .with_guarded_label_values(&[
248 READ_PERSISTENT_LOG,
249 &actor_id_str,
250 target,
251 &fragment_id_str,
252 name,
253 ]);
254
255 let persistent_log_read_count = metrics
256 .sync_kv_log_store_read_count
257 .with_guarded_label_values(&[
258 READ_PERSISTENT_LOG,
259 &actor_id_str,
260 target,
261 &fragment_id_str,
262 name,
263 ]);
264
265 let flushed_buffer_read_size = metrics
266 .sync_kv_log_store_read_size
267 .with_guarded_label_values(&[
268 READ_FLUSHED_BUFFER,
269 &actor_id_str,
270 target,
271 &fragment_id_str,
272 name,
273 ]);
274
275 let flushed_buffer_read_count = metrics
276 .sync_kv_log_store_read_count
277 .with_guarded_label_values(&[
278 READ_FLUSHED_BUFFER,
279 &actor_id_str,
280 target,
281 &fragment_id_str,
282 name,
283 ]);
284
285 Self {
286 unclean_state,
287 clean_state,
288 wait_next_poll_ns,
289 storage_write_size,
290 storage_write_count,
291 pause_duration_ns: storage_pause_duration_ns,
292 buffer_unconsumed_item_count,
293 buffer_unconsumed_row_count,
294 buffer_unconsumed_epoch_count,
295 buffer_unconsumed_min_epoch,
296 buffer_memory_bytes,
297 buffer_read_count,
298 buffer_read_size,
299 total_read_count,
300 total_read_size,
301 persistent_log_read_metrics: KvLogStoreReadMetrics {
302 storage_read_size: persistent_log_read_size,
303 storage_read_count: persistent_log_read_count,
304 },
305 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
306 storage_read_count: flushed_buffer_read_count,
307 storage_read_size: flushed_buffer_read_size,
308 },
309 }
310 }
311
312 #[cfg(test)]
313 pub(crate) fn for_test() -> Self {
314 SyncedKvLogStoreMetrics {
315 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
316 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
317 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
318 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
319 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
320 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
321 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
322 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
323 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
324 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
325 buffer_memory_bytes: LabelGuardedIntGauge::test_int_gauge::<4>(),
326 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
327 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
328 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
329 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
330 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
331 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
332 }
333 }
334 }
335}
336
337type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
338
339pub struct SyncedKvLogStoreExecutor<S: StateStore> {
340 actor_context: ActorContextRef,
341 table_id: TableId,
342 metrics: SyncedKvLogStoreMetrics,
343 serde: LogStoreRowSerde,
344
345 upstream: Executor,
347
348 state_store: S,
350 max_buffer_size: usize,
351
352 chunk_size: usize,
354
355 pause_duration_ms: Duration,
356
357 aligned: bool,
358}
359impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
361 #[expect(clippy::too_many_arguments)]
362 pub(crate) fn new(
363 actor_context: ActorContextRef,
364 table_id: TableId,
365 metrics: SyncedKvLogStoreMetrics,
366 serde: LogStoreRowSerde,
367 state_store: S,
368 buffer_size: usize,
369 chunk_size: usize,
370 upstream: Executor,
371 pause_duration_ms: Duration,
372 aligned: bool,
373 ) -> Self {
374 Self {
375 actor_context,
376 table_id,
377 metrics,
378 serde,
379 state_store,
380 upstream,
381 max_buffer_size: buffer_size,
382 chunk_size,
383 pause_duration_ms,
384 aligned,
385 }
386 }
387}
388
389struct FlushedChunkInfo {
390 epoch: u64,
391 start_seq_id: SeqId,
392 end_seq_id: SeqId,
393 flush_info: FlushInfo,
394 vnode_bitmap: Bitmap,
395}
396
397enum WriteFuture<S: LocalStateStore> {
398 Paused {
408 start_instant: Instant,
409 sleep_future: Option<Pin<Box<Sleep>>>,
410 barrier: Barrier,
411 stream: BoxedMessageStream,
412 write_state: LogStoreWriteState<S>, },
414 ReceiveFromUpstream {
415 future: StreamFuture<BoxedMessageStream>,
416 write_state: LogStoreWriteState<S>,
417 },
418 FlushingChunk {
419 epoch: u64,
420 start_seq_id: SeqId,
421 end_seq_id: SeqId,
422 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
423 stream: BoxedMessageStream,
424 },
425 Empty,
426}
427
428enum WriteFutureEvent {
429 UpstreamMessageReceived(Message),
430 ChunkFlushed(FlushedChunkInfo),
431}
432
433impl<S: LocalStateStore> WriteFuture<S> {
434 fn flush_chunk(
435 stream: BoxedMessageStream,
436 write_state: LogStoreWriteState<S>,
437 chunk: StreamChunk,
438 epoch: u64,
439 start_seq_id: SeqId,
440 end_seq_id: SeqId,
441 ) -> Self {
442 tracing::trace!(
443 start_seq_id,
444 end_seq_id,
445 epoch,
446 cardinality = chunk.cardinality(),
447 "write_future: flushing chunk"
448 );
449 Self::FlushingChunk {
450 epoch,
451 start_seq_id,
452 end_seq_id,
453 future: Box::pin(write_state.into_write_chunk_future(
454 chunk,
455 epoch,
456 start_seq_id,
457 end_seq_id,
458 )),
459 stream,
460 }
461 }
462
463 fn receive_from_upstream(
464 stream: BoxedMessageStream,
465 write_state: LogStoreWriteState<S>,
466 ) -> Self {
467 Self::ReceiveFromUpstream {
468 future: stream.into_future(),
469 write_state,
470 }
471 }
472
473 fn paused(
474 duration: Duration,
475 barrier: Barrier,
476 stream: BoxedMessageStream,
477 write_state: LogStoreWriteState<S>,
478 ) -> Self {
479 let now = Instant::now();
480 tracing::trace!(?now, ?duration, "write_future_pause");
481 Self::Paused {
482 start_instant: now,
483 sleep_future: Some(Box::pin(sleep_until(now + duration))),
484 barrier,
485 stream,
486 write_state,
487 }
488 }
489
490 async fn next_event(
491 &mut self,
492 metrics: &SyncedKvLogStoreMetrics,
493 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
494 match self {
495 WriteFuture::Paused {
496 start_instant,
497 sleep_future,
498 ..
499 } => {
500 if let Some(sleep_future) = sleep_future {
501 sleep_future.await;
502 metrics
503 .pause_duration_ns
504 .inc_by(start_instant.elapsed().as_nanos() as _);
505 tracing::trace!("resuming write future");
506 }
507 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
508 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
509 })
510 }
511 WriteFuture::ReceiveFromUpstream { future, .. } => {
512 let (opt, stream) = future.await;
513 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
514 opt
515 .ok_or_else(|| anyhow!("end of upstream input").into())
516 .and_then(|result| result.map(|item| {
517 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
518 }))
519 })
520 }
521 WriteFuture::FlushingChunk { future, .. } => {
522 let (write_state, result) = future.await;
523 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
524 result.map(|(flush_info, vnode_bitmap)| {
525 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
526 epoch,
527 start_seq_id,
528 end_seq_id,
529 flush_info,
530 vnode_bitmap,
531 }))
532 })
533 });
534 result.map_err(Into::into)
535 }
536 WriteFuture::Empty => {
537 unreachable!("should not be polled after ready")
538 }
539 }
540 }
541}
542
543impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
545 #[try_stream(ok= Message, error = StreamExecutorError)]
546 pub async fn execute_monitored(self) {
547 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
548 #[for_await]
549 for message in self.execute_inner() {
550 let current_time = Instant::now();
551 yield message?;
552 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
553 }
554 }
555
556 #[try_stream(ok = Message, error = StreamExecutorError)]
557 async fn execute_inner(self) {
558 let mut input = self.upstream.execute();
559
560 let first_barrier = expect_first_barrier(&mut input).await?;
562 let first_write_epoch = first_barrier.epoch;
563 yield Message::Barrier(first_barrier.clone());
564
565 let local_state_store = self
566 .state_store
567 .new_local(NewLocalOptions {
568 table_id: self.table_id,
569 fragment_id: self.actor_context.fragment_id,
570 op_consistency_level: OpConsistencyLevel::Inconsistent,
571 table_option: TableOption {
572 retention_seconds: None,
573 },
574 is_replicated: false,
575 vnodes: self.serde.vnodes().clone(),
576 upload_on_flush: false,
577 })
578 .await;
579
580 let (mut read_state, mut initial_write_state) = new_log_store_state(
581 self.table_id,
582 local_state_store,
583 self.serde,
584 self.chunk_size,
585 );
586 initial_write_state.init(first_write_epoch).await?;
587
588 let mut pause_stream = first_barrier.is_pause_on_startup();
589 let mut initial_write_epoch = first_write_epoch;
590
591 if self.aligned {
592 tracing::info!("aligned mode");
593 let log_store_stream = read_state
598 .read_persisted_log_store(
599 self.metrics.persistent_log_read_metrics.clone(),
600 initial_write_epoch.curr,
601 LogStoreReadStateStreamRangeStart::Unbounded,
602 )
603 .await?;
604
605 #[for_await]
606 for message in log_store_stream {
607 let (_epoch, message) = message?;
608 match message {
609 KvLogStoreItem::Barrier { .. } => {
610 continue;
611 }
612 KvLogStoreItem::StreamChunk { chunk, .. } => {
613 yield Message::Chunk(chunk);
614 }
615 }
616 }
617
618 let mut realigned_logstore = false;
619
620 #[for_await]
621 for message in input {
622 match message? {
623 Message::Barrier(barrier) => {
624 let is_checkpoint = barrier.is_checkpoint();
625 let mut progress = LogStoreVnodeProgress::None;
626 progress.apply_aligned(
627 read_state.vnodes().clone(),
628 barrier.epoch.prev,
629 None,
630 );
631 let post_seal = initial_write_state
633 .seal_current_epoch(barrier.epoch.curr, progress.take());
634 let update_vnode_bitmap =
635 barrier.as_update_vnode_bitmap(self.actor_context.id);
636 yield Message::Barrier(barrier);
637 post_seal.post_yield_barrier(update_vnode_bitmap).await?;
638 if !realigned_logstore && is_checkpoint {
639 realigned_logstore = true;
640 tracing::info!("realigned logstore");
641 }
642 }
643 Message::Chunk(chunk) => {
644 yield Message::Chunk(chunk);
645 }
646 Message::Watermark(watermark) => {
647 yield Message::Watermark(watermark);
648 }
649 }
650 }
651
652 return Ok(());
653 }
654
655 'recreate_consume_stream: loop {
659 let mut seq_id = FIRST_SEQ_ID;
660 let mut buffer = SyncedLogStoreBuffer {
661 buffer: VecDeque::new(),
662 current_size: 0,
663 max_size: self.max_buffer_size,
664 max_chunk_size: self.chunk_size,
665 next_chunk_id: 0,
666 metrics: self.metrics.clone(),
667 flushed_count: 0,
668 };
669
670 let log_store_stream = read_state
671 .read_persisted_log_store(
672 self.metrics.persistent_log_read_metrics.clone(),
673 initial_write_epoch.curr,
674 LogStoreReadStateStreamRangeStart::Unbounded,
675 )
676 .await?;
677
678 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
679 let mut clean_state = log_store_stream.peek().await.is_none();
680 tracing::trace!(?clean_state);
681
682 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
683
684 let mut write_future_state =
685 WriteFuture::receive_from_upstream(input, initial_write_state);
686
687 let mut progress = LogStoreVnodeProgress::None;
688
689 loop {
690 let select_result = {
691 let read_future = async {
692 if pause_stream {
693 pending().await
694 } else {
695 read_future_state
696 .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
697 .await
698 }
699 };
700 pin_mut!(read_future);
701 let write_future = write_future_state.next_event(&self.metrics);
702 pin_mut!(write_future);
703 let output = select(write_future, read_future).await;
704 drop_either_future(output)
705 };
706 match select_result {
707 Either::Left(result) => {
708 drop(write_future_state);
710 let (stream, mut write_state, either) = result?;
711 match either {
712 WriteFutureEvent::UpstreamMessageReceived(msg) => {
713 match msg {
714 Message::Barrier(barrier) => {
715 if clean_state
716 && barrier.kind.is_checkpoint()
717 && !buffer.is_empty()
718 {
719 write_future_state = WriteFuture::paused(
720 self.pause_duration_ms,
721 barrier,
722 stream,
723 write_state,
724 );
725 clean_state = false;
726 self.metrics.unclean_state.inc();
727 } else {
728 if let Some(mutation) = barrier.mutation.as_deref() {
729 match mutation {
730 Mutation::Pause => {
731 pause_stream = true;
732 }
733 Mutation::Resume => {
734 pause_stream = false;
735 }
736 _ => {}
737 }
738 }
739 let write_state_post_write_barrier =
740 Self::write_barrier(
741 self.actor_context.id,
742 &mut write_state,
743 barrier.clone(),
744 &self.metrics,
745 progress.take(),
746 &mut buffer,
747 )
748 .await?;
749 seq_id = FIRST_SEQ_ID;
750 let update_vnode_bitmap = barrier
751 .as_update_vnode_bitmap(self.actor_context.id);
752 let barrier_epoch = barrier.epoch;
753 tracing::trace!(
754 ?update_vnode_bitmap,
755 actor_id = %self.actor_context.id,
756 "update vnode bitmap"
757 );
758
759 yield Message::Barrier(barrier);
760
761 write_state_post_write_barrier
762 .post_yield_barrier(update_vnode_bitmap.clone())
763 .await?;
764 if let Some(vnode_bitmap) = update_vnode_bitmap {
765 read_state.update_vnode_bitmap(vnode_bitmap);
767 initial_write_epoch = barrier_epoch;
768 input = stream;
769 initial_write_state = write_state;
770 continue 'recreate_consume_stream;
771 } else {
772 write_future_state =
773 WriteFuture::receive_from_upstream(
774 stream,
775 write_state,
776 );
777 }
778 }
779 }
780 Message::Chunk(chunk) => {
781 let start_seq_id = seq_id;
782 let new_seq_id = seq_id + chunk.cardinality() as SeqId;
783 let end_seq_id = new_seq_id - 1;
784 let epoch = write_state.epoch().curr;
785 tracing::trace!(
786 start_seq_id,
787 end_seq_id,
788 new_seq_id,
789 epoch,
790 cardinality = chunk.cardinality(),
791 "received chunk"
792 );
793 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
794 start_seq_id,
795 end_seq_id,
796 chunk,
797 epoch,
798 ) {
799 seq_id = new_seq_id;
800 write_future_state = WriteFuture::flush_chunk(
801 stream,
802 write_state,
803 chunk_to_flush,
804 epoch,
805 start_seq_id,
806 end_seq_id,
807 );
808 } else {
809 seq_id = new_seq_id;
810 write_future_state = WriteFuture::receive_from_upstream(
811 stream,
812 write_state,
813 );
814 }
815 }
816 Message::Watermark(_watermark) => {
819 write_future_state =
820 WriteFuture::receive_from_upstream(stream, write_state);
821 }
822 }
823 }
824 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
825 start_seq_id,
826 end_seq_id,
827 epoch,
828 flush_info,
829 vnode_bitmap,
830 }) => {
831 buffer.add_flushed_item_to_buffer(
832 start_seq_id,
833 end_seq_id,
834 vnode_bitmap,
835 epoch,
836 );
837 self.metrics
838 .storage_write_count
839 .inc_by(flush_info.flush_count as _);
840 self.metrics
841 .storage_write_size
842 .inc_by(flush_info.flush_size as _);
843 write_future_state =
844 WriteFuture::receive_from_upstream(stream, write_state);
845 }
846 }
847 }
848 Either::Right(result) => {
849 if !clean_state
850 && matches!(read_future_state, ReadFuture::Idle)
851 && buffer.is_empty()
852 {
853 clean_state = true;
854 self.metrics.clean_state.inc();
855
856 if let WriteFuture::Paused { sleep_future, .. } =
858 &mut write_future_state
859 {
860 tracing::trace!("resuming paused future");
861 assert!(buffer.current_size < self.max_buffer_size);
862 *sleep_future = None;
863 }
864 }
865 let chunk = result?;
866 self.metrics
867 .total_read_count
868 .inc_by(chunk.cardinality() as _);
869
870 yield Message::Chunk(chunk);
871 }
872 }
873 }
874 }
875 }
876}
877
878type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
879
880enum ReadFuture<S: StateStoreRead> {
881 ReadingPersistedStream(PersistedStream<S>),
882 ReadingFlushedChunk {
883 future: ReadFlushedChunkFuture,
884 end_seq_id: SeqId,
885 },
886 Idle,
887}
888
889impl<S: StateStoreRead> ReadFuture<S> {
891 async fn next_chunk(
892 &mut self,
893 progress: &mut LogStoreVnodeProgress,
894 read_state: &LogStoreReadState<S>,
895 buffer: &mut SyncedLogStoreBuffer,
896 metrics: &SyncedKvLogStoreMetrics,
897 ) -> StreamExecutorResult<StreamChunk> {
898 match self {
899 ReadFuture::ReadingPersistedStream(stream) => {
900 while let Some((epoch, item)) = stream.try_next().await? {
901 match item {
902 KvLogStoreItem::Barrier { vnodes, .. } => {
903 tracing::trace!(epoch, "read logstore barrier");
904 progress.apply_aligned(vnodes, epoch, None);
906 continue;
907 }
908 KvLogStoreItem::StreamChunk {
909 chunk,
910 progress: chunk_progress,
911 } => {
912 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
913 progress.apply_per_vnode(epoch, chunk_progress);
914 return Ok(chunk);
915 }
916 }
917 }
918 *self = ReadFuture::Idle;
919 }
920 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
921 }
922 match self {
923 ReadFuture::ReadingPersistedStream(_) => {
924 unreachable!("must have finished read persisted stream when reaching here")
925 }
926 ReadFuture::ReadingFlushedChunk { .. } => {}
927 ReadFuture::Idle => loop {
928 let Some((item_epoch, item)) = buffer.pop_front() else {
929 return pending().await;
930 };
931 match item {
932 LogStoreBufferItem::StreamChunk {
933 chunk,
934 start_seq_id,
935 end_seq_id,
936 flushed,
937 ..
938 } => {
939 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
940 tracing::trace!(
941 start_seq_id,
942 end_seq_id,
943 flushed,
944 cardinality = chunk.cardinality(),
945 "read buffered chunk of size"
946 );
947 progress.apply_aligned(
948 read_state.vnodes().clone(),
949 item_epoch,
950 Some(end_seq_id),
951 );
952 return Ok(chunk);
953 }
954 LogStoreBufferItem::Flushed {
955 vnode_bitmap,
956 start_seq_id,
957 end_seq_id,
958 chunk_id,
959 } => {
960 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
961 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
962 let future = read_state
963 .read_flushed_chunk(
964 vnode_bitmap,
965 chunk_id,
966 start_seq_id,
967 end_seq_id,
968 item_epoch,
969 read_metrics,
970 )
971 .boxed();
972 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
973 break;
974 }
975 LogStoreBufferItem::Barrier { .. } => {
976 tracing::trace!(item_epoch, "read buffer barrier");
977 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
978 continue;
979 }
980 }
981 },
982 }
983
984 let (future, end_seq_id) = match self {
985 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
986 unreachable!("should be at ReadingFlushedChunk")
987 }
988 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
989 };
990
991 let (_, chunk, epoch) = future.await?;
992 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
993 tracing::trace!(
994 end_seq_id,
995 "read flushed chunk of size: {}",
996 chunk.cardinality()
997 );
998 *self = ReadFuture::Idle;
999 Ok(chunk)
1000 }
1001}
1002
1003impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
1005 async fn write_barrier<'a>(
1006 actor_id: ActorId,
1007 write_state: &'a mut LogStoreWriteState<S::Local>,
1008 barrier: Barrier,
1009 metrics: &SyncedKvLogStoreMetrics,
1010 progress: LogStoreVnodeProgress,
1011 buffer: &mut SyncedLogStoreBuffer,
1012 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
1013 tracing::trace!(%actor_id, ?progress, "applying truncation");
1014 let epoch = barrier.epoch.prev;
1018 let mut writer = write_state.start_writer(false);
1019 writer.write_barrier(epoch, barrier.is_checkpoint())?;
1020
1021 if barrier.is_checkpoint() {
1022 for (epoch, item) in buffer.buffer.iter_mut().rev() {
1023 match item {
1024 LogStoreBufferItem::StreamChunk {
1025 chunk,
1026 start_seq_id,
1027 end_seq_id,
1028 flushed,
1029 ..
1030 } => {
1031 if !*flushed {
1032 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
1033 *flushed = true;
1034 } else {
1035 break;
1036 }
1037 }
1038 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
1039 }
1040 }
1041 }
1042
1043 let (flush_info, _) = writer.finish().await?;
1045 metrics
1046 .storage_write_count
1047 .inc_by(flush_info.flush_count as _);
1048 metrics
1049 .storage_write_size
1050 .inc_by(flush_info.flush_size as _);
1051 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
1052
1053 buffer.buffer.push_back((
1055 epoch,
1056 LogStoreBufferItem::Barrier {
1057 is_checkpoint: barrier.is_checkpoint(),
1058 next_epoch: barrier.epoch.curr,
1059 schema_change: None,
1060 is_stop: false,
1061 },
1062 ));
1063 buffer.next_chunk_id = 0;
1064 buffer.update_buffer_metrics();
1065
1066 Ok(post_seal)
1067 }
1068}
1069
1070struct SyncedLogStoreBuffer {
1071 buffer: VecDeque<(u64, LogStoreBufferItem)>,
1072 current_size: usize,
1073 max_size: usize,
1074 max_chunk_size: usize,
1075 next_chunk_id: ChunkId,
1076 metrics: SyncedKvLogStoreMetrics,
1077 flushed_count: usize,
1078}
1079
1080impl SyncedLogStoreBuffer {
1081 fn is_empty(&self) -> bool {
1082 self.current_size == 0
1083 }
1084
1085 fn add_or_flush_chunk(
1086 &mut self,
1087 start_seq_id: SeqId,
1088 end_seq_id: SeqId,
1089 chunk: StreamChunk,
1090 epoch: u64,
1091 ) -> Option<StreamChunk> {
1092 let current_size = self.current_size;
1093 let chunk_size = chunk.cardinality();
1094
1095 tracing::trace!(
1096 current_size,
1097 chunk_size,
1098 max_size = self.max_size,
1099 "checking chunk size"
1100 );
1101 let should_flush_chunk = current_size + chunk_size > self.max_size;
1102 if should_flush_chunk {
1103 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
1104 Some(chunk)
1105 } else {
1106 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
1107 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
1108 None
1109 }
1110 }
1111
1112 fn add_flushed_item_to_buffer(
1115 &mut self,
1116 start_seq_id: SeqId,
1117 end_seq_id: SeqId,
1118 new_vnode_bitmap: Bitmap,
1119 epoch: u64,
1120 ) {
1121 let new_chunk_size = (end_seq_id - start_seq_id + 1) as usize;
1122
1123 if let Some((
1124 item_epoch,
1125 LogStoreBufferItem::Flushed {
1126 start_seq_id: prev_start_seq_id,
1127 end_seq_id: prev_end_seq_id,
1128 vnode_bitmap,
1129 ..
1130 },
1131 )) = self.buffer.back_mut()
1132 && let flushed_chunk_size = (*prev_end_seq_id - *prev_start_seq_id + 1) as usize
1133 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1134 && projected_flushed_chunk_size <= self.max_chunk_size
1135 {
1136 assert!(
1137 *prev_end_seq_id < start_seq_id,
1138 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1139 end_seq_id,
1140 start_seq_id
1141 );
1142 assert_eq!(
1143 epoch, *item_epoch,
1144 "epoch of newly added flushed item must be the same as the last flushed item"
1145 );
1146 *prev_end_seq_id = end_seq_id;
1147 *vnode_bitmap |= new_vnode_bitmap;
1148 } else {
1149 let chunk_id = self.next_chunk_id;
1150 self.next_chunk_id += 1;
1151 self.buffer.push_back((
1152 epoch,
1153 LogStoreBufferItem::Flushed {
1154 start_seq_id,
1155 end_seq_id,
1156 vnode_bitmap: new_vnode_bitmap,
1157 chunk_id,
1158 },
1159 ));
1160 self.flushed_count += 1;
1161 tracing::trace!(
1162 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1163 );
1164 }
1165 self.update_buffer_metrics();
1167 }
1168
1169 fn add_chunk_to_buffer(
1170 &mut self,
1171 chunk: StreamChunk,
1172 start_seq_id: SeqId,
1173 end_seq_id: SeqId,
1174 epoch: u64,
1175 ) {
1176 let chunk_id = self.next_chunk_id;
1177 self.next_chunk_id += 1;
1178 self.current_size += chunk.cardinality();
1179 self.buffer.push_back((
1180 epoch,
1181 LogStoreBufferItem::StreamChunk {
1182 chunk,
1183 start_seq_id,
1184 end_seq_id,
1185 flushed: false,
1186 chunk_id,
1187 },
1188 ));
1189 self.update_buffer_metrics();
1190 }
1191
1192 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1193 let item = self.buffer.pop_front();
1194 match &item {
1195 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1196 self.flushed_count -= 1;
1197 }
1198 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1199 self.current_size -= chunk.cardinality();
1200 }
1201 _ => {}
1202 }
1203 self.update_buffer_metrics();
1204 item
1205 }
1206
1207 fn update_buffer_metrics(&self) {
1208 let mut epoch_count = 0;
1209 let mut row_count = 0;
1210 let mut memory_bytes = 0;
1211 for (_, item) in &self.buffer {
1212 match item {
1213 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1214 row_count += chunk.cardinality();
1215 }
1216 LogStoreBufferItem::Flushed {
1217 start_seq_id,
1218 end_seq_id,
1219 ..
1220 } => {
1221 row_count += (end_seq_id - start_seq_id) as usize;
1222 }
1223 LogStoreBufferItem::Barrier { .. } => {
1224 epoch_count += 1;
1225 }
1226 }
1227 memory_bytes += item.estimated_size();
1228 }
1229 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1230 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1231 self.metrics
1232 .buffer_unconsumed_item_count
1233 .set(self.buffer.len() as _);
1234 self.metrics.buffer_unconsumed_min_epoch.set(
1235 self.buffer
1236 .front()
1237 .map(|(epoch, _)| *epoch)
1238 .unwrap_or_default() as _,
1239 );
1240 self.metrics.buffer_memory_bytes.set(memory_bytes as _);
1241 }
1242}
1243
1244impl<S> Execute for SyncedKvLogStoreExecutor<S>
1245where
1246 S: StateStore,
1247{
1248 fn execute(self: Box<Self>) -> BoxedMessageStream {
1249 self.execute_monitored().boxed()
1250 }
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use itertools::Itertools;
1256 use pretty_assertions::assert_eq;
1257 use risingwave_common::catalog::Field;
1258 use risingwave_common::hash::VirtualNode;
1259 use risingwave_common::test_prelude::*;
1260 use risingwave_common::util::epoch::test_epoch;
1261 use risingwave_storage::memory::MemoryStateStore;
1262
1263 use super::*;
1264 use crate::assert_stream_chunk_eq;
1265 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1266 use crate::common::log_store_impl::kv_log_store::test_utils::{
1267 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1268 };
1269 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1270 use crate::executor::test_utils::MockSource;
1271
1272 fn init_logger() {
1273 let _ = tracing_subscriber::fmt()
1274 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1275 .with_ansi(false)
1276 .try_init();
1277 }
1278
1279 #[tokio::test]
1281 async fn test_read_write_buffer() {
1282 init_logger();
1283
1284 let pk_info = &KV_LOG_STORE_V2_INFO;
1285 let column_descs = test_payload_schema(pk_info);
1286 let fields = column_descs
1287 .into_iter()
1288 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1289 .collect_vec();
1290 let schema = Schema { fields };
1291 let stream_key = vec![0];
1292 let (mut tx, source) = MockSource::channel();
1293 let source = source.into_executor(schema.clone(), stream_key.clone());
1294
1295 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1296
1297 let table = gen_test_log_store_table(pk_info);
1298
1299 let log_store_executor = SyncedKvLogStoreExecutor::new(
1300 ActorContext::for_test(123),
1301 table.id,
1302 SyncedKvLogStoreMetrics::for_test(),
1303 LogStoreRowSerde::new(&table, vnodes, pk_info),
1304 MemoryStateStore::new(),
1305 10,
1306 256,
1307 source,
1308 Duration::from_millis(256),
1309 false,
1310 )
1311 .boxed();
1312
1313 tx.push_barrier(test_epoch(1), false);
1315
1316 let chunk_1 = StreamChunk::from_pretty(
1317 " I T
1318 + 5 10
1319 + 6 10
1320 + 8 10
1321 + 9 10
1322 + 10 11",
1323 );
1324
1325 let chunk_2 = StreamChunk::from_pretty(
1326 " I T
1327 - 5 10
1328 - 6 10
1329 - 8 10
1330 U- 9 10
1331 U+ 10 11",
1332 );
1333
1334 tx.push_chunk(chunk_1.clone());
1335 tx.push_chunk(chunk_2.clone());
1336
1337 let mut stream = log_store_executor.execute();
1338
1339 match stream.next().await {
1340 Some(Ok(Message::Barrier(barrier))) => {
1341 assert_eq!(barrier.epoch.curr, test_epoch(1));
1342 }
1343 other => panic!("Expected a barrier message, got {:?}", other),
1344 }
1345
1346 match stream.next().await {
1347 Some(Ok(Message::Chunk(chunk))) => {
1348 assert_stream_chunk_eq!(chunk, chunk_1);
1349 }
1350 other => panic!("Expected a chunk message, got {:?}", other),
1351 }
1352
1353 match stream.next().await {
1354 Some(Ok(Message::Chunk(chunk))) => {
1355 assert_stream_chunk_eq!(chunk, chunk_2);
1356 }
1357 other => panic!("Expected a chunk message, got {:?}", other),
1358 }
1359
1360 tx.push_barrier(test_epoch(2), false);
1361
1362 match stream.next().await {
1363 Some(Ok(Message::Barrier(barrier))) => {
1364 assert_eq!(barrier.epoch.curr, test_epoch(2));
1365 }
1366 other => panic!("Expected a barrier message, got {:?}", other),
1367 }
1368 }
1369
1370 #[tokio::test]
1376 async fn test_barrier_persisted_read() {
1377 init_logger();
1378
1379 let pk_info = &KV_LOG_STORE_V2_INFO;
1380 let column_descs = test_payload_schema(pk_info);
1381 let fields = column_descs
1382 .into_iter()
1383 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1384 .collect_vec();
1385 let schema = Schema { fields };
1386 let stream_key = vec![0];
1387 let (mut tx, source) = MockSource::channel();
1388 let source = source.into_executor(schema.clone(), stream_key.clone());
1389
1390 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1391
1392 let table = gen_test_log_store_table(pk_info);
1393
1394 let log_store_executor = SyncedKvLogStoreExecutor::new(
1395 ActorContext::for_test(123),
1396 table.id,
1397 SyncedKvLogStoreMetrics::for_test(),
1398 LogStoreRowSerde::new(&table, vnodes, pk_info),
1399 MemoryStateStore::new(),
1400 10,
1401 256,
1402 source,
1403 Duration::from_millis(256),
1404 false,
1405 )
1406 .boxed();
1407
1408 tx.push_barrier(test_epoch(1), false);
1410
1411 let chunk_1 = StreamChunk::from_pretty(
1412 " I T
1413 + 5 10
1414 + 6 10
1415 + 8 10
1416 + 9 10
1417 + 10 11",
1418 );
1419
1420 let chunk_2 = StreamChunk::from_pretty(
1421 " I T
1422 - 5 10
1423 - 6 10
1424 - 8 10
1425 U- 10 11
1426 U+ 10 10",
1427 );
1428
1429 tx.push_chunk(chunk_1.clone());
1430 tx.push_chunk(chunk_2.clone());
1431
1432 tx.push_barrier(test_epoch(2), false);
1433
1434 let mut stream = log_store_executor.execute();
1435
1436 match stream.next().await {
1437 Some(Ok(Message::Barrier(barrier))) => {
1438 assert_eq!(barrier.epoch.curr, test_epoch(1));
1439 }
1440 other => panic!("Expected a barrier message, got {:?}", other),
1441 }
1442
1443 match stream.next().await {
1444 Some(Ok(Message::Chunk(chunk))) => {
1445 assert_stream_chunk_eq!(chunk, chunk_1);
1446 }
1447 other => panic!("Expected a chunk message, got {:?}", other),
1448 }
1449
1450 match stream.next().await {
1451 Some(Ok(Message::Chunk(chunk))) => {
1452 assert_stream_chunk_eq!(chunk, chunk_2);
1453 }
1454 other => panic!("Expected a chunk message, got {:?}", other),
1455 }
1456
1457 match stream.next().await {
1458 Some(Ok(Message::Barrier(barrier))) => {
1459 assert_eq!(barrier.epoch.curr, test_epoch(2));
1460 }
1461 other => panic!("Expected a barrier message, got {:?}", other),
1462 }
1463 }
1464
1465 #[tokio::test]
1468 async fn test_max_chunk_persisted_read() {
1469 init_logger();
1470
1471 let pk_info = &KV_LOG_STORE_V2_INFO;
1472 let column_descs = test_payload_schema(pk_info);
1473 let fields = column_descs
1474 .into_iter()
1475 .map(|desc| Field::new(desc.name.clone(), desc.data_type))
1476 .collect_vec();
1477 let schema = Schema { fields };
1478 let stream_key = vec![0];
1479 let (mut tx, source) = MockSource::channel();
1480 let source = source.into_executor(schema.clone(), stream_key.clone());
1481
1482 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1483
1484 let table = gen_test_log_store_table(pk_info);
1485
1486 let log_store_executor = SyncedKvLogStoreExecutor::new(
1487 ActorContext::for_test(123),
1488 table.id,
1489 SyncedKvLogStoreMetrics::for_test(),
1490 LogStoreRowSerde::new(&table, vnodes, pk_info),
1491 MemoryStateStore::new(),
1492 0,
1493 256,
1494 source,
1495 Duration::from_millis(256),
1496 false,
1497 )
1498 .boxed();
1499
1500 tx.push_barrier(test_epoch(1), false);
1502
1503 let chunk_1 = StreamChunk::from_pretty(
1504 " I T
1505 + 5 10
1506 + 6 10
1507 + 8 10
1508 + 9 10
1509 + 10 11",
1510 );
1511
1512 let chunk_2 = StreamChunk::from_pretty(
1513 " I T
1514 - 5 10
1515 - 6 10
1516 - 8 10
1517 U- 10 11
1518 U+ 10 10",
1519 );
1520
1521 tx.push_chunk(chunk_1.clone());
1522 tx.push_chunk(chunk_2.clone());
1523
1524 tx.push_barrier(test_epoch(2), false);
1525
1526 let mut stream = log_store_executor.execute();
1527
1528 for i in 1..=2 {
1529 match stream.next().await {
1530 Some(Ok(Message::Barrier(barrier))) => {
1531 assert_eq!(barrier.epoch.curr, test_epoch(i));
1532 }
1533 other => panic!("Expected a barrier message, got {:?}", other),
1534 }
1535 }
1536
1537 match stream.next().await {
1538 Some(Ok(Message::Chunk(actual))) => {
1539 let expected = StreamChunk::from_pretty(
1540 " I T
1541 + 5 10
1542 + 6 10
1543 + 8 10
1544 + 9 10
1545 + 10 11
1546 - 5 10
1547 - 6 10
1548 - 8 10
1549 U- 10 11
1550 U+ 10 10",
1551 );
1552 assert_stream_chunk_eq!(actual, expected);
1553 }
1554 other => panic!("Expected a chunk message, got {:?}", other),
1555 }
1556 }
1557}