risingwave_stream/executor/
dispatch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
28use risingwave_common::metrics::LabelGuardedIntCounter;
29use risingwave_common::row::RowExt;
30use risingwave_common::util::iter_util::ZipEqFast;
31use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
32use risingwave_pb::stream_plan::{self, PbDispatcher};
33use smallvec::{SmallVec, smallvec};
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::time::Instant;
36use tokio_stream::StreamExt;
37use tokio_stream::adapters::Peekable;
38use tracing::{Instrument, event};
39
40use super::exchange::output::Output;
41use super::{
42    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
43    UpdateMutation,
44};
45use crate::executor::prelude::*;
46use crate::executor::{StopMutation, StreamConsumer};
47use crate::task::{DispatcherId, NewOutputRequest};
48
49mod output_mapping;
50pub use output_mapping::DispatchOutputMapping;
51use risingwave_common::id::FragmentId;
52
53/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
54/// data chunks will be dispatched with some specified policy, while control message
55/// such as barriers will be distributed to all receivers.
56pub struct DispatchExecutor {
57    input: Executor,
58    inner: DispatchExecutorInner,
59}
60
61struct DispatcherWithMetrics {
62    dispatcher: DispatcherImpl,
63    pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
64}
65
66impl Debug for DispatcherWithMetrics {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        self.dispatcher.fmt(f)
69    }
70}
71
72impl Deref for DispatcherWithMetrics {
73    type Target = DispatcherImpl;
74
75    fn deref(&self) -> &Self::Target {
76        &self.dispatcher
77    }
78}
79
80impl DerefMut for DispatcherWithMetrics {
81    fn deref_mut(&mut self) -> &mut Self::Target {
82        &mut self.dispatcher
83    }
84}
85
86struct DispatchExecutorMetrics {
87    actor_id_str: String,
88    fragment_id_str: String,
89    metrics: Arc<StreamingMetrics>,
90    actor_out_record_cnt: LabelGuardedIntCounter,
91}
92
93impl DispatchExecutorMetrics {
94    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
95        DispatcherWithMetrics {
96            actor_output_buffer_blocking_duration_ns: self
97                .metrics
98                .actor_output_buffer_blocking_duration_ns
99                .with_guarded_label_values(&[
100                    self.actor_id_str.as_str(),
101                    self.fragment_id_str.as_str(),
102                    dispatcher.dispatcher_id_str(),
103                ]),
104            dispatcher,
105        }
106    }
107}
108
109struct DispatchExecutorInner {
110    dispatchers: Vec<DispatcherWithMetrics>,
111    actor_id: ActorId,
112    actor_config: Arc<StreamingConfig>,
113    metrics: DispatchExecutorMetrics,
114    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
115    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
116}
117
118impl DispatchExecutorInner {
119    async fn collect_outputs(
120        &mut self,
121        downstream_actors: &[ActorId],
122    ) -> StreamResult<Vec<Output>> {
123        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
124            let tx = match request {
125                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
126            };
127            Output::new(downstream_actor, tx)
128        }
129        let mut outputs = Vec::with_capacity(downstream_actors.len());
130        for &downstream_actor in downstream_actors {
131            let output =
132                if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
133                    resolve_output(downstream_actor, request)
134                } else {
135                    loop {
136                        let (requested_actor, request) = self
137                            .new_output_request_rx
138                            .recv()
139                            .await
140                            .ok_or_else(|| anyhow!("end of new output request"))?;
141                        if requested_actor == downstream_actor {
142                            break resolve_output(requested_actor, request);
143                        } else {
144                            assert!(
145                                self.pending_new_output_requests
146                                    .insert(requested_actor, request)
147                                    .is_none(),
148                                "duplicated inflight new output requests from actor {}",
149                                requested_actor
150                            );
151                        }
152                    }
153                };
154            outputs.push(output);
155        }
156        Ok(outputs)
157    }
158
159    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
160        macro_rules! await_with_metrics {
161            ($fut:expr, $metrics:expr) => {{
162                let mut start_time = Instant::now();
163                let interval_duration = Duration::from_secs(15);
164                let mut interval =
165                    tokio::time::interval_at(start_time + interval_duration, interval_duration);
166
167                let mut fut = std::pin::pin!($fut);
168
169                loop {
170                    tokio::select! {
171                        biased;
172                        res = &mut fut => {
173                            res?;
174                            let ns = start_time.elapsed().as_nanos() as u64;
175                            $metrics.inc_by(ns);
176                            break;
177                        }
178                        _ = interval.tick() => {
179                            start_time = Instant::now();
180                            $metrics.inc_by(interval_duration.as_nanos() as u64);
181                        }
182                    };
183                }
184                StreamResult::Ok(())
185            }};
186        }
187
188        let limit = self.actor_config.developer.exchange_concurrent_dispatchers;
189        // Only barrier can be batched for now.
190        match msg {
191            MessageBatch::BarrierBatch(barrier_batch) => {
192                if barrier_batch.is_empty() {
193                    return Ok(());
194                }
195                // Only the first barrier in a batch can be mutation.
196                let mutation = barrier_batch[0].mutation.clone();
197                self.pre_mutate_dispatchers(&mutation).await?;
198                futures::stream::iter(self.dispatchers.iter_mut())
199                    .map(Ok)
200                    .try_for_each_concurrent(limit, |dispatcher| async {
201                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
202                        let dispatcher_output = &mut dispatcher.dispatcher;
203                        let fut = dispatcher_output.dispatch_barriers(
204                            barrier_batch
205                                .iter()
206                                .cloned()
207                                .map(|b| b.into_dispatcher())
208                                .collect(),
209                        );
210                        await_with_metrics!(std::pin::pin!(fut), metrics)
211                    })
212                    .await?;
213                self.post_mutate_dispatchers(&mutation)?;
214            }
215            MessageBatch::Watermark(watermark) => {
216                futures::stream::iter(self.dispatchers.iter_mut())
217                    .map(Ok)
218                    .try_for_each_concurrent(limit, |dispatcher| async {
219                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
220                        let dispatcher_output = &mut dispatcher.dispatcher;
221                        let fut = dispatcher_output.dispatch_watermark(watermark.clone());
222                        await_with_metrics!(std::pin::pin!(fut), metrics)
223                    })
224                    .await?;
225            }
226            MessageBatch::Chunk(chunk) => {
227                futures::stream::iter(self.dispatchers.iter_mut())
228                    .map(Ok)
229                    .try_for_each_concurrent(limit, |dispatcher| async {
230                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
231                        let dispatcher_output = &mut dispatcher.dispatcher;
232                        let fut = dispatcher_output.dispatch_data(chunk.clone());
233                        await_with_metrics!(std::pin::pin!(fut), metrics)
234                    })
235                    .await?;
236
237                self.metrics
238                    .actor_out_record_cnt
239                    .inc_by(chunk.cardinality() as _);
240            }
241        }
242        Ok(())
243    }
244
245    /// Add new dispatchers to the executor. Will check whether their ids are unique.
246    async fn add_dispatchers<'a>(
247        &mut self,
248        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
249    ) -> StreamResult<()> {
250        for dispatcher in new_dispatchers {
251            let outputs = self
252                .collect_outputs(&dispatcher.downstream_actor_id)
253                .await?;
254            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
255            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
256            self.dispatchers.push(dispatcher);
257        }
258
259        assert!(
260            self.dispatchers
261                .iter()
262                .map(|d| d.dispatcher_id())
263                .all_unique(),
264            "dispatcher ids must be unique: {:?}",
265            self.dispatchers
266        );
267
268        Ok(())
269    }
270
271    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
272        self.dispatchers
273            .iter_mut()
274            .find(|d| d.dispatcher_id() == dispatcher_id)
275            .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
276    }
277
278    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
279    /// outputs.
280    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
281        let outputs = self
282            .collect_outputs(&update.added_downstream_actor_id)
283            .await?;
284
285        let dispatcher = self.find_dispatcher(update.dispatcher_id);
286        dispatcher.add_outputs(outputs);
287
288        Ok(())
289    }
290
291    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
292    /// update the hash mapping.
293    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
294        let ids = update.removed_downstream_actor_id.iter().copied().collect();
295
296        let dispatcher = self.find_dispatcher(update.dispatcher_id);
297        dispatcher.remove_outputs(&ids);
298
299        // The hash mapping is only used by the hash dispatcher.
300        //
301        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
302        // it's possible that there're multiple upstreams with different exchange types, for
303        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
304        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
305        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
306            dispatcher.hash_mapping =
307                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
308        }
309
310        Ok(())
311    }
312
313    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
314    async fn pre_mutate_dispatchers(
315        &mut self,
316        mutation: &Option<Arc<Mutation>>,
317    ) -> StreamResult<()> {
318        let Some(mutation) = mutation.as_deref() else {
319            return Ok(());
320        };
321
322        match mutation {
323            Mutation::Add(AddMutation { adds, .. }) => {
324                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
325                    self.add_dispatchers(new_dispatchers).await?;
326                }
327            }
328            Mutation::Update(UpdateMutation {
329                dispatchers,
330                actor_new_dispatchers: actor_dispatchers,
331                ..
332            }) => {
333                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
334                    self.add_dispatchers(new_dispatchers).await?;
335                }
336
337                if let Some(updates) = dispatchers.get(&self.actor_id) {
338                    for update in updates {
339                        self.pre_update_dispatcher(update).await?;
340                    }
341                }
342            }
343            _ => {}
344        }
345
346        Ok(())
347    }
348
349    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
350    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
351        let Some(mutation) = mutation.as_deref() else {
352            return Ok(());
353        };
354
355        match mutation {
356            Mutation::Stop(StopMutation { dropped_actors, .. }) => {
357                // Remove outputs only if this actor itself is not to be stopped.
358                if !dropped_actors.contains(&self.actor_id) {
359                    for dispatcher in &mut self.dispatchers {
360                        dispatcher.remove_outputs(dropped_actors);
361                    }
362                }
363            }
364            Mutation::Update(UpdateMutation {
365                dispatchers,
366                dropped_actors,
367                ..
368            }) => {
369                if let Some(updates) = dispatchers.get(&self.actor_id) {
370                    for update in updates {
371                        self.post_update_dispatcher(update)?;
372                    }
373                }
374
375                if !dropped_actors.contains(&self.actor_id) {
376                    for dispatcher in &mut self.dispatchers {
377                        dispatcher.remove_outputs(dropped_actors);
378                    }
379                }
380            }
381            _ => {}
382        };
383
384        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
385        // should clean up them.
386        self.dispatchers.retain(|d| !d.is_empty());
387
388        Ok(())
389    }
390}
391
392impl DispatchExecutor {
393    pub(crate) async fn new(
394        input: Executor,
395        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
396        dispatchers: Vec<stream_plan::Dispatcher>,
397        actor_context: &ActorContextRef,
398    ) -> StreamResult<Self> {
399        let mut executor = Self::new_inner(
400            input,
401            new_output_request_rx,
402            vec![],
403            actor_context.id,
404            actor_context.fragment_id,
405            actor_context.config.clone(),
406            actor_context.streaming_metrics.clone(),
407        );
408        let inner = &mut executor.inner;
409        for dispatcher in dispatchers {
410            let outputs = inner
411                .collect_outputs(&dispatcher.downstream_actor_id)
412                .await?;
413            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
414            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
415            inner.dispatchers.push(dispatcher);
416        }
417        Ok(executor)
418    }
419
420    #[cfg(test)]
421    pub(crate) fn for_test(
422        input: Executor,
423        dispatchers: Vec<DispatcherImpl>,
424        actor_id: ActorId,
425        fragment_id: FragmentId,
426        actor_config: Arc<StreamingConfig>,
427        metrics: Arc<StreamingMetrics>,
428    ) -> (
429        Self,
430        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
431    ) {
432        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
433
434        (
435            Self::new_inner(
436                input,
437                rx,
438                dispatchers,
439                actor_id,
440                fragment_id,
441                actor_config,
442                metrics,
443            ),
444            tx,
445        )
446    }
447
448    fn new_inner(
449        mut input: Executor,
450        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
451        dispatchers: Vec<DispatcherImpl>,
452        actor_id: ActorId,
453        fragment_id: FragmentId,
454        actor_config: Arc<StreamingConfig>,
455        metrics: Arc<StreamingMetrics>,
456    ) -> Self {
457        if crate::consistency::insane() {
458            // make some trouble before dispatching to avoid generating invalid dist key.
459            let mut info = input.info().clone();
460            info.identity = format!("{} (embedded trouble)", info.identity);
461            let troublemaker = TroublemakerExecutor::new(input, actor_config.developer.chunk_size);
462            input = (info, troublemaker).into();
463        }
464
465        let actor_id_str = actor_id.to_string();
466        let fragment_id_str = fragment_id.to_string();
467        let actor_out_record_cnt = metrics
468            .actor_out_record_cnt
469            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
470        let metrics = DispatchExecutorMetrics {
471            actor_id_str,
472            fragment_id_str,
473            metrics,
474            actor_out_record_cnt,
475        };
476        let dispatchers = dispatchers
477            .into_iter()
478            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
479            .collect();
480
481        Self {
482            input,
483            inner: DispatchExecutorInner {
484                dispatchers,
485                actor_id,
486                actor_config,
487                metrics,
488                new_output_request_rx,
489                pending_new_output_requests: Default::default(),
490            },
491        }
492    }
493}
494
495impl StreamConsumer for DispatchExecutor {
496    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
497
498    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
499        let max_barrier_count_per_batch = self.inner.actor_config.developer.max_barrier_batch_size;
500        #[try_stream]
501        async move {
502            let mut input = self.input.execute().peekable();
503            loop {
504                let Some(message) =
505                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
506                else {
507                    // end_of_stream
508                    break;
509                };
510                match message {
511                    chunk @ MessageBatch::Chunk(_) => {
512                        self.inner
513                            .dispatch(chunk)
514                            .instrument(tracing::info_span!("dispatch_chunk"))
515                            .instrument_await("dispatch_chunk")
516                            .await?;
517                    }
518                    MessageBatch::BarrierBatch(barrier_batch) => {
519                        assert!(!barrier_batch.is_empty());
520                        self.inner
521                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
522                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
523                            .instrument_await("dispatch_barrier_batch")
524                            .await?;
525                        self.inner
526                            .metrics
527                            .metrics
528                            .barrier_batch_size
529                            .observe(barrier_batch.len() as f64);
530                        for barrier in barrier_batch {
531                            yield barrier;
532                        }
533                    }
534                    watermark @ MessageBatch::Watermark(_) => {
535                        self.inner
536                            .dispatch(watermark)
537                            .instrument(tracing::info_span!("dispatch_watermark"))
538                            .instrument_await("dispatch_watermark")
539                            .await?;
540                    }
541                }
542            }
543        }
544    }
545}
546
547/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
548///
549/// Returns the message batch.
550///
551/// Returns None if end of stream.
552async fn try_batch_barriers(
553    max_barrier_count_per_batch: u32,
554    input: &mut Peekable<BoxedMessageStream>,
555) -> StreamResult<Option<MessageBatch>> {
556    let Some(msg) = input.next().await else {
557        // end_of_stream
558        return Ok(None);
559    };
560    let mut barrier_batch = vec![];
561    let msg: Message = msg?;
562    let max_peek_attempts = match msg {
563        Message::Chunk(c) => {
564            return Ok(Some(MessageBatch::Chunk(c)));
565        }
566        Message::Watermark(w) => {
567            return Ok(Some(MessageBatch::Watermark(w)));
568        }
569        Message::Barrier(b) => {
570            let peek_more_barrier = b.mutation.is_none();
571            barrier_batch.push(b);
572            if peek_more_barrier {
573                max_barrier_count_per_batch.saturating_sub(1)
574            } else {
575                0
576            }
577        }
578    };
579    // Try to peek more consecutive non-mutation barriers.
580    for _ in 0..max_peek_attempts {
581        let peek = input.peek().now_or_never();
582        let Some(peek) = peek else {
583            break;
584        };
585        let Some(msg) = peek else {
586            // end_of_stream
587            break;
588        };
589        let Ok(Message::Barrier(barrier)) = msg else {
590            break;
591        };
592        if barrier.mutation.is_some() {
593            break;
594        }
595        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
596        let Message::Barrier(ref barrier) = msg else {
597            unreachable!("must be a barrier");
598        };
599        barrier_batch.push(barrier.clone());
600    }
601    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
602}
603
604#[derive(Debug)]
605pub enum DispatcherImpl {
606    Hash(HashDataDispatcher),
607    Broadcast(BroadcastDispatcher),
608    Simple(SimpleDispatcher),
609    RoundRobin(RoundRobinDataDispatcher),
610}
611
612impl DispatcherImpl {
613    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
614        let output_mapping =
615            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
616
617        use risingwave_pb::stream_plan::DispatcherType::*;
618        let dispatcher_impl = match dispatcher.get_type()? {
619            Hash => {
620                assert!(!outputs.is_empty());
621                let dist_key_indices = dispatcher
622                    .dist_key_indices
623                    .iter()
624                    .map(|i| *i as usize)
625                    .collect();
626
627                let hash_mapping =
628                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
629
630                DispatcherImpl::Hash(HashDataDispatcher::new(
631                    outputs,
632                    dist_key_indices,
633                    output_mapping,
634                    hash_mapping,
635                    dispatcher.dispatcher_id,
636                ))
637            }
638            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
639                outputs,
640                output_mapping,
641                dispatcher.dispatcher_id,
642            )),
643            Simple | NoShuffle => {
644                let [output]: [_; 1] = outputs.try_into().unwrap();
645                DispatcherImpl::Simple(SimpleDispatcher::new(
646                    output,
647                    output_mapping,
648                    dispatcher.dispatcher_id,
649                ))
650            }
651            Unspecified => unreachable!(),
652        };
653
654        Ok(dispatcher_impl)
655    }
656}
657
658macro_rules! impl_dispatcher {
659    ($( { $variant_name:ident } ),*) => {
660        impl DispatcherImpl {
661            pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
662                match self {
663                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
664                }
665            }
666
667            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
668                match self {
669                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
670                }
671            }
672
673            pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
674                match self {
675                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
676                }
677            }
678
679            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
680                match self {
681                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
682                }
683            }
684
685            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
686                match self {
687                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
688                }
689            }
690
691            pub fn dispatcher_id(&self) -> DispatcherId {
692                match self {
693                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
694                }
695            }
696
697            pub fn dispatcher_id_str(&self) -> &str {
698                match self {
699                    $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
700                }
701            }
702
703            pub fn is_empty(&self) -> bool {
704                match self {
705                    $(Self::$variant_name(inner) => inner.is_empty(), )*
706                }
707            }
708        }
709    }
710}
711
712macro_rules! for_all_dispatcher_variants {
713    ($macro:ident) => {
714        $macro! {
715            { Hash },
716            { Broadcast },
717            { Simple },
718            { RoundRobin }
719        }
720    };
721}
722
723for_all_dispatcher_variants! { impl_dispatcher }
724
725pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
726
727pub trait Dispatcher: Debug + 'static {
728    /// Dispatch a data chunk to downstream actors.
729    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
730    /// Dispatch barriers to downstream actors, generally by broadcasting it.
731    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
732    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
733    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
734
735    /// Add new outputs to the dispatcher.
736    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
737    /// Remove outputs to `actor_ids` from the dispatcher.
738    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
739
740    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
741    /// different IDs.
742    ///
743    /// Note that the dispatcher id is always equal to the downstream fragment id.
744    /// See also `proto/stream_plan.proto`.
745    fn dispatcher_id(&self) -> DispatcherId;
746
747    /// Dispatcher id in string. See [`Dispatcher::dispatcher_id`].
748    fn dispatcher_id_str(&self) -> &str;
749
750    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
751    /// [`DispatchExecutor`].
752    fn is_empty(&self) -> bool;
753}
754
755/// Concurrently broadcast a message to all outputs.
756///
757/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
758/// always unlimited.
759async fn broadcast_concurrent(
760    outputs: impl IntoIterator<Item = &'_ mut Output>,
761    message: DispatcherMessageBatch,
762) -> StreamResult<()> {
763    futures::future::try_join_all(
764        outputs
765            .into_iter()
766            .map(|output| output.send(message.clone())),
767    )
768    .await?;
769    Ok(())
770}
771
772#[derive(Debug)]
773pub struct RoundRobinDataDispatcher {
774    outputs: Vec<Output>,
775    output_mapping: DispatchOutputMapping,
776    cur: usize,
777    dispatcher_id: DispatcherId,
778    dispatcher_id_str: String,
779}
780
781impl RoundRobinDataDispatcher {
782    pub fn new(
783        outputs: Vec<Output>,
784        output_mapping: DispatchOutputMapping,
785        dispatcher_id: DispatcherId,
786    ) -> Self {
787        Self {
788            outputs,
789            output_mapping,
790            cur: 0,
791            dispatcher_id,
792            dispatcher_id_str: dispatcher_id.to_string(),
793        }
794    }
795}
796
797impl Dispatcher for RoundRobinDataDispatcher {
798    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
799        let chunk = self.output_mapping.apply(chunk);
800
801        self.outputs[self.cur]
802            .send(DispatcherMessageBatch::Chunk(chunk))
803            .await?;
804        self.cur += 1;
805        self.cur %= self.outputs.len();
806        Ok(())
807    }
808
809    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
810        // always broadcast barrier
811        broadcast_concurrent(
812            &mut self.outputs,
813            DispatcherMessageBatch::BarrierBatch(barriers),
814        )
815        .await
816    }
817
818    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
819        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
820            // always broadcast watermark
821            broadcast_concurrent(
822                &mut self.outputs,
823                DispatcherMessageBatch::Watermark(watermark),
824            )
825            .await?;
826        }
827        Ok(())
828    }
829
830    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
831        self.outputs.extend(outputs);
832    }
833
834    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
835        self.outputs
836            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
837            .count();
838        self.cur = self.cur.min(self.outputs.len() - 1);
839    }
840
841    fn dispatcher_id(&self) -> DispatcherId {
842        self.dispatcher_id
843    }
844
845    fn dispatcher_id_str(&self) -> &str {
846        &self.dispatcher_id_str
847    }
848
849    fn is_empty(&self) -> bool {
850        self.outputs.is_empty()
851    }
852}
853
854pub struct HashDataDispatcher {
855    outputs: Vec<Output>,
856    keys: Vec<usize>,
857    output_mapping: DispatchOutputMapping,
858    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
859    /// different downstream actors.
860    hash_mapping: ExpandedActorMapping,
861    dispatcher_id: DispatcherId,
862    dispatcher_id_str: String,
863}
864
865impl Debug for HashDataDispatcher {
866    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
867        f.debug_struct("HashDataDispatcher")
868            .field("outputs", &self.outputs)
869            .field("keys", &self.keys)
870            .field("dispatcher_id", &self.dispatcher_id)
871            .finish_non_exhaustive()
872    }
873}
874
875impl HashDataDispatcher {
876    pub fn new(
877        outputs: Vec<Output>,
878        keys: Vec<usize>,
879        output_mapping: DispatchOutputMapping,
880        hash_mapping: ExpandedActorMapping,
881        dispatcher_id: DispatcherId,
882    ) -> Self {
883        Self {
884            outputs,
885            keys,
886            output_mapping,
887            hash_mapping,
888            dispatcher_id,
889            dispatcher_id_str: dispatcher_id.to_string(),
890        }
891    }
892}
893
894impl Dispatcher for HashDataDispatcher {
895    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
896        self.outputs.extend(outputs);
897    }
898
899    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
900        // always broadcast barrier
901        broadcast_concurrent(
902            &mut self.outputs,
903            DispatcherMessageBatch::BarrierBatch(barriers),
904        )
905        .await
906    }
907
908    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
909        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
910            // always broadcast watermark
911            broadcast_concurrent(
912                &mut self.outputs,
913                DispatcherMessageBatch::Watermark(watermark),
914            )
915            .await?;
916        }
917        Ok(())
918    }
919
920    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
921        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
922        // In these output chunks, the only difference are visibility map, which is calculated
923        // by the hash value of each line in the input chunk.
924        let num_outputs = self.outputs.len();
925
926        // get hash value of every line by its key
927        let vnode_count = self.hash_mapping.len();
928        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
929
930        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
931
932        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
933            .take(num_outputs)
934            .collect_vec();
935        let mut last_update_delete_row_idx = None;
936        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
937
938        for (row_idx, ((vnode, &op), visible)) in vnodes
939            .iter()
940            .copied()
941            .zip_eq_fast(chunk.ops())
942            .zip_eq_fast(chunk.visibility().iter())
943            .enumerate()
944        {
945            // Build visibility map for every output chunk.
946            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
947                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
948            }
949
950            if !visible {
951                new_ops.push(op);
952                continue;
953            }
954
955            // The `Update` message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
956            // need to be rewritten to common `Delete` and `Insert` if the distribution key
957            // columns are changed, since the distribution key will eventually be part of the
958            // stream key of the downstream executor, and there's an invariant that stream key
959            // must be the same for rows within an `Update` pair.
960            if op == Op::UpdateDelete {
961                last_update_delete_row_idx = Some(row_idx);
962            } else if op == Op::UpdateInsert {
963                let delete_row_idx = last_update_delete_row_idx
964                    .take()
965                    .expect("missing U- before U+");
966                assert!(delete_row_idx + 1 == row_idx, "U- and U+ are not adjacent");
967
968                // Check if any distribution key column value changed
969                let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
970                    != chunk.row_at(row_idx).1.project(&self.keys);
971
972                if dist_key_changed {
973                    new_ops.push(Op::Delete);
974                    new_ops.push(Op::Insert);
975                } else {
976                    new_ops.push(Op::UpdateDelete);
977                    new_ops.push(Op::UpdateInsert);
978                }
979            } else {
980                new_ops.push(op);
981            }
982        }
983        assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
984
985        let ops = new_ops;
986        // Apply output mapping after calculating the vnode and new visibility maps.
987        let chunk = self.output_mapping.apply(chunk);
988
989        // individually output StreamChunk integrated with vis_map
990        futures::future::try_join_all(
991            vis_maps
992                .into_iter()
993                .zip_eq_fast(self.outputs.iter_mut())
994                .map(|(vis_map, output)| async {
995                    let vis_map = vis_map.finish();
996                    // columns is not changed in this function
997                    let new_stream_chunk =
998                        StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
999                    if new_stream_chunk.cardinality() > 0 {
1000                        event!(
1001                            tracing::Level::TRACE,
1002                            msg = "chunk",
1003                            downstream = %output.actor_id(),
1004                            "send = \n{:#?}",
1005                            new_stream_chunk
1006                        );
1007                        output
1008                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1009                            .await?;
1010                    }
1011                    StreamResult::Ok(())
1012                }),
1013        )
1014        .await?;
1015
1016        Ok(())
1017    }
1018
1019    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1020        self.outputs
1021            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1022            .count();
1023    }
1024
1025    fn dispatcher_id(&self) -> DispatcherId {
1026        self.dispatcher_id
1027    }
1028
1029    fn dispatcher_id_str(&self) -> &str {
1030        &self.dispatcher_id_str
1031    }
1032
1033    fn is_empty(&self) -> bool {
1034        self.outputs.is_empty()
1035    }
1036}
1037
1038/// `BroadcastDispatcher` dispatches message to all outputs.
1039#[derive(Debug)]
1040pub struct BroadcastDispatcher {
1041    outputs: HashMap<ActorId, Output>,
1042    output_mapping: DispatchOutputMapping,
1043    dispatcher_id: DispatcherId,
1044    dispatcher_id_str: String,
1045}
1046
1047impl BroadcastDispatcher {
1048    pub fn new(
1049        outputs: impl IntoIterator<Item = Output>,
1050        output_mapping: DispatchOutputMapping,
1051        dispatcher_id: DispatcherId,
1052    ) -> Self {
1053        Self {
1054            outputs: Self::into_pairs(outputs).collect(),
1055            output_mapping,
1056            dispatcher_id,
1057            dispatcher_id_str: dispatcher_id.to_string(),
1058        }
1059    }
1060
1061    fn into_pairs(
1062        outputs: impl IntoIterator<Item = Output>,
1063    ) -> impl Iterator<Item = (ActorId, Output)> {
1064        outputs
1065            .into_iter()
1066            .map(|output| (output.actor_id(), output))
1067    }
1068}
1069
1070impl Dispatcher for BroadcastDispatcher {
1071    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1072        let chunk = self.output_mapping.apply(chunk);
1073        broadcast_concurrent(
1074            self.outputs.values_mut(),
1075            DispatcherMessageBatch::Chunk(chunk),
1076        )
1077        .await
1078    }
1079
1080    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1081        // always broadcast barrier
1082        broadcast_concurrent(
1083            self.outputs.values_mut(),
1084            DispatcherMessageBatch::BarrierBatch(barriers),
1085        )
1086        .await
1087    }
1088
1089    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1090        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1091            // always broadcast watermark
1092            broadcast_concurrent(
1093                self.outputs.values_mut(),
1094                DispatcherMessageBatch::Watermark(watermark),
1095            )
1096            .await?;
1097        }
1098        Ok(())
1099    }
1100
1101    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1102        self.outputs.extend(Self::into_pairs(outputs));
1103    }
1104
1105    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1106        self.outputs
1107            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1108            .count();
1109    }
1110
1111    fn dispatcher_id(&self) -> DispatcherId {
1112        self.dispatcher_id
1113    }
1114
1115    fn dispatcher_id_str(&self) -> &str {
1116        &self.dispatcher_id_str
1117    }
1118
1119    fn is_empty(&self) -> bool {
1120        self.outputs.is_empty()
1121    }
1122}
1123
1124/// `SimpleDispatcher` dispatches message to a single output.
1125#[derive(Debug)]
1126pub struct SimpleDispatcher {
1127    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1128    /// the field needs to be temporarily set to 0 or 2 outputs.
1129    ///
1130    /// - When dropping a materialized view, the output will be removed and this field becomes
1131    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1132    ///   finishing processing the current mutation.
1133    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1134    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1135    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1136    ///   will be removed and this field becomes single again.
1137    ///
1138    /// Therefore, when dispatching data, we assert that there's exactly one output by
1139    /// `Self::output`.
1140    output: SmallVec<[Output; 2]>,
1141    output_mapping: DispatchOutputMapping,
1142    dispatcher_id: DispatcherId,
1143    dispatcher_id_str: String,
1144}
1145
1146impl SimpleDispatcher {
1147    pub fn new(
1148        output: Output,
1149        output_mapping: DispatchOutputMapping,
1150        dispatcher_id: DispatcherId,
1151    ) -> Self {
1152        Self {
1153            output: smallvec![output],
1154            output_mapping,
1155            dispatcher_id,
1156            dispatcher_id_str: dispatcher_id.to_string(),
1157        }
1158    }
1159}
1160
1161impl Dispatcher for SimpleDispatcher {
1162    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1163        self.output.extend(outputs);
1164        assert!(self.output.len() <= 2);
1165    }
1166
1167    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1168        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1169        for output in &mut self.output {
1170            output
1171                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1172                .await?;
1173        }
1174        Ok(())
1175    }
1176
1177    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1178        let output = self
1179            .output
1180            .iter_mut()
1181            .exactly_one()
1182            .expect("expect exactly one output");
1183
1184        let chunk = self.output_mapping.apply(chunk);
1185        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1186    }
1187
1188    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1189        let output = self
1190            .output
1191            .iter_mut()
1192            .exactly_one()
1193            .expect("expect exactly one output");
1194
1195        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1196            output
1197                .send(DispatcherMessageBatch::Watermark(watermark))
1198                .await?;
1199        }
1200        Ok(())
1201    }
1202
1203    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1204        self.output
1205            .retain(|output| !actor_ids.contains(&output.actor_id()));
1206    }
1207
1208    fn dispatcher_id(&self) -> DispatcherId {
1209        self.dispatcher_id
1210    }
1211
1212    fn dispatcher_id_str(&self) -> &str {
1213        &self.dispatcher_id_str
1214    }
1215
1216    fn is_empty(&self) -> bool {
1217        self.output.is_empty()
1218    }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223    use std::hash::{BuildHasher, Hasher};
1224
1225    use futures::pin_mut;
1226    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1227    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1228    use risingwave_common::util::epoch::test_epoch;
1229    use risingwave_common::util::hash_util::Crc32FastBuilder;
1230    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1231    use tokio::sync::mpsc::unbounded_channel;
1232
1233    use super::*;
1234    use crate::executor::exchange::output::Output;
1235    use crate::executor::exchange::permit::channel_for_test;
1236    use crate::executor::receiver::ReceiverExecutor;
1237    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1238    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1239
1240    #[tokio::test]
1241    async fn test_hash_dispatcher_complex() {
1242        // This test only works when vnode count is 256.
1243        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1244
1245        let num_outputs = 2; // actor id ranges from 1 to 2
1246        let key_indices = &[0, 2];
1247        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1248            (0..num_outputs).map(|_| channel_for_test()).collect();
1249        let outputs = output_tx_vecs
1250            .into_iter()
1251            .enumerate()
1252            .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1253            .collect::<Vec<_>>();
1254        let mut hash_mapping = (1..num_outputs + 1)
1255            .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1256            .collect_vec();
1257        hash_mapping.resize(
1258            VirtualNode::COUNT_FOR_TEST,
1259            ActorId::new(num_outputs as u32),
1260        );
1261        let mut hash_dispatcher = HashDataDispatcher::new(
1262            outputs,
1263            key_indices.to_vec(),
1264            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1265            hash_mapping,
1266            0,
1267        );
1268
1269        let chunk = StreamChunk::from_pretty(
1270            "  I I I
1271            +  4 6 8
1272            +  5 7 9
1273            +  0 0 0
1274            -  1 1 1 D
1275            U- 2 0 2
1276            U+ 2 0 2
1277            U- 3 3 2
1278            U+ 3 3 4",
1279        );
1280        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1281
1282        assert_eq!(
1283            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1284            StreamChunk::from_pretty(
1285                "  I I I
1286                +  4 6 8
1287                +  5 7 9
1288                +  0 0 0
1289                -  1 1 1 D
1290                U- 2 0 2
1291                U+ 2 0 2
1292                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1293                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1294            )
1295        );
1296        assert_eq!(
1297            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1298            StreamChunk::from_pretty(
1299                "  I I I
1300                +  4 6 8 D
1301                +  5 7 9 D
1302                +  0 0 0 D
1303                -  1 1 1 D  // Should keep original invisible mark
1304                U- 2 0 2 D  // Should keep UpdateDelete
1305                U+ 2 0 2 D  // Should keep UpdateInsert
1306                -  3 3 2    // Should rewrite UpdateDelete to Delete
1307                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1308            )
1309        );
1310    }
1311
1312    #[tokio::test]
1313    async fn test_configuration_change() {
1314        let _schema = Schema { fields: vec![] };
1315        let (tx, rx) = channel_for_test();
1316        let actor_id = 233.into();
1317        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1318
1319        let (untouched, old, new) = (234.into(), 235.into(), 238.into()); // broadcast downstream actors
1320        let (old_simple, new_simple) = (114.into(), 514.into()); // simple downstream actors
1321
1322        // actor_id -> untouched, old, new, old_simple, new_simple
1323
1324        let broadcast_dispatcher_id = 666;
1325        let broadcast_dispatcher = PbDispatcher {
1326            r#type: DispatcherType::Broadcast as _,
1327            dispatcher_id: broadcast_dispatcher_id,
1328            downstream_actor_id: vec![untouched, old],
1329            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1330            ..Default::default()
1331        };
1332
1333        let simple_dispatcher_id = 888;
1334        let simple_dispatcher = PbDispatcher {
1335            r#type: DispatcherType::Simple as _,
1336            dispatcher_id: simple_dispatcher_id,
1337            downstream_actor_id: vec![old_simple],
1338            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1339            ..Default::default()
1340        };
1341
1342        let dispatcher_updates = maplit::hashmap! {
1343            actor_id => vec![PbDispatcherUpdate {
1344                actor_id,
1345                dispatcher_id: broadcast_dispatcher_id,
1346                added_downstream_actor_id: vec![new],
1347                removed_downstream_actor_id: vec![old],
1348                hash_mapping: Default::default(),
1349            }]
1350        };
1351        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1352            UpdateMutation {
1353                dispatchers: dispatcher_updates,
1354                ..Default::default()
1355            },
1356        ));
1357        barrier_test_env.inject_barrier(&b1, [actor_id]);
1358        barrier_test_env.flush_all_events().await;
1359
1360        let input = Executor::new(
1361            Default::default(),
1362            ReceiverExecutor::for_test(
1363                actor_id,
1364                rx,
1365                barrier_test_env.local_barrier_manager.clone(),
1366            )
1367            .boxed(),
1368        );
1369
1370        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1371        let mut rxs = [untouched, old, new, old_simple, new_simple]
1372            .into_iter()
1373            .map(|id| {
1374                (id, {
1375                    let (tx, rx) = channel_for_test();
1376                    new_output_request_tx
1377                        .send((id, NewOutputRequest::Local(tx)))
1378                        .unwrap();
1379                    rx
1380                })
1381            })
1382            .collect::<HashMap<_, _>>();
1383        let executor = Box::new(
1384            DispatchExecutor::new(
1385                input,
1386                new_output_request_rx,
1387                vec![broadcast_dispatcher, simple_dispatcher],
1388                &ActorContext::for_test(actor_id),
1389            )
1390            .await
1391            .unwrap(),
1392        )
1393        .execute();
1394
1395        pin_mut!(executor);
1396
1397        macro_rules! try_recv {
1398            ($down_id:expr) => {
1399                rxs.get_mut(&$down_id).unwrap().try_recv()
1400            };
1401        }
1402
1403        // 3. Send a chunk.
1404        tx.send(Message::Chunk(StreamChunk::default()).into())
1405            .await
1406            .unwrap();
1407
1408        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1409            .await
1410            .unwrap();
1411        executor.next().await.unwrap().unwrap();
1412
1413        // 5. Check downstream.
1414        try_recv!(untouched).unwrap().as_chunk().unwrap();
1415        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1416
1417        try_recv!(old).unwrap().as_chunk().unwrap();
1418        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1419
1420        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1421
1422        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1423        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1424
1425        // 6. Send another barrier.
1426        let b2 = Barrier::new_test_barrier(test_epoch(2));
1427        barrier_test_env.inject_barrier(&b2, [actor_id]);
1428        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1429            .await
1430            .unwrap();
1431        executor.next().await.unwrap().unwrap();
1432
1433        // 7. Check downstream.
1434        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1435        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1436        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1437
1438        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1439        try_recv!(new_simple).unwrap_err(); // Untouched.
1440
1441        // 8. Send another chunk.
1442        tx.send(Message::Chunk(StreamChunk::default()).into())
1443            .await
1444            .unwrap();
1445
1446        // 9. Send a configuration change barrier for simple dispatcher.
1447        let dispatcher_updates = maplit::hashmap! {
1448            actor_id => vec![PbDispatcherUpdate {
1449                actor_id,
1450                dispatcher_id: simple_dispatcher_id,
1451                added_downstream_actor_id: vec![new_simple],
1452                removed_downstream_actor_id: vec![old_simple],
1453                hash_mapping: Default::default(),
1454            }]
1455        };
1456        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1457            UpdateMutation {
1458                dispatchers: dispatcher_updates,
1459                ..Default::default()
1460            },
1461        ));
1462        barrier_test_env.inject_barrier(&b3, [actor_id]);
1463        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1464            .await
1465            .unwrap();
1466        executor.next().await.unwrap().unwrap();
1467
1468        // 10. Check downstream.
1469        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1470        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1471
1472        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1473
1474        // 11. Send another barrier.
1475        let b4 = Barrier::new_test_barrier(test_epoch(4));
1476        barrier_test_env.inject_barrier(&b4, [actor_id]);
1477        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1478            .await
1479            .unwrap();
1480        executor.next().await.unwrap().unwrap();
1481
1482        // 12. Check downstream.
1483        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1484        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1485    }
1486
1487    #[tokio::test]
1488    async fn test_hash_dispatcher() {
1489        // This test only works when vnode count is 256.
1490        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1491
1492        let num_outputs = 5; // actor id ranges from 1 to 5
1493        let cardinality = 10;
1494        let dimension = 4;
1495        let key_indices = &[0, 2];
1496        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1497            (0..num_outputs).map(|_| channel_for_test()).collect();
1498        let outputs = output_tx_vecs
1499            .into_iter()
1500            .enumerate()
1501            .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1502            .collect::<Vec<_>>();
1503        let mut hash_mapping = (1..num_outputs + 1)
1504            .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1505            .collect_vec();
1506        hash_mapping.resize(
1507            VirtualNode::COUNT_FOR_TEST,
1508            ActorId::new(num_outputs as u32),
1509        );
1510        let mut hash_dispatcher = HashDataDispatcher::new(
1511            outputs,
1512            key_indices.to_vec(),
1513            DispatchOutputMapping::Simple((0..dimension).collect()),
1514            hash_mapping.clone(),
1515            0,
1516        );
1517
1518        let mut ops = Vec::new();
1519        for idx in 0..cardinality {
1520            if idx % 2 == 0 {
1521                ops.push(Op::Insert);
1522            } else {
1523                ops.push(Op::Delete);
1524            }
1525        }
1526
1527        let mut start = 19260817i32..;
1528        let mut builders = (0..dimension)
1529            .map(|_| I32ArrayBuilder::new(cardinality))
1530            .collect_vec();
1531        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1532        let mut output_ops = vec![vec![]; num_outputs];
1533        for op in &ops {
1534            let hash_builder = Crc32FastBuilder;
1535            let mut hasher = hash_builder.build_hasher();
1536            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1537            for key_idx in key_indices {
1538                let val = one_row[*key_idx];
1539                let bytes = val.to_le_bytes();
1540                hasher.update(&bytes);
1541            }
1542            let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1543                .as_raw_id() as usize
1544                - 1;
1545            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1546                builder.append(Some(*val));
1547            }
1548            output_cols[output_idx]
1549                .iter_mut()
1550                .zip_eq_fast(one_row.iter())
1551                .for_each(|(each_column, val)| each_column.push(*val));
1552            output_ops[output_idx].push(op);
1553        }
1554
1555        let columns = builders
1556            .into_iter()
1557            .map(|builder| {
1558                let array = builder.finish();
1559                array.into_ref()
1560            })
1561            .collect();
1562
1563        let chunk = StreamChunk::new(ops, columns);
1564        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1565
1566        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1567            let mut output = vec![];
1568            while let Some(Some(msg)) = rx.recv().now_or_never() {
1569                output.push(msg);
1570            }
1571            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1572            assert!(output.len() <= 1);
1573            if output.is_empty() {
1574                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1575            } else {
1576                let message = output.first().unwrap();
1577                let real_chunk = match message {
1578                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1579                    _ => panic!(),
1580                };
1581                real_chunk
1582                    .columns()
1583                    .iter()
1584                    .zip_eq_fast(output_cols[output_idx].iter())
1585                    .for_each(|(real_col, expect_col)| {
1586                        let real_vals = real_chunk
1587                            .visibility()
1588                            .iter_ones()
1589                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1590                            .collect::<Vec<_>>();
1591                        assert_eq!(real_vals.len(), expect_col.len());
1592                        assert_eq!(real_vals, *expect_col);
1593                    });
1594            }
1595        }
1596    }
1597}