1use std::collections::VecDeque;
63use std::future::pending;
64use std::mem::replace;
65use std::pin::Pin;
66
67use anyhow::anyhow;
68use futures::future::{BoxFuture, Either, select};
69use futures::stream::StreamFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use futures_async_stream::try_stream;
72use risingwave_common::array::StreamChunk;
73use risingwave_common::bitmap::Bitmap;
74use risingwave_common::catalog::{TableId, TableOption};
75use risingwave_common::must_match;
76use risingwave_connector::sink::log_store::{ChunkId, LogStoreResult};
77use risingwave_storage::StateStore;
78use risingwave_storage::store::{
79 LocalStateStore, NewLocalOptions, OpConsistencyLevel, StateStoreRead,
80};
81use rw_futures_util::drop_either_future;
82use tokio::time::{Duration, Instant, Sleep, sleep_until};
83use tokio_stream::adapters::Peekable;
84
85use crate::common::log_store_impl::kv_log_store::buffer::LogStoreBufferItem;
86use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
87use crate::common::log_store_impl::kv_log_store::reader::timeout_auto_rebuild::TimeoutAutoRebuildIter;
88use crate::common::log_store_impl::kv_log_store::serde::{
89 KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
90};
91use crate::common::log_store_impl::kv_log_store::state::{
92 LogStorePostSealCurrentEpoch, LogStoreReadState, LogStoreStateWriteChunkFuture,
93 LogStoreWriteState, new_log_store_state,
94};
95use crate::common::log_store_impl::kv_log_store::{
96 FIRST_SEQ_ID, FlushInfo, ReaderTruncationOffsetType, SeqIdType,
97};
98use crate::executor::prelude::*;
99use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
100use crate::executor::{
101 Barrier, BoxedMessageStream, Message, StreamExecutorError, StreamExecutorResult,
102};
103
104pub mod metrics {
105 use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
106
107 use crate::common::log_store_impl::kv_log_store::KvLogStoreReadMetrics;
108 use crate::executor::monitor::StreamingMetrics;
109 use crate::task::ActorId;
110
111 #[derive(Clone)]
112 pub struct SyncedKvLogStoreMetrics {
113 pub unclean_state: LabelGuardedIntCounter<5>,
115 pub clean_state: LabelGuardedIntCounter<5>,
116 pub wait_next_poll_ns: LabelGuardedIntCounter<4>,
117
118 pub storage_write_count: LabelGuardedIntCounter<4>,
120 pub storage_write_size: LabelGuardedIntCounter<4>,
121 pub pause_duration_ns: LabelGuardedIntCounter<4>,
122
123 pub buffer_unconsumed_item_count: LabelGuardedIntGauge<4>,
125 pub buffer_unconsumed_row_count: LabelGuardedIntGauge<4>,
126 pub buffer_unconsumed_epoch_count: LabelGuardedIntGauge<4>,
127 pub buffer_unconsumed_min_epoch: LabelGuardedIntGauge<4>,
128 pub buffer_read_count: LabelGuardedIntCounter<5>,
129 pub buffer_read_size: LabelGuardedIntCounter<5>,
130
131 pub total_read_count: LabelGuardedIntCounter<5>,
133 pub total_read_size: LabelGuardedIntCounter<5>,
134 pub persistent_log_read_metrics: KvLogStoreReadMetrics,
135 pub flushed_buffer_read_metrics: KvLogStoreReadMetrics,
136 }
137
138 impl SyncedKvLogStoreMetrics {
139 pub(crate) fn new(
146 metrics: &StreamingMetrics,
147 actor_id: ActorId,
148 id: u32,
149 name: &str,
150 target: &'static str,
151 ) -> Self {
152 let actor_id_str = actor_id.to_string();
153 let id_str = id.to_string();
154 let labels = &[&actor_id_str, target, &id_str, name];
155
156 let unclean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
157 "dirty",
158 &actor_id_str,
159 target,
160 &id_str,
161 name,
162 ]);
163 let clean_state = metrics.sync_kv_log_store_state.with_guarded_label_values(&[
164 "clean",
165 &actor_id_str,
166 target,
167 &id_str,
168 name,
169 ]);
170 let wait_next_poll_ns = metrics
171 .sync_kv_log_store_wait_next_poll_ns
172 .with_guarded_label_values(labels);
173
174 let storage_write_size = metrics
175 .sync_kv_log_store_storage_write_size
176 .with_guarded_label_values(labels);
177 let storage_write_count = metrics
178 .sync_kv_log_store_storage_write_count
179 .with_guarded_label_values(labels);
180 let storage_pause_duration_ns = metrics
181 .sync_kv_log_store_write_pause_duration_ns
182 .with_guarded_label_values(labels);
183
184 let buffer_unconsumed_item_count = metrics
185 .sync_kv_log_store_buffer_unconsumed_item_count
186 .with_guarded_label_values(labels);
187 let buffer_unconsumed_row_count = metrics
188 .sync_kv_log_store_buffer_unconsumed_row_count
189 .with_guarded_label_values(labels);
190 let buffer_unconsumed_epoch_count = metrics
191 .sync_kv_log_store_buffer_unconsumed_epoch_count
192 .with_guarded_label_values(labels);
193 let buffer_unconsumed_min_epoch = metrics
194 .sync_kv_log_store_buffer_unconsumed_min_epoch
195 .with_guarded_label_values(labels);
196 let buffer_read_count = metrics
197 .sync_kv_log_store_read_count
198 .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
199
200 let buffer_read_size = metrics
201 .sync_kv_log_store_read_size
202 .with_guarded_label_values(&["buffer", &actor_id_str, target, &id_str, name]);
203
204 let total_read_count = metrics
205 .sync_kv_log_store_read_count
206 .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
207
208 let total_read_size = metrics
209 .sync_kv_log_store_read_size
210 .with_guarded_label_values(&["total", &actor_id_str, target, &id_str, name]);
211
212 const READ_PERSISTENT_LOG: &str = "persistent_log";
213 const READ_FLUSHED_BUFFER: &str = "flushed_buffer";
214
215 let persistent_log_read_size = metrics
216 .sync_kv_log_store_read_size
217 .with_guarded_label_values(&[
218 READ_PERSISTENT_LOG,
219 &actor_id_str,
220 target,
221 &id_str,
222 name,
223 ]);
224
225 let persistent_log_read_count = metrics
226 .sync_kv_log_store_read_count
227 .with_guarded_label_values(&[
228 READ_PERSISTENT_LOG,
229 &actor_id_str,
230 target,
231 &id_str,
232 name,
233 ]);
234
235 let flushed_buffer_read_size = metrics
236 .sync_kv_log_store_read_size
237 .with_guarded_label_values(&[
238 READ_FLUSHED_BUFFER,
239 &actor_id_str,
240 target,
241 &id_str,
242 name,
243 ]);
244
245 let flushed_buffer_read_count = metrics
246 .sync_kv_log_store_read_count
247 .with_guarded_label_values(&[
248 READ_FLUSHED_BUFFER,
249 &actor_id_str,
250 target,
251 &id_str,
252 name,
253 ]);
254
255 Self {
256 unclean_state,
257 clean_state,
258 wait_next_poll_ns,
259 storage_write_size,
260 storage_write_count,
261 pause_duration_ns: storage_pause_duration_ns,
262 buffer_unconsumed_item_count,
263 buffer_unconsumed_row_count,
264 buffer_unconsumed_epoch_count,
265 buffer_unconsumed_min_epoch,
266 buffer_read_count,
267 buffer_read_size,
268 total_read_count,
269 total_read_size,
270 persistent_log_read_metrics: KvLogStoreReadMetrics {
271 storage_read_size: persistent_log_read_size,
272 storage_read_count: persistent_log_read_count,
273 },
274 flushed_buffer_read_metrics: KvLogStoreReadMetrics {
275 storage_read_count: flushed_buffer_read_count,
276 storage_read_size: flushed_buffer_read_size,
277 },
278 }
279 }
280
281 #[cfg(test)]
282 pub(crate) fn for_test() -> Self {
283 SyncedKvLogStoreMetrics {
284 unclean_state: LabelGuardedIntCounter::test_int_counter(),
285 clean_state: LabelGuardedIntCounter::test_int_counter(),
286 wait_next_poll_ns: LabelGuardedIntCounter::test_int_counter(),
287 storage_write_count: LabelGuardedIntCounter::test_int_counter(),
288 storage_write_size: LabelGuardedIntCounter::test_int_counter(),
289 pause_duration_ns: LabelGuardedIntCounter::test_int_counter(),
290 buffer_unconsumed_item_count: LabelGuardedIntGauge::test_int_gauge(),
291 buffer_unconsumed_row_count: LabelGuardedIntGauge::test_int_gauge(),
292 buffer_unconsumed_epoch_count: LabelGuardedIntGauge::test_int_gauge(),
293 buffer_unconsumed_min_epoch: LabelGuardedIntGauge::test_int_gauge(),
294 buffer_read_count: LabelGuardedIntCounter::test_int_counter(),
295 buffer_read_size: LabelGuardedIntCounter::test_int_counter(),
296 total_read_count: LabelGuardedIntCounter::test_int_counter(),
297 total_read_size: LabelGuardedIntCounter::test_int_counter(),
298 persistent_log_read_metrics: KvLogStoreReadMetrics::for_test(),
299 flushed_buffer_read_metrics: KvLogStoreReadMetrics::for_test(),
300 }
301 }
302 }
303}
304
305type ReadFlushedChunkFuture = BoxFuture<'static, LogStoreResult<(ChunkId, StreamChunk, u64)>>;
306
307pub struct SyncedKvLogStoreExecutor<S: StateStore> {
308 actor_context: ActorContextRef,
309 table_id: TableId,
310 metrics: SyncedKvLogStoreMetrics,
311 serde: LogStoreRowSerde,
312
313 upstream: Executor,
315
316 state_store: S,
318 max_buffer_size: usize,
319
320 chunk_size: u32,
322
323 pause_duration_ms: Duration,
324}
325impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
327 #[expect(clippy::too_many_arguments)]
328 pub(crate) fn new(
329 actor_context: ActorContextRef,
330 table_id: u32,
331 metrics: SyncedKvLogStoreMetrics,
332 serde: LogStoreRowSerde,
333 state_store: S,
334 buffer_size: usize,
335 chunk_size: u32,
336 upstream: Executor,
337 pause_duration_ms: Duration,
338 ) -> Self {
339 Self {
340 actor_context,
341 table_id: TableId::new(table_id),
342 metrics,
343 serde,
344 state_store,
345 upstream,
346 max_buffer_size: buffer_size,
347 chunk_size,
348 pause_duration_ms,
349 }
350 }
351}
352
353struct FlushedChunkInfo {
354 epoch: u64,
355 start_seq_id: SeqIdType,
356 end_seq_id: SeqIdType,
357 flush_info: FlushInfo,
358 vnode_bitmap: Bitmap,
359}
360
361enum WriteFuture<S: LocalStateStore> {
362 Paused {
372 start_instant: Instant,
373 sleep_future: Option<Pin<Box<Sleep>>>,
374 barrier: Barrier,
375 stream: BoxedMessageStream,
376 write_state: LogStoreWriteState<S>, },
378 ReceiveFromUpstream {
379 future: StreamFuture<BoxedMessageStream>,
380 write_state: LogStoreWriteState<S>,
381 },
382 FlushingChunk {
383 epoch: u64,
384 start_seq_id: SeqIdType,
385 end_seq_id: SeqIdType,
386 future: Pin<Box<LogStoreStateWriteChunkFuture<S>>>,
387 stream: BoxedMessageStream,
388 },
389 Empty,
390}
391
392enum WriteFutureEvent {
393 UpstreamMessageReceived(Message),
394 ChunkFlushed(FlushedChunkInfo),
395}
396
397impl<S: LocalStateStore> WriteFuture<S> {
398 fn flush_chunk(
399 stream: BoxedMessageStream,
400 write_state: LogStoreWriteState<S>,
401 chunk: StreamChunk,
402 epoch: u64,
403 start_seq_id: SeqIdType,
404 end_seq_id: SeqIdType,
405 ) -> Self {
406 tracing::trace!(
407 start_seq_id,
408 end_seq_id,
409 epoch,
410 cardinality = chunk.cardinality(),
411 "write_future: flushing chunk"
412 );
413 Self::FlushingChunk {
414 epoch,
415 start_seq_id,
416 end_seq_id,
417 future: Box::pin(write_state.into_write_chunk_future(
418 chunk,
419 epoch,
420 start_seq_id,
421 end_seq_id,
422 )),
423 stream,
424 }
425 }
426
427 fn receive_from_upstream(
428 stream: BoxedMessageStream,
429 write_state: LogStoreWriteState<S>,
430 ) -> Self {
431 Self::ReceiveFromUpstream {
432 future: stream.into_future(),
433 write_state,
434 }
435 }
436
437 fn paused(
438 duration: Duration,
439 barrier: Barrier,
440 stream: BoxedMessageStream,
441 write_state: LogStoreWriteState<S>,
442 ) -> Self {
443 let now = Instant::now();
444 tracing::trace!(?now, ?duration, "write_future_pause");
445 Self::Paused {
446 start_instant: now,
447 sleep_future: Some(Box::pin(sleep_until(now + duration))),
448 barrier,
449 stream,
450 write_state,
451 }
452 }
453
454 async fn next_event(
455 &mut self,
456 metrics: &SyncedKvLogStoreMetrics,
457 ) -> StreamExecutorResult<(BoxedMessageStream, LogStoreWriteState<S>, WriteFutureEvent)> {
458 match self {
459 WriteFuture::Paused {
460 start_instant,
461 sleep_future,
462 ..
463 } => {
464 if let Some(sleep_future) = sleep_future {
465 sleep_future.await;
466 metrics
467 .pause_duration_ns
468 .inc_by(start_instant.elapsed().as_nanos() as _);
469 tracing::trace!("resuming write future");
470 }
471 must_match!(replace(self, WriteFuture::Empty), WriteFuture::Paused { stream, write_state, barrier, .. } => {
472 Ok((stream, write_state, WriteFutureEvent::UpstreamMessageReceived(Message::Barrier(barrier))))
473 })
474 }
475 WriteFuture::ReceiveFromUpstream { future, .. } => {
476 let (opt, stream) = future.await;
477 must_match!(replace(self, WriteFuture::Empty), WriteFuture::ReceiveFromUpstream { write_state, .. } => {
478 opt
479 .ok_or_else(|| anyhow!("end of upstream input").into())
480 .and_then(|result| result.map(|item| {
481 (stream, write_state, WriteFutureEvent::UpstreamMessageReceived(item))
482 }))
483 })
484 }
485 WriteFuture::FlushingChunk { future, .. } => {
486 let (write_state, result) = future.await;
487 let result = must_match!(replace(self, WriteFuture::Empty), WriteFuture::FlushingChunk { epoch, start_seq_id, end_seq_id, stream, .. } => {
488 result.map(|(flush_info, vnode_bitmap)| {
489 (stream, write_state, WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
490 epoch,
491 start_seq_id,
492 end_seq_id,
493 flush_info,
494 vnode_bitmap,
495 }))
496 })
497 });
498 result.map_err(Into::into)
499 }
500 WriteFuture::Empty => {
501 unreachable!("should not be polled after ready")
502 }
503 }
504 }
505}
506
507impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
509 #[try_stream(ok= Message, error = StreamExecutorError)]
510 pub async fn execute_monitored(self) {
511 let wait_next_poll_ns = self.metrics.wait_next_poll_ns.clone();
512 #[for_await]
513 for message in self.execute_inner() {
514 let current_time = Instant::now();
515 yield message?;
516 wait_next_poll_ns.inc_by(current_time.elapsed().as_nanos() as _);
517 }
518 }
519
520 #[try_stream(ok = Message, error = StreamExecutorError)]
521 async fn execute_inner(self) {
522 let mut input = self.upstream.execute();
523
524 let first_barrier = expect_first_barrier(&mut input).await?;
526 let first_write_epoch = first_barrier.epoch;
527 yield Message::Barrier(first_barrier.clone());
528
529 let local_state_store = self
530 .state_store
531 .new_local(NewLocalOptions {
532 table_id: self.table_id,
533 op_consistency_level: OpConsistencyLevel::Inconsistent,
534 table_option: TableOption {
535 retention_seconds: None,
536 },
537 is_replicated: false,
538 vnodes: self.serde.vnodes().clone(),
539 })
540 .await;
541
542 let (mut read_state, mut initial_write_state) =
543 new_log_store_state(self.table_id, local_state_store, self.serde);
544 initial_write_state.init(first_write_epoch).await?;
545
546 let mut pause_stream = first_barrier.is_pause_on_startup();
547 let mut initial_write_epoch = first_write_epoch;
548
549 'recreate_consume_stream: loop {
553 let mut seq_id = FIRST_SEQ_ID;
554 let mut truncation_offset = None;
555 let mut buffer = SyncedLogStoreBuffer {
556 buffer: VecDeque::new(),
557 current_size: 0,
558 max_size: self.max_buffer_size,
559 max_chunk_size: self.chunk_size,
560 next_chunk_id: 0,
561 metrics: self.metrics.clone(),
562 flushed_count: 0,
563 };
564
565 let log_store_stream = read_state
566 .read_persisted_log_store(
567 self.metrics.persistent_log_read_metrics.clone(),
568 initial_write_epoch.prev,
569 LogStoreReadStateStreamRangeStart::Unbounded,
570 )
571 .await?;
572
573 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
574 let mut clean_state = log_store_stream.peek().await.is_none();
575
576 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
577
578 let mut write_future_state =
579 WriteFuture::receive_from_upstream(input, initial_write_state);
580
581 loop {
582 let select_result = {
583 let read_future = async {
584 if pause_stream {
585 pending().await
586 } else {
587 read_future_state
588 .next_chunk(&read_state, &mut buffer, &self.metrics)
589 .await
590 }
591 };
592 pin_mut!(read_future);
593 let write_future = write_future_state.next_event(&self.metrics);
594 pin_mut!(write_future);
595 let output = select(write_future, read_future).await;
596 drop_either_future(output)
597 };
598 match select_result {
599 Either::Left(result) => {
600 drop(write_future_state);
602 let (stream, mut write_state, either) = result?;
603 match either {
604 WriteFutureEvent::UpstreamMessageReceived(msg) => {
605 match msg {
606 Message::Barrier(barrier) => {
607 if clean_state
608 && barrier.kind.is_checkpoint()
609 && !buffer.is_empty()
610 {
611 write_future_state = WriteFuture::paused(
612 self.pause_duration_ms,
613 barrier,
614 stream,
615 write_state,
616 );
617 clean_state = false;
618 self.metrics.unclean_state.inc();
619 } else {
620 if let Some(mutation) = barrier.mutation.as_deref() {
621 match mutation {
622 Mutation::Pause => {
623 pause_stream = true;
624 }
625 Mutation::Resume => {
626 pause_stream = false;
627 }
628 _ => {}
629 }
630 }
631 let write_state_post_write_barrier =
632 Self::write_barrier(
633 &mut write_state,
634 barrier.clone(),
635 &self.metrics,
636 truncation_offset,
637 &mut buffer,
638 )
639 .await?;
640 seq_id = FIRST_SEQ_ID;
641 let update_vnode_bitmap = barrier
642 .as_update_vnode_bitmap(self.actor_context.id);
643 let barrier_epoch = barrier.epoch;
644
645 yield Message::Barrier(barrier);
646
647 write_state_post_write_barrier
648 .post_yield_barrier(update_vnode_bitmap.clone())
649 .await?;
650 if let Some(vnode_bitmap) = update_vnode_bitmap {
651 read_state.update_vnode_bitmap(vnode_bitmap);
653 initial_write_epoch = barrier_epoch;
654 input = stream;
655 initial_write_state = write_state;
656 continue 'recreate_consume_stream;
657 } else {
658 write_future_state =
659 WriteFuture::receive_from_upstream(
660 stream,
661 write_state,
662 );
663 }
664 }
665 }
666 Message::Chunk(chunk) => {
667 let start_seq_id = seq_id;
668 let new_seq_id = seq_id + chunk.cardinality() as SeqIdType;
669 let end_seq_id = new_seq_id - 1;
670 let epoch = write_state.epoch().curr;
671 tracing::trace!(
672 start_seq_id,
673 end_seq_id,
674 new_seq_id,
675 epoch,
676 cardinality = chunk.cardinality(),
677 "received chunk"
678 );
679 if let Some(chunk_to_flush) = buffer.add_or_flush_chunk(
680 start_seq_id,
681 end_seq_id,
682 chunk,
683 epoch,
684 ) {
685 seq_id = new_seq_id;
686 write_future_state = WriteFuture::flush_chunk(
687 stream,
688 write_state,
689 chunk_to_flush,
690 epoch,
691 start_seq_id,
692 end_seq_id,
693 );
694 } else {
695 seq_id = new_seq_id;
696 write_future_state = WriteFuture::receive_from_upstream(
697 stream,
698 write_state,
699 );
700 }
701 }
702 Message::Watermark(_watermark) => {
705 write_future_state =
706 WriteFuture::receive_from_upstream(stream, write_state);
707 }
708 }
709 }
710 WriteFutureEvent::ChunkFlushed(FlushedChunkInfo {
711 start_seq_id,
712 end_seq_id,
713 epoch,
714 flush_info,
715 vnode_bitmap,
716 }) => {
717 buffer.add_flushed_item_to_buffer(
718 start_seq_id,
719 end_seq_id,
720 vnode_bitmap,
721 epoch,
722 );
723 self.metrics
724 .storage_write_count
725 .inc_by(flush_info.flush_count as _);
726 self.metrics
727 .storage_write_size
728 .inc_by(flush_info.flush_size as _);
729 write_future_state =
730 WriteFuture::receive_from_upstream(stream, write_state);
731 }
732 }
733 }
734 Either::Right(result) => {
735 if !clean_state
736 && matches!(read_future_state, ReadFuture::Idle)
737 && buffer.is_empty()
738 {
739 clean_state = true;
740 self.metrics.clean_state.inc();
741
742 if let WriteFuture::Paused { sleep_future, .. } =
744 &mut write_future_state
745 {
746 tracing::trace!("resuming paused future");
747 assert!(buffer.current_size < self.max_buffer_size);
748 *sleep_future = None;
749 }
750 }
751 let (chunk, new_truncate_offset) = result?;
752 if let Some(new_truncate_offset) = new_truncate_offset {
753 truncation_offset = Some(new_truncate_offset);
754 }
755 self.metrics
756 .total_read_count
757 .inc_by(chunk.cardinality() as _);
758
759 yield Message::Chunk(chunk);
760 }
761 }
762 }
763 }
764 }
765}
766
767enum ReadFuture<S: StateStoreRead> {
768 ReadingPersistedStream(Peekable<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>),
769 ReadingFlushedChunk {
770 future: ReadFlushedChunkFuture,
771 truncate_offset: ReaderTruncationOffsetType,
772 },
773 Idle,
774}
775
776impl<S: StateStoreRead> ReadFuture<S> {
778 async fn next_chunk(
780 &mut self,
781 read_state: &LogStoreReadState<S>,
782 buffer: &mut SyncedLogStoreBuffer,
783 metrics: &SyncedKvLogStoreMetrics,
784 ) -> StreamExecutorResult<(StreamChunk, Option<ReaderTruncationOffsetType>)> {
785 match self {
786 ReadFuture::ReadingPersistedStream(stream) => {
787 while let Some((_, item)) = stream.try_next().await? {
788 match item {
789 KvLogStoreItem::Barrier { .. } => {
790 continue;
791 }
792 KvLogStoreItem::StreamChunk(chunk) => {
793 tracing::trace!("read logstore chunk of size: {}", chunk.cardinality());
795 return Ok((chunk, None));
796 }
797 }
798 }
799 *self = ReadFuture::Idle;
800 }
801 ReadFuture::ReadingFlushedChunk { .. } | ReadFuture::Idle => {}
802 }
803 match self {
804 ReadFuture::ReadingPersistedStream(_) => {
805 unreachable!("must have finished read persisted stream when reaching here")
806 }
807 ReadFuture::ReadingFlushedChunk { .. } => {}
808 ReadFuture::Idle => loop {
809 let Some((item_epoch, item)) = buffer.pop_front() else {
810 return pending().await;
811 };
812 match item {
813 LogStoreBufferItem::StreamChunk {
814 chunk,
815 start_seq_id,
816 end_seq_id,
817 flushed,
818 ..
819 } => {
820 metrics.buffer_read_count.inc_by(chunk.cardinality() as _);
821 tracing::trace!(
822 start_seq_id,
823 end_seq_id,
824 flushed,
825 cardinality = chunk.cardinality(),
826 "read buffered chunk of size"
827 );
828 return Ok((chunk, Some((item_epoch, Some(end_seq_id)))));
829 }
830 LogStoreBufferItem::Flushed {
831 vnode_bitmap,
832 start_seq_id,
833 end_seq_id,
834 chunk_id,
835 } => {
836 tracing::trace!(start_seq_id, end_seq_id, chunk_id, "read flushed chunk");
837 let truncate_offset = (item_epoch, Some(end_seq_id));
838 let read_metrics = metrics.flushed_buffer_read_metrics.clone();
839 let future = read_state
840 .read_flushed_chunk(
841 vnode_bitmap,
842 chunk_id,
843 start_seq_id,
844 end_seq_id,
845 item_epoch,
846 read_metrics,
847 )
848 .boxed();
849 *self = ReadFuture::ReadingFlushedChunk {
850 future,
851 truncate_offset,
852 };
853 break;
854 }
855 LogStoreBufferItem::Barrier { .. } => {
856 continue;
857 }
858 }
859 },
860 }
861
862 let (future, truncate_offset) = match self {
863 ReadFuture::ReadingPersistedStream(_) | ReadFuture::Idle => {
864 unreachable!("should be at ReadingFlushedChunk")
865 }
866 ReadFuture::ReadingFlushedChunk {
867 future,
868 truncate_offset,
869 } => (future, *truncate_offset),
870 };
871
872 let (_, chunk, _) = future.await?;
873 tracing::trace!("read flushed chunk of size: {}", chunk.cardinality());
874 *self = ReadFuture::Idle;
875 Ok((chunk, Some(truncate_offset)))
876 }
877}
878
879impl<S: StateStore> SyncedKvLogStoreExecutor<S> {
881 async fn write_barrier<'a>(
882 write_state: &'a mut LogStoreWriteState<S::Local>,
883 barrier: Barrier,
884 metrics: &SyncedKvLogStoreMetrics,
885 truncation_offset: Option<ReaderTruncationOffsetType>,
886 buffer: &mut SyncedLogStoreBuffer,
887 ) -> StreamExecutorResult<LogStorePostSealCurrentEpoch<'a, S::Local>> {
888 let epoch = barrier.epoch.prev;
892 let mut writer = write_state.start_writer(false);
893 writer.write_barrier(epoch, barrier.is_checkpoint())?;
894
895 if barrier.is_checkpoint() {
896 for (epoch, item) in buffer.buffer.iter_mut().rev() {
897 match item {
898 LogStoreBufferItem::StreamChunk {
899 chunk,
900 start_seq_id,
901 end_seq_id,
902 flushed,
903 ..
904 } => {
905 if !*flushed {
906 writer.write_chunk(chunk, *epoch, *start_seq_id, *end_seq_id)?;
907 *flushed = true;
908 } else {
909 break;
910 }
911 }
912 LogStoreBufferItem::Flushed { .. } | LogStoreBufferItem::Barrier { .. } => {}
913 }
914 }
915 }
916
917 let (flush_info, _) = writer.finish().await?;
919 metrics
920 .storage_write_count
921 .inc_by(flush_info.flush_count as _);
922 metrics
923 .storage_write_size
924 .inc_by(flush_info.flush_size as _);
925 let post_seal = write_state.seal_current_epoch(barrier.epoch.curr, truncation_offset);
926
927 buffer.buffer.push_back((
929 epoch,
930 LogStoreBufferItem::Barrier {
931 is_checkpoint: barrier.is_checkpoint(),
932 next_epoch: barrier.epoch.curr,
933 },
934 ));
935 buffer.next_chunk_id = 0;
936 buffer.update_unconsumed_buffer_metrics();
937
938 Ok(post_seal)
939 }
940}
941
942struct SyncedLogStoreBuffer {
943 buffer: VecDeque<(u64, LogStoreBufferItem)>,
944 current_size: usize,
945 max_size: usize,
946 max_chunk_size: u32,
947 next_chunk_id: ChunkId,
948 metrics: SyncedKvLogStoreMetrics,
949 flushed_count: usize,
950}
951
952impl SyncedLogStoreBuffer {
953 fn is_empty(&self) -> bool {
954 self.current_size == 0
955 }
956
957 fn add_or_flush_chunk(
958 &mut self,
959 start_seq_id: SeqIdType,
960 end_seq_id: SeqIdType,
961 chunk: StreamChunk,
962 epoch: u64,
963 ) -> Option<StreamChunk> {
964 let current_size = self.current_size;
965 let chunk_size = chunk.cardinality();
966
967 tracing::trace!(
968 current_size,
969 chunk_size,
970 max_size = self.max_size,
971 "checking chunk size"
972 );
973 let should_flush_chunk = current_size + chunk_size > self.max_size;
974 if should_flush_chunk {
975 tracing::trace!(start_seq_id, end_seq_id, epoch, "flushing chunk",);
976 Some(chunk)
977 } else {
978 tracing::trace!(start_seq_id, end_seq_id, epoch, "buffering chunk",);
979 self.add_chunk_to_buffer(chunk, start_seq_id, end_seq_id, epoch);
980 None
981 }
982 }
983
984 fn add_flushed_item_to_buffer(
987 &mut self,
988 start_seq_id: SeqIdType,
989 end_seq_id: SeqIdType,
990 new_vnode_bitmap: Bitmap,
991 epoch: u64,
992 ) {
993 let new_chunk_size = end_seq_id - start_seq_id + 1;
994
995 if let Some((
996 item_epoch,
997 LogStoreBufferItem::Flushed {
998 start_seq_id: prev_start_seq_id,
999 end_seq_id: prev_end_seq_id,
1000 vnode_bitmap,
1001 ..
1002 },
1003 )) = self.buffer.back_mut()
1004 && let flushed_chunk_size = *prev_end_seq_id - *prev_start_seq_id + 1
1005 && let projected_flushed_chunk_size = flushed_chunk_size + new_chunk_size
1006 && projected_flushed_chunk_size as u32 <= self.max_chunk_size
1007 {
1008 assert!(
1009 *prev_end_seq_id < start_seq_id,
1010 "prev end_seq_id {} should be smaller than current start_seq_id {}",
1011 end_seq_id,
1012 start_seq_id
1013 );
1014 assert_eq!(
1015 epoch, *item_epoch,
1016 "epoch of newly added flushed item must be the same as the last flushed item"
1017 );
1018 *prev_end_seq_id = end_seq_id;
1019 *vnode_bitmap |= new_vnode_bitmap;
1020 } else {
1021 let chunk_id = self.next_chunk_id;
1022 self.next_chunk_id += 1;
1023 self.buffer.push_back((
1024 epoch,
1025 LogStoreBufferItem::Flushed {
1026 start_seq_id,
1027 end_seq_id,
1028 vnode_bitmap: new_vnode_bitmap,
1029 chunk_id,
1030 },
1031 ));
1032 self.flushed_count += 1;
1033 tracing::trace!(
1034 "adding flushed item to buffer: start_seq_id: {start_seq_id}, end_seq_id: {end_seq_id}, chunk_id: {chunk_id}"
1035 );
1036 }
1037 self.update_unconsumed_buffer_metrics();
1039 }
1040
1041 fn add_chunk_to_buffer(
1042 &mut self,
1043 chunk: StreamChunk,
1044 start_seq_id: SeqIdType,
1045 end_seq_id: SeqIdType,
1046 epoch: u64,
1047 ) {
1048 let chunk_id = self.next_chunk_id;
1049 self.next_chunk_id += 1;
1050 self.current_size += chunk.cardinality();
1051 self.buffer.push_back((
1052 epoch,
1053 LogStoreBufferItem::StreamChunk {
1054 chunk,
1055 start_seq_id,
1056 end_seq_id,
1057 flushed: false,
1058 chunk_id,
1059 },
1060 ));
1061 self.update_unconsumed_buffer_metrics();
1062 }
1063
1064 fn pop_front(&mut self) -> Option<(u64, LogStoreBufferItem)> {
1065 let item = self.buffer.pop_front();
1066 match &item {
1067 Some((_, LogStoreBufferItem::Flushed { .. })) => {
1068 self.flushed_count -= 1;
1069 }
1070 Some((_, LogStoreBufferItem::StreamChunk { chunk, .. })) => {
1071 self.current_size -= chunk.cardinality();
1072 }
1073 _ => {}
1074 }
1075 self.update_unconsumed_buffer_metrics();
1076 item
1077 }
1078
1079 fn update_unconsumed_buffer_metrics(&self) {
1080 let mut epoch_count = 0;
1081 let mut row_count = 0;
1082 for (_, item) in &self.buffer {
1083 match item {
1084 LogStoreBufferItem::StreamChunk { chunk, .. } => {
1085 row_count += chunk.cardinality();
1086 }
1087 LogStoreBufferItem::Flushed {
1088 start_seq_id,
1089 end_seq_id,
1090 ..
1091 } => {
1092 row_count += (end_seq_id - start_seq_id) as usize;
1093 }
1094 LogStoreBufferItem::Barrier { .. } => {
1095 epoch_count += 1;
1096 }
1097 }
1098 }
1099 self.metrics.buffer_unconsumed_epoch_count.set(epoch_count);
1100 self.metrics.buffer_unconsumed_row_count.set(row_count as _);
1101 self.metrics
1102 .buffer_unconsumed_item_count
1103 .set(self.buffer.len() as _);
1104 self.metrics.buffer_unconsumed_min_epoch.set(
1105 self.buffer
1106 .front()
1107 .map(|(epoch, _)| *epoch)
1108 .unwrap_or_default() as _,
1109 );
1110 }
1111}
1112
1113impl<S> Execute for SyncedKvLogStoreExecutor<S>
1114where
1115 S: StateStore,
1116{
1117 fn execute(self: Box<Self>) -> BoxedMessageStream {
1118 self.execute_monitored().boxed()
1119 }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124 use itertools::Itertools;
1125 use pretty_assertions::assert_eq;
1126 use risingwave_common::catalog::Field;
1127 use risingwave_common::hash::VirtualNode;
1128 use risingwave_common::test_prelude::*;
1129 use risingwave_common::util::epoch::test_epoch;
1130 use risingwave_storage::memory::MemoryStateStore;
1131
1132 use super::*;
1133 use crate::assert_stream_chunk_eq;
1134 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
1135 use crate::common::log_store_impl::kv_log_store::test_utils::{
1136 check_stream_chunk_eq, gen_test_log_store_table, test_payload_schema,
1137 };
1138 use crate::executor::sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
1139 use crate::executor::test_utils::MockSource;
1140
1141 fn init_logger() {
1142 let _ = tracing_subscriber::fmt()
1143 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1144 .with_ansi(false)
1145 .try_init();
1146 }
1147
1148 #[tokio::test]
1150 async fn test_read_write_buffer() {
1151 init_logger();
1152
1153 let pk_info = &KV_LOG_STORE_V2_INFO;
1154 let column_descs = test_payload_schema(pk_info);
1155 let fields = column_descs
1156 .into_iter()
1157 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1158 .collect_vec();
1159 let schema = Schema { fields };
1160 let pk_indices = vec![0];
1161 let (mut tx, source) = MockSource::channel();
1162 let source = source.into_executor(schema.clone(), pk_indices.clone());
1163
1164 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1165
1166 let table = gen_test_log_store_table(pk_info);
1167
1168 let log_store_executor = SyncedKvLogStoreExecutor::new(
1169 ActorContext::for_test(123),
1170 table.id,
1171 SyncedKvLogStoreMetrics::for_test(),
1172 LogStoreRowSerde::new(&table, vnodes, pk_info),
1173 MemoryStateStore::new(),
1174 10,
1175 256,
1176 source,
1177 Duration::from_millis(256),
1178 )
1179 .boxed();
1180
1181 tx.push_barrier(test_epoch(1), false);
1183
1184 let chunk_1 = StreamChunk::from_pretty(
1185 " I T
1186 + 5 10
1187 + 6 10
1188 + 8 10
1189 + 9 10
1190 + 10 11",
1191 );
1192
1193 let chunk_2 = StreamChunk::from_pretty(
1194 " I T
1195 - 5 10
1196 - 6 10
1197 - 8 10
1198 U- 9 10
1199 U+ 10 11",
1200 );
1201
1202 tx.push_chunk(chunk_1.clone());
1203 tx.push_chunk(chunk_2.clone());
1204
1205 let mut stream = log_store_executor.execute();
1206
1207 match stream.next().await {
1208 Some(Ok(Message::Barrier(barrier))) => {
1209 assert_eq!(barrier.epoch.curr, test_epoch(1));
1210 }
1211 other => panic!("Expected a barrier message, got {:?}", other),
1212 }
1213
1214 match stream.next().await {
1215 Some(Ok(Message::Chunk(chunk))) => {
1216 assert_stream_chunk_eq!(chunk, chunk_1);
1217 }
1218 other => panic!("Expected a chunk message, got {:?}", other),
1219 }
1220
1221 match stream.next().await {
1222 Some(Ok(Message::Chunk(chunk))) => {
1223 assert_stream_chunk_eq!(chunk, chunk_2);
1224 }
1225 other => panic!("Expected a chunk message, got {:?}", other),
1226 }
1227
1228 tx.push_barrier(test_epoch(2), false);
1229
1230 match stream.next().await {
1231 Some(Ok(Message::Barrier(barrier))) => {
1232 assert_eq!(barrier.epoch.curr, test_epoch(2));
1233 }
1234 other => panic!("Expected a barrier message, got {:?}", other),
1235 }
1236 }
1237
1238 #[tokio::test]
1244 async fn test_barrier_persisted_read() {
1245 init_logger();
1246
1247 let pk_info = &KV_LOG_STORE_V2_INFO;
1248 let column_descs = test_payload_schema(pk_info);
1249 let fields = column_descs
1250 .into_iter()
1251 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1252 .collect_vec();
1253 let schema = Schema { fields };
1254 let pk_indices = vec![0];
1255 let (mut tx, source) = MockSource::channel();
1256 let source = source.into_executor(schema.clone(), pk_indices.clone());
1257
1258 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1259
1260 let table = gen_test_log_store_table(pk_info);
1261
1262 let log_store_executor = SyncedKvLogStoreExecutor::new(
1263 ActorContext::for_test(123),
1264 table.id,
1265 SyncedKvLogStoreMetrics::for_test(),
1266 LogStoreRowSerde::new(&table, vnodes, pk_info),
1267 MemoryStateStore::new(),
1268 10,
1269 256,
1270 source,
1271 Duration::from_millis(256),
1272 )
1273 .boxed();
1274
1275 tx.push_barrier(test_epoch(1), false);
1277
1278 let chunk_1 = StreamChunk::from_pretty(
1279 " I T
1280 + 5 10
1281 + 6 10
1282 + 8 10
1283 + 9 10
1284 + 10 11",
1285 );
1286
1287 let chunk_2 = StreamChunk::from_pretty(
1288 " I T
1289 - 5 10
1290 - 6 10
1291 - 8 10
1292 U- 10 11
1293 U+ 10 10",
1294 );
1295
1296 tx.push_chunk(chunk_1.clone());
1297 tx.push_chunk(chunk_2.clone());
1298
1299 tx.push_barrier(test_epoch(2), false);
1300
1301 let mut stream = log_store_executor.execute();
1302
1303 match stream.next().await {
1304 Some(Ok(Message::Barrier(barrier))) => {
1305 assert_eq!(barrier.epoch.curr, test_epoch(1));
1306 }
1307 other => panic!("Expected a barrier message, got {:?}", other),
1308 }
1309
1310 match stream.next().await {
1311 Some(Ok(Message::Chunk(chunk))) => {
1312 assert_stream_chunk_eq!(chunk, chunk_1);
1313 }
1314 other => panic!("Expected a chunk message, got {:?}", other),
1315 }
1316
1317 match stream.next().await {
1318 Some(Ok(Message::Chunk(chunk))) => {
1319 assert_stream_chunk_eq!(chunk, chunk_2);
1320 }
1321 other => panic!("Expected a chunk message, got {:?}", other),
1322 }
1323
1324 match stream.next().await {
1325 Some(Ok(Message::Barrier(barrier))) => {
1326 assert_eq!(barrier.epoch.curr, test_epoch(2));
1327 }
1328 other => panic!("Expected a barrier message, got {:?}", other),
1329 }
1330 }
1331
1332 #[tokio::test]
1335 async fn test_max_chunk_persisted_read() {
1336 init_logger();
1337
1338 let pk_info = &KV_LOG_STORE_V2_INFO;
1339 let column_descs = test_payload_schema(pk_info);
1340 let fields = column_descs
1341 .into_iter()
1342 .map(|desc| Field::new(desc.name.clone(), desc.data_type.clone()))
1343 .collect_vec();
1344 let schema = Schema { fields };
1345 let pk_indices = vec![0];
1346 let (mut tx, source) = MockSource::channel();
1347 let source = source.into_executor(schema.clone(), pk_indices.clone());
1348
1349 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
1350
1351 let table = gen_test_log_store_table(pk_info);
1352
1353 let log_store_executor = SyncedKvLogStoreExecutor::new(
1354 ActorContext::for_test(123),
1355 table.id,
1356 SyncedKvLogStoreMetrics::for_test(),
1357 LogStoreRowSerde::new(&table, vnodes, pk_info),
1358 MemoryStateStore::new(),
1359 0,
1360 256,
1361 source,
1362 Duration::from_millis(256),
1363 )
1364 .boxed();
1365
1366 tx.push_barrier(test_epoch(1), false);
1368
1369 let chunk_1 = StreamChunk::from_pretty(
1370 " I T
1371 + 5 10
1372 + 6 10
1373 + 8 10
1374 + 9 10
1375 + 10 11",
1376 );
1377
1378 let chunk_2 = StreamChunk::from_pretty(
1379 " I T
1380 - 5 10
1381 - 6 10
1382 - 8 10
1383 U- 10 11
1384 U+ 10 10",
1385 );
1386
1387 tx.push_chunk(chunk_1.clone());
1388 tx.push_chunk(chunk_2.clone());
1389
1390 tx.push_barrier(test_epoch(2), false);
1391
1392 let mut stream = log_store_executor.execute();
1393
1394 for i in 1..=2 {
1395 match stream.next().await {
1396 Some(Ok(Message::Barrier(barrier))) => {
1397 assert_eq!(barrier.epoch.curr, test_epoch(i));
1398 }
1399 other => panic!("Expected a barrier message, got {:?}", other),
1400 }
1401 }
1402
1403 match stream.next().await {
1404 Some(Ok(Message::Chunk(actual))) => {
1405 let expected = StreamChunk::from_pretty(
1406 " I T
1407 + 5 10
1408 + 6 10
1409 + 8 10
1410 + 9 10
1411 + 10 11
1412 - 5 10
1413 - 6 10
1414 - 8 10
1415 U- 10 11
1416 U+ 10 10",
1417 );
1418 assert_stream_chunk_eq!(actual, expected);
1419 }
1420 other => panic!("Expected a chunk message, got {:?}", other),
1421 }
1422 }
1423}