1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93 LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
106
107 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
108 use crate::executor::monitor::StreamingMetrics;
109 use crate::task::ActorId;
110
111 #[derive(Clone)]
112 pub struct SyncedKvLogStoreMetrics {
113 pub unclean_state: LabelGuardedIntCounter,
115 pub clean_state: LabelGuardedIntCounter,
116 pub wait_next_poll_ns: LabelGuardedIntCounter,
117
118 pub storage_write_count: LabelGuardedIntCounter,
120 pub storage_write_size: LabelGuardedIntCounter,
121 pub pause_duration_ns: LabelGuardedIntCounter,
122
123 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
125 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
126 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
127 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
128 pub buffer_read_count: LabelGuardedIntCounter,
129 pub buffer_read_size: LabelGuardedIntCounter,
130
131 pub total_read_count: LabelGuardedIntCounter,
133 pub total_read_size: LabelGuardedIntCounter,
134 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
135 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
136 }
137
138 impl SyncedKvLogStoreMetrics {
139 pub(crate) fn new(
146 metrics: &StreamingMetrics,
147 actor_id: ActorId,
148 id: u32,
149 name: &str,
150 target: &'static str,
151 ) -> Self {
152 let actor_id_str = actor_id.to_string();
153 let id_str = id.to_string();
154 let labels = &[&actor_id_str, target, &id_str, name];
155
156 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
157 "dirty",
158 &actor_id_str,
159 target,
160 &id_str,
161 name,
162 ]);
163 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
164 "clean",
165 &actor_id_str,
166 target,
167 &id_str,
168 name,
169 ]);
170 let wait_next_poll_ns = metrics
171 .sync_kv_log_store_wait_next_poll_ns
172 .with_guarded_label_values(labels);
173
174 let storage_write_size = metrics
175 .sync_kv_log_store_storage_write_size
176 .with_guarded_label_values(labels);
177 let storage_write_count = metrics
178 .sync_kv_log_store_storage_write_count
179 .with_guarded_label_values(labels);
180 let storage_pause_duration_ns = metrics
181 .sync_kv_log_store_write_pause_duration_ns
182 .with_guarded_label_values(labels);
183
184 let buffer_unconsumed_item_count = metrics
185 .sync_kv_log_store_buffer_unconsumed_item_count
186 .with_guarded_label_values(labels);
187 let buffer_unconsumed_row_count = metrics
188 .sync_kv_log_store_buffer_unconsumed_row_count
189 .with_guarded_label_values(labels);
190 let buffer_unconsumed_epoch_count = metrics
191 .sync_kv_log_store_buffer_unconsumed_epoch_count
192 .with_guarded_label_values(labels);
193 let buffer_unconsumed_min_epoch = metrics
194 .sync_kv_log_store_buffer_unconsumed_min_epoch
195 .with_guarded_label_values(labels);
196 let buffer_read_count = metrics
197 .sync_kv_log_store_read_count
198 .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
199
200 let buffer_read_size = metrics
201 .sync_kv_log_store_read_size
202 .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
203
204 let total_read_count = metrics
205 .sync_kv_log_store_read_count
206 .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
207
208 let total_read_size = metrics
209 .sync_kv_log_store_read_size
210 .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
211
212 const READ_PERSISTENT_LOG: &str = "persistent_log";
213 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
214
215 let persistent_log_read_size = metrics
216 .sync_kv_log_store_read_size
217 .with_guarded_label_values(&[
218 READ_PERSISTENT_LOG,
219 &actor_id_str,
220 target,
221 &id_str,
222 name,
223 ]);
224
225 let persistent_log_read_count = metrics
226 .sync_kv_log_store_read_count
227 .with_guarded_label_values(&[
228 READ_PERSISTENT_LOG,
229 &actor_id_str,
230 target,
231 &id_str,
232 name,
233 ]);
234
235 let flushed_buffer_read_size = metrics
236 .sync_kv_log_store_read_size
237 .with_guarded_label_values(&[
238 READ_FLUSHED_BUFFER,
239 &actor_id_str,
240 target,
241 &id_str,
242 name,
243 ]);
244
245 let flushed_buffer_read_count = metrics
246 .sync_kv_log_store_read_count
247 .with_guarded_label_values(&[
248 READ_FLUSHED_BUFFER,
249 &actor_id_str,
250 target,
251 &id_str,
252 name,
253 ]);
254
255 Self {
256 unclean_state,
257 clean_state,
258 wait_next_poll_ns,
259 storage_write_size,
260 storage_write_count,
261 pause_duration_ns: storage_pause_duration_ns,
262 buffer_unconsumed_item_count,
263 buffer_unconsumed_row_count,
264 buffer_unconsumed_epoch_count,
265 buffer_unconsumed_min_epoch,
266 buffer_read_count,
267 buffer_read_size,
268 total_read_count,
269 total_read_size,
270 persistent_log_read_metrics: KvLogStoreReadMetrics {
271 storage_read_size: persistent_log_read_size,
272 storage_read_count: persistent_log_read_count,
273 },
274 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
275 storage_read_count: flushed_buffer_read_count,
276 storage_read_size: flushed_buffer_read_size,
277 },
278 }
279 }
280
281 #[cfg(test)]
282 pub(crate) fn for_test() -> Self {
283 SyncedKvLogStoreMetrics {
284 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
285 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
286 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
287 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
288 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
289 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
290 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
291 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
292 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
293 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
294 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
295 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
296 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
297 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
298 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
299 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
300 }
301 }
302 }
303}
304
305type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
306
307pub struct SyncedKvLogStoreExecutor<S: StateStore> {
308 actor_context: ActorContextRef,
309 table_id: TableId,
310 metrics: SyncedKvLogStoreMetrics,
311 serde: LogStoreRowSerde,
312
313 upstream: Executor,
315
316 state_store: S,
318 max_buffer_size: usize,
319
320 chunk_size: u32,
322
323 pause_duration_ms: Duration,
324}
325impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
327 #[expect(clippy::too_many_arguments)]
328 pub(crate) fn new(
329 actor_context: ActorContextRef,
330 table_id: u32,
331 metrics: SyncedKvLogStoreMetrics,
332 serde: LogStoreRowSerde,
333 state_store: S,
334 buffer_size: usize,
335 chunk_size: u32,
336 upstream: Executor,
337 pause_duration_ms: Duration,
338 ) -> Self {
339 Self {
340 actor_context,
341 table_id: TableId::new(table_id),
342 metrics,
343 serde,
344 state_store,
345 upstream,
346 max_buffer_size: buffer_size,
347 chunk_size,
348 pause_duration_ms,
349 }
350 }
351}
352
353struct FlushedChunkInfo {
354 epoch: u64,
355 start_seq_id: SeqId,
356 end_seq_id: SeqId,
357 flush_info: FlushInfo,
358 vnode_bitmap: Bitmap,
359}
360
361enum WriteFuture<S: LocalStateStore> {
362 Paused {
372 start_instant: Instant,
373 sleep_future: Option<Pin<Box<Sleep>>>,
374 barrier: Barrier,
375 stream: BoxedMessageStream,
376 write_state: LogStoreWriteState<S>, },
378 ReceiveFromUpstream {
379 future: StreamFuture<BoxedMessageStream>,
380 write_state: LogStoreWriteState<S>,
381 },
382 FlushingChunk {
383 epoch: u64,
384 start_seq_id: SeqId,
385 end_seq_id: SeqId,
386 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
387 stream: BoxedMessageStream,
388 },
389 Empty,
390}
391
392enum WriteFutureEvent {
393 UpstreamMessageReceived(Message),
394 ChunkFlushed(FlushedChunkInfo),
395}
396
397impl<S: LocalStateStore> WriteFuture<S> {
398 fn flush_chunk(
399 stream: BoxedMessageStream,
400 write_state: LogStoreWriteState<S>,
401 chunk: StreamChunk,
402 epoch: u64,
403 start_seq_id: SeqId,
404 end_seq_id: SeqId,
405 ) -> Self {
406 tracing::trace!(
407 start_seq_id,
408 end_seq_id,
409 epoch,
410 cardinality = chunk.cardinality(),
411 "write_future: flushing chunk"
412 );
413 Self::FlushingChunk {
414 epoch,
415 start_seq_id,
416 end_seq_id,
417 future: Box::pin(write_state.into_write_chunk_future(
418 chunk,
419 epoch,
420 start_seq_id,
421 end_seq_id,
422 )),
423 stream,
424 }
425 }
426
427 fn receive_from_upstream(
428 stream: BoxedMessageStream,
429 write_state: LogStoreWriteState<S>,
430 ) -> Self {
431 Self::ReceiveFromUpstream {
432 future: stream.into_future(),
433 write_state,
434 }
435 }
436
437 fn paused(
438 duration: Duration,
439 barrier: Barrier,
440 stream: BoxedMessageStream,
441 write_state: LogStoreWriteState<S>,
442 ) -> Self {
443 let now = Instant::now();
444 tracing::trace!(?now, ?duration, "write_future_pause");
445 Self::Paused {
446 start_instant: now,
447 sleep_future: Some(Box::pin(sleep_until(now + duration))),
448 barrier,
449 stream,
450 write_state,
451 }
452 }
453
454 async fn next_event(
455 &mut self,
456 metrics: &SyncedKvLogStoreMetrics,
457 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
458 match self {
459 WriteFuture::Paused {
460 start_instant,
461 sleep_future,
462 ..
463 } => {
464 if let Some(sleep_future) = sleep_future {
465 sleep_future.await;
466 metrics
467 .pause_duration_ns
468 .inc_by(start_instant.elapsed().as_nanos() as _);
469 tracing::trace!("resuming write future");
470 }
471 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
472 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
473 })
474 }
475 WriteFuture::ReceiveFromUpstream { future, .. } => {
476 let (opt, stream) = future.await;
477 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
478 opt
479 .ok_or_else(|| anyhow!("end of upstream input").into())
480 .and_then(|result| result.map(|item| {
481 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
482 }))
483 })
484 }
485 WriteFuture::FlushingChunk { future, .. } => {
486 let (write_state, result) = future.await;
487 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
488 result.map(|(flush_info, vnode_bitmap)| {
489 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
490 epoch,
491 start_seq_id,
492 end_seq_id,
493 flush_info,
494 vnode_bitmap,
495 }))
496 })
497 });
498 result.map_err(Into::into)
499 }
500 WriteFuture::Empty => {
501 unreachable!("should not be polled after ready")
502 }
503 }
504 }
505}
506
507impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
509 #[try_stream(ok= Message, error = StreamExecutorError)]
510 pub async fn execute_monitored(self) {
511 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
512 #[for_await]
513 for message in self.execute_inner() {
514 let current_time = Instant::now();
515 yield message?;
516 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
517 }
518 }
519
520 #[try_stream(ok = Message, error = StreamExecutorError)]
521 async fn execute_inner(self) {
522 let mut input = self.upstream.execute();
523
524 let first_barrier = expect_first_barrier(&mut input).await?;
526 let first_write_epoch = first_barrier.epoch;
527 yield Message::Barrier(first_barrier.clone());
528
529 let local_state_store = self
530 .state_store
531 .new_local(NewLocalOptions {
532 table_id: self.table_id,
533 op_consistency_level: OpConsistencyLevel::Inconsistent,
534 table_option: TableOption {
535 retention_seconds: None,
536 },
537 is_replicated: false,
538 vnodes: self.serde.vnodes().clone(),
539 })
540 .await;
541
542 let (mut read_state, mut initial_write_state) =
543 new_log_store_state(self.table_id, local_state_store, self.serde);
544 initial_write_state.init(first_write_epoch).await?;
545
546 let mut pause_stream = first_barrier.is_pause_on_startup();
547 let mut initial_write_epoch = first_write_epoch;
548
549 'recreate_consume_stream: loop {
553 let mut seq_id = FIRST_SEQ_ID;
554 let mut buffer = SyncedLogStoreBuffer {
555 buffer: VecDeque::new(),
556 current_size: 0,
557 max_size: self.max_buffer_size,
558 max_chunk_size: self.chunk_size,
559 next_chunk_id: 0,
560 metrics: self.metrics.clone(),
561 flushed_count: 0,
562 };
563
564 let log_store_stream = read_state
565 .read_persisted_log_store(
566 self.metrics.persistent_log_read_metrics.clone(),
567 initial_write_epoch.curr,
568 LogStoreReadStateStreamRangeStart::Unbounded,
569 )
570 .await?;
571
572 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
573 let mut clean_state = log_store_stream.peek().await.is_none();
574 tracing::trace!(?clean_state);
575
576 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
577
578 let mut write_future_state =
579 WriteFuture::receive_from_upstream(input, initial_write_state);
580
581 let mut progress = LogStoreVnodeProgress::None;
582
583 loop {
584 let select_result = {
585 let read_future = async {
586 if pause_stream {
587 pending().await
588 } else {
589 read_future_state
590 .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
591 .await
592 }
593 };
594 pin_mut!(read_future);
595 let write_future = write_future_state.next_event(&self.metrics);
596 pin_mut!(write_future);
597 let output = select(write_future, read_future).await;
598 drop_either_future(output)
599 };
600 match select_result {
601 Either::Left(result) => {
602 drop(write_future_state);
604 let (stream, mut write_state, either) = result?;
605 match either {
606 WriteFutureEvent::UpstreamMessageReceived(msg) => {
607 match msg {
608 Message::Barrier(barrier) => {
609 if clean_state
610 && barrier.kind.is_checkpoint()
611 && !buffer.is_empty()
612 {
613 write_future_state = WriteFuture::paused(
614 self.pause_duration_ms,
615 barrier,
616 stream,
617 write_state,
618 );
619 clean_state = false;
620 self.metrics.unclean_state.inc();
621 } else {
622 if let Some(mutation) = barrier.mutation.as_deref() {
623 match mutation {
624 Mutation::Pause => {
625 pause_stream = true;
626 }
627 Mutation::Resume => {
628 pause_stream = false;
629 }
630 _ => {}
631 }
632 }
633 let write_state_post_write_barrier =
634 Self::write_barrier(
635 self.actor_context.id,
636 &mut write_state,
637 barrier.clone(),
638 &self.metrics,
639 progress.take(),
640 &mut buffer,
641 )
642 .await?;
643 seq_id = FIRST_SEQ_ID;
644 let update_vnode_bitmap = barrier
645 .as_update_vnode_bitmap(self.actor_context.id);
646 let barrier_epoch = barrier.epoch;
647 tracing::trace!(
648 ?update_vnode_bitmap,
649 actor_id = self.actor_context.id,
650 "update vnode bitmap"
651 );
652
653 yield Message::Barrier(barrier);
654
655 write_state_post_write_barrier
656 .post_yield_barrier(update_vnode_bitmap.clone())
657 .await?;
658 if let Some(vnode_bitmap) = update_vnode_bitmap {
659 read_state.update_vnode_bitmap(vnode_bitmap);
661 initial_write_epoch = barrier_epoch;
662 input = stream;
663 initial_write_state = write_state;
664 continue 'recreate_consume_stream;
665 } else {
666 write_future_state =
667 WriteFuture::receive_from_upstream(
668 stream,
669 write_state,
670 );
671 }
672 }
673 }
674 Message::Chunk(chunk) => {
675 let start_seq_id = seq_id;
676 let new_seq_id = seq_id + chunk.cardinality() as SeqId;
677 let end_seq_id = new_seq_id - 1;
678 let epoch = write_state.epoch().curr;
679 tracing::trace!(
680 start_seq_id,
681 end_seq_id,
682 new_seq_id,
683 epoch,
684 cardinality = chunk.cardinality(),
685 "received chunk"
686 );
687 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
688 start_seq_id,
689 end_seq_id,
690 chunk,
691 epoch,
692 ) {
693 seq_id = new_seq_id;
694 write_future_state = WriteFuture::flush_chunk(
695 stream,
696 write_state,
697 chunk_to_flush,
698 epoch,
699 start_seq_id,
700 end_seq_id,
701 );
702 } else {
703 seq_id = new_seq_id;
704 write_future_state = WriteFuture::receive_from_upstream(
705 stream,
706 write_state,
707 );
708 }
709 }
710 Message::Watermark(_watermark) => {
713 write_future_state =
714 WriteFuture::receive_from_upstream(stream, write_state);
715 }
716 }
717 }
718 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
719 start_seq_id,
720 end_seq_id,
721 epoch,
722 flush_info,
723 vnode_bitmap,
724 }) => {
725 buffer.add_flushed_item_to_buffer(
726 start_seq_id,
727 end_seq_id,
728 vnode_bitmap,
729 epoch,
730 );
731 self.metrics
732 .storage_write_count
733 .inc_by(flush_info.flush_count as _);
734 self.metrics
735 .storage_write_size
736 .inc_by(flush_info.flush_size as _);
737 write_future_state =
738 WriteFuture::receive_from_upstream(stream, write_state);
739 }
740 }
741 }
742 Either::Right(result) => {
743 if !clean_state
744 && matches!(read_future_state, ReadFuture::Idle)
745 && buffer.is_empty()
746 {
747 clean_state = true;
748 self.metrics.clean_state.inc();
749
750 if let WriteFuture::Paused { sleep_future, .. } =
752 &mut write_future_state
753 {
754 tracing::trace!("resuming paused future");
755 assert!(buffer.current_size < self.max_buffer_size);
756 *sleep_future = None;
757 }
758 }
759 let chunk = result?;
760 self.metrics
761 .total_read_count
762 .inc_by(chunk.cardinality() as _);
763
764 yield Message::Chunk(chunk);
765 }
766 }
767 }
768 }
769 }
770}
771
772type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
773
774enum ReadFuture<S: StateStoreRead> {
775 ReadingPersistedStream(PersistedStream<S>),
776 ReadingFlushedChunk {
777 future: ReadFlushedChunkFuture,
778 end_seq_id: SeqId,
779 },
780 Idle,
781}
782
783impl<S: StateStoreRead> ReadFuture<S> {
785 async fn next_chunk(
786 &mut self,
787 progress: &mut LogStoreVnodeProgress,
788 read_state: &LogStoreReadState<S>,
789 buffer: &mut SyncedLogStoreBuffer,
790 metrics: &SyncedKvLogStoreMetrics,
791 ) -> StreamExecutorResult<StreamChunk> {
792 match self {
793 ReadFuture::ReadingPersistedStream(stream) => {
794 while let Some((epoch, item)) = stream.try_next().await? {
795 match item {
796 KvLogStoreItem::Barrier { vnodes, .. } => {
797 tracing::trace!(epoch, "read logstore barrier");
798 progress.apply_aligned(vnodes, epoch, None);
800 continue;
801 }
802 KvLogStoreItem::StreamChunk {
803 chunk,
804 progress: chunk_progress,
805 } => {
806 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
807 progress.apply_per_vnode(epoch, chunk_progress);
808 return Ok(chunk);
809 }
810 }
811 }
812 *self = ReadFuture::Idle;
813 }
814 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
815 }
816 match self {
817 ReadFuture::ReadingPersistedStream(_) => {
818 unreachable!("must have finished read persisted stream when reaching here")
819 }
820 ReadFuture::ReadingFlushedChunk { .. } => {}
821 ReadFuture::Idle => loop {
822 let Some((item_epoch, item)) = buffer.pop_front() else {
823 return pending().await;
824 };
825 match item {
826 LogStoreBufferItem::StreamChunk {
827 chunk,
828 start_seq_id,
829 end_seq_id,
830 flushed,
831 ..
832 } => {
833 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
834 tracing::trace!(
835 start_seq_id,
836 end_seq_id,
837 flushed,
838 cardinality = chunk.cardinality(),
839 "read buffered chunk of size"
840 );
841 return Ok(chunk);
842 }
843 LogStoreBufferItem::Flushed {
844 vnode_bitmap,
845 start_seq_id,
846 end_seq_id,
847 chunk_id,
848 } => {
849 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
850 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
851 let future = read_state
852 .read_flushed_chunk(
853 vnode_bitmap,
854 chunk_id,
855 start_seq_id,
856 end_seq_id,
857 item_epoch,
858 read_metrics,
859 )
860 .boxed();
861 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
862 break;
863 }
864 LogStoreBufferItem::Barrier { .. } => {
865 tracing::trace!(item_epoch, "read buffer barrier");
866 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
867 continue;
868 }
869 }
870 },
871 }
872
873 let (future, end_seq_id) = match self {
874 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
875 unreachable!("should be at ReadingFlushedChunk")
876 }
877 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
878 };
879
880 let (_, chunk, epoch) = future.await?;
881 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
882 tracing::trace!(
883 end_seq_id,
884 "read flushed chunk of size: {}",
885 chunk.cardinality()
886 );
887 *self = ReadFuture::Idle;
888 Ok(chunk)
889 }
890}
891
892impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
894 async fn write_barrier<'a>(
895 actor_id: u32,
896 write_state: &'a mut LogStoreWriteState<S::Local>,
897 barrier: Barrier,
898 metrics: &SyncedKvLogStoreMetrics,
899 progress: LogStoreVnodeProgress,
900 buffer: &mut SyncedLogStoreBuffer,
901 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
902 tracing::trace!(actor_id, ?progress, "applying truncation");
903 let epoch = barrier.epoch.prev;
907 let mut writer = write_state.start_writer(false);
908 writer.write_barrier(epoch, barrier.is_checkpoint())?;
909
910 if barrier.is_checkpoint() {
911 for (epoch, item) in buffer.buffer.iter_mut().rev() {
912 match item {
913 LogStoreBufferItem::StreamChunk {
914 chunk,
915 start_seq_id,
916 end_seq_id,
917 flushed,
918 ..
919 } => {
920 if !*flushed {
921 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
922 *flushed = true;
923 } else {
924 break;
925 }
926 }
927 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
928 }
929 }
930 }
931
932 let (flush_info, _) = writer.finish().await?;
934 metrics
935 .storage_write_count
936 .inc_by(flush_info.flush_count as _);
937 metrics
938 .storage_write_size
939 .inc_by(flush_info.flush_size as _);
940 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
941
942 buffer.buffer.push_back((
944 epoch,
945 LogStoreBufferItem::Barrier {
946 is_checkpoint: barrier.is_checkpoint(),
947 next_epoch: barrier.epoch.curr,
948 },
949 ));
950 buffer.next_chunk_id = 0;
951 buffer.update_unconsumed_buffer_metrics();
952
953 Ok(post_seal)
954 }
955}
956
957struct SyncedLogStoreBuffer {
958 buffer: VecDeque<(u64, LogStoreBufferItem)>,
959 current_size: usize,
960 max_size: usize,
961 max_chunk_size: u32,
962 next_chunk_id: ChunkId,
963 metrics: SyncedKvLogStoreMetrics,
964 flushed_count: usize,
965}
966
967impl SyncedLogStoreBuffer {
968 fn is_empty(&self) -> bool {
969 self.current_size == 0
970 }
971
972 fn add_or_flush_chunk(
973 &mut self,
974 start_seq_id: SeqId,
975 end_seq_id: SeqId,
976 chunk: StreamChunk,
977 epoch: u64,
978 ) -> Option<StreamChunk> {
979 let current_size = self.current_size;
980 let chunk_size = chunk.cardinality();
981
982 tracing::trace!(
983 current_size,
984 chunk_size,
985 max_size = self.max_size,
986 "checking chunk size"
987 );
988 let should_flush_chunk = current_size + chunk_size > self.max_size;
989 if should_flush_chunk {
990 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
991 Some(chunk)
992 } else {
993 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
994 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
995 None
996 }
997 }
998
999 fn add_flushed_item_to_buffer(
1002 &mut self,
1003 start_seq_id: SeqId,
1004 end_seq_id: SeqId,
1005 new_vnode_bitmap: Bitmap,
1006 epoch: u64,
1007 ) {
1008 let new_chunk_size = end_seq_id - start_seq_id + 1;
1009
1010 if let Some((
1011 item_epoch,
1012 LogStoreBufferItem::Flushed {
1013 start_seq_id: prev_start_seq_id,
1014 end_seq_id: prev_end_seq_id,
1015 vnode_bitmap,
1016 ..
1017 },
1018 )) = self.buffer.back_mut()
1019 && let flushed_chunk_size = *prev_end_seq_id - *prev_start_seq_id + 1
1020 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1021 && projected_flushed_chunk_size as u32 <= self.max_chunk_size
1022 {
1023 assert!(
1024 *prev_end_seq_id < start_seq_id,
1025 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1026 end_seq_id,
1027 start_seq_id
1028 );
1029 assert_eq!(
1030 epoch, *item_epoch,
1031 "epoch of newly added flushed item must be the same as the last flushed item"
1032 );
1033 *prev_end_seq_id = end_seq_id;
1034 *vnode_bitmap |= new_vnode_bitmap;
1035 } else {
1036 let chunk_id = self.next_chunk_id;
1037 self.next_chunk_id += 1;
1038 self.buffer.push_back((
1039 epoch,
1040 LogStoreBufferItem::Flushed {
1041 start_seq_id,
1042 end_seq_id,
1043 vnode_bitmap: new_vnode_bitmap,
1044 chunk_id,
1045 },
1046 ));
1047 self.flushed_count += 1;
1048 tracing::trace!(
1049 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1050 );
1051 }
1052 self.update_unconsumed_buffer_metrics();
1054 }
1055
1056 fn add_chunk_to_buffer(
1057 &mut self,
1058 chunk: StreamChunk,
1059 start_seq_id: SeqId,
1060 end_seq_id: SeqId,
1061 epoch: u64,
1062 ) {
1063 let chunk_id = self.next_chunk_id;
1064 self.next_chunk_id += 1;
1065 self.current_size += chunk.cardinality();
1066 self.buffer.push_back((
1067 epoch,
1068 LogStoreBufferItem::StreamChunk {
1069 chunk,
1070 start_seq_id,
1071 end_seq_id,
1072 flushed: false,
1073 chunk_id,
1074 },
1075 ));
1076 self.update_unconsumed_buffer_metrics();
1077 }
1078
1079 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1080 let item = self.buffer.pop_front();
1081 match &item {
1082 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1083 self.flushed_count -= 1;
1084 }
1085 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1086 self.current_size -= chunk.cardinality();
1087 }
1088 _ => {}
1089 }
1090 self.update_unconsumed_buffer_metrics();
1091 item
1092 }
1093
1094 fn update_unconsumed_buffer_metrics(&self) {
1095 let mut epoch_count = 0;
1096 let mut row_count = 0;
1097 for (_, item) in &self.buffer {
1098 match item {
1099 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1100 row_count += chunk.cardinality();
1101 }
1102 LogStoreBufferItem::Flushed {
1103 start_seq_id,
1104 end_seq_id,
1105 ..
1106 } => {
1107 row_count += (end_seq_id - start_seq_id) as usize;
1108 }
1109 LogStoreBufferItem::Barrier { .. } => {
1110 epoch_count += 1;
1111 }
1112 }
1113 }
1114 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1115 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1116 self.metrics
1117 .buffer_unconsumed_item_count
1118 .set(self.buffer.len() as _);
1119 self.metrics.buffer_unconsumed_min_epoch.set(
1120 self.buffer
1121 .front()
1122 .map(|(epoch, _)| *epoch)
1123 .unwrap_or_default() as _,
1124 );
1125 }
1126}
1127
1128impl<S> Execute for SyncedKvLogStoreExecutor<S>
1129where
1130 S: StateStore,
1131{
1132 fn execute(self: Box<Self>) -> BoxedMessageStream {
1133 self.execute_monitored().boxed()
1134 }
1135}
1136
1137#[cfg(test)]
1138mod tests {
1139 use itertools::Itertools;
1140 use pretty_assertions::assert_eq;
1141 use risingwave_common::catalog::Field;
1142 use risingwave_common::hash::VirtualNode;
1143 use risingwave_common::test_prelude::*;
1144 use risingwave_common::util::epoch::test_epoch;
1145 use risingwave_storage::memory::MemoryStateStore;
1146
1147 use super::*;
1148 use crate::assert_stream_chunk_eq;
1149 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1150 use crate::common::log_store_impl::kv_log_store::test_utils::{
1151 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1152 };
1153 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1154 use crate::executor::test_utils::MockSource;
1155
1156 fn init_logger() {
1157 let _ = tracing_subscriber::fmt()
1158 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1159 .with_ansi(false)
1160 .try_init();
1161 }
1162
1163 #[tokio::test]
1165 async fn test_read_write_buffer() {
1166 init_logger();
1167
1168 let pk_info = &KV_LOG_STORE_V2_INFO;
1169 let column_descs = test_payload_schema(pk_info);
1170 let fields = column_descs
1171 .into_iter()
1172 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1173 .collect_vec();
1174 let schema = Schema { fields };
1175 let pk_indices = vec![0];
1176 let (mut tx, source) = MockSource::channel();
1177 let source = source.into_executor(schema.clone(), pk_indices.clone());
1178
1179 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1180
1181 let table = gen_test_log_store_table(pk_info);
1182
1183 let log_store_executor = SyncedKvLogStoreExecutor::new(
1184 ActorContext::for_test(123),
1185 table.id,
1186 SyncedKvLogStoreMetrics::for_test(),
1187 LogStoreRowSerde::new(&table, vnodes, pk_info),
1188 MemoryStateStore::new(),
1189 10,
1190 256,
1191 source,
1192 Duration::from_millis(256),
1193 )
1194 .boxed();
1195
1196 tx.push_barrier(test_epoch(1), false);
1198
1199 let chunk_1 = StreamChunk::from_pretty(
1200 " I T
1201 + 5 10
1202 + 6 10
1203 + 8 10
1204 + 9 10
1205 + 10 11",
1206 );
1207
1208 let chunk_2 = StreamChunk::from_pretty(
1209 " I T
1210 - 5 10
1211 - 6 10
1212 - 8 10
1213 U- 9 10
1214 U+ 10 11",
1215 );
1216
1217 tx.push_chunk(chunk_1.clone());
1218 tx.push_chunk(chunk_2.clone());
1219
1220 let mut stream = log_store_executor.execute();
1221
1222 match stream.next().await {
1223 Some(Ok(Message::Barrier(barrier))) => {
1224 assert_eq!(barrier.epoch.curr, test_epoch(1));
1225 }
1226 other => panic!("Expected a barrier message, got {:?}", other),
1227 }
1228
1229 match stream.next().await {
1230 Some(Ok(Message::Chunk(chunk))) => {
1231 assert_stream_chunk_eq!(chunk, chunk_1);
1232 }
1233 other => panic!("Expected a chunk message, got {:?}", other),
1234 }
1235
1236 match stream.next().await {
1237 Some(Ok(Message::Chunk(chunk))) => {
1238 assert_stream_chunk_eq!(chunk, chunk_2);
1239 }
1240 other => panic!("Expected a chunk message, got {:?}", other),
1241 }
1242
1243 tx.push_barrier(test_epoch(2), false);
1244
1245 match stream.next().await {
1246 Some(Ok(Message::Barrier(barrier))) => {
1247 assert_eq!(barrier.epoch.curr, test_epoch(2));
1248 }
1249 other => panic!("Expected a barrier message, got {:?}", other),
1250 }
1251 }
1252
1253 #[tokio::test]
1259 async fn test_barrier_persisted_read() {
1260 init_logger();
1261
1262 let pk_info = &KV_LOG_STORE_V2_INFO;
1263 let column_descs = test_payload_schema(pk_info);
1264 let fields = column_descs
1265 .into_iter()
1266 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1267 .collect_vec();
1268 let schema = Schema { fields };
1269 let pk_indices = vec![0];
1270 let (mut tx, source) = MockSource::channel();
1271 let source = source.into_executor(schema.clone(), pk_indices.clone());
1272
1273 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1274
1275 let table = gen_test_log_store_table(pk_info);
1276
1277 let log_store_executor = SyncedKvLogStoreExecutor::new(
1278 ActorContext::for_test(123),
1279 table.id,
1280 SyncedKvLogStoreMetrics::for_test(),
1281 LogStoreRowSerde::new(&table, vnodes, pk_info),
1282 MemoryStateStore::new(),
1283 10,
1284 256,
1285 source,
1286 Duration::from_millis(256),
1287 )
1288 .boxed();
1289
1290 tx.push_barrier(test_epoch(1), false);
1292
1293 let chunk_1 = StreamChunk::from_pretty(
1294 " I T
1295 + 5 10
1296 + 6 10
1297 + 8 10
1298 + 9 10
1299 + 10 11",
1300 );
1301
1302 let chunk_2 = StreamChunk::from_pretty(
1303 " I T
1304 - 5 10
1305 - 6 10
1306 - 8 10
1307 U- 10 11
1308 U+ 10 10",
1309 );
1310
1311 tx.push_chunk(chunk_1.clone());
1312 tx.push_chunk(chunk_2.clone());
1313
1314 tx.push_barrier(test_epoch(2), false);
1315
1316 let mut stream = log_store_executor.execute();
1317
1318 match stream.next().await {
1319 Some(Ok(Message::Barrier(barrier))) => {
1320 assert_eq!(barrier.epoch.curr, test_epoch(1));
1321 }
1322 other => panic!("Expected a barrier message, got {:?}", other),
1323 }
1324
1325 match stream.next().await {
1326 Some(Ok(Message::Chunk(chunk))) => {
1327 assert_stream_chunk_eq!(chunk, chunk_1);
1328 }
1329 other => panic!("Expected a chunk message, got {:?}", other),
1330 }
1331
1332 match stream.next().await {
1333 Some(Ok(Message::Chunk(chunk))) => {
1334 assert_stream_chunk_eq!(chunk, chunk_2);
1335 }
1336 other => panic!("Expected a chunk message, got {:?}", other),
1337 }
1338
1339 match stream.next().await {
1340 Some(Ok(Message::Barrier(barrier))) => {
1341 assert_eq!(barrier.epoch.curr, test_epoch(2));
1342 }
1343 other => panic!("Expected a barrier message, got {:?}", other),
1344 }
1345 }
1346
1347 #[tokio::test]
1350 async fn test_max_chunk_persisted_read() {
1351 init_logger();
1352
1353 let pk_info = &KV_LOG_STORE_V2_INFO;
1354 let column_descs = test_payload_schema(pk_info);
1355 let fields = column_descs
1356 .into_iter()
1357 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1358 .collect_vec();
1359 let schema = Schema { fields };
1360 let pk_indices = vec![0];
1361 let (mut tx, source) = MockSource::channel();
1362 let source = source.into_executor(schema.clone(), pk_indices.clone());
1363
1364 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1365
1366 let table = gen_test_log_store_table(pk_info);
1367
1368 let log_store_executor = SyncedKvLogStoreExecutor::new(
1369 ActorContext::for_test(123),
1370 table.id,
1371 SyncedKvLogStoreMetrics::for_test(),
1372 LogStoreRowSerde::new(&table, vnodes, pk_info),
1373 MemoryStateStore::new(),
1374 0,
1375 256,
1376 source,
1377 Duration::from_millis(256),
1378 )
1379 .boxed();
1380
1381 tx.push_barrier(test_epoch(1), false);
1383
1384 let chunk_1 = StreamChunk::from_pretty(
1385 " I T
1386 + 5 10
1387 + 6 10
1388 + 8 10
1389 + 9 10
1390 + 10 11",
1391 );
1392
1393 let chunk_2 = StreamChunk::from_pretty(
1394 " I T
1395 - 5 10
1396 - 6 10
1397 - 8 10
1398 U- 10 11
1399 U+ 10 10",
1400 );
1401
1402 tx.push_chunk(chunk_1.clone());
1403 tx.push_chunk(chunk_2.clone());
1404
1405 tx.push_barrier(test_epoch(2), false);
1406
1407 let mut stream = log_store_executor.execute();
1408
1409 for i in 1..=2 {
1410 match stream.next().await {
1411 Some(Ok(Message::Barrier(barrier))) => {
1412 assert_eq!(barrier.epoch.curr, test_epoch(i));
1413 }
1414 other => panic!("Expected a barrier message, got {:?}", other),
1415 }
1416 }
1417
1418 match stream.next().await {
1419 Some(Ok(Message::Chunk(actual))) => {
1420 let expected = StreamChunk::from_pretty(
1421 " I T
1422 + 5 10
1423 + 6 10
1424 + 8 10
1425 + 9 10
1426 + 10 11
1427 - 5 10
1428 - 6 10
1429 - 8 10
1430 U- 10 11
1431 U+ 10 10",
1432 );
1433 assert_stream_chunk_eq!(actual, expected);
1434 }
1435 other => panic!("Expected a chunk message, got {:?}", other),
1436 }
1437 }
1438}