risingwave_stream/executor/
dispatch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
31use risingwave_pb::stream_plan::{self, PbDispatcher};
32use smallvec::{SmallVec, smallvec};
33use tokio::sync::mpsc::UnboundedReceiver;
34use tokio::time::Instant;
35use tokio_stream::StreamExt;
36use tokio_stream::adapters::Peekable;
37use tracing::{Instrument, event};
38
39use super::exchange::output::Output;
40use super::{
41    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
42    UpdateMutation,
43};
44use crate::executor::prelude::*;
45use crate::executor::{StopMutation, StreamConsumer};
46use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
47
48mod output_mapping;
49pub use output_mapping::DispatchOutputMapping;
50use risingwave_common::id::FragmentId;
51
52/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
53/// data chunks will be dispatched with some specified policy, while control message
54/// such as barriers will be distributed to all receivers.
55pub struct DispatchExecutor {
56    input: Executor,
57    inner: DispatchExecutorInner,
58}
59
60struct DispatcherWithMetrics {
61    dispatcher: DispatcherImpl,
62    pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
63}
64
65impl Debug for DispatcherWithMetrics {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        self.dispatcher.fmt(f)
68    }
69}
70
71impl Deref for DispatcherWithMetrics {
72    type Target = DispatcherImpl;
73
74    fn deref(&self) -> &Self::Target {
75        &self.dispatcher
76    }
77}
78
79impl DerefMut for DispatcherWithMetrics {
80    fn deref_mut(&mut self) -> &mut Self::Target {
81        &mut self.dispatcher
82    }
83}
84
85struct DispatchExecutorMetrics {
86    actor_id_str: String,
87    fragment_id_str: String,
88    metrics: Arc<StreamingMetrics>,
89    actor_out_record_cnt: LabelGuardedIntCounter,
90}
91
92impl DispatchExecutorMetrics {
93    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
94        DispatcherWithMetrics {
95            actor_output_buffer_blocking_duration_ns: self
96                .metrics
97                .actor_output_buffer_blocking_duration_ns
98                .with_guarded_label_values(&[
99                    self.actor_id_str.as_str(),
100                    self.fragment_id_str.as_str(),
101                    dispatcher.dispatcher_id_str(),
102                ]),
103            dispatcher,
104        }
105    }
106}
107
108struct DispatchExecutorInner {
109    dispatchers: Vec<DispatcherWithMetrics>,
110    actor_id: ActorId,
111    local_barrier_manager: LocalBarrierManager,
112    metrics: DispatchExecutorMetrics,
113    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
114    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
115}
116
117impl DispatchExecutorInner {
118    async fn collect_outputs(
119        &mut self,
120        downstream_actors: &[ActorId],
121    ) -> StreamResult<Vec<Output>> {
122        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
123            let tx = match request {
124                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
125            };
126            Output::new(downstream_actor, tx)
127        }
128        let mut outputs = Vec::with_capacity(downstream_actors.len());
129        for &downstream_actor in downstream_actors {
130            let output =
131                if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
132                    resolve_output(downstream_actor, request)
133                } else {
134                    loop {
135                        let (requested_actor, request) = self
136                            .new_output_request_rx
137                            .recv()
138                            .await
139                            .ok_or_else(|| anyhow!("end of new output request"))?;
140                        if requested_actor == downstream_actor {
141                            break resolve_output(requested_actor, request);
142                        } else {
143                            assert!(
144                                self.pending_new_output_requests
145                                    .insert(requested_actor, request)
146                                    .is_none(),
147                                "duplicated inflight new output requests from actor {}",
148                                requested_actor
149                            );
150                        }
151                    }
152                };
153            outputs.push(output);
154        }
155        Ok(outputs)
156    }
157
158    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
159        macro_rules! await_with_metrics {
160            ($fut:expr, $metrics:expr) => {{
161                let mut start_time = Instant::now();
162                let interval_duration = Duration::from_secs(15);
163                let mut interval =
164                    tokio::time::interval_at(start_time + interval_duration, interval_duration);
165
166                let mut fut = std::pin::pin!($fut);
167
168                loop {
169                    tokio::select! {
170                        biased;
171                        res = &mut fut => {
172                            res?;
173                            let ns = start_time.elapsed().as_nanos() as u64;
174                            $metrics.inc_by(ns);
175                            break;
176                        }
177                        _ = interval.tick() => {
178                            start_time = Instant::now();
179                            $metrics.inc_by(interval_duration.as_nanos() as u64);
180                        }
181                    };
182                }
183                StreamResult::Ok(())
184            }};
185        }
186
187        // TODO(config): use config from actor context, instead of the global one
188        let limit = self
189            .local_barrier_manager
190            .env
191            .global_config()
192            .developer
193            .exchange_concurrent_dispatchers;
194        // Only barrier can be batched for now.
195        match msg {
196            MessageBatch::BarrierBatch(barrier_batch) => {
197                if barrier_batch.is_empty() {
198                    return Ok(());
199                }
200                // Only the first barrier in a batch can be mutation.
201                let mutation = barrier_batch[0].mutation.clone();
202                self.pre_mutate_dispatchers(&mutation).await?;
203                futures::stream::iter(self.dispatchers.iter_mut())
204                    .map(Ok)
205                    .try_for_each_concurrent(limit, |dispatcher| async {
206                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
207                        let dispatcher_output = &mut dispatcher.dispatcher;
208                        let fut = dispatcher_output.dispatch_barriers(
209                            barrier_batch
210                                .iter()
211                                .cloned()
212                                .map(|b| b.into_dispatcher())
213                                .collect(),
214                        );
215                        await_with_metrics!(std::pin::pin!(fut), metrics)
216                    })
217                    .await?;
218                self.post_mutate_dispatchers(&mutation)?;
219            }
220            MessageBatch::Watermark(watermark) => {
221                futures::stream::iter(self.dispatchers.iter_mut())
222                    .map(Ok)
223                    .try_for_each_concurrent(limit, |dispatcher| async {
224                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
225                        let dispatcher_output = &mut dispatcher.dispatcher;
226                        let fut = dispatcher_output.dispatch_watermark(watermark.clone());
227                        await_with_metrics!(std::pin::pin!(fut), metrics)
228                    })
229                    .await?;
230            }
231            MessageBatch::Chunk(chunk) => {
232                futures::stream::iter(self.dispatchers.iter_mut())
233                    .map(Ok)
234                    .try_for_each_concurrent(limit, |dispatcher| async {
235                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
236                        let dispatcher_output = &mut dispatcher.dispatcher;
237                        let fut = dispatcher_output.dispatch_data(chunk.clone());
238                        await_with_metrics!(std::pin::pin!(fut), metrics)
239                    })
240                    .await?;
241
242                self.metrics
243                    .actor_out_record_cnt
244                    .inc_by(chunk.cardinality() as _);
245            }
246        }
247        Ok(())
248    }
249
250    /// Add new dispatchers to the executor. Will check whether their ids are unique.
251    async fn add_dispatchers<'a>(
252        &mut self,
253        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
254    ) -> StreamResult<()> {
255        for dispatcher in new_dispatchers {
256            let outputs = self
257                .collect_outputs(&dispatcher.downstream_actor_id)
258                .await?;
259            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
260            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
261            self.dispatchers.push(dispatcher);
262        }
263
264        assert!(
265            self.dispatchers
266                .iter()
267                .map(|d| d.dispatcher_id())
268                .all_unique(),
269            "dispatcher ids must be unique: {:?}",
270            self.dispatchers
271        );
272
273        Ok(())
274    }
275
276    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
277        self.dispatchers
278            .iter_mut()
279            .find(|d| d.dispatcher_id() == dispatcher_id)
280            .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
281    }
282
283    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
284    /// outputs.
285    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
286        let outputs = self
287            .collect_outputs(&update.added_downstream_actor_id)
288            .await?;
289
290        let dispatcher = self.find_dispatcher(update.dispatcher_id);
291        dispatcher.add_outputs(outputs);
292
293        Ok(())
294    }
295
296    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
297    /// update the hash mapping.
298    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
299        let ids = update.removed_downstream_actor_id.iter().copied().collect();
300
301        let dispatcher = self.find_dispatcher(update.dispatcher_id);
302        dispatcher.remove_outputs(&ids);
303
304        // The hash mapping is only used by the hash dispatcher.
305        //
306        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
307        // it's possible that there're multiple upstreams with different exchange types, for
308        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
309        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
310        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
311            dispatcher.hash_mapping =
312                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
313        }
314
315        Ok(())
316    }
317
318    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
319    async fn pre_mutate_dispatchers(
320        &mut self,
321        mutation: &Option<Arc<Mutation>>,
322    ) -> StreamResult<()> {
323        let Some(mutation) = mutation.as_deref() else {
324            return Ok(());
325        };
326
327        match mutation {
328            Mutation::Add(AddMutation { adds, .. }) => {
329                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
330                    self.add_dispatchers(new_dispatchers).await?;
331                }
332            }
333            Mutation::Update(UpdateMutation {
334                dispatchers,
335                actor_new_dispatchers: actor_dispatchers,
336                ..
337            }) => {
338                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
339                    self.add_dispatchers(new_dispatchers).await?;
340                }
341
342                if let Some(updates) = dispatchers.get(&self.actor_id) {
343                    for update in updates {
344                        self.pre_update_dispatcher(update).await?;
345                    }
346                }
347            }
348            _ => {}
349        }
350
351        Ok(())
352    }
353
354    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
355    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
356        let Some(mutation) = mutation.as_deref() else {
357            return Ok(());
358        };
359
360        match mutation {
361            Mutation::Stop(StopMutation { dropped_actors, .. }) => {
362                // Remove outputs only if this actor itself is not to be stopped.
363                if !dropped_actors.contains(&self.actor_id) {
364                    for dispatcher in &mut self.dispatchers {
365                        dispatcher.remove_outputs(dropped_actors);
366                    }
367                }
368            }
369            Mutation::Update(UpdateMutation {
370                dispatchers,
371                dropped_actors,
372                ..
373            }) => {
374                if let Some(updates) = dispatchers.get(&self.actor_id) {
375                    for update in updates {
376                        self.post_update_dispatcher(update)?;
377                    }
378                }
379
380                if !dropped_actors.contains(&self.actor_id) {
381                    for dispatcher in &mut self.dispatchers {
382                        dispatcher.remove_outputs(dropped_actors);
383                    }
384                }
385            }
386            _ => {}
387        };
388
389        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
390        // should clean up them.
391        self.dispatchers.retain(|d| !d.is_empty());
392
393        Ok(())
394    }
395}
396
397impl DispatchExecutor {
398    pub(crate) async fn new(
399        input: Executor,
400        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
401        dispatchers: Vec<stream_plan::Dispatcher>,
402        actor_id: ActorId,
403        fragment_id: FragmentId,
404        local_barrier_manager: LocalBarrierManager,
405        metrics: Arc<StreamingMetrics>,
406    ) -> StreamResult<Self> {
407        let mut executor = Self::new_inner(
408            input,
409            new_output_request_rx,
410            vec![],
411            actor_id,
412            fragment_id,
413            local_barrier_manager,
414            metrics,
415        );
416        let inner = &mut executor.inner;
417        for dispatcher in dispatchers {
418            let outputs = inner
419                .collect_outputs(&dispatcher.downstream_actor_id)
420                .await?;
421            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
422            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
423            inner.dispatchers.push(dispatcher);
424        }
425        Ok(executor)
426    }
427
428    #[cfg(test)]
429    pub(crate) fn for_test(
430        input: Executor,
431        dispatchers: Vec<DispatcherImpl>,
432        actor_id: ActorId,
433        fragment_id: FragmentId,
434        local_barrier_manager: LocalBarrierManager,
435        metrics: Arc<StreamingMetrics>,
436    ) -> (
437        Self,
438        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
439    ) {
440        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
441
442        (
443            Self::new_inner(
444                input,
445                rx,
446                dispatchers,
447                actor_id,
448                fragment_id,
449                local_barrier_manager,
450                metrics,
451            ),
452            tx,
453        )
454    }
455
456    fn new_inner(
457        mut input: Executor,
458        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
459        dispatchers: Vec<DispatcherImpl>,
460        actor_id: ActorId,
461        fragment_id: FragmentId,
462        local_barrier_manager: LocalBarrierManager,
463        metrics: Arc<StreamingMetrics>,
464    ) -> Self {
465        let chunk_size = local_barrier_manager
466            .env
467            .global_config()
468            .developer
469            .chunk_size;
470        if crate::consistency::insane() {
471            // make some trouble before dispatching to avoid generating invalid dist key.
472            let mut info = input.info().clone();
473            info.identity = format!("{} (embedded trouble)", info.identity);
474            let troublemaker = TroublemakerExecutor::new(input, chunk_size);
475            input = (info, troublemaker).into();
476        }
477
478        let actor_id_str = actor_id.to_string();
479        let fragment_id_str = fragment_id.to_string();
480        let actor_out_record_cnt = metrics
481            .actor_out_record_cnt
482            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
483        let metrics = DispatchExecutorMetrics {
484            actor_id_str,
485            fragment_id_str,
486            metrics,
487            actor_out_record_cnt,
488        };
489        let dispatchers = dispatchers
490            .into_iter()
491            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
492            .collect();
493        Self {
494            input,
495            inner: DispatchExecutorInner {
496                dispatchers,
497                actor_id,
498                local_barrier_manager,
499                metrics,
500                new_output_request_rx,
501                pending_new_output_requests: Default::default(),
502            },
503        }
504    }
505}
506
507impl StreamConsumer for DispatchExecutor {
508    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
509
510    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
511        // TODO(config): use config from actor context, instead of the global one
512        let max_barrier_count_per_batch = self
513            .inner
514            .local_barrier_manager
515            .env
516            .global_config()
517            .developer
518            .max_barrier_batch_size;
519        #[try_stream]
520        async move {
521            let mut input = self.input.execute().peekable();
522            loop {
523                let Some(message) =
524                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
525                else {
526                    // end_of_stream
527                    break;
528                };
529                match message {
530                    chunk @ MessageBatch::Chunk(_) => {
531                        self.inner
532                            .dispatch(chunk)
533                            .instrument(tracing::info_span!("dispatch_chunk"))
534                            .instrument_await("dispatch_chunk")
535                            .await?;
536                    }
537                    MessageBatch::BarrierBatch(barrier_batch) => {
538                        assert!(!barrier_batch.is_empty());
539                        self.inner
540                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
541                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
542                            .instrument_await("dispatch_barrier_batch")
543                            .await?;
544                        self.inner
545                            .metrics
546                            .metrics
547                            .barrier_batch_size
548                            .observe(barrier_batch.len() as f64);
549                        for barrier in barrier_batch {
550                            yield barrier;
551                        }
552                    }
553                    watermark @ MessageBatch::Watermark(_) => {
554                        self.inner
555                            .dispatch(watermark)
556                            .instrument(tracing::info_span!("dispatch_watermark"))
557                            .instrument_await("dispatch_watermark")
558                            .await?;
559                    }
560                }
561            }
562        }
563    }
564}
565
566/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
567///
568/// Returns the message batch.
569///
570/// Returns None if end of stream.
571async fn try_batch_barriers(
572    max_barrier_count_per_batch: u32,
573    input: &mut Peekable<BoxedMessageStream>,
574) -> StreamResult<Option<MessageBatch>> {
575    let Some(msg) = input.next().await else {
576        // end_of_stream
577        return Ok(None);
578    };
579    let mut barrier_batch = vec![];
580    let msg: Message = msg?;
581    let max_peek_attempts = match msg {
582        Message::Chunk(c) => {
583            return Ok(Some(MessageBatch::Chunk(c)));
584        }
585        Message::Watermark(w) => {
586            return Ok(Some(MessageBatch::Watermark(w)));
587        }
588        Message::Barrier(b) => {
589            let peek_more_barrier = b.mutation.is_none();
590            barrier_batch.push(b);
591            if peek_more_barrier {
592                max_barrier_count_per_batch.saturating_sub(1)
593            } else {
594                0
595            }
596        }
597    };
598    // Try to peek more consecutive non-mutation barriers.
599    for _ in 0..max_peek_attempts {
600        let peek = input.peek().now_or_never();
601        let Some(peek) = peek else {
602            break;
603        };
604        let Some(msg) = peek else {
605            // end_of_stream
606            break;
607        };
608        let Ok(Message::Barrier(barrier)) = msg else {
609            break;
610        };
611        if barrier.mutation.is_some() {
612            break;
613        }
614        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
615        let Message::Barrier(ref barrier) = msg else {
616            unreachable!("must be a barrier");
617        };
618        barrier_batch.push(barrier.clone());
619    }
620    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
621}
622
623#[derive(Debug)]
624pub enum DispatcherImpl {
625    Hash(HashDataDispatcher),
626    Broadcast(BroadcastDispatcher),
627    Simple(SimpleDispatcher),
628    RoundRobin(RoundRobinDataDispatcher),
629}
630
631impl DispatcherImpl {
632    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
633        let output_mapping =
634            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
635
636        use risingwave_pb::stream_plan::DispatcherType::*;
637        let dispatcher_impl = match dispatcher.get_type()? {
638            Hash => {
639                assert!(!outputs.is_empty());
640                let dist_key_indices = dispatcher
641                    .dist_key_indices
642                    .iter()
643                    .map(|i| *i as usize)
644                    .collect();
645
646                let hash_mapping =
647                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
648
649                DispatcherImpl::Hash(HashDataDispatcher::new(
650                    outputs,
651                    dist_key_indices,
652                    output_mapping,
653                    hash_mapping,
654                    dispatcher.dispatcher_id,
655                ))
656            }
657            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
658                outputs,
659                output_mapping,
660                dispatcher.dispatcher_id,
661            )),
662            Simple | NoShuffle => {
663                let [output]: [_; 1] = outputs.try_into().unwrap();
664                DispatcherImpl::Simple(SimpleDispatcher::new(
665                    output,
666                    output_mapping,
667                    dispatcher.dispatcher_id,
668                ))
669            }
670            Unspecified => unreachable!(),
671        };
672
673        Ok(dispatcher_impl)
674    }
675}
676
677macro_rules! impl_dispatcher {
678    ($( { $variant_name:ident } ),*) => {
679        impl DispatcherImpl {
680            pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
681                match self {
682                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
683                }
684            }
685
686            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
687                match self {
688                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
689                }
690            }
691
692            pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
693                match self {
694                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
695                }
696            }
697
698            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
699                match self {
700                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
701                }
702            }
703
704            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
705                match self {
706                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
707                }
708            }
709
710            pub fn dispatcher_id(&self) -> DispatcherId {
711                match self {
712                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
713                }
714            }
715
716            pub fn dispatcher_id_str(&self) -> &str {
717                match self {
718                    $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
719                }
720            }
721
722            pub fn is_empty(&self) -> bool {
723                match self {
724                    $(Self::$variant_name(inner) => inner.is_empty(), )*
725                }
726            }
727        }
728    }
729}
730
731macro_rules! for_all_dispatcher_variants {
732    ($macro:ident) => {
733        $macro! {
734            { Hash },
735            { Broadcast },
736            { Simple },
737            { RoundRobin }
738        }
739    };
740}
741
742for_all_dispatcher_variants! { impl_dispatcher }
743
744pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
745
746pub trait Dispatcher: Debug + 'static {
747    /// Dispatch a data chunk to downstream actors.
748    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
749    /// Dispatch barriers to downstream actors, generally by broadcasting it.
750    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
751    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
752    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
753
754    /// Add new outputs to the dispatcher.
755    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
756    /// Remove outputs to `actor_ids` from the dispatcher.
757    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
758
759    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
760    /// different IDs.
761    ///
762    /// Note that the dispatcher id is always equal to the downstream fragment id.
763    /// See also `proto/stream_plan.proto`.
764    fn dispatcher_id(&self) -> DispatcherId;
765
766    /// Dispatcher id in string. See [`Dispatcher::dispatcher_id`].
767    fn dispatcher_id_str(&self) -> &str;
768
769    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
770    /// [`DispatchExecutor`].
771    fn is_empty(&self) -> bool;
772}
773
774/// Concurrently broadcast a message to all outputs.
775///
776/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
777/// always unlimited.
778async fn broadcast_concurrent(
779    outputs: impl IntoIterator<Item = &'_ mut Output>,
780    message: DispatcherMessageBatch,
781) -> StreamResult<()> {
782    futures::future::try_join_all(
783        outputs
784            .into_iter()
785            .map(|output| output.send(message.clone())),
786    )
787    .await?;
788    Ok(())
789}
790
791#[derive(Debug)]
792pub struct RoundRobinDataDispatcher {
793    outputs: Vec<Output>,
794    output_mapping: DispatchOutputMapping,
795    cur: usize,
796    dispatcher_id: DispatcherId,
797    dispatcher_id_str: String,
798}
799
800impl RoundRobinDataDispatcher {
801    pub fn new(
802        outputs: Vec<Output>,
803        output_mapping: DispatchOutputMapping,
804        dispatcher_id: DispatcherId,
805    ) -> Self {
806        Self {
807            outputs,
808            output_mapping,
809            cur: 0,
810            dispatcher_id,
811            dispatcher_id_str: dispatcher_id.to_string(),
812        }
813    }
814}
815
816impl Dispatcher for RoundRobinDataDispatcher {
817    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
818        let chunk = self.output_mapping.apply(chunk);
819
820        self.outputs[self.cur]
821            .send(DispatcherMessageBatch::Chunk(chunk))
822            .await?;
823        self.cur += 1;
824        self.cur %= self.outputs.len();
825        Ok(())
826    }
827
828    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
829        // always broadcast barrier
830        broadcast_concurrent(
831            &mut self.outputs,
832            DispatcherMessageBatch::BarrierBatch(barriers),
833        )
834        .await
835    }
836
837    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
838        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
839            // always broadcast watermark
840            broadcast_concurrent(
841                &mut self.outputs,
842                DispatcherMessageBatch::Watermark(watermark),
843            )
844            .await?;
845        }
846        Ok(())
847    }
848
849    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
850        self.outputs.extend(outputs);
851    }
852
853    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
854        self.outputs
855            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
856            .count();
857        self.cur = self.cur.min(self.outputs.len() - 1);
858    }
859
860    fn dispatcher_id(&self) -> DispatcherId {
861        self.dispatcher_id
862    }
863
864    fn dispatcher_id_str(&self) -> &str {
865        &self.dispatcher_id_str
866    }
867
868    fn is_empty(&self) -> bool {
869        self.outputs.is_empty()
870    }
871}
872
873pub struct HashDataDispatcher {
874    outputs: Vec<Output>,
875    keys: Vec<usize>,
876    output_mapping: DispatchOutputMapping,
877    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
878    /// different downstream actors.
879    hash_mapping: ExpandedActorMapping,
880    dispatcher_id: DispatcherId,
881    dispatcher_id_str: String,
882}
883
884impl Debug for HashDataDispatcher {
885    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
886        f.debug_struct("HashDataDispatcher")
887            .field("outputs", &self.outputs)
888            .field("keys", &self.keys)
889            .field("dispatcher_id", &self.dispatcher_id)
890            .finish_non_exhaustive()
891    }
892}
893
894impl HashDataDispatcher {
895    pub fn new(
896        outputs: Vec<Output>,
897        keys: Vec<usize>,
898        output_mapping: DispatchOutputMapping,
899        hash_mapping: ExpandedActorMapping,
900        dispatcher_id: DispatcherId,
901    ) -> Self {
902        Self {
903            outputs,
904            keys,
905            output_mapping,
906            hash_mapping,
907            dispatcher_id,
908            dispatcher_id_str: dispatcher_id.to_string(),
909        }
910    }
911}
912
913impl Dispatcher for HashDataDispatcher {
914    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
915        self.outputs.extend(outputs);
916    }
917
918    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
919        // always broadcast barrier
920        broadcast_concurrent(
921            &mut self.outputs,
922            DispatcherMessageBatch::BarrierBatch(barriers),
923        )
924        .await
925    }
926
927    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
928        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
929            // always broadcast watermark
930            broadcast_concurrent(
931                &mut self.outputs,
932                DispatcherMessageBatch::Watermark(watermark),
933            )
934            .await?;
935        }
936        Ok(())
937    }
938
939    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
940        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
941        // In these output chunks, the only difference are visibility map, which is calculated
942        // by the hash value of each line in the input chunk.
943        let num_outputs = self.outputs.len();
944
945        // get hash value of every line by its key
946        let vnode_count = self.hash_mapping.len();
947        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
948
949        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
950
951        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
952            .take(num_outputs)
953            .collect_vec();
954        let mut last_update_delete_row_idx = None;
955        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
956
957        for (row_idx, ((vnode, &op), visible)) in vnodes
958            .iter()
959            .copied()
960            .zip_eq_fast(chunk.ops())
961            .zip_eq_fast(chunk.visibility().iter())
962            .enumerate()
963        {
964            // Build visibility map for every output chunk.
965            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
966                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
967            }
968
969            if !visible {
970                new_ops.push(op);
971                continue;
972            }
973
974            // The `Update` message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
975            // need to be rewritten to common `Delete` and `Insert` if the distribution key
976            // columns are changed, since the distribution key will eventually be part of the
977            // stream key of the downstream executor, and there's an invariant that stream key
978            // must be the same for rows within an `Update` pair.
979            if op == Op::UpdateDelete {
980                last_update_delete_row_idx = Some(row_idx);
981            } else if op == Op::UpdateInsert {
982                let delete_row_idx = last_update_delete_row_idx
983                    .take()
984                    .expect("missing U- before U+");
985                assert!(delete_row_idx + 1 == row_idx, "U- and U+ are not adjacent");
986
987                // Check if any distribution key column value changed
988                let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
989                    != chunk.row_at(row_idx).1.project(&self.keys);
990
991                if dist_key_changed {
992                    new_ops.push(Op::Delete);
993                    new_ops.push(Op::Insert);
994                } else {
995                    new_ops.push(Op::UpdateDelete);
996                    new_ops.push(Op::UpdateInsert);
997                }
998            } else {
999                new_ops.push(op);
1000            }
1001        }
1002        assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1003
1004        let ops = new_ops;
1005        // Apply output mapping after calculating the vnode and new visibility maps.
1006        let chunk = self.output_mapping.apply(chunk);
1007
1008        // individually output StreamChunk integrated with vis_map
1009        futures::future::try_join_all(
1010            vis_maps
1011                .into_iter()
1012                .zip_eq_fast(self.outputs.iter_mut())
1013                .map(|(vis_map, output)| async {
1014                    let vis_map = vis_map.finish();
1015                    // columns is not changed in this function
1016                    let new_stream_chunk =
1017                        StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1018                    if new_stream_chunk.cardinality() > 0 {
1019                        event!(
1020                            tracing::Level::TRACE,
1021                            msg = "chunk",
1022                            downstream = %output.actor_id(),
1023                            "send = \n{:#?}",
1024                            new_stream_chunk
1025                        );
1026                        output
1027                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1028                            .await?;
1029                    }
1030                    StreamResult::Ok(())
1031                }),
1032        )
1033        .await?;
1034
1035        Ok(())
1036    }
1037
1038    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1039        self.outputs
1040            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1041            .count();
1042    }
1043
1044    fn dispatcher_id(&self) -> DispatcherId {
1045        self.dispatcher_id
1046    }
1047
1048    fn dispatcher_id_str(&self) -> &str {
1049        &self.dispatcher_id_str
1050    }
1051
1052    fn is_empty(&self) -> bool {
1053        self.outputs.is_empty()
1054    }
1055}
1056
1057/// `BroadcastDispatcher` dispatches message to all outputs.
1058#[derive(Debug)]
1059pub struct BroadcastDispatcher {
1060    outputs: HashMap<ActorId, Output>,
1061    output_mapping: DispatchOutputMapping,
1062    dispatcher_id: DispatcherId,
1063    dispatcher_id_str: String,
1064}
1065
1066impl BroadcastDispatcher {
1067    pub fn new(
1068        outputs: impl IntoIterator<Item = Output>,
1069        output_mapping: DispatchOutputMapping,
1070        dispatcher_id: DispatcherId,
1071    ) -> Self {
1072        Self {
1073            outputs: Self::into_pairs(outputs).collect(),
1074            output_mapping,
1075            dispatcher_id,
1076            dispatcher_id_str: dispatcher_id.to_string(),
1077        }
1078    }
1079
1080    fn into_pairs(
1081        outputs: impl IntoIterator<Item = Output>,
1082    ) -> impl Iterator<Item = (ActorId, Output)> {
1083        outputs
1084            .into_iter()
1085            .map(|output| (output.actor_id(), output))
1086    }
1087}
1088
1089impl Dispatcher for BroadcastDispatcher {
1090    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1091        let chunk = self.output_mapping.apply(chunk);
1092        broadcast_concurrent(
1093            self.outputs.values_mut(),
1094            DispatcherMessageBatch::Chunk(chunk),
1095        )
1096        .await
1097    }
1098
1099    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1100        // always broadcast barrier
1101        broadcast_concurrent(
1102            self.outputs.values_mut(),
1103            DispatcherMessageBatch::BarrierBatch(barriers),
1104        )
1105        .await
1106    }
1107
1108    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1109        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1110            // always broadcast watermark
1111            broadcast_concurrent(
1112                self.outputs.values_mut(),
1113                DispatcherMessageBatch::Watermark(watermark),
1114            )
1115            .await?;
1116        }
1117        Ok(())
1118    }
1119
1120    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1121        self.outputs.extend(Self::into_pairs(outputs));
1122    }
1123
1124    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1125        self.outputs
1126            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1127            .count();
1128    }
1129
1130    fn dispatcher_id(&self) -> DispatcherId {
1131        self.dispatcher_id
1132    }
1133
1134    fn dispatcher_id_str(&self) -> &str {
1135        &self.dispatcher_id_str
1136    }
1137
1138    fn is_empty(&self) -> bool {
1139        self.outputs.is_empty()
1140    }
1141}
1142
1143/// `SimpleDispatcher` dispatches message to a single output.
1144#[derive(Debug)]
1145pub struct SimpleDispatcher {
1146    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1147    /// the field needs to be temporarily set to 0 or 2 outputs.
1148    ///
1149    /// - When dropping a materialized view, the output will be removed and this field becomes
1150    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1151    ///   finishing processing the current mutation.
1152    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1153    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1154    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1155    ///   will be removed and this field becomes single again.
1156    ///
1157    /// Therefore, when dispatching data, we assert that there's exactly one output by
1158    /// `Self::output`.
1159    output: SmallVec<[Output; 2]>,
1160    output_mapping: DispatchOutputMapping,
1161    dispatcher_id: DispatcherId,
1162    dispatcher_id_str: String,
1163}
1164
1165impl SimpleDispatcher {
1166    pub fn new(
1167        output: Output,
1168        output_mapping: DispatchOutputMapping,
1169        dispatcher_id: DispatcherId,
1170    ) -> Self {
1171        Self {
1172            output: smallvec![output],
1173            output_mapping,
1174            dispatcher_id,
1175            dispatcher_id_str: dispatcher_id.to_string(),
1176        }
1177    }
1178}
1179
1180impl Dispatcher for SimpleDispatcher {
1181    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1182        self.output.extend(outputs);
1183        assert!(self.output.len() <= 2);
1184    }
1185
1186    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1187        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1188        for output in &mut self.output {
1189            output
1190                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1191                .await?;
1192        }
1193        Ok(())
1194    }
1195
1196    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1197        let output = self
1198            .output
1199            .iter_mut()
1200            .exactly_one()
1201            .expect("expect exactly one output");
1202
1203        let chunk = self.output_mapping.apply(chunk);
1204        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1205    }
1206
1207    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1208        let output = self
1209            .output
1210            .iter_mut()
1211            .exactly_one()
1212            .expect("expect exactly one output");
1213
1214        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1215            output
1216                .send(DispatcherMessageBatch::Watermark(watermark))
1217                .await?;
1218        }
1219        Ok(())
1220    }
1221
1222    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1223        self.output
1224            .retain(|output| !actor_ids.contains(&output.actor_id()));
1225    }
1226
1227    fn dispatcher_id(&self) -> DispatcherId {
1228        self.dispatcher_id
1229    }
1230
1231    fn dispatcher_id_str(&self) -> &str {
1232        &self.dispatcher_id_str
1233    }
1234
1235    fn is_empty(&self) -> bool {
1236        self.output.is_empty()
1237    }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242    use std::hash::{BuildHasher, Hasher};
1243
1244    use futures::pin_mut;
1245    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1246    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1247    use risingwave_common::util::epoch::test_epoch;
1248    use risingwave_common::util::hash_util::Crc32FastBuilder;
1249    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1250    use tokio::sync::mpsc::unbounded_channel;
1251
1252    use super::*;
1253    use crate::executor::exchange::output::Output;
1254    use crate::executor::exchange::permit::channel_for_test;
1255    use crate::executor::receiver::ReceiverExecutor;
1256    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1257    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1258
1259    #[tokio::test]
1260    async fn test_hash_dispatcher_complex() {
1261        // This test only works when vnode count is 256.
1262        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1263
1264        let num_outputs = 2; // actor id ranges from 1 to 2
1265        let key_indices = &[0, 2];
1266        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1267            (0..num_outputs).map(|_| channel_for_test()).collect();
1268        let outputs = output_tx_vecs
1269            .into_iter()
1270            .enumerate()
1271            .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1272            .collect::<Vec<_>>();
1273        let mut hash_mapping = (1..num_outputs + 1)
1274            .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1275            .collect_vec();
1276        hash_mapping.resize(
1277            VirtualNode::COUNT_FOR_TEST,
1278            ActorId::new(num_outputs as u32),
1279        );
1280        let mut hash_dispatcher = HashDataDispatcher::new(
1281            outputs,
1282            key_indices.to_vec(),
1283            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1284            hash_mapping,
1285            0,
1286        );
1287
1288        let chunk = StreamChunk::from_pretty(
1289            "  I I I
1290            +  4 6 8
1291            +  5 7 9
1292            +  0 0 0
1293            -  1 1 1 D
1294            U- 2 0 2
1295            U+ 2 0 2
1296            U- 3 3 2
1297            U+ 3 3 4",
1298        );
1299        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1300
1301        assert_eq!(
1302            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1303            StreamChunk::from_pretty(
1304                "  I I I
1305                +  4 6 8
1306                +  5 7 9
1307                +  0 0 0
1308                -  1 1 1 D
1309                U- 2 0 2
1310                U+ 2 0 2
1311                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1312                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1313            )
1314        );
1315        assert_eq!(
1316            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1317            StreamChunk::from_pretty(
1318                "  I I I
1319                +  4 6 8 D
1320                +  5 7 9 D
1321                +  0 0 0 D
1322                -  1 1 1 D  // Should keep original invisible mark
1323                U- 2 0 2 D  // Should keep UpdateDelete
1324                U+ 2 0 2 D  // Should keep UpdateInsert
1325                -  3 3 2    // Should rewrite UpdateDelete to Delete
1326                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1327            )
1328        );
1329    }
1330
1331    #[tokio::test]
1332    async fn test_configuration_change() {
1333        let _schema = Schema { fields: vec![] };
1334        let (tx, rx) = channel_for_test();
1335        let actor_id = 233.into();
1336        let fragment_id = 666.into();
1337        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1338        let metrics = Arc::new(StreamingMetrics::unused());
1339
1340        let (untouched, old, new) = (234.into(), 235.into(), 238.into()); // broadcast downstream actors
1341        let (old_simple, new_simple) = (114.into(), 514.into()); // simple downstream actors
1342
1343        // actor_id -> untouched, old, new, old_simple, new_simple
1344
1345        let broadcast_dispatcher_id = 666;
1346        let broadcast_dispatcher = PbDispatcher {
1347            r#type: DispatcherType::Broadcast as _,
1348            dispatcher_id: broadcast_dispatcher_id,
1349            downstream_actor_id: vec![untouched, old],
1350            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1351            ..Default::default()
1352        };
1353
1354        let simple_dispatcher_id = 888;
1355        let simple_dispatcher = PbDispatcher {
1356            r#type: DispatcherType::Simple as _,
1357            dispatcher_id: simple_dispatcher_id,
1358            downstream_actor_id: vec![old_simple],
1359            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1360            ..Default::default()
1361        };
1362
1363        let dispatcher_updates = maplit::hashmap! {
1364            actor_id => vec![PbDispatcherUpdate {
1365                actor_id,
1366                dispatcher_id: broadcast_dispatcher_id,
1367                added_downstream_actor_id: vec![new],
1368                removed_downstream_actor_id: vec![old],
1369                hash_mapping: Default::default(),
1370            }]
1371        };
1372        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1373            UpdateMutation {
1374                dispatchers: dispatcher_updates,
1375                ..Default::default()
1376            },
1377        ));
1378        barrier_test_env.inject_barrier(&b1, [actor_id]);
1379        barrier_test_env.flush_all_events().await;
1380
1381        let input = Executor::new(
1382            Default::default(),
1383            ReceiverExecutor::for_test(
1384                actor_id,
1385                rx,
1386                barrier_test_env.local_barrier_manager.clone(),
1387            )
1388            .boxed(),
1389        );
1390
1391        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1392        let mut rxs = [untouched, old, new, old_simple, new_simple]
1393            .into_iter()
1394            .map(|id| {
1395                (id, {
1396                    let (tx, rx) = channel_for_test();
1397                    new_output_request_tx
1398                        .send((id, NewOutputRequest::Local(tx)))
1399                        .unwrap();
1400                    rx
1401                })
1402            })
1403            .collect::<HashMap<_, _>>();
1404        let executor = Box::new(
1405            DispatchExecutor::new(
1406                input,
1407                new_output_request_rx,
1408                vec![broadcast_dispatcher, simple_dispatcher],
1409                actor_id,
1410                fragment_id,
1411                barrier_test_env.local_barrier_manager.clone(),
1412                metrics,
1413            )
1414            .await
1415            .unwrap(),
1416        )
1417        .execute();
1418
1419        pin_mut!(executor);
1420
1421        macro_rules! try_recv {
1422            ($down_id:expr) => {
1423                rxs.get_mut(&$down_id).unwrap().try_recv()
1424            };
1425        }
1426
1427        // 3. Send a chunk.
1428        tx.send(Message::Chunk(StreamChunk::default()).into())
1429            .await
1430            .unwrap();
1431
1432        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1433            .await
1434            .unwrap();
1435        executor.next().await.unwrap().unwrap();
1436
1437        // 5. Check downstream.
1438        try_recv!(untouched).unwrap().as_chunk().unwrap();
1439        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1440
1441        try_recv!(old).unwrap().as_chunk().unwrap();
1442        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1443
1444        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1445
1446        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1447        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1448
1449        // 6. Send another barrier.
1450        let b2 = Barrier::new_test_barrier(test_epoch(2));
1451        barrier_test_env.inject_barrier(&b2, [actor_id]);
1452        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1453            .await
1454            .unwrap();
1455        executor.next().await.unwrap().unwrap();
1456
1457        // 7. Check downstream.
1458        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1459        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1460        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1461
1462        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1463        try_recv!(new_simple).unwrap_err(); // Untouched.
1464
1465        // 8. Send another chunk.
1466        tx.send(Message::Chunk(StreamChunk::default()).into())
1467            .await
1468            .unwrap();
1469
1470        // 9. Send a configuration change barrier for simple dispatcher.
1471        let dispatcher_updates = maplit::hashmap! {
1472            actor_id => vec![PbDispatcherUpdate {
1473                actor_id,
1474                dispatcher_id: simple_dispatcher_id,
1475                added_downstream_actor_id: vec![new_simple],
1476                removed_downstream_actor_id: vec![old_simple],
1477                hash_mapping: Default::default(),
1478            }]
1479        };
1480        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1481            UpdateMutation {
1482                dispatchers: dispatcher_updates,
1483                ..Default::default()
1484            },
1485        ));
1486        barrier_test_env.inject_barrier(&b3, [actor_id]);
1487        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1488            .await
1489            .unwrap();
1490        executor.next().await.unwrap().unwrap();
1491
1492        // 10. Check downstream.
1493        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1494        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1495
1496        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1497
1498        // 11. Send another barrier.
1499        let b4 = Barrier::new_test_barrier(test_epoch(4));
1500        barrier_test_env.inject_barrier(&b4, [actor_id]);
1501        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1502            .await
1503            .unwrap();
1504        executor.next().await.unwrap().unwrap();
1505
1506        // 12. Check downstream.
1507        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1508        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1509    }
1510
1511    #[tokio::test]
1512    async fn test_hash_dispatcher() {
1513        // This test only works when vnode count is 256.
1514        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1515
1516        let num_outputs = 5; // actor id ranges from 1 to 5
1517        let cardinality = 10;
1518        let dimension = 4;
1519        let key_indices = &[0, 2];
1520        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1521            (0..num_outputs).map(|_| channel_for_test()).collect();
1522        let outputs = output_tx_vecs
1523            .into_iter()
1524            .enumerate()
1525            .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1526            .collect::<Vec<_>>();
1527        let mut hash_mapping = (1..num_outputs + 1)
1528            .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1529            .collect_vec();
1530        hash_mapping.resize(
1531            VirtualNode::COUNT_FOR_TEST,
1532            ActorId::new(num_outputs as u32),
1533        );
1534        let mut hash_dispatcher = HashDataDispatcher::new(
1535            outputs,
1536            key_indices.to_vec(),
1537            DispatchOutputMapping::Simple((0..dimension).collect()),
1538            hash_mapping.clone(),
1539            0,
1540        );
1541
1542        let mut ops = Vec::new();
1543        for idx in 0..cardinality {
1544            if idx % 2 == 0 {
1545                ops.push(Op::Insert);
1546            } else {
1547                ops.push(Op::Delete);
1548            }
1549        }
1550
1551        let mut start = 19260817i32..;
1552        let mut builders = (0..dimension)
1553            .map(|_| I32ArrayBuilder::new(cardinality))
1554            .collect_vec();
1555        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1556        let mut output_ops = vec![vec![]; num_outputs];
1557        for op in &ops {
1558            let hash_builder = Crc32FastBuilder;
1559            let mut hasher = hash_builder.build_hasher();
1560            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1561            for key_idx in key_indices {
1562                let val = one_row[*key_idx];
1563                let bytes = val.to_le_bytes();
1564                hasher.update(&bytes);
1565            }
1566            let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1567                .as_raw_id() as usize
1568                - 1;
1569            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1570                builder.append(Some(*val));
1571            }
1572            output_cols[output_idx]
1573                .iter_mut()
1574                .zip_eq_fast(one_row.iter())
1575                .for_each(|(each_column, val)| each_column.push(*val));
1576            output_ops[output_idx].push(op);
1577        }
1578
1579        let columns = builders
1580            .into_iter()
1581            .map(|builder| {
1582                let array = builder.finish();
1583                array.into_ref()
1584            })
1585            .collect();
1586
1587        let chunk = StreamChunk::new(ops, columns);
1588        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1589
1590        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1591            let mut output = vec![];
1592            while let Some(Some(msg)) = rx.recv().now_or_never() {
1593                output.push(msg);
1594            }
1595            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1596            assert!(output.len() <= 1);
1597            if output.is_empty() {
1598                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1599            } else {
1600                let message = output.first().unwrap();
1601                let real_chunk = match message {
1602                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1603                    _ => panic!(),
1604                };
1605                real_chunk
1606                    .columns()
1607                    .iter()
1608                    .zip_eq_fast(output_cols[output_idx].iter())
1609                    .for_each(|(real_col, expect_col)| {
1610                        let real_vals = real_chunk
1611                            .visibility()
1612                            .iter_ones()
1613                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1614                            .collect::<Vec<_>>();
1615                        assert_eq!(real_vals.len(), expect_col.len());
1616                        assert_eq!(real_vals, *expect_col);
1617                    });
1618            }
1619        }
1620    }
1621}