risingwave_stream/executor/
dispatch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
31use risingwave_pb::stream_plan::{self, PbDispatcher};
32use smallvec::{SmallVec, smallvec};
33use tokio::sync::mpsc::UnboundedReceiver;
34use tokio::time::Instant;
35use tokio_stream::StreamExt;
36use tokio_stream::adapters::Peekable;
37use tracing::{Instrument, event};
38
39use super::exchange::output::Output;
40use super::{
41    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
42    UpdateMutation,
43};
44use crate::executor::prelude::*;
45use crate::executor::{StopMutation, StreamConsumer};
46use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
47
48mod output_mapping;
49pub use output_mapping::DispatchOutputMapping;
50
51/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
52/// data chunks will be dispatched with some specified policy, while control message
53/// such as barriers will be distributed to all receivers.
54pub struct DispatchExecutor {
55    input: Executor,
56    inner: DispatchExecutorInner,
57}
58
59struct DispatcherWithMetrics {
60    dispatcher: DispatcherImpl,
61    pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
62}
63
64impl Debug for DispatcherWithMetrics {
65    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66        self.dispatcher.fmt(f)
67    }
68}
69
70impl Deref for DispatcherWithMetrics {
71    type Target = DispatcherImpl;
72
73    fn deref(&self) -> &Self::Target {
74        &self.dispatcher
75    }
76}
77
78impl DerefMut for DispatcherWithMetrics {
79    fn deref_mut(&mut self) -> &mut Self::Target {
80        &mut self.dispatcher
81    }
82}
83
84struct DispatchExecutorMetrics {
85    actor_id_str: String,
86    fragment_id_str: String,
87    metrics: Arc<StreamingMetrics>,
88    actor_out_record_cnt: LabelGuardedIntCounter,
89}
90
91impl DispatchExecutorMetrics {
92    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
93        DispatcherWithMetrics {
94            actor_output_buffer_blocking_duration_ns: self
95                .metrics
96                .actor_output_buffer_blocking_duration_ns
97                .with_guarded_label_values(&[
98                    self.actor_id_str.as_str(),
99                    self.fragment_id_str.as_str(),
100                    dispatcher.dispatcher_id_str(),
101                ]),
102            dispatcher,
103        }
104    }
105}
106
107struct DispatchExecutorInner {
108    dispatchers: Vec<DispatcherWithMetrics>,
109    actor_id: u32,
110    local_barrier_manager: LocalBarrierManager,
111    metrics: DispatchExecutorMetrics,
112    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
113    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
114}
115
116impl DispatchExecutorInner {
117    async fn collect_outputs(
118        &mut self,
119        downstream_actors: &[ActorId],
120    ) -> StreamResult<Vec<Output>> {
121        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
122            let tx = match request {
123                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
124            };
125            Output::new(downstream_actor, tx)
126        }
127        let mut outputs = Vec::with_capacity(downstream_actors.len());
128        for downstream_actor in downstream_actors {
129            let output =
130                if let Some(request) = self.pending_new_output_requests.remove(downstream_actor) {
131                    resolve_output(*downstream_actor, request)
132                } else {
133                    loop {
134                        let (requested_actor, request) = self
135                            .new_output_request_rx
136                            .recv()
137                            .await
138                            .ok_or_else(|| anyhow!("end of new output request"))?;
139                        if requested_actor == *downstream_actor {
140                            break resolve_output(requested_actor, request);
141                        } else {
142                            assert!(
143                                self.pending_new_output_requests
144                                    .insert(requested_actor, request)
145                                    .is_none(),
146                                "duplicated inflight new output requests from actor {}",
147                                requested_actor
148                            );
149                        }
150                    }
151                };
152            outputs.push(output);
153        }
154        Ok(outputs)
155    }
156
157    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
158        macro_rules! await_with_metrics {
159            ($fut:expr, $metrics:expr) => {{
160                let mut start_time = Instant::now();
161                let interval_duration = Duration::from_secs(15);
162                let mut interval =
163                    tokio::time::interval_at(start_time + interval_duration, interval_duration);
164
165                let mut fut = std::pin::pin!($fut);
166
167                loop {
168                    tokio::select! {
169                        biased;
170                        res = &mut fut => {
171                            res?;
172                            let ns = start_time.elapsed().as_nanos() as u64;
173                            $metrics.inc_by(ns);
174                            break;
175                        }
176                        _ = interval.tick() => {
177                            start_time = Instant::now();
178                            $metrics.inc_by(interval_duration.as_nanos() as u64);
179                        }
180                    };
181                }
182                StreamResult::Ok(())
183            }};
184        }
185
186        let limit = self
187            .local_barrier_manager
188            .env
189            .config()
190            .developer
191            .exchange_concurrent_dispatchers;
192        // Only barrier can be batched for now.
193        match msg {
194            MessageBatch::BarrierBatch(barrier_batch) => {
195                if barrier_batch.is_empty() {
196                    return Ok(());
197                }
198                // Only the first barrier in a batch can be mutation.
199                let mutation = barrier_batch[0].mutation.clone();
200                self.pre_mutate_dispatchers(&mutation).await?;
201                futures::stream::iter(self.dispatchers.iter_mut())
202                    .map(Ok)
203                    .try_for_each_concurrent(limit, |dispatcher| async {
204                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
205                        let dispatcher_output = &mut dispatcher.dispatcher;
206                        let fut = dispatcher_output.dispatch_barriers(
207                            barrier_batch
208                                .iter()
209                                .cloned()
210                                .map(|b| b.into_dispatcher())
211                                .collect(),
212                        );
213                        await_with_metrics!(std::pin::pin!(fut), metrics)
214                    })
215                    .await?;
216                self.post_mutate_dispatchers(&mutation)?;
217            }
218            MessageBatch::Watermark(watermark) => {
219                futures::stream::iter(self.dispatchers.iter_mut())
220                    .map(Ok)
221                    .try_for_each_concurrent(limit, |dispatcher| async {
222                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
223                        let dispatcher_output = &mut dispatcher.dispatcher;
224                        let fut = dispatcher_output.dispatch_watermark(watermark.clone());
225                        await_with_metrics!(std::pin::pin!(fut), metrics)
226                    })
227                    .await?;
228            }
229            MessageBatch::Chunk(chunk) => {
230                futures::stream::iter(self.dispatchers.iter_mut())
231                    .map(Ok)
232                    .try_for_each_concurrent(limit, |dispatcher| async {
233                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
234                        let dispatcher_output = &mut dispatcher.dispatcher;
235                        let fut = dispatcher_output.dispatch_data(chunk.clone());
236                        await_with_metrics!(std::pin::pin!(fut), metrics)
237                    })
238                    .await?;
239
240                self.metrics
241                    .actor_out_record_cnt
242                    .inc_by(chunk.cardinality() as _);
243            }
244        }
245        Ok(())
246    }
247
248    /// Add new dispatchers to the executor. Will check whether their ids are unique.
249    async fn add_dispatchers<'a>(
250        &mut self,
251        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
252    ) -> StreamResult<()> {
253        for dispatcher in new_dispatchers {
254            let outputs = self
255                .collect_outputs(&dispatcher.downstream_actor_id)
256                .await?;
257            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
258            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
259            self.dispatchers.push(dispatcher);
260        }
261
262        assert!(
263            self.dispatchers
264                .iter()
265                .map(|d| d.dispatcher_id())
266                .all_unique(),
267            "dispatcher ids must be unique: {:?}",
268            self.dispatchers
269        );
270
271        Ok(())
272    }
273
274    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
275        self.dispatchers
276            .iter_mut()
277            .find(|d| d.dispatcher_id() == dispatcher_id)
278            .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
279    }
280
281    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
282    /// outputs.
283    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
284        let outputs = self
285            .collect_outputs(&update.added_downstream_actor_id)
286            .await?;
287
288        let dispatcher = self.find_dispatcher(update.dispatcher_id);
289        dispatcher.add_outputs(outputs);
290
291        Ok(())
292    }
293
294    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
295    /// update the hash mapping.
296    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
297        let ids = update.removed_downstream_actor_id.iter().copied().collect();
298
299        let dispatcher = self.find_dispatcher(update.dispatcher_id);
300        dispatcher.remove_outputs(&ids);
301
302        // The hash mapping is only used by the hash dispatcher.
303        //
304        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
305        // it's possible that there're multiple upstreams with different exchange types, for
306        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
307        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
308        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
309            dispatcher.hash_mapping =
310                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
311        }
312
313        Ok(())
314    }
315
316    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
317    async fn pre_mutate_dispatchers(
318        &mut self,
319        mutation: &Option<Arc<Mutation>>,
320    ) -> StreamResult<()> {
321        let Some(mutation) = mutation.as_deref() else {
322            return Ok(());
323        };
324
325        match mutation {
326            Mutation::Add(AddMutation { adds, .. }) => {
327                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
328                    self.add_dispatchers(new_dispatchers).await?;
329                }
330            }
331            Mutation::Update(UpdateMutation {
332                dispatchers,
333                actor_new_dispatchers: actor_dispatchers,
334                ..
335            }) => {
336                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
337                    self.add_dispatchers(new_dispatchers).await?;
338                }
339
340                if let Some(updates) = dispatchers.get(&self.actor_id) {
341                    for update in updates {
342                        self.pre_update_dispatcher(update).await?;
343                    }
344                }
345            }
346            Mutation::AddAndUpdate(
347                AddMutation { adds, .. },
348                UpdateMutation {
349                    dispatchers,
350                    actor_new_dispatchers: actor_dispatchers,
351                    ..
352                },
353            ) => {
354                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
355                    self.add_dispatchers(new_dispatchers).await?;
356                }
357
358                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
359                    self.add_dispatchers(new_dispatchers).await?;
360                }
361
362                if let Some(updates) = dispatchers.get(&self.actor_id) {
363                    for update in updates {
364                        self.pre_update_dispatcher(update).await?;
365                    }
366                }
367            }
368            _ => {}
369        }
370
371        Ok(())
372    }
373
374    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
375    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
376        let Some(mutation) = mutation.as_deref() else {
377            return Ok(());
378        };
379
380        match mutation {
381            Mutation::Stop(StopMutation { dropped_actors, .. }) => {
382                // Remove outputs only if this actor itself is not to be stopped.
383                if !dropped_actors.contains(&self.actor_id) {
384                    for dispatcher in &mut self.dispatchers {
385                        dispatcher.remove_outputs(dropped_actors);
386                    }
387                }
388            }
389            Mutation::Update(UpdateMutation {
390                dispatchers,
391                dropped_actors,
392                ..
393            })
394            | Mutation::AddAndUpdate(
395                _,
396                UpdateMutation {
397                    dispatchers,
398                    dropped_actors,
399                    ..
400                },
401            ) => {
402                if let Some(updates) = dispatchers.get(&self.actor_id) {
403                    for update in updates {
404                        self.post_update_dispatcher(update)?;
405                    }
406                }
407
408                if !dropped_actors.contains(&self.actor_id) {
409                    for dispatcher in &mut self.dispatchers {
410                        dispatcher.remove_outputs(dropped_actors);
411                    }
412                }
413            }
414            _ => {}
415        };
416
417        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
418        // should clean up them.
419        self.dispatchers.retain(|d| !d.is_empty());
420
421        Ok(())
422    }
423}
424
425impl DispatchExecutor {
426    pub(crate) async fn new(
427        input: Executor,
428        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
429        dispatchers: Vec<stream_plan::Dispatcher>,
430        actor_id: u32,
431        fragment_id: u32,
432        local_barrier_manager: LocalBarrierManager,
433        metrics: Arc<StreamingMetrics>,
434    ) -> StreamResult<Self> {
435        let mut executor = Self::new_inner(
436            input,
437            new_output_request_rx,
438            vec![],
439            actor_id,
440            fragment_id,
441            local_barrier_manager,
442            metrics,
443        );
444        let inner = &mut executor.inner;
445        for dispatcher in dispatchers {
446            let outputs = inner
447                .collect_outputs(&dispatcher.downstream_actor_id)
448                .await?;
449            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
450            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
451            inner.dispatchers.push(dispatcher);
452        }
453        Ok(executor)
454    }
455
456    #[cfg(test)]
457    pub(crate) fn for_test(
458        input: Executor,
459        dispatchers: Vec<DispatcherImpl>,
460        actor_id: u32,
461        fragment_id: u32,
462        local_barrier_manager: LocalBarrierManager,
463        metrics: Arc<StreamingMetrics>,
464    ) -> (
465        Self,
466        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
467    ) {
468        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
469
470        (
471            Self::new_inner(
472                input,
473                rx,
474                dispatchers,
475                actor_id,
476                fragment_id,
477                local_barrier_manager,
478                metrics,
479            ),
480            tx,
481        )
482    }
483
484    fn new_inner(
485        mut input: Executor,
486        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
487        dispatchers: Vec<DispatcherImpl>,
488        actor_id: u32,
489        fragment_id: u32,
490        local_barrier_manager: LocalBarrierManager,
491        metrics: Arc<StreamingMetrics>,
492    ) -> Self {
493        let chunk_size = local_barrier_manager.env.config().developer.chunk_size;
494        if crate::consistency::insane() {
495            // make some trouble before dispatching to avoid generating invalid dist key.
496            let mut info = input.info().clone();
497            info.identity = format!("{} (embedded trouble)", info.identity);
498            let troublemaker = TroublemakerExecutor::new(input, chunk_size);
499            input = (info, troublemaker).into();
500        }
501
502        let actor_id_str = actor_id.to_string();
503        let fragment_id_str = fragment_id.to_string();
504        let actor_out_record_cnt = metrics
505            .actor_out_record_cnt
506            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
507        let metrics = DispatchExecutorMetrics {
508            actor_id_str,
509            fragment_id_str,
510            metrics,
511            actor_out_record_cnt,
512        };
513        let dispatchers = dispatchers
514            .into_iter()
515            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
516            .collect();
517        Self {
518            input,
519            inner: DispatchExecutorInner {
520                dispatchers,
521                actor_id,
522                local_barrier_manager,
523                metrics,
524                new_output_request_rx,
525                pending_new_output_requests: Default::default(),
526            },
527        }
528    }
529}
530
531impl StreamConsumer for DispatchExecutor {
532    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
533
534    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
535        let max_barrier_count_per_batch = self
536            .inner
537            .local_barrier_manager
538            .env
539            .config()
540            .developer
541            .max_barrier_batch_size;
542        #[try_stream]
543        async move {
544            let mut input = self.input.execute().peekable();
545            loop {
546                let Some(message) =
547                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
548                else {
549                    // end_of_stream
550                    break;
551                };
552                match message {
553                    chunk @ MessageBatch::Chunk(_) => {
554                        self.inner
555                            .dispatch(chunk)
556                            .instrument(tracing::info_span!("dispatch_chunk"))
557                            .instrument_await("dispatch_chunk")
558                            .await?;
559                    }
560                    MessageBatch::BarrierBatch(barrier_batch) => {
561                        assert!(!barrier_batch.is_empty());
562                        self.inner
563                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
564                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
565                            .instrument_await("dispatch_barrier_batch")
566                            .await?;
567                        self.inner
568                            .metrics
569                            .metrics
570                            .barrier_batch_size
571                            .observe(barrier_batch.len() as f64);
572                        for barrier in barrier_batch {
573                            yield barrier;
574                        }
575                    }
576                    watermark @ MessageBatch::Watermark(_) => {
577                        self.inner
578                            .dispatch(watermark)
579                            .instrument(tracing::info_span!("dispatch_watermark"))
580                            .instrument_await("dispatch_watermark")
581                            .await?;
582                    }
583                }
584            }
585        }
586    }
587}
588
589/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
590///
591/// Returns the message batch.
592///
593/// Returns None if end of stream.
594async fn try_batch_barriers(
595    max_barrier_count_per_batch: u32,
596    input: &mut Peekable<BoxedMessageStream>,
597) -> StreamResult<Option<MessageBatch>> {
598    let Some(msg) = input.next().await else {
599        // end_of_stream
600        return Ok(None);
601    };
602    let mut barrier_batch = vec![];
603    let msg: Message = msg?;
604    let max_peek_attempts = match msg {
605        Message::Chunk(c) => {
606            return Ok(Some(MessageBatch::Chunk(c)));
607        }
608        Message::Watermark(w) => {
609            return Ok(Some(MessageBatch::Watermark(w)));
610        }
611        Message::Barrier(b) => {
612            let peek_more_barrier = b.mutation.is_none();
613            barrier_batch.push(b);
614            if peek_more_barrier {
615                max_barrier_count_per_batch.saturating_sub(1)
616            } else {
617                0
618            }
619        }
620    };
621    // Try to peek more consecutive non-mutation barriers.
622    for _ in 0..max_peek_attempts {
623        let peek = input.peek().now_or_never();
624        let Some(peek) = peek else {
625            break;
626        };
627        let Some(msg) = peek else {
628            // end_of_stream
629            break;
630        };
631        let Ok(Message::Barrier(barrier)) = msg else {
632            break;
633        };
634        if barrier.mutation.is_some() {
635            break;
636        }
637        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
638        let Message::Barrier(ref barrier) = msg else {
639            unreachable!("must be a barrier");
640        };
641        barrier_batch.push(barrier.clone());
642    }
643    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
644}
645
646#[derive(Debug)]
647pub enum DispatcherImpl {
648    Hash(HashDataDispatcher),
649    Broadcast(BroadcastDispatcher),
650    Simple(SimpleDispatcher),
651    RoundRobin(RoundRobinDataDispatcher),
652}
653
654impl DispatcherImpl {
655    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
656        let output_mapping =
657            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
658
659        use risingwave_pb::stream_plan::DispatcherType::*;
660        let dispatcher_impl = match dispatcher.get_type()? {
661            Hash => {
662                assert!(!outputs.is_empty());
663                let dist_key_indices = dispatcher
664                    .dist_key_indices
665                    .iter()
666                    .map(|i| *i as usize)
667                    .collect();
668
669                let hash_mapping =
670                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
671
672                DispatcherImpl::Hash(HashDataDispatcher::new(
673                    outputs,
674                    dist_key_indices,
675                    output_mapping,
676                    hash_mapping,
677                    dispatcher.dispatcher_id,
678                ))
679            }
680            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
681                outputs,
682                output_mapping,
683                dispatcher.dispatcher_id,
684            )),
685            Simple | NoShuffle => {
686                let [output]: [_; 1] = outputs.try_into().unwrap();
687                DispatcherImpl::Simple(SimpleDispatcher::new(
688                    output,
689                    output_mapping,
690                    dispatcher.dispatcher_id,
691                ))
692            }
693            Unspecified => unreachable!(),
694        };
695
696        Ok(dispatcher_impl)
697    }
698}
699
700macro_rules! impl_dispatcher {
701    ($( { $variant_name:ident } ),*) => {
702        impl DispatcherImpl {
703            pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
704                match self {
705                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
706                }
707            }
708
709            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
710                match self {
711                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
712                }
713            }
714
715            pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
716                match self {
717                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
718                }
719            }
720
721            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
722                match self {
723                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
724                }
725            }
726
727            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
728                match self {
729                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
730                }
731            }
732
733            pub fn dispatcher_id(&self) -> DispatcherId {
734                match self {
735                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
736                }
737            }
738
739            pub fn dispatcher_id_str(&self) -> &str {
740                match self {
741                    $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
742                }
743            }
744
745            pub fn is_empty(&self) -> bool {
746                match self {
747                    $(Self::$variant_name(inner) => inner.is_empty(), )*
748                }
749            }
750        }
751    }
752}
753
754macro_rules! for_all_dispatcher_variants {
755    ($macro:ident) => {
756        $macro! {
757            { Hash },
758            { Broadcast },
759            { Simple },
760            { RoundRobin }
761        }
762    };
763}
764
765for_all_dispatcher_variants! { impl_dispatcher }
766
767pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
768
769pub trait Dispatcher: Debug + 'static {
770    /// Dispatch a data chunk to downstream actors.
771    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
772    /// Dispatch barriers to downstream actors, generally by broadcasting it.
773    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
774    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
775    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
776
777    /// Add new outputs to the dispatcher.
778    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
779    /// Remove outputs to `actor_ids` from the dispatcher.
780    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
781
782    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
783    /// different IDs.
784    ///
785    /// Note that the dispatcher id is always equal to the downstream fragment id.
786    /// See also `proto/stream_plan.proto`.
787    fn dispatcher_id(&self) -> DispatcherId;
788
789    /// Dispatcher id in string. See [`Dispatcher::dispatcher_id`].
790    fn dispatcher_id_str(&self) -> &str;
791
792    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
793    /// [`DispatchExecutor`].
794    fn is_empty(&self) -> bool;
795}
796
797/// Concurrently broadcast a message to all outputs.
798///
799/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
800/// always unlimited.
801async fn broadcast_concurrent(
802    outputs: impl IntoIterator<Item = &'_ mut Output>,
803    message: DispatcherMessageBatch,
804) -> StreamResult<()> {
805    futures::future::try_join_all(
806        outputs
807            .into_iter()
808            .map(|output| output.send(message.clone())),
809    )
810    .await?;
811    Ok(())
812}
813
814#[derive(Debug)]
815pub struct RoundRobinDataDispatcher {
816    outputs: Vec<Output>,
817    output_mapping: DispatchOutputMapping,
818    cur: usize,
819    dispatcher_id: DispatcherId,
820    dispatcher_id_str: String,
821}
822
823impl RoundRobinDataDispatcher {
824    pub fn new(
825        outputs: Vec<Output>,
826        output_mapping: DispatchOutputMapping,
827        dispatcher_id: DispatcherId,
828    ) -> Self {
829        Self {
830            outputs,
831            output_mapping,
832            cur: 0,
833            dispatcher_id,
834            dispatcher_id_str: dispatcher_id.to_string(),
835        }
836    }
837}
838
839impl Dispatcher for RoundRobinDataDispatcher {
840    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
841        let chunk = self.output_mapping.apply(chunk);
842
843        self.outputs[self.cur]
844            .send(DispatcherMessageBatch::Chunk(chunk))
845            .await?;
846        self.cur += 1;
847        self.cur %= self.outputs.len();
848        Ok(())
849    }
850
851    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
852        // always broadcast barrier
853        broadcast_concurrent(
854            &mut self.outputs,
855            DispatcherMessageBatch::BarrierBatch(barriers),
856        )
857        .await
858    }
859
860    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
861        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
862            // always broadcast watermark
863            broadcast_concurrent(
864                &mut self.outputs,
865                DispatcherMessageBatch::Watermark(watermark),
866            )
867            .await?;
868        }
869        Ok(())
870    }
871
872    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
873        self.outputs.extend(outputs);
874    }
875
876    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
877        self.outputs
878            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
879            .count();
880        self.cur = self.cur.min(self.outputs.len() - 1);
881    }
882
883    fn dispatcher_id(&self) -> DispatcherId {
884        self.dispatcher_id
885    }
886
887    fn dispatcher_id_str(&self) -> &str {
888        &self.dispatcher_id_str
889    }
890
891    fn is_empty(&self) -> bool {
892        self.outputs.is_empty()
893    }
894}
895
896pub struct HashDataDispatcher {
897    outputs: Vec<Output>,
898    keys: Vec<usize>,
899    output_mapping: DispatchOutputMapping,
900    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
901    /// different downstream actors.
902    hash_mapping: ExpandedActorMapping,
903    dispatcher_id: DispatcherId,
904    dispatcher_id_str: String,
905}
906
907impl Debug for HashDataDispatcher {
908    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
909        f.debug_struct("HashDataDispatcher")
910            .field("outputs", &self.outputs)
911            .field("keys", &self.keys)
912            .field("dispatcher_id", &self.dispatcher_id)
913            .finish_non_exhaustive()
914    }
915}
916
917impl HashDataDispatcher {
918    pub fn new(
919        outputs: Vec<Output>,
920        keys: Vec<usize>,
921        output_mapping: DispatchOutputMapping,
922        hash_mapping: ExpandedActorMapping,
923        dispatcher_id: DispatcherId,
924    ) -> Self {
925        Self {
926            outputs,
927            keys,
928            output_mapping,
929            hash_mapping,
930            dispatcher_id,
931            dispatcher_id_str: dispatcher_id.to_string(),
932        }
933    }
934}
935
936impl Dispatcher for HashDataDispatcher {
937    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
938        self.outputs.extend(outputs);
939    }
940
941    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
942        // always broadcast barrier
943        broadcast_concurrent(
944            &mut self.outputs,
945            DispatcherMessageBatch::BarrierBatch(barriers),
946        )
947        .await
948    }
949
950    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
951        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
952            // always broadcast watermark
953            broadcast_concurrent(
954                &mut self.outputs,
955                DispatcherMessageBatch::Watermark(watermark),
956            )
957            .await?;
958        }
959        Ok(())
960    }
961
962    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
963        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
964        // In these output chunks, the only difference are visibility map, which is calculated
965        // by the hash value of each line in the input chunk.
966        let num_outputs = self.outputs.len();
967
968        // get hash value of every line by its key
969        let vnode_count = self.hash_mapping.len();
970        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
971
972        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
973
974        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
975            .take(num_outputs)
976            .collect_vec();
977        let mut last_update_delete_row_idx = None;
978        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
979
980        for (row_idx, ((vnode, &op), visible)) in vnodes
981            .iter()
982            .copied()
983            .zip_eq_fast(chunk.ops())
984            .zip_eq_fast(chunk.visibility().iter())
985            .enumerate()
986        {
987            // Build visibility map for every output chunk.
988            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
989                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
990            }
991
992            if !visible {
993                new_ops.push(op);
994                continue;
995            }
996
997            // The `Update` message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
998            // need to be rewritten to common `Delete` and `Insert` if the distribution key
999            // columns are changed, since the distribution key will eventually be part of the
1000            // stream key of the downstream executor, and there's an invariant that stream key
1001            // must be the same for rows within an `Update` pair.
1002            if op == Op::UpdateDelete {
1003                last_update_delete_row_idx = Some(row_idx);
1004            } else if op == Op::UpdateInsert {
1005                let delete_row_idx = last_update_delete_row_idx
1006                    .take()
1007                    .expect("missing U- before U+");
1008                assert!(delete_row_idx + 1 == row_idx, "U- and U+ are not adjacent");
1009
1010                // Check if any distribution key column value changed
1011                let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
1012                    != chunk.row_at(row_idx).1.project(&self.keys);
1013
1014                if dist_key_changed {
1015                    new_ops.push(Op::Delete);
1016                    new_ops.push(Op::Insert);
1017                } else {
1018                    new_ops.push(Op::UpdateDelete);
1019                    new_ops.push(Op::UpdateInsert);
1020                }
1021            } else {
1022                new_ops.push(op);
1023            }
1024        }
1025        assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1026
1027        let ops = new_ops;
1028        // Apply output mapping after calculating the vnode and new visibility maps.
1029        let chunk = self.output_mapping.apply(chunk);
1030
1031        // individually output StreamChunk integrated with vis_map
1032        futures::future::try_join_all(
1033            vis_maps
1034                .into_iter()
1035                .zip_eq_fast(self.outputs.iter_mut())
1036                .map(|(vis_map, output)| async {
1037                    let vis_map = vis_map.finish();
1038                    // columns is not changed in this function
1039                    let new_stream_chunk =
1040                        StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1041                    if new_stream_chunk.cardinality() > 0 {
1042                        event!(
1043                            tracing::Level::TRACE,
1044                            msg = "chunk",
1045                            downstream = output.actor_id(),
1046                            "send = \n{:#?}",
1047                            new_stream_chunk
1048                        );
1049                        output
1050                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1051                            .await?;
1052                    }
1053                    StreamResult::Ok(())
1054                }),
1055        )
1056        .await?;
1057
1058        Ok(())
1059    }
1060
1061    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1062        self.outputs
1063            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1064            .count();
1065    }
1066
1067    fn dispatcher_id(&self) -> DispatcherId {
1068        self.dispatcher_id
1069    }
1070
1071    fn dispatcher_id_str(&self) -> &str {
1072        &self.dispatcher_id_str
1073    }
1074
1075    fn is_empty(&self) -> bool {
1076        self.outputs.is_empty()
1077    }
1078}
1079
1080/// `BroadcastDispatcher` dispatches message to all outputs.
1081#[derive(Debug)]
1082pub struct BroadcastDispatcher {
1083    outputs: HashMap<ActorId, Output>,
1084    output_mapping: DispatchOutputMapping,
1085    dispatcher_id: DispatcherId,
1086    dispatcher_id_str: String,
1087}
1088
1089impl BroadcastDispatcher {
1090    pub fn new(
1091        outputs: impl IntoIterator<Item = Output>,
1092        output_mapping: DispatchOutputMapping,
1093        dispatcher_id: DispatcherId,
1094    ) -> Self {
1095        Self {
1096            outputs: Self::into_pairs(outputs).collect(),
1097            output_mapping,
1098            dispatcher_id,
1099            dispatcher_id_str: dispatcher_id.to_string(),
1100        }
1101    }
1102
1103    fn into_pairs(
1104        outputs: impl IntoIterator<Item = Output>,
1105    ) -> impl Iterator<Item = (ActorId, Output)> {
1106        outputs
1107            .into_iter()
1108            .map(|output| (output.actor_id(), output))
1109    }
1110}
1111
1112impl Dispatcher for BroadcastDispatcher {
1113    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1114        let chunk = self.output_mapping.apply(chunk);
1115        broadcast_concurrent(
1116            self.outputs.values_mut(),
1117            DispatcherMessageBatch::Chunk(chunk),
1118        )
1119        .await
1120    }
1121
1122    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1123        // always broadcast barrier
1124        broadcast_concurrent(
1125            self.outputs.values_mut(),
1126            DispatcherMessageBatch::BarrierBatch(barriers),
1127        )
1128        .await
1129    }
1130
1131    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1132        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1133            // always broadcast watermark
1134            broadcast_concurrent(
1135                self.outputs.values_mut(),
1136                DispatcherMessageBatch::Watermark(watermark),
1137            )
1138            .await?;
1139        }
1140        Ok(())
1141    }
1142
1143    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1144        self.outputs.extend(Self::into_pairs(outputs));
1145    }
1146
1147    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1148        self.outputs
1149            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1150            .count();
1151    }
1152
1153    fn dispatcher_id(&self) -> DispatcherId {
1154        self.dispatcher_id
1155    }
1156
1157    fn dispatcher_id_str(&self) -> &str {
1158        &self.dispatcher_id_str
1159    }
1160
1161    fn is_empty(&self) -> bool {
1162        self.outputs.is_empty()
1163    }
1164}
1165
1166/// `SimpleDispatcher` dispatches message to a single output.
1167#[derive(Debug)]
1168pub struct SimpleDispatcher {
1169    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1170    /// the field needs to be temporarily set to 0 or 2 outputs.
1171    ///
1172    /// - When dropping a materialized view, the output will be removed and this field becomes
1173    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1174    ///   finishing processing the current mutation.
1175    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1176    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1177    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1178    ///   will be removed and this field becomes single again.
1179    ///
1180    /// Therefore, when dispatching data, we assert that there's exactly one output by
1181    /// `Self::output`.
1182    output: SmallVec<[Output; 2]>,
1183    output_mapping: DispatchOutputMapping,
1184    dispatcher_id: DispatcherId,
1185    dispatcher_id_str: String,
1186}
1187
1188impl SimpleDispatcher {
1189    pub fn new(
1190        output: Output,
1191        output_mapping: DispatchOutputMapping,
1192        dispatcher_id: DispatcherId,
1193    ) -> Self {
1194        Self {
1195            output: smallvec![output],
1196            output_mapping,
1197            dispatcher_id,
1198            dispatcher_id_str: dispatcher_id.to_string(),
1199        }
1200    }
1201}
1202
1203impl Dispatcher for SimpleDispatcher {
1204    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1205        self.output.extend(outputs);
1206        assert!(self.output.len() <= 2);
1207    }
1208
1209    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1210        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1211        for output in &mut self.output {
1212            output
1213                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1214                .await?;
1215        }
1216        Ok(())
1217    }
1218
1219    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1220        let output = self
1221            .output
1222            .iter_mut()
1223            .exactly_one()
1224            .expect("expect exactly one output");
1225
1226        let chunk = self.output_mapping.apply(chunk);
1227        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1228    }
1229
1230    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1231        let output = self
1232            .output
1233            .iter_mut()
1234            .exactly_one()
1235            .expect("expect exactly one output");
1236
1237        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1238            output
1239                .send(DispatcherMessageBatch::Watermark(watermark))
1240                .await?;
1241        }
1242        Ok(())
1243    }
1244
1245    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1246        self.output
1247            .retain(|output| !actor_ids.contains(&output.actor_id()));
1248    }
1249
1250    fn dispatcher_id(&self) -> DispatcherId {
1251        self.dispatcher_id
1252    }
1253
1254    fn dispatcher_id_str(&self) -> &str {
1255        &self.dispatcher_id_str
1256    }
1257
1258    fn is_empty(&self) -> bool {
1259        self.output.is_empty()
1260    }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265    use std::hash::{BuildHasher, Hasher};
1266
1267    use futures::pin_mut;
1268    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1269    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1270    use risingwave_common::util::epoch::test_epoch;
1271    use risingwave_common::util::hash_util::Crc32FastBuilder;
1272    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1273    use tokio::sync::mpsc::unbounded_channel;
1274
1275    use super::*;
1276    use crate::executor::exchange::output::Output;
1277    use crate::executor::exchange::permit::channel_for_test;
1278    use crate::executor::receiver::ReceiverExecutor;
1279    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1280    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1281
1282    #[tokio::test]
1283    async fn test_hash_dispatcher_complex() {
1284        // This test only works when vnode count is 256.
1285        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1286
1287        let num_outputs = 2; // actor id ranges from 1 to 2
1288        let key_indices = &[0, 2];
1289        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1290            (0..num_outputs).map(|_| channel_for_test()).collect();
1291        let outputs = output_tx_vecs
1292            .into_iter()
1293            .enumerate()
1294            .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1295            .collect::<Vec<_>>();
1296        let mut hash_mapping = (1..num_outputs + 1)
1297            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1298            .collect_vec();
1299        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1300        let mut hash_dispatcher = HashDataDispatcher::new(
1301            outputs,
1302            key_indices.to_vec(),
1303            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1304            hash_mapping,
1305            0,
1306        );
1307
1308        let chunk = StreamChunk::from_pretty(
1309            "  I I I
1310            +  4 6 8
1311            +  5 7 9
1312            +  0 0 0
1313            -  1 1 1 D
1314            U- 2 0 2
1315            U+ 2 0 2
1316            U- 3 3 2
1317            U+ 3 3 4",
1318        );
1319        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1320
1321        assert_eq!(
1322            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1323            StreamChunk::from_pretty(
1324                "  I I I
1325                +  4 6 8
1326                +  5 7 9
1327                +  0 0 0
1328                -  1 1 1 D
1329                U- 2 0 2
1330                U+ 2 0 2
1331                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1332                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1333            )
1334        );
1335        assert_eq!(
1336            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1337            StreamChunk::from_pretty(
1338                "  I I I
1339                +  4 6 8 D
1340                +  5 7 9 D
1341                +  0 0 0 D
1342                -  1 1 1 D  // Should keep original invisible mark
1343                U- 2 0 2 D  // Should keep UpdateDelete
1344                U+ 2 0 2 D  // Should keep UpdateInsert
1345                -  3 3 2    // Should rewrite UpdateDelete to Delete
1346                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1347            )
1348        );
1349    }
1350
1351    #[tokio::test]
1352    async fn test_configuration_change() {
1353        let _schema = Schema { fields: vec![] };
1354        let (tx, rx) = channel_for_test();
1355        let actor_id = 233;
1356        let fragment_id = 666;
1357        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1358        let metrics = Arc::new(StreamingMetrics::unused());
1359
1360        let (untouched, old, new) = (234, 235, 238); // broadcast downstream actors
1361        let (old_simple, new_simple) = (114, 514); // simple downstream actors
1362
1363        // actor_id -> untouched, old, new, old_simple, new_simple
1364
1365        let broadcast_dispatcher_id = 666;
1366        let broadcast_dispatcher = PbDispatcher {
1367            r#type: DispatcherType::Broadcast as _,
1368            dispatcher_id: broadcast_dispatcher_id,
1369            downstream_actor_id: vec![untouched, old],
1370            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1371            ..Default::default()
1372        };
1373
1374        let simple_dispatcher_id = 888;
1375        let simple_dispatcher = PbDispatcher {
1376            r#type: DispatcherType::Simple as _,
1377            dispatcher_id: simple_dispatcher_id,
1378            downstream_actor_id: vec![old_simple],
1379            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1380            ..Default::default()
1381        };
1382
1383        let dispatcher_updates = maplit::hashmap! {
1384            actor_id => vec![PbDispatcherUpdate {
1385                actor_id,
1386                dispatcher_id: broadcast_dispatcher_id,
1387                added_downstream_actor_id: vec![new],
1388                removed_downstream_actor_id: vec![old],
1389                hash_mapping: Default::default(),
1390            }]
1391        };
1392        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1393            UpdateMutation {
1394                dispatchers: dispatcher_updates,
1395                ..Default::default()
1396            },
1397        ));
1398        barrier_test_env.inject_barrier(&b1, [actor_id]);
1399        barrier_test_env.flush_all_events().await;
1400
1401        let input = Executor::new(
1402            Default::default(),
1403            ReceiverExecutor::for_test(
1404                actor_id,
1405                rx,
1406                barrier_test_env.local_barrier_manager.clone(),
1407            )
1408            .boxed(),
1409        );
1410
1411        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1412        let mut rxs = [untouched, old, new, old_simple, new_simple]
1413            .into_iter()
1414            .map(|id| {
1415                (id, {
1416                    let (tx, rx) = channel_for_test();
1417                    new_output_request_tx
1418                        .send((id, NewOutputRequest::Local(tx)))
1419                        .unwrap();
1420                    rx
1421                })
1422            })
1423            .collect::<HashMap<_, _>>();
1424        let executor = Box::new(
1425            DispatchExecutor::new(
1426                input,
1427                new_output_request_rx,
1428                vec![broadcast_dispatcher, simple_dispatcher],
1429                actor_id,
1430                fragment_id,
1431                barrier_test_env.local_barrier_manager.clone(),
1432                metrics,
1433            )
1434            .await
1435            .unwrap(),
1436        )
1437        .execute();
1438
1439        pin_mut!(executor);
1440
1441        macro_rules! try_recv {
1442            ($down_id:expr) => {
1443                rxs.get_mut(&$down_id).unwrap().try_recv()
1444            };
1445        }
1446
1447        // 3. Send a chunk.
1448        tx.send(Message::Chunk(StreamChunk::default()).into())
1449            .await
1450            .unwrap();
1451
1452        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1453            .await
1454            .unwrap();
1455        executor.next().await.unwrap().unwrap();
1456
1457        // 5. Check downstream.
1458        try_recv!(untouched).unwrap().as_chunk().unwrap();
1459        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1460
1461        try_recv!(old).unwrap().as_chunk().unwrap();
1462        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1463
1464        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1465
1466        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1467        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1468
1469        // 6. Send another barrier.
1470        let b2 = Barrier::new_test_barrier(test_epoch(2));
1471        barrier_test_env.inject_barrier(&b2, [actor_id]);
1472        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1473            .await
1474            .unwrap();
1475        executor.next().await.unwrap().unwrap();
1476
1477        // 7. Check downstream.
1478        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1479        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1480        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1481
1482        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1483        try_recv!(new_simple).unwrap_err(); // Untouched.
1484
1485        // 8. Send another chunk.
1486        tx.send(Message::Chunk(StreamChunk::default()).into())
1487            .await
1488            .unwrap();
1489
1490        // 9. Send a configuration change barrier for simple dispatcher.
1491        let dispatcher_updates = maplit::hashmap! {
1492            actor_id => vec![PbDispatcherUpdate {
1493                actor_id,
1494                dispatcher_id: simple_dispatcher_id,
1495                added_downstream_actor_id: vec![new_simple],
1496                removed_downstream_actor_id: vec![old_simple],
1497                hash_mapping: Default::default(),
1498            }]
1499        };
1500        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1501            UpdateMutation {
1502                dispatchers: dispatcher_updates,
1503                ..Default::default()
1504            },
1505        ));
1506        barrier_test_env.inject_barrier(&b3, [actor_id]);
1507        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1508            .await
1509            .unwrap();
1510        executor.next().await.unwrap().unwrap();
1511
1512        // 10. Check downstream.
1513        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1514        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1515
1516        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1517
1518        // 11. Send another barrier.
1519        let b4 = Barrier::new_test_barrier(test_epoch(4));
1520        barrier_test_env.inject_barrier(&b4, [actor_id]);
1521        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1522            .await
1523            .unwrap();
1524        executor.next().await.unwrap().unwrap();
1525
1526        // 12. Check downstream.
1527        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1528        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1529    }
1530
1531    #[tokio::test]
1532    async fn test_hash_dispatcher() {
1533        // This test only works when vnode count is 256.
1534        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1535
1536        let num_outputs = 5; // actor id ranges from 1 to 5
1537        let cardinality = 10;
1538        let dimension = 4;
1539        let key_indices = &[0, 2];
1540        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1541            (0..num_outputs).map(|_| channel_for_test()).collect();
1542        let outputs = output_tx_vecs
1543            .into_iter()
1544            .enumerate()
1545            .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1546            .collect::<Vec<_>>();
1547        let mut hash_mapping = (1..num_outputs + 1)
1548            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1549            .collect_vec();
1550        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1551        let mut hash_dispatcher = HashDataDispatcher::new(
1552            outputs,
1553            key_indices.to_vec(),
1554            DispatchOutputMapping::Simple((0..dimension).collect()),
1555            hash_mapping.clone(),
1556            0,
1557        );
1558
1559        let mut ops = Vec::new();
1560        for idx in 0..cardinality {
1561            if idx % 2 == 0 {
1562                ops.push(Op::Insert);
1563            } else {
1564                ops.push(Op::Delete);
1565            }
1566        }
1567
1568        let mut start = 19260817i32..;
1569        let mut builders = (0..dimension)
1570            .map(|_| I32ArrayBuilder::new(cardinality))
1571            .collect_vec();
1572        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1573        let mut output_ops = vec![vec![]; num_outputs];
1574        for op in &ops {
1575            let hash_builder = Crc32FastBuilder;
1576            let mut hasher = hash_builder.build_hasher();
1577            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1578            for key_idx in key_indices {
1579                let val = one_row[*key_idx];
1580                let bytes = val.to_le_bytes();
1581                hasher.update(&bytes);
1582            }
1583            let output_idx =
1584                hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1585            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1586                builder.append(Some(*val));
1587            }
1588            output_cols[output_idx]
1589                .iter_mut()
1590                .zip_eq_fast(one_row.iter())
1591                .for_each(|(each_column, val)| each_column.push(*val));
1592            output_ops[output_idx].push(op);
1593        }
1594
1595        let columns = builders
1596            .into_iter()
1597            .map(|builder| {
1598                let array = builder.finish();
1599                array.into_ref()
1600            })
1601            .collect();
1602
1603        let chunk = StreamChunk::new(ops, columns);
1604        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1605
1606        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1607            let mut output = vec![];
1608            while let Some(Some(msg)) = rx.recv().now_or_never() {
1609                output.push(msg);
1610            }
1611            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1612            assert!(output.len() <= 1);
1613            if output.is_empty() {
1614                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1615            } else {
1616                let message = output.first().unwrap();
1617                let real_chunk = match message {
1618                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1619                    _ => panic!(),
1620                };
1621                real_chunk
1622                    .columns()
1623                    .iter()
1624                    .zip_eq_fast(output_cols[output_idx].iter())
1625                    .for_each(|(real_col, expect_col)| {
1626                        let real_vals = real_chunk
1627                            .visibility()
1628                            .iter_ones()
1629                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1630                            .collect::<Vec<_>>();
1631                        assert_eq!(real_vals.len(), expect_col.len());
1632                        assert_eq!(real_vals, *expect_col);
1633                    });
1634            }
1635        }
1636    }
1637}