1use std::collections::{BTreeMap, VecDeque};
16use std::pin::Pin;
17use std::task::{Context, Poll};
18
19use anyhow::Context as _;
20use futures::future::try_join_all;
21use futures::stream::{FusedStream, FuturesUnordered, StreamFuture};
22use prometheus::Histogram;
23use risingwave_common::array::StreamChunkBuilder;
24use risingwave_common::config::MetricLevel;
25use risingwave_common::metrics::LabelGuardedMetric;
26use tokio::sync::mpsc;
27use tokio::time::Instant;
28
29use super::exchange::input::BoxedInput;
30use super::watermark::*;
31use super::*;
32use crate::executor::exchange::input::{
33 assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
34};
35use crate::executor::prelude::*;
36use crate::task::LocalBarrierManager;
37
38pub(crate) enum MergeExecutorUpstream {
39 Singleton(BoxedInput),
40 Merge(SelectReceivers),
41}
42
43pub(crate) struct MergeExecutorInput {
44 upstream: MergeExecutorUpstream,
45 actor_context: ActorContextRef,
46 upstream_fragment_id: UpstreamFragmentId,
47 local_barrier_manager: LocalBarrierManager,
48 executor_stats: Arc<StreamingMetrics>,
49 pub(crate) info: ExecutorInfo,
50 chunk_size: usize,
51}
52
53impl MergeExecutorInput {
54 pub(crate) fn new(
55 upstream: MergeExecutorUpstream,
56 actor_context: ActorContextRef,
57 upstream_fragment_id: UpstreamFragmentId,
58 local_barrier_manager: LocalBarrierManager,
59 executor_stats: Arc<StreamingMetrics>,
60 info: ExecutorInfo,
61 chunk_size: usize,
62 ) -> Self {
63 Self {
64 upstream,
65 actor_context,
66 upstream_fragment_id,
67 local_barrier_manager,
68 executor_stats,
69 info,
70 chunk_size,
71 }
72 }
73
74 pub(crate) fn into_executor(self, barrier_rx: mpsc::UnboundedReceiver<Barrier>) -> Executor {
75 let fragment_id = self.actor_context.fragment_id;
76 let executor = match self.upstream {
77 MergeExecutorUpstream::Singleton(input) => ReceiverExecutor::new(
78 self.actor_context,
79 fragment_id,
80 self.upstream_fragment_id,
81 input,
82 self.local_barrier_manager,
83 self.executor_stats,
84 barrier_rx,
85 )
86 .boxed(),
87 MergeExecutorUpstream::Merge(inputs) => MergeExecutor::new(
88 self.actor_context,
89 fragment_id,
90 self.upstream_fragment_id,
91 inputs,
92 self.local_barrier_manager,
93 self.executor_stats,
94 barrier_rx,
95 self.chunk_size,
96 self.info.schema.clone(),
97 )
98 .boxed(),
99 };
100 (self.info, executor).into()
101 }
102}
103
104impl Stream for MergeExecutorInput {
105 type Item = DispatcherMessageStreamItem;
106
107 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108 match &mut self.get_mut().upstream {
109 MergeExecutorUpstream::Singleton(input) => input.poll_next_unpin(cx),
110 MergeExecutorUpstream::Merge(inputs) => inputs.poll_next_unpin(cx),
111 }
112 }
113}
114
115pub struct MergeExecutor {
118 actor_context: ActorContextRef,
120
121 upstreams: SelectReceivers,
123
124 fragment_id: FragmentId,
126
127 upstream_fragment_id: FragmentId,
129
130 local_barrier_manager: LocalBarrierManager,
131
132 metrics: Arc<StreamingMetrics>,
134
135 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
136
137 chunk_size: usize,
139
140 schema: Schema,
142}
143
144impl MergeExecutor {
145 #[allow(clippy::too_many_arguments)]
146 pub fn new(
147 ctx: ActorContextRef,
148 fragment_id: FragmentId,
149 upstream_fragment_id: FragmentId,
150 upstreams: SelectReceivers,
151 local_barrier_manager: LocalBarrierManager,
152 metrics: Arc<StreamingMetrics>,
153 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
154 chunk_size: usize,
155 schema: Schema,
156 ) -> Self {
157 Self {
158 actor_context: ctx,
159 upstreams,
160 fragment_id,
161 upstream_fragment_id,
162 local_barrier_manager,
163 metrics,
164 barrier_rx,
165 chunk_size,
166 schema,
167 }
168 }
169
170 #[cfg(test)]
171 pub fn for_test(
172 actor_id: ActorId,
173 inputs: Vec<super::exchange::permit::Receiver>,
174 local_barrier_manager: crate::task::LocalBarrierManager,
175 schema: Schema,
176 ) -> Self {
177 use super::exchange::input::LocalInput;
178 use crate::executor::exchange::input::Input;
179
180 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
181
182 let metrics = StreamingMetrics::unused();
183 let actor_ctx = ActorContext::for_test(actor_id);
184 let upstream = Self::new_select_receiver(
185 inputs
186 .into_iter()
187 .enumerate()
188 .map(|(idx, input)| LocalInput::new(input, idx as ActorId).boxed_input())
189 .collect(),
190 &metrics,
191 &actor_ctx,
192 );
193
194 Self::new(
195 actor_ctx,
196 514,
197 1919,
198 upstream,
199 local_barrier_manager,
200 metrics.into(),
201 barrier_rx,
202 100,
203 schema,
204 )
205 }
206
207 pub(crate) fn new_select_receiver(
208 upstreams: Vec<BoxedInput>,
209 metrics: &StreamingMetrics,
210 actor_context: &ActorContext,
211 ) -> SelectReceivers {
212 let merge_barrier_align_duration = if metrics.level >= MetricLevel::Debug {
213 Some(
214 metrics
215 .merge_barrier_align_duration
216 .with_guarded_label_values(&[
217 &actor_context.id.to_string(),
218 &actor_context.fragment_id.to_string(),
219 ]),
220 )
221 } else {
222 None
223 };
224
225 SelectReceivers::new(
227 actor_context.id,
228 upstreams,
229 merge_barrier_align_duration.clone(),
230 )
231 }
232
233 #[try_stream(ok = Message, error = StreamExecutorError)]
234 async fn execute_inner(mut self: Box<Self>) {
235 let select_all = self.upstreams;
236 let select_all = BufferChunks::new(select_all, self.chunk_size, self.schema);
237 let actor_id = self.actor_context.id;
238
239 let mut metrics = self.metrics.new_actor_input_metrics(
240 actor_id,
241 self.fragment_id,
242 self.upstream_fragment_id,
243 );
244
245 let mut start_time = Instant::now();
247 pin_mut!(select_all);
248 while let Some(msg) = select_all.next().await {
249 metrics
250 .actor_input_buffer_blocking_duration_ns
251 .inc_by(start_time.elapsed().as_nanos() as u64);
252 let msg: DispatcherMessage = msg?;
253 let mut msg: Message = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
254
255 match &mut msg {
256 Message::Watermark(_) => {
257 }
259 Message::Chunk(chunk) => {
260 metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
261 }
262 Message::Barrier(barrier) => {
263 tracing::debug!(
264 target: "events::stream::barrier::path",
265 actor_id = actor_id,
266 "receiver receives barrier from path: {:?}",
267 barrier.passed_actors
268 );
269 barrier.passed_actors.push(actor_id);
270
271 if let Some(Mutation::Update(UpdateMutation { dispatchers, .. })) =
272 barrier.mutation.as_deref()
273 {
274 if select_all
275 .upstream_actor_ids()
276 .iter()
277 .any(|actor_id| dispatchers.contains_key(actor_id))
278 {
279 select_all
281 .buffered_watermarks
282 .values_mut()
283 .for_each(|buffers| buffers.clear());
284 }
285 }
286
287 if let Some(update) =
288 barrier.as_update_merge(self.actor_context.id, self.upstream_fragment_id)
289 {
290 let new_upstream_fragment_id = update
291 .new_upstream_fragment_id
292 .unwrap_or(self.upstream_fragment_id);
293 let removed_upstream_actor_id: HashSet<_> =
294 if update.new_upstream_fragment_id.is_some() {
295 select_all.upstream_actor_ids().iter().copied().collect()
296 } else {
297 update.removed_upstream_actor_id.iter().copied().collect()
298 };
299
300 select_all
302 .buffered_watermarks
303 .values_mut()
304 .for_each(|buffers| buffers.clear());
305
306 if !update.added_upstream_actors.is_empty() {
307 let new_upstreams: Vec<_> = try_join_all(
309 update.added_upstream_actors.iter().map(|upstream_actor| {
310 new_input(
311 &self.local_barrier_manager,
312 self.metrics.clone(),
313 self.actor_context.id,
314 self.fragment_id,
315 upstream_actor,
316 new_upstream_fragment_id,
317 )
318 }),
319 )
320 .await
321 .context("failed to create upstream receivers")?;
322
323 let mut select_new = SelectReceivers::new(
326 self.actor_context.id,
327 new_upstreams,
328 select_all.merge_barrier_align_duration(),
329 );
330 let new_barrier = expect_first_barrier(&mut select_new).await?;
331 assert_equal_dispatcher_barrier(barrier, &new_barrier);
332
333 select_all.add_upstreams_from(select_new);
335
336 select_all
338 .buffered_watermarks
339 .values_mut()
340 .for_each(|buffers| {
341 buffers.add_buffers(
342 update
343 .added_upstream_actors
344 .iter()
345 .map(|actor| actor.actor_id),
346 )
347 });
348 }
349
350 if !removed_upstream_actor_id.is_empty() {
351 select_all.remove_upstreams(&removed_upstream_actor_id);
353
354 for buffers in select_all.buffered_watermarks.values_mut() {
355 buffers.remove_buffer(removed_upstream_actor_id.clone());
358 }
359 }
360
361 self.upstream_fragment_id = new_upstream_fragment_id;
362 metrics = self.metrics.new_actor_input_metrics(
363 actor_id,
364 self.fragment_id,
365 self.upstream_fragment_id,
366 );
367
368 select_all.update_actor_ids();
369 }
370
371 if barrier.is_stop(actor_id) {
372 yield msg;
373 break;
374 }
375 }
376 }
377
378 yield msg;
379 start_time = Instant::now();
380 }
381 }
382}
383
384impl Execute for MergeExecutor {
385 fn execute(self: Box<Self>) -> BoxedMessageStream {
386 self.execute_inner().boxed()
387 }
388}
389
390pub struct SelectReceivers {
392 barrier: Option<DispatcherBarrier>,
394 blocked: Vec<BoxedInput>,
396 active: FuturesUnordered<StreamFuture<BoxedInput>>,
398 upstream_actor_ids: Vec<ActorId>,
400
401 actor_id: u32,
403 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<ActorId>>,
405 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
407}
408
409impl Stream for SelectReceivers {
410 type Item = DispatcherMessageStreamItem;
411
412 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
413 if self.active.is_terminated() {
414 assert!(self.blocked.is_empty());
416 return Poll::Ready(None);
417 }
418
419 let mut start = None;
420 loop {
421 match futures::ready!(self.active.poll_next_unpin(cx)) {
422 Some((Some(Err(e)), _)) => {
424 return Poll::Ready(Some(Err(e)));
425 }
426 Some((Some(Ok(message)), remaining)) => {
428 let actor_id = remaining.actor_id();
429 match message {
430 DispatcherMessage::Chunk(chunk) => {
431 self.active.push(remaining.into_future());
433 return Poll::Ready(Some(Ok(DispatcherMessage::Chunk(chunk))));
434 }
435 DispatcherMessage::Watermark(watermark) => {
436 self.active.push(remaining.into_future());
438 if let Some(watermark) = self.handle_watermark(actor_id, watermark) {
439 return Poll::Ready(Some(Ok(DispatcherMessage::Watermark(
440 watermark,
441 ))));
442 }
443 }
444 DispatcherMessage::Barrier(barrier) => {
445 if self.blocked.is_empty()
447 && self.merge_barrier_align_duration.is_some()
448 {
449 start = Some(Instant::now());
450 }
451 self.blocked.push(remaining);
452 if let Some(current_barrier) = self.barrier.as_ref() {
453 if current_barrier.epoch != barrier.epoch {
454 return Poll::Ready(Some(Err(
455 StreamExecutorError::align_barrier(
456 current_barrier.clone().map_mutation(|_| None),
457 barrier.map_mutation(|_| None),
458 ),
459 )));
460 }
461 } else {
462 self.barrier = Some(barrier);
463 }
464 }
465 }
466 }
467 Some((None, _)) => unreachable!(),
475 None => {
477 if let Some(start) = start
478 && let Some(merge_barrier_align_duration) =
479 &self.merge_barrier_align_duration
480 {
481 merge_barrier_align_duration.observe(start.elapsed().as_secs_f64())
483 }
484 break;
485 }
486 }
487 }
488
489 assert!(self.active.is_terminated());
490 let barrier = self.barrier.take().unwrap();
491
492 let upstreams = std::mem::take(&mut self.blocked);
493 self.extend_active(upstreams);
494 assert!(!self.active.is_terminated());
495
496 Poll::Ready(Some(Ok(DispatcherMessage::Barrier(barrier))))
497 }
498}
499
500impl SelectReceivers {
501 fn new(
502 actor_id: u32,
503 upstreams: Vec<BoxedInput>,
504 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
505 ) -> Self {
506 assert!(!upstreams.is_empty());
507 let upstream_actor_ids = upstreams.iter().map(|input| input.actor_id()).collect();
508 let mut this = Self {
509 blocked: Vec::with_capacity(upstreams.len()),
510 active: Default::default(),
511 actor_id,
512 barrier: None,
513 upstream_actor_ids,
514 buffered_watermarks: Default::default(),
515 merge_barrier_align_duration,
516 };
517 this.extend_active(upstreams);
518 this
519 }
520
521 fn extend_active(&mut self, upstreams: impl IntoIterator<Item = BoxedInput>) {
524 assert!(self.blocked.is_empty() && self.barrier.is_none());
525
526 self.active
527 .extend(upstreams.into_iter().map(|s| s.into_future()));
528 }
529
530 fn upstream_actor_ids(&self) -> &[ActorId] {
531 &self.upstream_actor_ids
532 }
533
534 fn update_actor_ids(&mut self) {
535 self.upstream_actor_ids = self
536 .blocked
537 .iter()
538 .map(|input| input.actor_id())
539 .chain(
540 self.active
541 .iter()
542 .map(|input| input.get_ref().unwrap().actor_id()),
543 )
544 .collect();
545 }
546
547 fn handle_watermark(&mut self, actor_id: ActorId, watermark: Watermark) -> Option<Watermark> {
549 let col_idx = watermark.col_idx;
550 let watermarks = self
552 .buffered_watermarks
553 .entry(col_idx)
554 .or_insert_with(|| BufferedWatermarks::with_ids(self.upstream_actor_ids.clone()));
555 watermarks.handle_watermark(actor_id, watermark)
556 }
557
558 fn add_upstreams_from(&mut self, other: Self) {
561 assert!(self.blocked.is_empty() && self.barrier.is_none());
562 assert!(other.blocked.is_empty() && other.barrier.is_none());
563 assert_eq!(self.actor_id, other.actor_id);
564
565 self.active.extend(other.active);
566 }
567
568 fn remove_upstreams(&mut self, upstream_actor_ids: &HashSet<ActorId>) {
571 assert!(self.blocked.is_empty() && self.barrier.is_none());
572
573 let new_upstreams = std::mem::take(&mut self.active)
574 .into_iter()
575 .map(|s| s.into_inner().unwrap())
576 .filter(|u| !upstream_actor_ids.contains(&u.actor_id()));
577 self.extend_active(new_upstreams);
578 }
579
580 fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
581 self.merge_barrier_align_duration.clone()
582 }
583}
584
585struct BufferChunks<S: Stream> {
589 inner: S,
590 chunk_builder: StreamChunkBuilder,
591
592 pending_items: VecDeque<S::Item>,
594}
595
596impl<S: Stream> BufferChunks<S> {
597 pub(super) fn new(inner: S, chunk_size: usize, schema: Schema) -> Self {
598 assert!(chunk_size > 0);
599 let chunk_builder = StreamChunkBuilder::new(chunk_size, schema.data_types());
600 Self {
601 inner,
602 chunk_builder,
603 pending_items: VecDeque::new(),
604 }
605 }
606}
607
608impl<S: Stream> std::ops::Deref for BufferChunks<S> {
609 type Target = S;
610
611 fn deref(&self) -> &Self::Target {
612 &self.inner
613 }
614}
615
616impl<S: Stream> std::ops::DerefMut for BufferChunks<S> {
617 fn deref_mut(&mut self) -> &mut Self::Target {
618 &mut self.inner
619 }
620}
621
622impl<S: Stream> Stream for BufferChunks<S>
623where
624 S: Stream<Item = DispatcherMessageStreamItem> + Unpin,
625{
626 type Item = S::Item;
627
628 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
629 loop {
630 if let Some(item) = self.pending_items.pop_front() {
631 return Poll::Ready(Some(item));
632 }
633
634 match self.inner.poll_next_unpin(cx) {
635 Poll::Pending => {
636 return if let Some(chunk_out) = self.chunk_builder.take() {
637 Poll::Ready(Some(Ok(MessageInner::Chunk(chunk_out))))
638 } else {
639 Poll::Pending
640 };
641 }
642
643 Poll::Ready(Some(result)) => {
644 if let Ok(MessageInner::Chunk(chunk)) = result {
645 for row in chunk.records() {
646 if let Some(chunk_out) = self.chunk_builder.append_record(row) {
647 self.pending_items
648 .push_back(Ok(MessageInner::Chunk(chunk_out)));
649 }
650 }
651 } else {
652 return if let Some(chunk_out) = self.chunk_builder.take() {
653 self.pending_items.push_back(result);
654 Poll::Ready(Some(Ok(MessageInner::Chunk(chunk_out))))
655 } else {
656 Poll::Ready(Some(result))
657 };
658 }
659 }
660
661 Poll::Ready(None) => {
662 unreachable!("SelectReceivers should never return None");
664 }
665 }
666 }
667 }
668}
669
670#[cfg(test)]
671mod tests {
672 use std::sync::atomic::{AtomicBool, Ordering};
673 use std::time::Duration;
674
675 use assert_matches::assert_matches;
676 use futures::FutureExt;
677 use futures::future::try_join_all;
678 use risingwave_common::array::Op;
679 use risingwave_common::util::epoch::test_epoch;
680 use risingwave_pb::task_service::exchange_service_server::{
681 ExchangeService, ExchangeServiceServer,
682 };
683 use risingwave_pb::task_service::{
684 GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
685 };
686 use tokio::time::sleep;
687 use tokio_stream::wrappers::ReceiverStream;
688 use tonic::{Request, Response, Status, Streaming};
689
690 use super::*;
691 use crate::executor::exchange::input::{Input, LocalInput, RemoteInput};
692 use crate::executor::exchange::permit::channel_for_test;
693 use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
694 use crate::task::NewOutputRequest;
695 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
696 use crate::task::test_utils::helper_make_local_actor;
697
698 fn build_test_chunk(size: u64) -> StreamChunk {
699 let ops = vec![Op::Insert; size as usize];
700 StreamChunk::new(ops, vec![])
701 }
702
703 #[tokio::test]
704 async fn test_buffer_chunks() {
705 let test_env = LocalBarrierTestEnv::for_test().await;
706
707 let (tx, rx) = channel_for_test();
708 let input = LocalInput::new(rx, 1).boxed_input();
709 let mut buffer = BufferChunks::new(input, 100, Schema::new(vec![]));
710
711 tx.send(Message::Chunk(build_test_chunk(10)).into())
713 .await
714 .unwrap();
715 assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
716 assert_eq!(chunk.ops().len() as u64, 10);
717 });
718
719 tx.send(Message::Chunk(build_test_chunk(10)).into())
721 .await
722 .unwrap();
723 tx.send(Message::Chunk(build_test_chunk(10)).into())
724 .await
725 .unwrap();
726 assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
727 assert_eq!(chunk.ops().len() as u64, 20);
728 });
729
730 tx.send(
732 Message::Watermark(Watermark {
733 col_idx: 0,
734 data_type: DataType::Int64,
735 val: ScalarImpl::Int64(233),
736 })
737 .into(),
738 )
739 .await
740 .unwrap();
741 assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Watermark(watermark) => {
742 assert_eq!(watermark.val, ScalarImpl::Int64(233));
743 });
744
745 tx.send(Message::Chunk(build_test_chunk(10)).into())
747 .await
748 .unwrap();
749 tx.send(Message::Chunk(build_test_chunk(10)).into())
750 .await
751 .unwrap();
752 tx.send(
753 Message::Watermark(Watermark {
754 col_idx: 0,
755 data_type: DataType::Int64,
756 val: ScalarImpl::Int64(233),
757 })
758 .into(),
759 )
760 .await
761 .unwrap();
762 assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
763 assert_eq!(chunk.ops().len() as u64, 20);
764 });
765 assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Watermark(watermark) => {
766 assert_eq!(watermark.val, ScalarImpl::Int64(233));
767 });
768
769 let barrier = Barrier::new_test_barrier(test_epoch(1));
771 test_env.inject_barrier(&barrier, [2]);
772 tx.send(Message::Barrier(barrier.clone().into_dispatcher()).into())
773 .await
774 .unwrap();
775 assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => {
776 assert_eq!(barrier_epoch.curr, test_epoch(1));
777 });
778
779 tx.send(Message::Chunk(build_test_chunk(10)).into())
781 .await
782 .unwrap();
783 tx.send(Message::Chunk(build_test_chunk(10)).into())
784 .await
785 .unwrap();
786 let barrier = Barrier::new_test_barrier(test_epoch(2));
787 test_env.inject_barrier(&barrier, [2]);
788 tx.send(Message::Barrier(barrier.clone().into_dispatcher()).into())
789 .await
790 .unwrap();
791 assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
792 assert_eq!(chunk.ops().len() as u64, 20);
793 });
794 assert_matches!(buffer.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => {
795 assert_eq!(barrier_epoch.curr, test_epoch(2));
796 });
797 }
798
799 #[tokio::test]
800 async fn test_merger() {
801 const CHANNEL_NUMBER: usize = 10;
802 let mut txs = Vec::with_capacity(CHANNEL_NUMBER);
803 let mut rxs = Vec::with_capacity(CHANNEL_NUMBER);
804 for _i in 0..CHANNEL_NUMBER {
805 let (tx, rx) = channel_for_test();
806 txs.push(tx);
807 rxs.push(rx);
808 }
809 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
810 let actor_id = 233;
811 let mut handles = Vec::with_capacity(CHANNEL_NUMBER);
812
813 let epochs = (10..1000u64)
814 .step_by(10)
815 .map(|idx| (idx, test_epoch(idx)))
816 .collect_vec();
817 let mut prev_epoch = 0;
818 let prev_epoch = &mut prev_epoch;
819 let barriers: HashMap<_, _> = epochs
820 .iter()
821 .map(|(_, epoch)| {
822 let barrier = Barrier::with_prev_epoch_for_test(*epoch, *prev_epoch);
823 *prev_epoch = *epoch;
824 barrier_test_env.inject_barrier(&barrier, [actor_id]);
825 (*epoch, barrier)
826 })
827 .collect();
828 let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch)
829 .with_mutation(Mutation::Stop(HashSet::default()));
830 barrier_test_env.inject_barrier(&b2, [actor_id]);
831 barrier_test_env.flush_all_events().await;
832
833 for (tx_id, tx) in txs.into_iter().enumerate() {
834 let epochs = epochs.clone();
835 let barriers = barriers.clone();
836 let b2 = b2.clone();
837 let handle = tokio::spawn(async move {
838 for (idx, epoch) in epochs {
839 if idx % 20 == 0 {
840 tx.send(Message::Chunk(build_test_chunk(10)).into())
841 .await
842 .unwrap();
843 } else {
844 tx.send(
845 Message::Watermark(Watermark {
846 col_idx: (idx as usize / 20 + tx_id) % CHANNEL_NUMBER,
847 data_type: DataType::Int64,
848 val: ScalarImpl::Int64(idx as i64),
849 })
850 .into(),
851 )
852 .await
853 .unwrap();
854 }
855 tx.send(Message::Barrier(barriers[&epoch].clone().into_dispatcher()).into())
856 .await
857 .unwrap();
858 sleep(Duration::from_millis(1)).await;
859 }
860 tx.send(Message::Barrier(b2.clone().into_dispatcher()).into())
861 .await
862 .unwrap();
863 });
864 handles.push(handle);
865 }
866
867 let merger = MergeExecutor::for_test(
868 actor_id,
869 rxs,
870 barrier_test_env.local_barrier_manager.clone(),
871 Schema::new(vec![]),
872 );
873 let mut merger = merger.boxed().execute();
874 for (idx, epoch) in epochs {
875 if idx % 20 == 0 {
876 let mut count = 0usize;
878 while count < 100 {
879 assert_matches!(merger.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
880 count += chunk.ops().len();
881 });
882 }
883 assert_eq!(count, 100);
884 } else if idx as usize / 20 >= CHANNEL_NUMBER - 1 {
885 for _ in 0..CHANNEL_NUMBER {
887 assert_matches!(merger.next().await.unwrap().unwrap(), Message::Watermark(watermark) => {
888 assert_eq!(watermark.val, ScalarImpl::Int64((idx - 20 * (CHANNEL_NUMBER as u64 - 1)) as i64));
889 });
890 }
891 }
892 assert_matches!(merger.next().await.unwrap().unwrap(), Message::Barrier(Barrier{epoch:barrier_epoch,mutation:_,..}) => {
894 assert_eq!(barrier_epoch.curr, epoch);
895 });
896 }
897 assert_matches!(
898 merger.next().await.unwrap().unwrap(),
899 Message::Barrier(Barrier {
900 mutation,
901 ..
902 }) if mutation.as_deref().unwrap().is_stop()
903 );
904
905 for handle in handles {
906 handle.await.unwrap();
907 }
908 }
909
910 #[tokio::test]
911 async fn test_configuration_change() {
912 let actor_id = 233;
913 let (untouched, old, new) = (234, 235, 238); let barrier_test_env = LocalBarrierTestEnv::for_test().await;
915 let metrics = Arc::new(StreamingMetrics::unused());
916
917 let (upstream_fragment_id, fragment_id) = (10, 18);
922
923 let inputs: Vec<_> =
924 try_join_all([untouched, old].into_iter().map(async |upstream_actor_id| {
925 new_input(
926 &barrier_test_env.local_barrier_manager,
927 metrics.clone(),
928 actor_id,
929 fragment_id,
930 &helper_make_local_actor(upstream_actor_id),
931 upstream_fragment_id,
932 )
933 .await
934 }))
935 .await
936 .unwrap();
937
938 let merge_updates = maplit::hashmap! {
939 (actor_id, upstream_fragment_id) => MergeUpdate {
940 actor_id,
941 upstream_fragment_id,
942 new_upstream_fragment_id: None,
943 added_upstream_actors: vec![helper_make_local_actor(new)],
944 removed_upstream_actor_id: vec![old],
945 }
946 };
947
948 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
949 UpdateMutation {
950 dispatchers: Default::default(),
951 merges: merge_updates,
952 vnode_bitmaps: Default::default(),
953 dropped_actors: Default::default(),
954 actor_splits: Default::default(),
955 actor_new_dispatchers: Default::default(),
956 },
957 ));
958 barrier_test_env.inject_barrier(&b1, [actor_id]);
959 barrier_test_env.flush_all_events().await;
960
961 let barrier_rx = barrier_test_env
962 .local_barrier_manager
963 .subscribe_barrier(actor_id);
964 let actor_ctx = ActorContext::for_test(actor_id);
965 let upstream = MergeExecutor::new_select_receiver(inputs, &metrics, &actor_ctx);
966
967 let mut merge = MergeExecutor::new(
968 actor_ctx,
969 fragment_id,
970 upstream_fragment_id,
971 upstream,
972 barrier_test_env.local_barrier_manager.clone(),
973 metrics.clone(),
974 barrier_rx,
975 100,
976 Schema::new(vec![]),
977 )
978 .boxed()
979 .execute();
980
981 let mut txs = HashMap::new();
982 macro_rules! send {
983 ($actors:expr, $msg:expr) => {
984 for actor in $actors {
985 txs.get(&actor).unwrap().send($msg).await.unwrap();
986 }
987 };
988 }
989
990 macro_rules! assert_recv_pending {
991 () => {
992 assert!(
993 merge
994 .next()
995 .now_or_never()
996 .flatten()
997 .transpose()
998 .unwrap()
999 .is_none()
1000 );
1001 };
1002 }
1003 macro_rules! recv {
1004 () => {
1005 merge.next().await.transpose().unwrap()
1006 };
1007 }
1008
1009 macro_rules! collect_upstream_tx {
1010 ($actors:expr) => {
1011 for upstream_id in $actors {
1012 let mut output_requests = barrier_test_env
1013 .take_pending_new_output_requests(upstream_id)
1014 .await;
1015 assert_eq!(output_requests.len(), 1);
1016 let (downstream_actor_id, request) = output_requests.pop().unwrap();
1017 assert_eq!(actor_id, downstream_actor_id);
1018 let NewOutputRequest::Local(tx) = request else {
1019 unreachable!()
1020 };
1021 txs.insert(upstream_id, tx);
1022 }
1023 };
1024 }
1025
1026 assert_recv_pending!();
1027 barrier_test_env.flush_all_events().await;
1028
1029 collect_upstream_tx!([untouched, old]);
1031
1032 send!([untouched, old], Message::Chunk(build_test_chunk(1)).into());
1034 assert_eq!(2, recv!().unwrap().as_chunk().unwrap().cardinality()); assert_recv_pending!();
1036
1037 send!(
1038 [untouched, old],
1039 Message::Barrier(b1.clone().into_dispatcher()).into()
1040 );
1041 assert_recv_pending!(); collect_upstream_tx!([new]);
1044
1045 send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
1046 recv!().unwrap().as_barrier().unwrap(); send!([untouched, new], Message::Chunk(build_test_chunk(1)).into());
1050 assert_eq!(2, recv!().unwrap().as_chunk().unwrap().cardinality()); assert_recv_pending!();
1052 }
1053
1054 struct FakeExchangeService {
1055 rpc_called: Arc<AtomicBool>,
1056 }
1057
1058 fn exchange_client_test_barrier() -> crate::executor::Barrier {
1059 Barrier::new_test_barrier(test_epoch(1))
1060 }
1061
1062 #[async_trait::async_trait]
1063 impl ExchangeService for FakeExchangeService {
1064 type GetDataStream = ReceiverStream<std::result::Result<GetDataResponse, Status>>;
1065 type GetStreamStream = ReceiverStream<std::result::Result<GetStreamResponse, Status>>;
1066
1067 async fn get_data(
1068 &self,
1069 _: Request<GetDataRequest>,
1070 ) -> std::result::Result<Response<Self::GetDataStream>, Status> {
1071 unimplemented!()
1072 }
1073
1074 async fn get_stream(
1075 &self,
1076 _request: Request<Streaming<GetStreamRequest>>,
1077 ) -> std::result::Result<Response<Self::GetStreamStream>, Status> {
1078 let (tx, rx) = tokio::sync::mpsc::channel(10);
1079 self.rpc_called.store(true, Ordering::SeqCst);
1080 let stream_chunk = StreamChunk::default().to_protobuf();
1082 tx.send(Ok(GetStreamResponse {
1083 message: Some(PbStreamMessageBatch {
1084 stream_message_batch: Some(
1085 risingwave_pb::stream_plan::stream_message_batch::StreamMessageBatch::StreamChunk(
1086 stream_chunk,
1087 ),
1088 ),
1089 }),
1090 permits: Some(PbPermits::default()),
1091 }))
1092 .await
1093 .unwrap();
1094 let barrier = exchange_client_test_barrier();
1096 tx.send(Ok(GetStreamResponse {
1097 message: Some(PbStreamMessageBatch {
1098 stream_message_batch: Some(
1099 risingwave_pb::stream_plan::stream_message_batch::StreamMessageBatch::BarrierBatch(
1100 BarrierBatch {
1101 barriers: vec![barrier.to_protobuf()],
1102 },
1103 ),
1104 ),
1105 }),
1106 permits: Some(PbPermits::default()),
1107 }))
1108 .await
1109 .unwrap();
1110 Ok(Response::new(ReceiverStream::new(rx)))
1111 }
1112 }
1113
1114 #[tokio::test]
1115 async fn test_stream_exchange_client() {
1116 let rpc_called = Arc::new(AtomicBool::new(false));
1117 let server_run = Arc::new(AtomicBool::new(false));
1118 let addr = "127.0.0.1:12348".parse().unwrap();
1119
1120 let (shutdown_send, shutdown_recv) = tokio::sync::oneshot::channel();
1122 let exchange_svc = ExchangeServiceServer::new(FakeExchangeService {
1123 rpc_called: rpc_called.clone(),
1124 });
1125 let cp_server_run = server_run.clone();
1126 let join_handle = tokio::spawn(async move {
1127 cp_server_run.store(true, Ordering::SeqCst);
1128 tonic::transport::Server::builder()
1129 .add_service(exchange_svc)
1130 .serve_with_shutdown(addr, async move {
1131 shutdown_recv.await.unwrap();
1132 })
1133 .await
1134 .unwrap();
1135 });
1136
1137 sleep(Duration::from_secs(1)).await;
1138 assert!(server_run.load(Ordering::SeqCst));
1139
1140 let test_env = LocalBarrierTestEnv::for_test().await;
1141
1142 let remote_input = {
1143 RemoteInput::new(
1144 &test_env.local_barrier_manager,
1145 addr.into(),
1146 (0, 0),
1147 (0, 0),
1148 Arc::new(StreamingMetrics::unused()),
1149 )
1150 .await
1151 .unwrap()
1152 };
1153
1154 test_env.inject_barrier(&exchange_client_test_barrier(), [remote_input.actor_id()]);
1155
1156 pin_mut!(remote_input);
1157
1158 assert_matches!(remote_input.next().await.unwrap().unwrap(), Message::Chunk(chunk) => {
1159 let (ops, columns, visibility) = chunk.into_inner();
1160 assert!(ops.is_empty());
1161 assert!(columns.is_empty());
1162 assert!(visibility.is_empty());
1163 });
1164 assert_matches!(remote_input.next().await.unwrap().unwrap(), Message::Barrier(Barrier { epoch: barrier_epoch, mutation: _, .. }) => {
1165 assert_eq!(barrier_epoch.curr, test_epoch(1));
1166 });
1167 assert!(rpc_called.load(Ordering::SeqCst));
1168
1169 shutdown_send.send(()).unwrap();
1170 join_handle.await.unwrap();
1171 }
1172}