1use std::collections::VecDeque;
16use std::future::{Future, pending};
17use std::pin::Pin;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::future::{Either, select};
22use pin_project::pin_project;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::must_match;
25use risingwave_pb::stream_plan;
26use risingwave_storage::StateStore;
27use risingwave_storage::store::StateStoreRead;
28use rw_futures_util::drop_either_future;
29use tokio::sync::mpsc::UnboundedReceiver;
30
31use super::{DispatchExecutor, DispatchExecutorInner, dispatch_message_batch};
32use crate::common::log_store_impl::kv_log_store::reader::LogStoreReadStateStreamRangeStart;
33use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
34use crate::common::log_store_impl::kv_log_store::state::LogStoreReadState;
35use crate::common::log_store_impl::kv_log_store::{
36 FIRST_SEQ_ID, KV_LOG_STORE_V2_INFO, LogStoreVnodeProgress,
37};
38use crate::executor::prelude::*;
39use crate::executor::sync_kv_log_store::{
40 ReadFuture, SyncKvLogStoreContext, SyncedKvLogStoreExecutor, SyncedLogStoreBuffer, WriteFuture,
41 WriteFutureEvent,
42};
43use crate::executor::{MessageBatch, StreamConsumer, SyncedKvLogStoreMetrics};
44use crate::task::NewOutputRequest;
45
46pub struct SyncLogStoreDispatchExecutor<S: StateStore> {
52 pub(super) input: Executor,
53 pub(super) inner: DispatchExecutorInner,
54 pub(super) log_store_context: SyncKvLogStoreContext<S>,
55}
56
57impl<S: StateStore> SyncLogStoreDispatchExecutor<S> {
58 pub(crate) async fn new(
59 input: Executor,
60 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
61 dispatchers: Vec<stream_plan::Dispatcher>,
62 actor_context: &ActorContextRef,
63 sync: &stream_plan::SyncLogStoreNode,
64 vnode_bitmap: Option<Bitmap>,
65 state_store: S,
66 ) -> StreamResult<Self> {
67 let chunk_size = actor_context.config.developer.chunk_size;
68 let fragment_id = actor_context.fragment_id;
69 let log_store_metrics = SyncedKvLogStoreMetrics::new(
70 &actor_context.streaming_metrics,
71 actor_context.id,
72 fragment_id,
73 "sync_log_store_dispatch",
74 "sync_log_store_dispatch",
75 );
76
77 let table = sync
78 .log_store_table
79 .as_ref()
80 .ok_or_else(|| anyhow!("missing log_store_table in SyncLogStoreNode"))?;
81
82 let pause_duration_ms = actor_context
83 .config
84 .developer
85 .sync_log_store_pause_duration_ms;
86 let max_buffer_size = actor_context.config.developer.sync_log_store_buffer_size;
87
88 let serde =
89 LogStoreRowSerde::new(table, vnode_bitmap.map(Into::into), &KV_LOG_STORE_V2_INFO);
90 let log_store_context = SyncKvLogStoreContext {
91 table_id: table.id,
92 fragment_id,
93 serde,
94 state_store,
95 max_buffer_size,
96 pause_duration_ms: Duration::from_millis(pause_duration_ms as _),
97 aligned: sync.aligned,
98 chunk_size,
99 metrics: log_store_metrics,
100 };
101
102 Self::new_with_log_store_context(
103 input,
104 new_output_request_rx,
105 dispatchers,
106 actor_context,
107 log_store_context,
108 )
109 .await
110 }
111
112 async fn new_with_log_store_context(
113 input: Executor,
114 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
115 dispatchers: Vec<stream_plan::Dispatcher>,
116 actor_context: &ActorContextRef,
117 log_store_context: SyncKvLogStoreContext<S>,
118 ) -> StreamResult<Self> {
119 let DispatchExecutor { input, inner } =
120 DispatchExecutor::new(input, new_output_request_rx, dispatchers, actor_context).await?;
121
122 tracing::info!(
123 actor_id = %actor_context.id,
124 "synclogstore dispatch executor info"
125 );
126
127 Ok(Self {
128 input,
129 inner,
130 log_store_context,
131 })
132 }
133}
134
135type DispatchingFuture =
136 impl Future<Output = (DispatchExecutorInner, StreamResult<Option<Barrier>>)> + 'static;
137
138#[define_opaque(DispatchingFuture)]
139fn dispatching_future(mut inner: DispatchExecutorInner, message: Message) -> DispatchingFuture {
140 async move {
141 let batch: MessageBatch = message.into();
142 let r = dispatch_message_batch(&mut inner, batch)
143 .await
144 .map(|barrier_batch| {
145 barrier_batch.map(|mut barrier_batch| {
146 debug_assert_eq!(barrier_batch.len(), 1);
147 barrier_batch
148 .pop()
149 .expect("barrier batch should contain one barrier")
150 })
151 });
152 (inner, r)
153 }
154}
155
156#[pin_project(project = ConsumerFutureProj, project_replace = ConsumerFutureProjReplace)]
159enum ConsumerFuture {
160 ReadingChunk { inner: DispatchExecutorInner },
163 Dispatching {
166 #[pin]
167 future: DispatchingFuture,
168 barrier_queue: VecDeque<Message>,
169 },
170 PlaceHolder,
172}
173
174enum ConsumerFutureEvent {
175 BarrierDispatched(Barrier),
176 CleanStateReached,
177}
178
179impl ConsumerFuture {
180 fn dispatch(inner: DispatchExecutorInner, message: Message) -> Self {
181 tracing::trace!("consumer_future: dispatching future created");
182 Self::Dispatching {
183 future: dispatching_future(inner, message),
184 barrier_queue: VecDeque::new(),
185 }
186 }
187
188 fn read_chunk(inner: DispatchExecutorInner) -> Self {
189 tracing::trace!("consumer_future: reading chunk future created");
190 Self::ReadingChunk { inner }
191 }
192
193 fn push_barrier(mut self: Pin<&mut Self>, barrier: Barrier) {
194 let message = Message::Barrier(barrier);
195 match self.as_mut().project() {
196 ConsumerFutureProj::ReadingChunk { .. } => {
197 let inner = must_match!(
198 self.as_mut().project_replace(ConsumerFuture::PlaceHolder),
199 ConsumerFutureProjReplace::ReadingChunk { inner } => inner
200 );
201 self.set(Self::dispatch(inner, message));
202 }
203 ConsumerFutureProj::Dispatching { barrier_queue, .. } => {
204 barrier_queue.push_front(message);
205 }
206 ConsumerFutureProj::PlaceHolder => {
207 unreachable!("ConsumerFuture::PlaceHolder should be handled!")
208 }
209 }
210 }
211
212 #[expect(clippy::too_many_arguments)]
213 async fn next_event<S: StateStoreRead>(
214 mut self: Pin<&mut Self>,
215 read_future: &mut ReadFuture<S>,
216 read_paused: bool,
217 clean_state: &mut bool,
218 progress: &mut LogStoreVnodeProgress,
219 read_state: &LogStoreReadState<S>,
220 buffer: &mut SyncedLogStoreBuffer,
221 metrics: &SyncedKvLogStoreMetrics,
222 ) -> StreamResult<ConsumerFutureEvent> {
223 loop {
224 match self.as_mut().project() {
225 ConsumerFutureProj::ReadingChunk { .. } => {
226 if read_paused {
227 pending().await
228 }
229
230 let chunk = read_future
231 .next_chunk(progress, read_state, buffer, metrics)
232 .await?;
233 metrics.total_read_count.inc_by(chunk.cardinality() as _);
234
235 let clean_state_reached =
236 read_future.mark_clean_state(clean_state, buffer, metrics);
237 let inner = must_match!(
238 self.as_mut().project_replace(ConsumerFuture::PlaceHolder),
239 ConsumerFutureProjReplace::ReadingChunk { inner } => inner
240 );
241 self.set(Self::dispatch(inner, Message::Chunk(chunk)));
242
243 if clean_state_reached {
244 return Ok(ConsumerFutureEvent::CleanStateReached);
245 }
246 continue;
247 }
248 ConsumerFutureProj::Dispatching {
249 future,
250 barrier_queue,
251 } => {
252 let (inner, result) = future.await;
253 let barrier = result?;
254
255 if let Some(next_barrier) = barrier_queue.pop_back() {
256 tracing::trace!("consumer_future: dispatching future created");
257 let ConsumerFutureProj::Dispatching { mut future, .. } =
258 self.as_mut().project()
259 else {
260 unreachable!("ConsumerFuture::ReadingChunk should be handled!")
261 };
262 future.set(dispatching_future(inner, next_barrier));
263 } else {
264 self.set(Self::read_chunk(inner));
265 }
266
267 if let Some(barrier) = barrier {
268 return Ok(ConsumerFutureEvent::BarrierDispatched(barrier));
269 }
270 }
271 ConsumerFutureProj::PlaceHolder => {
272 unreachable!("ConsumerFuture::PlaceHolder should be handled!")
273 }
274 }
275 }
276 }
277}
278
279impl<S: StateStore> StreamConsumer for SyncLogStoreDispatchExecutor<S> {
280 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
281
282 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
283 #[try_stream]
284 async move {
285 let actor_id = self.inner.actor_id;
286 let log_store_config = self.log_store_context;
287
288 let mut input = self.input.execute();
289
290 let first_barrier = expect_first_barrier(&mut input).await?;
291 let first_write_epoch = first_barrier.epoch;
292
293 let first_barrier_batch = dispatch_message_batch(
295 &mut self.inner,
296 Message::Barrier(first_barrier.clone()).into(),
297 )
298 .await?;
299 debug_assert_eq!(
300 first_barrier_batch
301 .as_ref()
302 .map(|barrier_batch| barrier_batch.len()),
303 Some(1)
304 );
305 yield first_barrier.clone();
306
307 let (read_state, initial_write_state) =
308 SyncedKvLogStoreExecutor::<S>::init_local_log_store_state(
309 &log_store_config,
310 first_write_epoch,
311 )
312 .await?;
313
314 let initial_write_epoch = first_write_epoch;
315 let mut pause_stream = first_barrier.is_pause_on_startup();
316
317 if log_store_config.aligned {
318 let aligned_stream = SyncedKvLogStoreExecutor::<S>::aligned_message_stream(
319 actor_id,
320 input,
321 read_state,
322 initial_write_state,
323 log_store_config.metrics.clone(),
324 initial_write_epoch,
325 );
326
327 #[for_await]
328 for message in aligned_stream {
329 if let Some(barrier_batch) =
330 dispatch_message_batch(&mut self.inner, message?.into()).await?
331 {
332 debug_assert_eq!(barrier_batch.len(), 1);
334 for barrier in barrier_batch {
335 yield barrier;
336 }
337 }
338 }
339 return Ok(());
340 }
341
342 let mut seq_id = FIRST_SEQ_ID;
343 let mut buffer = SyncedLogStoreBuffer::new(
344 log_store_config.max_buffer_size,
345 log_store_config.chunk_size,
346 &log_store_config.metrics,
347 );
348
349 let log_store_stream = read_state
350 .read_persisted_log_store(
351 log_store_config.metrics.persistent_log_read_metrics.clone(),
352 initial_write_epoch.curr,
353 LogStoreReadStateStreamRangeStart::Unbounded,
354 )
355 .await?;
356
357 let mut log_store_stream = tokio_stream::StreamExt::peekable(log_store_stream);
358 let mut clean_state = log_store_stream.peek().await.is_none();
359 tracing::trace!(?clean_state);
360
361 let mut progress = LogStoreVnodeProgress::None;
362 let mut read_future_state = ReadFuture::ReadingPersistedStream(log_store_stream);
363 let consumer_future_state = ConsumerFuture::ReadingChunk { inner: self.inner };
364 pin_mut!(consumer_future_state);
365
366 let mut write_future_state =
367 WriteFuture::receive_from_upstream(input, initial_write_state);
368 let mut end_of_stream = false;
369
370 loop {
371 let select_result = {
372 let consumer_future = async {
373 consumer_future_state
374 .as_mut()
375 .next_event(
376 &mut read_future_state,
377 pause_stream,
378 &mut clean_state,
379 &mut progress,
380 &read_state,
381 &mut buffer,
382 &log_store_config.metrics,
383 )
384 .await
385 };
386 pin_mut!(consumer_future);
387 let write_future = async {
388 if end_of_stream {
389 pending().await
390 } else {
391 write_future_state
392 .next_event(&log_store_config.metrics)
393 .await
394 }
395 };
396 pin_mut!(write_future);
397 let output = select(write_future, consumer_future).await;
398 drop_either_future(output)
399 };
400
401 match select_result {
402 Either::Left(_write_result) => {
403 drop(write_future_state);
404 let (stream, mut write_state, either) = _write_result?;
405 match either {
406 WriteFutureEvent::UpstreamMessageReceived(msg) => match msg {
407 Message::Chunk(chunk) => {
408 let (new_seq_id, next_write_future) =
409 SyncedKvLogStoreExecutor::<S>::process_upstream_chunk(
410 seq_id,
411 stream,
412 write_state,
413 chunk,
414 &mut buffer,
415 );
416 seq_id = new_seq_id;
417 write_future_state = next_write_future;
418 }
419 Message::Barrier(barrier) => {
420 if clean_state
421 && barrier.kind.is_checkpoint()
422 && !buffer.is_empty()
423 {
424 write_future_state = WriteFuture::paused(
425 log_store_config.pause_duration_ms,
426 barrier,
427 stream,
428 write_state,
429 );
430 clean_state = false;
431 log_store_config.metrics.unclean_state.inc();
432 } else {
433 SyncedKvLogStoreExecutor::<S>::apply_pause_resume_mutation(
434 &barrier,
435 &mut pause_stream,
436 );
437 let write_state_post_write_barrier =
438 SyncedKvLogStoreExecutor::<S>::write_barrier(
439 actor_id,
440 &mut write_state,
441 barrier.clone(),
442 &log_store_config.metrics,
443 progress.take(),
444 &mut buffer,
445 )
446 .await?;
447 seq_id = FIRST_SEQ_ID;
448 barrier.assume_no_update_vnode_bitmap(actor_id)?;
449
450 write_state_post_write_barrier
451 .post_yield_barrier(None)
452 .await?;
453
454 let is_stop_barrier = barrier.is_stop(actor_id);
455 if is_stop_barrier {
456 end_of_stream = true;
459 write_future_state = WriteFuture::Empty;
460 } else {
461 write_future_state = WriteFuture::receive_from_upstream(
462 stream,
463 write_state,
464 );
465 }
466 consumer_future_state.as_mut().push_barrier(barrier);
467 }
468 }
469 Message::Watermark(_watermark) => {
471 write_future_state =
472 WriteFuture::receive_from_upstream(stream, write_state);
473 }
474 },
475 WriteFutureEvent::ChunkFlushed(info) => {
476 write_future_state =
477 SyncedKvLogStoreExecutor::<S>::process_flushed_chunk(
478 stream,
479 write_state,
480 info,
481 &mut buffer,
482 &log_store_config.metrics,
483 );
484 }
485 }
486 }
487 Either::Right(consumer_result) => {
488 let event = consumer_result?;
489 match event {
490 ConsumerFutureEvent::CleanStateReached => {
491 if let WriteFuture::Paused { sleep_future, .. } =
492 &mut write_future_state
493 {
494 assert!(buffer.has_available_capacity());
495 *sleep_future = None;
496 }
497 }
498 ConsumerFutureEvent::BarrierDispatched(barrier) => {
499 yield barrier;
500 }
501 }
502 }
503 }
504 }
505 }
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use std::sync::Arc;
512
513 use futures::StreamExt;
514 use risingwave_common::array::{StreamChunk, StreamChunkTestExt};
515 use risingwave_common::bitmap::Bitmap;
516 use risingwave_common::catalog::{Field, Schema};
517 use risingwave_common::hash::VirtualNode;
518 use risingwave_common::types::DataType;
519 use risingwave_common::util::epoch::test_epoch;
520 use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
521 use risingwave_storage::memory::MemoryStateStore;
522 use tokio::sync::mpsc::unbounded_channel;
523 use tokio::time::{Duration, timeout};
524
525 use super::*;
526 use crate::assert_stream_chunk_eq;
527 use crate::common::log_store_impl::kv_log_store::KV_LOG_STORE_V2_INFO;
528 use crate::common::log_store_impl::kv_log_store::serde::LogStoreRowSerde;
529 use crate::common::log_store_impl::kv_log_store::test_utils::{
530 check_stream_chunk_eq, gen_test_log_store_table,
531 };
532 use crate::executor::exchange::permit::channel_for_test;
533 use crate::executor::test_utils::MockSource;
534 use crate::executor::{ActorContext, BarrierInner as Barrier};
535 use crate::task::ActorId;
536
537 const ACTOR_ID: u32 = 4242;
538 const DOWNSTREAM_ACTOR_ID: u32 = 5252;
539
540 fn init_logger() {
541 let _ = tracing_subscriber::fmt()
542 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
543 .with_ansi(false)
544 .try_init();
545 }
546
547 async fn run_barrier_chunk_ordering_test(aligned: bool) {
548 init_logger();
549
550 let actor_id = ActorId::new(ACTOR_ID);
551 let downstream_actor = ActorId::new(DOWNSTREAM_ACTOR_ID);
552 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
553 let (down_tx, mut down_rx) = channel_for_test();
554 new_output_request_tx
555 .send((downstream_actor, NewOutputRequest::Local(down_tx)))
556 .unwrap();
557
558 let barrier1 = Barrier::new_test_barrier(test_epoch(1));
559 let barrier2 = Barrier::new_test_barrier(test_epoch(2));
560 let chunk_1 = StreamChunk::from_pretty(
561 " I T
562 + 5 10
563 + 6 10
564 + 8 10
565 + 9 10
566 + 10 11",
567 );
568 let chunk_2 = StreamChunk::from_pretty(
569 " I T
570 - 5 10
571 - 6 10
572 - 8 10
573 U- 10 11
574 U+ 10 10",
575 );
576 let dispatcher = stream_plan::Dispatcher {
577 r#type: DispatcherType::Simple as _,
578 dispatcher_id: 7.into(),
579 downstream_actor_id: vec![DOWNSTREAM_ACTOR_ID.into()],
580 output_mapping: PbDispatchOutputMapping::identical(2).into(),
581 ..Default::default()
582 };
583 let pk_info = &KV_LOG_STORE_V2_INFO;
584 let table = gen_test_log_store_table(pk_info);
585 let vnodes = Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)));
586 let serde = LogStoreRowSerde::new(&table, vnodes, pk_info);
587 let log_store_config = SyncKvLogStoreContext {
588 table_id: table.id,
589 fragment_id: 0.into(),
590 serde,
591 state_store: MemoryStateStore::new(),
592 max_buffer_size: 1024,
593 pause_duration_ms: Duration::from_millis(10),
594 aligned,
595 chunk_size: 1024,
596 metrics: SyncedKvLogStoreMetrics::for_test(),
597 };
598 let (mut input_tx, source) = MockSource::channel();
599 let input = source.into_executor(
600 Schema {
601 fields: vec![
602 Field::unnamed(DataType::Int64),
603 Field::unnamed(DataType::Varchar),
604 ],
605 },
606 vec![0],
607 );
608
609 let executor = SyncLogStoreDispatchExecutor::new_with_log_store_context(
610 input,
611 new_output_request_rx,
612 vec![dispatcher],
613 &ActorContext::for_test(actor_id),
614 log_store_config,
615 )
616 .await
617 .unwrap();
618
619 let (barrier_out_tx, mut barrier_out_rx) = unbounded_channel();
620 let barrier_driver = tokio::spawn(async move {
621 let barrier_stream = Box::new(executor).execute();
622 futures::pin_mut!(barrier_stream);
623 while let Some(item) = barrier_stream.next().await {
624 barrier_out_tx.send(item).ok();
625 }
626 });
627
628 input_tx.send_barrier(barrier1.clone());
629 let observed1 = timeout(Duration::from_secs(1), barrier_out_rx.recv())
630 .await
631 .unwrap()
632 .unwrap()
633 .unwrap();
634 assert_eq!(observed1.epoch.curr, test_epoch(1));
635
636 let msg = timeout(Duration::from_secs(1), down_rx.recv())
637 .await
638 .unwrap()
639 .expect("downstream should receive barrier(1)");
640 let barriers = msg.as_barrier_batch().unwrap();
641 assert_eq!(barriers.len(), 1);
642 assert_eq!(barriers[0].epoch.curr, test_epoch(1));
643
644 input_tx.push_chunk(chunk_1.clone());
645 input_tx.push_chunk(chunk_2.clone());
646 let msg = timeout(Duration::from_secs(1), down_rx.recv())
647 .await
648 .unwrap()
649 .expect("downstream should receive chunk(1)");
650 assert_stream_chunk_eq!(msg.as_chunk().unwrap(), chunk_1);
651
652 let msg = timeout(Duration::from_secs(1), down_rx.recv())
653 .await
654 .unwrap()
655 .expect("downstream should receive chunk(2)");
656 assert_stream_chunk_eq!(msg.as_chunk().unwrap(), chunk_2);
657
658 input_tx.send_barrier(barrier2.clone());
659 let msg = timeout(Duration::from_secs(1), down_rx.recv())
660 .await
661 .unwrap()
662 .expect("downstream should receive barrier(2)");
663 let barriers = msg.as_barrier_batch().unwrap();
664 assert_eq!(barriers.len(), 1);
665 assert_eq!(barriers[0].epoch.curr, test_epoch(2));
666
667 let observed2 = timeout(Duration::from_secs(1), barrier_out_rx.recv())
668 .await
669 .unwrap()
670 .unwrap()
671 .unwrap();
672 assert_eq!(observed2.epoch.curr, test_epoch(2));
673
674 barrier_driver.abort();
675 }
676
677 #[tokio::test]
680 async fn test_barrier_chunk_ordering_in_dispatch() {
681 run_barrier_chunk_ordering_test(false).await;
682 }
683
684 #[tokio::test]
685 async fn test_aligned_barrier_chunk_ordering_in_dispatch() {
686 run_barrier_chunk_ordering_test(true).await;
687 }
688}