1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93 LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96 Epoch, FIRST_SEQ_ID, FlushInfo, LogStoreVnodeProgress, SeqId,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
106
107 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
108 use crate::executor::monitor::StreamingMetrics;
109 use crate::task::ActorId;
110
111 #[derive(Clone)]
112 pub struct SyncedKvLogStoreMetrics {
113 pub unclean_state: LabelGuardedIntCounter,
115 pub clean_state: LabelGuardedIntCounter,
116 pub wait_next_poll_ns: LabelGuardedIntCounter,
117
118 pub storage_write_count: LabelGuardedIntCounter,
120 pub storage_write_size: LabelGuardedIntCounter,
121 pub pause_duration_ns: LabelGuardedIntCounter,
122
123 pub buffer_unconsumed_item_count: LabelGuardedIntGauge,
125 pub buffer_unconsumed_row_count: LabelGuardedIntGauge,
126 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge,
127 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge,
128 pub buffer_read_count: LabelGuardedIntCounter,
129 pub buffer_read_size: LabelGuardedIntCounter,
130
131 pub total_read_count: LabelGuardedIntCounter,
133 pub total_read_size: LabelGuardedIntCounter,
134 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
135 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
136 }
137
138 impl SyncedKvLogStoreMetrics {
139 pub(crate) fn new(
146 metrics: &StreamingMetrics,
147 actor_id: ActorId,
148 id: u32,
149 name: &str,
150 target: &'static str,
151 ) -> Self {
152 let actor_id_str = actor_id.to_string();
153 let id_str = id.to_string();
154 let labels = &[&actor_id_str, target, &id_str, name];
155
156 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
157 "dirty",
158 &actor_id_str,
159 target,
160 &id_str,
161 name,
162 ]);
163 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
164 "clean",
165 &actor_id_str,
166 target,
167 &id_str,
168 name,
169 ]);
170 let wait_next_poll_ns = metrics
171 .sync_kv_log_store_wait_next_poll_ns
172 .with_guarded_label_values(labels);
173
174 let storage_write_size = metrics
175 .sync_kv_log_store_storage_write_size
176 .with_guarded_label_values(labels);
177 let storage_write_count = metrics
178 .sync_kv_log_store_storage_write_count
179 .with_guarded_label_values(labels);
180 let storage_pause_duration_ns = metrics
181 .sync_kv_log_store_write_pause_duration_ns
182 .with_guarded_label_values(labels);
183
184 let buffer_unconsumed_item_count = metrics
185 .sync_kv_log_store_buffer_unconsumed_item_count
186 .with_guarded_label_values(labels);
187 let buffer_unconsumed_row_count = metrics
188 .sync_kv_log_store_buffer_unconsumed_row_count
189 .with_guarded_label_values(labels);
190 let buffer_unconsumed_epoch_count = metrics
191 .sync_kv_log_store_buffer_unconsumed_epoch_count
192 .with_guarded_label_values(labels);
193 let buffer_unconsumed_min_epoch = metrics
194 .sync_kv_log_store_buffer_unconsumed_min_epoch
195 .with_guarded_label_values(labels);
196 let buffer_read_count = metrics
197 .sync_kv_log_store_read_count
198 .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
199
200 let buffer_read_size = metrics
201 .sync_kv_log_store_read_size
202 .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
203
204 let total_read_count = metrics
205 .sync_kv_log_store_read_count
206 .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
207
208 let total_read_size = metrics
209 .sync_kv_log_store_read_size
210 .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
211
212 const READ_PERSISTENT_LOG: &str = "persistent_log";
213 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
214
215 let persistent_log_read_size = metrics
216 .sync_kv_log_store_read_size
217 .with_guarded_label_values(&[
218 READ_PERSISTENT_LOG,
219 &actor_id_str,
220 target,
221 &id_str,
222 name,
223 ]);
224
225 let persistent_log_read_count = metrics
226 .sync_kv_log_store_read_count
227 .with_guarded_label_values(&[
228 READ_PERSISTENT_LOG,
229 &actor_id_str,
230 target,
231 &id_str,
232 name,
233 ]);
234
235 let flushed_buffer_read_size = metrics
236 .sync_kv_log_store_read_size
237 .with_guarded_label_values(&[
238 READ_FLUSHED_BUFFER,
239 &actor_id_str,
240 target,
241 &id_str,
242 name,
243 ]);
244
245 let flushed_buffer_read_count = metrics
246 .sync_kv_log_store_read_count
247 .with_guarded_label_values(&[
248 READ_FLUSHED_BUFFER,
249 &actor_id_str,
250 target,
251 &id_str,
252 name,
253 ]);
254
255 Self {
256 unclean_state,
257 clean_state,
258 wait_next_poll_ns,
259 storage_write_size,
260 storage_write_count,
261 pause_duration_ns: storage_pause_duration_ns,
262 buffer_unconsumed_item_count,
263 buffer_unconsumed_row_count,
264 buffer_unconsumed_epoch_count,
265 buffer_unconsumed_min_epoch,
266 buffer_read_count,
267 buffer_read_size,
268 total_read_count,
269 total_read_size,
270 persistent_log_read_metrics: KvLogStoreReadMetrics {
271 storage_read_size: persistent_log_read_size,
272 storage_read_count: persistent_log_read_count,
273 },
274 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
275 storage_read_count: flushed_buffer_read_count,
276 storage_read_size: flushed_buffer_read_size,
277 },
278 }
279 }
280
281 #[cfg(test)]
282 pub(crate) fn for_test() -> Self {
283 SyncedKvLogStoreMetrics {
284 unclean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
285 clean_state: LabelGuardedIntCounter::test_int_counter::<5>(),
286 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
287 storage_write_count: LabelGuardedIntCounter::test_int_counter::<4>(),
288 storage_write_size: LabelGuardedIntCounter::test_int_counter::<4>(),
289 pause_duration_ns: LabelGuardedIntCounter::test_int_counter::<4>(),
290 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
291 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
292 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge::<4>(),
293 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge::<4>(),
294 buffer_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
295 buffer_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
296 total_read_count: LabelGuardedIntCounter::test_int_counter::<5>(),
297 total_read_size: LabelGuardedIntCounter::test_int_counter::<5>(),
298 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
299 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
300 }
301 }
302 }
303}
304
305type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, Epoch)>>;
306
307pub struct SyncedKvLogStoreExecutor<S: StateStore> {
308 actor_context: ActorContextRef,
309 table_id: TableId,
310 metrics: SyncedKvLogStoreMetrics,
311 serde: LogStoreRowSerde,
312
313 upstream: Executor,
315
316 state_store: S,
318 max_buffer_size: usize,
319
320 chunk_size: u32,
322
323 pause_duration_ms: Duration,
324}
325impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
327 #[expect(clippy::too_many_arguments)]
328 pub(crate) fn new(
329 actor_context: ActorContextRef,
330 table_id: u32,
331 metrics: SyncedKvLogStoreMetrics,
332 serde: LogStoreRowSerde,
333 state_store: S,
334 buffer_size: usize,
335 chunk_size: u32,
336 upstream: Executor,
337 pause_duration_ms: Duration,
338 ) -> Self {
339 Self {
340 actor_context,
341 table_id: TableId::new(table_id),
342 metrics,
343 serde,
344 state_store,
345 upstream,
346 max_buffer_size: buffer_size,
347 chunk_size,
348 pause_duration_ms,
349 }
350 }
351}
352
353struct FlushedChunkInfo {
354 epoch: u64,
355 start_seq_id: SeqId,
356 end_seq_id: SeqId,
357 flush_info: FlushInfo,
358 vnode_bitmap: Bitmap,
359}
360
361enum WriteFuture<S: LocalStateStore> {
362 Paused {
372 start_instant: Instant,
373 sleep_future: Option<Pin<Box<Sleep>>>,
374 barrier: Barrier,
375 stream: BoxedMessageStream,
376 write_state: LogStoreWriteState<S>, },
378 ReceiveFromUpstream {
379 future: StreamFuture<BoxedMessageStream>,
380 write_state: LogStoreWriteState<S>,
381 },
382 FlushingChunk {
383 epoch: u64,
384 start_seq_id: SeqId,
385 end_seq_id: SeqId,
386 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
387 stream: BoxedMessageStream,
388 },
389 Empty,
390}
391
392enum WriteFutureEvent {
393 UpstreamMessageReceived(Message),
394 ChunkFlushed(FlushedChunkInfo),
395}
396
397impl<S: LocalStateStore> WriteFuture<S> {
398 fn flush_chunk(
399 stream: BoxedMessageStream,
400 write_state: LogStoreWriteState<S>,
401 chunk: StreamChunk,
402 epoch: u64,
403 start_seq_id: SeqId,
404 end_seq_id: SeqId,
405 ) -> Self {
406 tracing::trace!(
407 start_seq_id,
408 end_seq_id,
409 epoch,
410 cardinality = chunk.cardinality(),
411 "write_future: flushing chunk"
412 );
413 Self::FlushingChunk {
414 epoch,
415 start_seq_id,
416 end_seq_id,
417 future: Box::pin(write_state.into_write_chunk_future(
418 chunk,
419 epoch,
420 start_seq_id,
421 end_seq_id,
422 )),
423 stream,
424 }
425 }
426
427 fn receive_from_upstream(
428 stream: BoxedMessageStream,
429 write_state: LogStoreWriteState<S>,
430 ) -> Self {
431 Self::ReceiveFromUpstream {
432 future: stream.into_future(),
433 write_state,
434 }
435 }
436
437 fn paused(
438 duration: Duration,
439 barrier: Barrier,
440 stream: BoxedMessageStream,
441 write_state: LogStoreWriteState<S>,
442 ) -> Self {
443 let now = Instant::now();
444 tracing::trace!(?now, ?duration, "write_future_pause");
445 Self::Paused {
446 start_instant: now,
447 sleep_future: Some(Box::pin(sleep_until(now + duration))),
448 barrier,
449 stream,
450 write_state,
451 }
452 }
453
454 async fn next_event(
455 &mut self,
456 metrics: &SyncedKvLogStoreMetrics,
457 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
458 match self {
459 WriteFuture::Paused {
460 start_instant,
461 sleep_future,
462 ..
463 } => {
464 if let Some(sleep_future) = sleep_future {
465 sleep_future.await;
466 metrics
467 .pause_duration_ns
468 .inc_by(start_instant.elapsed().as_nanos() as _);
469 tracing::trace!("resuming write future");
470 }
471 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
472 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
473 })
474 }
475 WriteFuture::ReceiveFromUpstream { future, .. } => {
476 let (opt, stream) = future.await;
477 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
478 opt
479 .ok_or_else(|| anyhow!("end of upstream input").into())
480 .and_then(|result| result.map(|item| {
481 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
482 }))
483 })
484 }
485 WriteFuture::FlushingChunk { future, .. } => {
486 let (write_state, result) = future.await;
487 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
488 result.map(|(flush_info, vnode_bitmap)| {
489 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
490 epoch,
491 start_seq_id,
492 end_seq_id,
493 flush_info,
494 vnode_bitmap,
495 }))
496 })
497 });
498 result.map_err(Into::into)
499 }
500 WriteFuture::Empty => {
501 unreachable!("should not be polled after ready")
502 }
503 }
504 }
505}
506
507impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
509 #[try_stream(ok= Message, error = StreamExecutorError)]
510 pub async fn execute_monitored(self) {
511 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
512 #[for_await]
513 for message in self.execute_inner() {
514 let current_time = Instant::now();
515 yield message?;
516 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
517 }
518 }
519
520 #[try_stream(ok = Message, error = StreamExecutorError)]
521 async fn execute_inner(self) {
522 let mut input = self.upstream.execute();
523
524 let first_barrier = expect_first_barrier(&mut input).await?;
526 let first_write_epoch = first_barrier.epoch;
527 yield Message::Barrier(first_barrier.clone());
528
529 let local_state_store = self
530 .state_store
531 .new_local(NewLocalOptions {
532 table_id: self.table_id,
533 op_consistency_level: OpConsistencyLevel::Inconsistent,
534 table_option: TableOption {
535 retention_seconds: None,
536 },
537 is_replicated: false,
538 vnodes: self.serde.vnodes().clone(),
539 })
540 .await;
541
542 let (mut read_state, mut initial_write_state) =
543 new_log_store_state(self.table_id, local_state_store, self.serde);
544 initial_write_state.init(first_write_epoch).await?;
545
546 let mut pause_stream = first_barrier.is_pause_on_startup();
547 let mut initial_write_epoch = first_write_epoch;
548
549 'recreate_consume_stream: loop {
553 let mut seq_id = FIRST_SEQ_ID;
554 let mut buffer = SyncedLogStoreBuffer {
555 buffer: VecDeque::new(),
556 current_size: 0,
557 max_size: self.max_buffer_size,
558 max_chunk_size: self.chunk_size,
559 next_chunk_id: 0,
560 metrics: self.metrics.clone(),
561 flushed_count: 0,
562 };
563
564 let log_store_stream = read_state
565 .read_persisted_log_store(
566 self.metrics.persistent_log_read_metrics.clone(),
567 initial_write_epoch.curr,
568 LogStoreReadStateStreamRangeStart::Unbounded,
569 )
570 .await?;
571
572 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
573 let mut clean_state = log_store_stream.peek().await.is_none();
574 tracing::trace!(?clean_state);
575
576 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
577
578 let mut write_future_state =
579 WriteFuture::receive_from_upstream(input, initial_write_state);
580
581 let mut progress = LogStoreVnodeProgress::None;
582
583 loop {
584 let select_result = {
585 let read_future = async {
586 if pause_stream {
587 pending().await
588 } else {
589 read_future_state
590 .next_chunk(&mut progress, &read_state, &mut buffer, &self.metrics)
591 .await
592 }
593 };
594 pin_mut!(read_future);
595 let write_future = write_future_state.next_event(&self.metrics);
596 pin_mut!(write_future);
597 let output = select(write_future, read_future).await;
598 drop_either_future(output)
599 };
600 match select_result {
601 Either::Left(result) => {
602 drop(write_future_state);
604 let (stream, mut write_state, either) = result?;
605 match either {
606 WriteFutureEvent::UpstreamMessageReceived(msg) => {
607 match msg {
608 Message::Barrier(barrier) => {
609 if clean_state
610 && barrier.kind.is_checkpoint()
611 && !buffer.is_empty()
612 {
613 write_future_state = WriteFuture::paused(
614 self.pause_duration_ms,
615 barrier,
616 stream,
617 write_state,
618 );
619 clean_state = false;
620 self.metrics.unclean_state.inc();
621 } else {
622 if let Some(mutation) = barrier.mutation.as_deref() {
623 match mutation {
624 Mutation::Pause => {
625 pause_stream = true;
626 }
627 Mutation::Resume => {
628 pause_stream = false;
629 }
630 _ => {}
631 }
632 }
633 let write_state_post_write_barrier =
634 Self::write_barrier(
635 &mut write_state,
636 barrier.clone(),
637 &self.metrics,
638 progress.take(),
639 &mut buffer,
640 )
641 .await?;
642 seq_id = FIRST_SEQ_ID;
643 let update_vnode_bitmap = barrier
644 .as_update_vnode_bitmap(self.actor_context.id);
645 let barrier_epoch = barrier.epoch;
646
647 yield Message::Barrier(barrier);
648
649 write_state_post_write_barrier
650 .post_yield_barrier(update_vnode_bitmap.clone())
651 .await?;
652 if let Some(vnode_bitmap) = update_vnode_bitmap {
653 read_state.update_vnode_bitmap(vnode_bitmap);
655 initial_write_epoch = barrier_epoch;
656 input = stream;
657 initial_write_state = write_state;
658 continue 'recreate_consume_stream;
659 } else {
660 write_future_state =
661 WriteFuture::receive_from_upstream(
662 stream,
663 write_state,
664 );
665 }
666 }
667 }
668 Message::Chunk(chunk) => {
669 let start_seq_id = seq_id;
670 let new_seq_id = seq_id + chunk.cardinality() as SeqId;
671 let end_seq_id = new_seq_id - 1;
672 let epoch = write_state.epoch().curr;
673 tracing::trace!(
674 start_seq_id,
675 end_seq_id,
676 new_seq_id,
677 epoch,
678 cardinality = chunk.cardinality(),
679 "received chunk"
680 );
681 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
682 start_seq_id,
683 end_seq_id,
684 chunk,
685 epoch,
686 ) {
687 seq_id = new_seq_id;
688 write_future_state = WriteFuture::flush_chunk(
689 stream,
690 write_state,
691 chunk_to_flush,
692 epoch,
693 start_seq_id,
694 end_seq_id,
695 );
696 } else {
697 seq_id = new_seq_id;
698 write_future_state = WriteFuture::receive_from_upstream(
699 stream,
700 write_state,
701 );
702 }
703 }
704 Message::Watermark(_watermark) => {
707 write_future_state =
708 WriteFuture::receive_from_upstream(stream, write_state);
709 }
710 }
711 }
712 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
713 start_seq_id,
714 end_seq_id,
715 epoch,
716 flush_info,
717 vnode_bitmap,
718 }) => {
719 buffer.add_flushed_item_to_buffer(
720 start_seq_id,
721 end_seq_id,
722 vnode_bitmap,
723 epoch,
724 );
725 self.metrics
726 .storage_write_count
727 .inc_by(flush_info.flush_count as _);
728 self.metrics
729 .storage_write_size
730 .inc_by(flush_info.flush_size as _);
731 write_future_state =
732 WriteFuture::receive_from_upstream(stream, write_state);
733 }
734 }
735 }
736 Either::Right(result) => {
737 if !clean_state
738 && matches!(read_future_state, ReadFuture::Idle)
739 && buffer.is_empty()
740 {
741 clean_state = true;
742 self.metrics.clean_state.inc();
743
744 if let WriteFuture::Paused { sleep_future, .. } =
746 &mut write_future_state
747 {
748 tracing::trace!("resuming paused future");
749 assert!(buffer.current_size < self.max_buffer_size);
750 *sleep_future = None;
751 }
752 }
753 let chunk = result?;
754 self.metrics
755 .total_read_count
756 .inc_by(chunk.cardinality() as _);
757
758 yield Message::Chunk(chunk);
759 }
760 }
761 }
762 }
763 }
764}
765
766type PersistedStream<S> = Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>;
767
768enum ReadFuture<S: StateStoreRead> {
769 ReadingPersistedStream(PersistedStream<S>),
770 ReadingFlushedChunk {
771 future: ReadFlushedChunkFuture,
772 end_seq_id: SeqId,
773 },
774 Idle,
775}
776
777impl<S: StateStoreRead> ReadFuture<S> {
779 async fn next_chunk(
780 &mut self,
781 progress: &mut LogStoreVnodeProgress,
782 read_state: &LogStoreReadState<S>,
783 buffer: &mut SyncedLogStoreBuffer,
784 metrics: &SyncedKvLogStoreMetrics,
785 ) -> StreamExecutorResult<StreamChunk> {
786 match self {
787 ReadFuture::ReadingPersistedStream(stream) => {
788 while let Some((epoch, item)) = stream.try_next().await? {
789 match item {
790 KvLogStoreItem::Barrier { vnodes, .. } => {
791 tracing::trace!(epoch, "read logstore barrier");
792 progress.apply_aligned(vnodes, epoch, None);
794 continue;
795 }
796 KvLogStoreItem::StreamChunk {
797 chunk,
798 progress: chunk_progress,
799 } => {
800 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
801 progress.apply_per_vnode(epoch, chunk_progress);
802 return Ok(chunk);
803 }
804 }
805 }
806 *self = ReadFuture::Idle;
807 }
808 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
809 }
810 match self {
811 ReadFuture::ReadingPersistedStream(_) => {
812 unreachable!("must have finished read persisted stream when reaching here")
813 }
814 ReadFuture::ReadingFlushedChunk { .. } => {}
815 ReadFuture::Idle => loop {
816 let Some((item_epoch, item)) = buffer.pop_front() else {
817 return pending().await;
818 };
819 match item {
820 LogStoreBufferItem::StreamChunk {
821 chunk,
822 start_seq_id,
823 end_seq_id,
824 flushed,
825 ..
826 } => {
827 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
828 tracing::trace!(
829 start_seq_id,
830 end_seq_id,
831 flushed,
832 cardinality = chunk.cardinality(),
833 "read buffered chunk of size"
834 );
835 return Ok(chunk);
836 }
837 LogStoreBufferItem::Flushed {
838 vnode_bitmap,
839 start_seq_id,
840 end_seq_id,
841 chunk_id,
842 } => {
843 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
844 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
845 let future = read_state
846 .read_flushed_chunk(
847 vnode_bitmap,
848 chunk_id,
849 start_seq_id,
850 end_seq_id,
851 item_epoch,
852 read_metrics,
853 )
854 .boxed();
855 *self = ReadFuture::ReadingFlushedChunk { future, end_seq_id };
856 break;
857 }
858 LogStoreBufferItem::Barrier { .. } => {
859 tracing::trace!(item_epoch, "read buffer barrier");
860 progress.apply_aligned(read_state.vnodes().clone(), item_epoch, None);
861 continue;
862 }
863 }
864 },
865 }
866
867 let (future, end_seq_id) = match self {
868 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
869 unreachable!("should be at ReadingFlushedChunk")
870 }
871 ReadFuture::ReadingFlushedChunk { future, end_seq_id } => (future, *end_seq_id),
872 };
873
874 let (_, chunk, epoch) = future.await?;
875 progress.apply_aligned(read_state.vnodes().clone(), epoch, Some(end_seq_id));
876 tracing::trace!(
877 end_seq_id,
878 "read flushed chunk of size: {}",
879 chunk.cardinality()
880 );
881 *self = ReadFuture::Idle;
882 Ok(chunk)
883 }
884}
885
886impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
888 async fn write_barrier<'a>(
889 write_state: &'a mut LogStoreWriteState<S::Local>,
890 barrier: Barrier,
891 metrics: &SyncedKvLogStoreMetrics,
892 progress: LogStoreVnodeProgress,
893 buffer: &mut SyncedLogStoreBuffer,
894 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
895 tracing::trace!(?progress, "applying truncation");
896 let epoch = barrier.epoch.prev;
900 let mut writer = write_state.start_writer(false);
901 writer.write_barrier(epoch, barrier.is_checkpoint())?;
902
903 if barrier.is_checkpoint() {
904 for (epoch, item) in buffer.buffer.iter_mut().rev() {
905 match item {
906 LogStoreBufferItem::StreamChunk {
907 chunk,
908 start_seq_id,
909 end_seq_id,
910 flushed,
911 ..
912 } => {
913 if !*flushed {
914 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
915 *flushed = true;
916 } else {
917 break;
918 }
919 }
920 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
921 }
922 }
923 }
924
925 let (flush_info, _) = writer.finish().await?;
927 metrics
928 .storage_write_count
929 .inc_by(flush_info.flush_count as _);
930 metrics
931 .storage_write_size
932 .inc_by(flush_info.flush_size as _);
933 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, progress);
934
935 buffer.buffer.push_back((
937 epoch,
938 LogStoreBufferItem::Barrier {
939 is_checkpoint: barrier.is_checkpoint(),
940 next_epoch: barrier.epoch.curr,
941 },
942 ));
943 buffer.next_chunk_id = 0;
944 buffer.update_unconsumed_buffer_metrics();
945
946 Ok(post_seal)
947 }
948}
949
950struct SyncedLogStoreBuffer {
951 buffer: VecDeque<(u64, LogStoreBufferItem)>,
952 current_size: usize,
953 max_size: usize,
954 max_chunk_size: u32,
955 next_chunk_id: ChunkId,
956 metrics: SyncedKvLogStoreMetrics,
957 flushed_count: usize,
958}
959
960impl SyncedLogStoreBuffer {
961 fn is_empty(&self) -> bool {
962 self.current_size == 0
963 }
964
965 fn add_or_flush_chunk(
966 &mut self,
967 start_seq_id: SeqId,
968 end_seq_id: SeqId,
969 chunk: StreamChunk,
970 epoch: u64,
971 ) -> Option<StreamChunk> {
972 let current_size = self.current_size;
973 let chunk_size = chunk.cardinality();
974
975 tracing::trace!(
976 current_size,
977 chunk_size,
978 max_size = self.max_size,
979 "checking chunk size"
980 );
981 let should_flush_chunk = current_size + chunk_size > self.max_size;
982 if should_flush_chunk {
983 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
984 Some(chunk)
985 } else {
986 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
987 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
988 None
989 }
990 }
991
992 fn add_flushed_item_to_buffer(
995 &mut self,
996 start_seq_id: SeqId,
997 end_seq_id: SeqId,
998 new_vnode_bitmap: Bitmap,
999 epoch: u64,
1000 ) {
1001 let new_chunk_size = end_seq_id - start_seq_id + 1;
1002
1003 if let Some((
1004 item_epoch,
1005 LogStoreBufferItem::Flushed {
1006 start_seq_id: prev_start_seq_id,
1007 end_seq_id: prev_end_seq_id,
1008 vnode_bitmap,
1009 ..
1010 },
1011 )) = self.buffer.back_mut()
1012 && let flushed_chunk_size = *prev_end_seq_id - *prev_start_seq_id + 1
1013 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1014 && projected_flushed_chunk_size as u32 <= self.max_chunk_size
1015 {
1016 assert!(
1017 *prev_end_seq_id < start_seq_id,
1018 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1019 end_seq_id,
1020 start_seq_id
1021 );
1022 assert_eq!(
1023 epoch, *item_epoch,
1024 "epoch of newly added flushed item must be the same as the last flushed item"
1025 );
1026 *prev_end_seq_id = end_seq_id;
1027 *vnode_bitmap |= new_vnode_bitmap;
1028 } else {
1029 let chunk_id = self.next_chunk_id;
1030 self.next_chunk_id += 1;
1031 self.buffer.push_back((
1032 epoch,
1033 LogStoreBufferItem::Flushed {
1034 start_seq_id,
1035 end_seq_id,
1036 vnode_bitmap: new_vnode_bitmap,
1037 chunk_id,
1038 },
1039 ));
1040 self.flushed_count += 1;
1041 tracing::trace!(
1042 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1043 );
1044 }
1045 self.update_unconsumed_buffer_metrics();
1047 }
1048
1049 fn add_chunk_to_buffer(
1050 &mut self,
1051 chunk: StreamChunk,
1052 start_seq_id: SeqId,
1053 end_seq_id: SeqId,
1054 epoch: u64,
1055 ) {
1056 let chunk_id = self.next_chunk_id;
1057 self.next_chunk_id += 1;
1058 self.current_size += chunk.cardinality();
1059 self.buffer.push_back((
1060 epoch,
1061 LogStoreBufferItem::StreamChunk {
1062 chunk,
1063 start_seq_id,
1064 end_seq_id,
1065 flushed: false,
1066 chunk_id,
1067 },
1068 ));
1069 self.update_unconsumed_buffer_metrics();
1070 }
1071
1072 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1073 let item = self.buffer.pop_front();
1074 match &item {
1075 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1076 self.flushed_count -= 1;
1077 }
1078 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1079 self.current_size -= chunk.cardinality();
1080 }
1081 _ => {}
1082 }
1083 self.update_unconsumed_buffer_metrics();
1084 item
1085 }
1086
1087 fn update_unconsumed_buffer_metrics(&self) {
1088 let mut epoch_count = 0;
1089 let mut row_count = 0;
1090 for (_, item) in &self.buffer {
1091 match item {
1092 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1093 row_count += chunk.cardinality();
1094 }
1095 LogStoreBufferItem::Flushed {
1096 start_seq_id,
1097 end_seq_id,
1098 ..
1099 } => {
1100 row_count += (end_seq_id - start_seq_id) as usize;
1101 }
1102 LogStoreBufferItem::Barrier { .. } => {
1103 epoch_count += 1;
1104 }
1105 }
1106 }
1107 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1108 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1109 self.metrics
1110 .buffer_unconsumed_item_count
1111 .set(self.buffer.len() as _);
1112 self.metrics.buffer_unconsumed_min_epoch.set(
1113 self.buffer
1114 .front()
1115 .map(|(epoch, _)| *epoch)
1116 .unwrap_or_default() as _,
1117 );
1118 }
1119}
1120
1121impl<S> Execute for SyncedKvLogStoreExecutor<S>
1122where
1123 S: StateStore,
1124{
1125 fn execute(self: Box<Self>) -> BoxedMessageStream {
1126 self.execute_monitored().boxed()
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use itertools::Itertools;
1133 use pretty_assertions::assert_eq;
1134 use risingwave_common::catalog::Field;
1135 use risingwave_common::hash::VirtualNode;
1136 use risingwave_common::test_prelude::*;
1137 use risingwave_common::util::epoch::test_epoch;
1138 use risingwave_storage::memory::MemoryStateStore;
1139
1140 use super::*;
1141 use crate::assert_stream_chunk_eq;
1142 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1143 use crate::common::log_store_impl::kv_log_store::test_utils::{
1144 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1145 };
1146 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1147 use crate::executor::test_utils::MockSource;
1148
1149 fn init_logger() {
1150 let _ = tracing_subscriber::fmt()
1151 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1152 .with_ansi(false)
1153 .try_init();
1154 }
1155
1156 #[tokio::test]
1158 async fn test_read_write_buffer() {
1159 init_logger();
1160
1161 let pk_info = &KV_LOG_STORE_V2_INFO;
1162 let column_descs = test_payload_schema(pk_info);
1163 let fields = column_descs
1164 .into_iter()
1165 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1166 .collect_vec();
1167 let schema = Schema { fields };
1168 let pk_indices = vec![0];
1169 let (mut tx, source) = MockSource::channel();
1170 let source = source.into_executor(schema.clone(), pk_indices.clone());
1171
1172 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1173
1174 let table = gen_test_log_store_table(pk_info);
1175
1176 let log_store_executor = SyncedKvLogStoreExecutor::new(
1177 ActorContext::for_test(123),
1178 table.id,
1179 SyncedKvLogStoreMetrics::for_test(),
1180 LogStoreRowSerde::new(&table, vnodes, pk_info),
1181 MemoryStateStore::new(),
1182 10,
1183 256,
1184 source,
1185 Duration::from_millis(256),
1186 )
1187 .boxed();
1188
1189 tx.push_barrier(test_epoch(1), false);
1191
1192 let chunk_1 = StreamChunk::from_pretty(
1193 " I T
1194 + 5 10
1195 + 6 10
1196 + 8 10
1197 + 9 10
1198 + 10 11",
1199 );
1200
1201 let chunk_2 = StreamChunk::from_pretty(
1202 " I T
1203 - 5 10
1204 - 6 10
1205 - 8 10
1206 U- 9 10
1207 U+ 10 11",
1208 );
1209
1210 tx.push_chunk(chunk_1.clone());
1211 tx.push_chunk(chunk_2.clone());
1212
1213 let mut stream = log_store_executor.execute();
1214
1215 match stream.next().await {
1216 Some(Ok(Message::Barrier(barrier))) => {
1217 assert_eq!(barrier.epoch.curr, test_epoch(1));
1218 }
1219 other => panic!("Expected a barrier message, got {:?}", other),
1220 }
1221
1222 match stream.next().await {
1223 Some(Ok(Message::Chunk(chunk))) => {
1224 assert_stream_chunk_eq!(chunk, chunk_1);
1225 }
1226 other => panic!("Expected a chunk message, got {:?}", other),
1227 }
1228
1229 match stream.next().await {
1230 Some(Ok(Message::Chunk(chunk))) => {
1231 assert_stream_chunk_eq!(chunk, chunk_2);
1232 }
1233 other => panic!("Expected a chunk message, got {:?}", other),
1234 }
1235
1236 tx.push_barrier(test_epoch(2), false);
1237
1238 match stream.next().await {
1239 Some(Ok(Message::Barrier(barrier))) => {
1240 assert_eq!(barrier.epoch.curr, test_epoch(2));
1241 }
1242 other => panic!("Expected a barrier message, got {:?}", other),
1243 }
1244 }
1245
1246 #[tokio::test]
1252 async fn test_barrier_persisted_read() {
1253 init_logger();
1254
1255 let pk_info = &KV_LOG_STORE_V2_INFO;
1256 let column_descs = test_payload_schema(pk_info);
1257 let fields = column_descs
1258 .into_iter()
1259 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1260 .collect_vec();
1261 let schema = Schema { fields };
1262 let pk_indices = vec![0];
1263 let (mut tx, source) = MockSource::channel();
1264 let source = source.into_executor(schema.clone(), pk_indices.clone());
1265
1266 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1267
1268 let table = gen_test_log_store_table(pk_info);
1269
1270 let log_store_executor = SyncedKvLogStoreExecutor::new(
1271 ActorContext::for_test(123),
1272 table.id,
1273 SyncedKvLogStoreMetrics::for_test(),
1274 LogStoreRowSerde::new(&table, vnodes, pk_info),
1275 MemoryStateStore::new(),
1276 10,
1277 256,
1278 source,
1279 Duration::from_millis(256),
1280 )
1281 .boxed();
1282
1283 tx.push_barrier(test_epoch(1), false);
1285
1286 let chunk_1 = StreamChunk::from_pretty(
1287 " I T
1288 + 5 10
1289 + 6 10
1290 + 8 10
1291 + 9 10
1292 + 10 11",
1293 );
1294
1295 let chunk_2 = StreamChunk::from_pretty(
1296 " I T
1297 - 5 10
1298 - 6 10
1299 - 8 10
1300 U- 10 11
1301 U+ 10 10",
1302 );
1303
1304 tx.push_chunk(chunk_1.clone());
1305 tx.push_chunk(chunk_2.clone());
1306
1307 tx.push_barrier(test_epoch(2), false);
1308
1309 let mut stream = log_store_executor.execute();
1310
1311 match stream.next().await {
1312 Some(Ok(Message::Barrier(barrier))) => {
1313 assert_eq!(barrier.epoch.curr, test_epoch(1));
1314 }
1315 other => panic!("Expected a barrier message, got {:?}", other),
1316 }
1317
1318 match stream.next().await {
1319 Some(Ok(Message::Chunk(chunk))) => {
1320 assert_stream_chunk_eq!(chunk, chunk_1);
1321 }
1322 other => panic!("Expected a chunk message, got {:?}", other),
1323 }
1324
1325 match stream.next().await {
1326 Some(Ok(Message::Chunk(chunk))) => {
1327 assert_stream_chunk_eq!(chunk, chunk_2);
1328 }
1329 other => panic!("Expected a chunk message, got {:?}", other),
1330 }
1331
1332 match stream.next().await {
1333 Some(Ok(Message::Barrier(barrier))) => {
1334 assert_eq!(barrier.epoch.curr, test_epoch(2));
1335 }
1336 other => panic!("Expected a barrier message, got {:?}", other),
1337 }
1338 }
1339
1340 #[tokio::test]
1343 async fn test_max_chunk_persisted_read() {
1344 init_logger();
1345
1346 let pk_info = &KV_LOG_STORE_V2_INFO;
1347 let column_descs = test_payload_schema(pk_info);
1348 let fields = column_descs
1349 .into_iter()
1350 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1351 .collect_vec();
1352 let schema = Schema { fields };
1353 let pk_indices = vec![0];
1354 let (mut tx, source) = MockSource::channel();
1355 let source = source.into_executor(schema.clone(), pk_indices.clone());
1356
1357 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1358
1359 let table = gen_test_log_store_table(pk_info);
1360
1361 let log_store_executor = SyncedKvLogStoreExecutor::new(
1362 ActorContext::for_test(123),
1363 table.id,
1364 SyncedKvLogStoreMetrics::for_test(),
1365 LogStoreRowSerde::new(&table, vnodes, pk_info),
1366 MemoryStateStore::new(),
1367 0,
1368 256,
1369 source,
1370 Duration::from_millis(256),
1371 )
1372 .boxed();
1373
1374 tx.push_barrier(test_epoch(1), false);
1376
1377 let chunk_1 = StreamChunk::from_pretty(
1378 " I T
1379 + 5 10
1380 + 6 10
1381 + 8 10
1382 + 9 10
1383 + 10 11",
1384 );
1385
1386 let chunk_2 = StreamChunk::from_pretty(
1387 " I T
1388 - 5 10
1389 - 6 10
1390 - 8 10
1391 U- 10 11
1392 U+ 10 10",
1393 );
1394
1395 tx.push_chunk(chunk_1.clone());
1396 tx.push_chunk(chunk_2.clone());
1397
1398 tx.push_barrier(test_epoch(2), false);
1399
1400 let mut stream = log_store_executor.execute();
1401
1402 for i in 1..=2 {
1403 match stream.next().await {
1404 Some(Ok(Message::Barrier(barrier))) => {
1405 assert_eq!(barrier.epoch.curr, test_epoch(i));
1406 }
1407 other => panic!("Expected a barrier message, got {:?}", other),
1408 }
1409 }
1410
1411 match stream.next().await {
1412 Some(Ok(Message::Chunk(actual))) => {
1413 let expected = StreamChunk::from_pretty(
1414 " I T
1415 + 5 10
1416 + 6 10
1417 + 8 10
1418 + 9 10
1419 + 10 11
1420 - 5 10
1421 - 6 10
1422 - 8 10
1423 U- 10 11
1424 U+ 10 10",
1425 );
1426 assert_stream_chunk_eq!(actual, expected);
1427 }
1428 other => panic!("Expected a chunk message, got {:?}", other),
1429 }
1430 }
1431}