risingwave_stream/executor/
dispatch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use risingwave_pb::stream_plan::{self, PbDispatcher};
31use smallvec::{SmallVec, smallvec};
32use tokio::sync::mpsc::UnboundedReceiver;
33use tokio::time::Instant;
34use tokio_stream::StreamExt;
35use tokio_stream::adapters::Peekable;
36use tracing::{Instrument, event};
37
38use super::exchange::output::Output;
39use super::{
40    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
41    UpdateMutation,
42};
43use crate::executor::prelude::*;
44use crate::executor::{StopMutation, StreamConsumer};
45use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
46
47mod output_mapping;
48pub use output_mapping::DispatchOutputMapping;
49
50/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
51/// data chunks will be dispatched with some specified policy, while control message
52/// such as barriers will be distributed to all receivers.
53pub struct DispatchExecutor {
54    input: Executor,
55    inner: DispatchExecutorInner,
56}
57
58struct DispatcherWithMetrics {
59    dispatcher: DispatcherImpl,
60    pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
61}
62
63impl Debug for DispatcherWithMetrics {
64    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65        self.dispatcher.fmt(f)
66    }
67}
68
69impl Deref for DispatcherWithMetrics {
70    type Target = DispatcherImpl;
71
72    fn deref(&self) -> &Self::Target {
73        &self.dispatcher
74    }
75}
76
77impl DerefMut for DispatcherWithMetrics {
78    fn deref_mut(&mut self) -> &mut Self::Target {
79        &mut self.dispatcher
80    }
81}
82
83struct DispatchExecutorMetrics {
84    actor_id_str: String,
85    fragment_id_str: String,
86    metrics: Arc<StreamingMetrics>,
87    actor_out_record_cnt: LabelGuardedIntCounter,
88}
89
90impl DispatchExecutorMetrics {
91    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
92        DispatcherWithMetrics {
93            actor_output_buffer_blocking_duration_ns: self
94                .metrics
95                .actor_output_buffer_blocking_duration_ns
96                .with_guarded_label_values(&[
97                    self.actor_id_str.as_str(),
98                    self.fragment_id_str.as_str(),
99                    dispatcher.dispatcher_id_str(),
100                ]),
101            dispatcher,
102        }
103    }
104}
105
106struct DispatchExecutorInner {
107    dispatchers: Vec<DispatcherWithMetrics>,
108    actor_id: u32,
109    local_barrier_manager: LocalBarrierManager,
110    metrics: DispatchExecutorMetrics,
111    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
112    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
113}
114
115impl DispatchExecutorInner {
116    async fn collect_outputs(
117        &mut self,
118        downstream_actors: &[ActorId],
119    ) -> StreamResult<Vec<Output>> {
120        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
121            let tx = match request {
122                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
123            };
124            Output::new(downstream_actor, tx)
125        }
126        let mut outputs = Vec::with_capacity(downstream_actors.len());
127        for downstream_actor in downstream_actors {
128            let output =
129                if let Some(request) = self.pending_new_output_requests.remove(downstream_actor) {
130                    resolve_output(*downstream_actor, request)
131                } else {
132                    loop {
133                        let (requested_actor, request) = self
134                            .new_output_request_rx
135                            .recv()
136                            .await
137                            .ok_or_else(|| anyhow!("end of new output request"))?;
138                        if requested_actor == *downstream_actor {
139                            break resolve_output(requested_actor, request);
140                        } else {
141                            assert!(
142                                self.pending_new_output_requests
143                                    .insert(requested_actor, request)
144                                    .is_none(),
145                                "duplicated inflight new output requests from actor {}",
146                                requested_actor
147                            );
148                        }
149                    }
150                };
151            outputs.push(output);
152        }
153        Ok(outputs)
154    }
155
156    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
157        macro_rules! await_with_metrics {
158            ($fut:expr, $metrics:expr) => {{
159                let mut start_time = Instant::now();
160                let interval_duration = Duration::from_secs(15);
161                let mut interval =
162                    tokio::time::interval_at(start_time + interval_duration, interval_duration);
163
164                let mut fut = std::pin::pin!($fut);
165
166                loop {
167                    tokio::select! {
168                        biased;
169                        res = &mut fut => {
170                            res?;
171                            let ns = start_time.elapsed().as_nanos() as u64;
172                            $metrics.inc_by(ns);
173                            break;
174                        }
175                        _ = interval.tick() => {
176                            start_time = Instant::now();
177                            $metrics.inc_by(interval_duration.as_nanos() as u64);
178                        }
179                    };
180                }
181                StreamResult::Ok(())
182            }};
183        }
184
185        let limit = self
186            .local_barrier_manager
187            .env
188            .config()
189            .developer
190            .exchange_concurrent_dispatchers;
191        // Only barrier can be batched for now.
192        match msg {
193            MessageBatch::BarrierBatch(barrier_batch) => {
194                if barrier_batch.is_empty() {
195                    return Ok(());
196                }
197                // Only the first barrier in a batch can be mutation.
198                let mutation = barrier_batch[0].mutation.clone();
199                self.pre_mutate_dispatchers(&mutation).await?;
200                futures::stream::iter(self.dispatchers.iter_mut())
201                    .map(Ok)
202                    .try_for_each_concurrent(limit, |dispatcher| async {
203                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
204                        let dispatcher_output = &mut dispatcher.dispatcher;
205                        let fut = dispatcher_output.dispatch_barriers(
206                            barrier_batch
207                                .iter()
208                                .cloned()
209                                .map(|b| b.into_dispatcher())
210                                .collect(),
211                        );
212                        await_with_metrics!(std::pin::pin!(fut), metrics)
213                    })
214                    .await?;
215                self.post_mutate_dispatchers(&mutation)?;
216            }
217            MessageBatch::Watermark(watermark) => {
218                futures::stream::iter(self.dispatchers.iter_mut())
219                    .map(Ok)
220                    .try_for_each_concurrent(limit, |dispatcher| async {
221                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
222                        let dispatcher_output = &mut dispatcher.dispatcher;
223                        let fut = dispatcher_output.dispatch_watermark(watermark.clone());
224                        await_with_metrics!(std::pin::pin!(fut), metrics)
225                    })
226                    .await?;
227            }
228            MessageBatch::Chunk(chunk) => {
229                futures::stream::iter(self.dispatchers.iter_mut())
230                    .map(Ok)
231                    .try_for_each_concurrent(limit, |dispatcher| async {
232                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
233                        let dispatcher_output = &mut dispatcher.dispatcher;
234                        let fut = dispatcher_output.dispatch_data(chunk.clone());
235                        await_with_metrics!(std::pin::pin!(fut), metrics)
236                    })
237                    .await?;
238
239                self.metrics
240                    .actor_out_record_cnt
241                    .inc_by(chunk.cardinality() as _);
242            }
243        }
244        Ok(())
245    }
246
247    /// Add new dispatchers to the executor. Will check whether their ids are unique.
248    async fn add_dispatchers<'a>(
249        &mut self,
250        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
251    ) -> StreamResult<()> {
252        for dispatcher in new_dispatchers {
253            let outputs = self
254                .collect_outputs(&dispatcher.downstream_actor_id)
255                .await?;
256            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
257            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
258            self.dispatchers.push(dispatcher);
259        }
260
261        assert!(
262            self.dispatchers
263                .iter()
264                .map(|d| d.dispatcher_id())
265                .all_unique(),
266            "dispatcher ids must be unique: {:?}",
267            self.dispatchers
268        );
269
270        Ok(())
271    }
272
273    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
274        self.dispatchers
275            .iter_mut()
276            .find(|d| d.dispatcher_id() == dispatcher_id)
277            .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
278    }
279
280    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
281    /// outputs.
282    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
283        let outputs = self
284            .collect_outputs(&update.added_downstream_actor_id)
285            .await?;
286
287        let dispatcher = self.find_dispatcher(update.dispatcher_id);
288        dispatcher.add_outputs(outputs);
289
290        Ok(())
291    }
292
293    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
294    /// update the hash mapping.
295    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
296        let ids = update.removed_downstream_actor_id.iter().copied().collect();
297
298        let dispatcher = self.find_dispatcher(update.dispatcher_id);
299        dispatcher.remove_outputs(&ids);
300
301        // The hash mapping is only used by the hash dispatcher.
302        //
303        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
304        // it's possible that there're multiple upstreams with different exchange types, for
305        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
306        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
307        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
308            dispatcher.hash_mapping =
309                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
310        }
311
312        Ok(())
313    }
314
315    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
316    async fn pre_mutate_dispatchers(
317        &mut self,
318        mutation: &Option<Arc<Mutation>>,
319    ) -> StreamResult<()> {
320        let Some(mutation) = mutation.as_deref() else {
321            return Ok(());
322        };
323
324        match mutation {
325            Mutation::Add(AddMutation { adds, .. }) => {
326                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
327                    self.add_dispatchers(new_dispatchers).await?;
328                }
329            }
330            Mutation::Update(UpdateMutation {
331                dispatchers,
332                actor_new_dispatchers: actor_dispatchers,
333                ..
334            }) => {
335                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
336                    self.add_dispatchers(new_dispatchers).await?;
337                }
338
339                if let Some(updates) = dispatchers.get(&self.actor_id) {
340                    for update in updates {
341                        self.pre_update_dispatcher(update).await?;
342                    }
343                }
344            }
345            Mutation::AddAndUpdate(
346                AddMutation { adds, .. },
347                UpdateMutation {
348                    dispatchers,
349                    actor_new_dispatchers: actor_dispatchers,
350                    ..
351                },
352            ) => {
353                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
354                    self.add_dispatchers(new_dispatchers).await?;
355                }
356
357                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
358                    self.add_dispatchers(new_dispatchers).await?;
359                }
360
361                if let Some(updates) = dispatchers.get(&self.actor_id) {
362                    for update in updates {
363                        self.pre_update_dispatcher(update).await?;
364                    }
365                }
366            }
367            _ => {}
368        }
369
370        Ok(())
371    }
372
373    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
374    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
375        let Some(mutation) = mutation.as_deref() else {
376            return Ok(());
377        };
378
379        match mutation {
380            Mutation::Stop(StopMutation { dropped_actors, .. }) => {
381                // Remove outputs only if this actor itself is not to be stopped.
382                if !dropped_actors.contains(&self.actor_id) {
383                    for dispatcher in &mut self.dispatchers {
384                        dispatcher.remove_outputs(dropped_actors);
385                    }
386                }
387            }
388            Mutation::Update(UpdateMutation {
389                dispatchers,
390                dropped_actors,
391                ..
392            })
393            | Mutation::AddAndUpdate(
394                _,
395                UpdateMutation {
396                    dispatchers,
397                    dropped_actors,
398                    ..
399                },
400            ) => {
401                if let Some(updates) = dispatchers.get(&self.actor_id) {
402                    for update in updates {
403                        self.post_update_dispatcher(update)?;
404                    }
405                }
406
407                if !dropped_actors.contains(&self.actor_id) {
408                    for dispatcher in &mut self.dispatchers {
409                        dispatcher.remove_outputs(dropped_actors);
410                    }
411                }
412            }
413            _ => {}
414        };
415
416        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
417        // should clean up them.
418        self.dispatchers.retain(|d| !d.is_empty());
419
420        Ok(())
421    }
422}
423
424impl DispatchExecutor {
425    pub(crate) async fn new(
426        input: Executor,
427        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
428        dispatchers: Vec<stream_plan::Dispatcher>,
429        actor_id: u32,
430        fragment_id: u32,
431        local_barrier_manager: LocalBarrierManager,
432        metrics: Arc<StreamingMetrics>,
433    ) -> StreamResult<Self> {
434        let mut executor = Self::new_inner(
435            input,
436            new_output_request_rx,
437            vec![],
438            actor_id,
439            fragment_id,
440            local_barrier_manager,
441            metrics,
442        );
443        let inner = &mut executor.inner;
444        for dispatcher in dispatchers {
445            let outputs = inner
446                .collect_outputs(&dispatcher.downstream_actor_id)
447                .await?;
448            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
449            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
450            inner.dispatchers.push(dispatcher);
451        }
452        Ok(executor)
453    }
454
455    #[cfg(test)]
456    pub(crate) fn for_test(
457        input: Executor,
458        dispatchers: Vec<DispatcherImpl>,
459        actor_id: u32,
460        fragment_id: u32,
461        local_barrier_manager: LocalBarrierManager,
462        metrics: Arc<StreamingMetrics>,
463    ) -> (
464        Self,
465        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
466    ) {
467        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
468
469        (
470            Self::new_inner(
471                input,
472                rx,
473                dispatchers,
474                actor_id,
475                fragment_id,
476                local_barrier_manager,
477                metrics,
478            ),
479            tx,
480        )
481    }
482
483    fn new_inner(
484        mut input: Executor,
485        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
486        dispatchers: Vec<DispatcherImpl>,
487        actor_id: u32,
488        fragment_id: u32,
489        local_barrier_manager: LocalBarrierManager,
490        metrics: Arc<StreamingMetrics>,
491    ) -> Self {
492        let chunk_size = local_barrier_manager.env.config().developer.chunk_size;
493        if crate::consistency::insane() {
494            // make some trouble before dispatching to avoid generating invalid dist key.
495            let mut info = input.info().clone();
496            info.identity = format!("{} (embedded trouble)", info.identity);
497            let troublemaker = TroublemakerExecutor::new(input, chunk_size);
498            input = (info, troublemaker).into();
499        }
500
501        let actor_id_str = actor_id.to_string();
502        let fragment_id_str = fragment_id.to_string();
503        let actor_out_record_cnt = metrics
504            .actor_out_record_cnt
505            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
506        let metrics = DispatchExecutorMetrics {
507            actor_id_str,
508            fragment_id_str,
509            metrics,
510            actor_out_record_cnt,
511        };
512        let dispatchers = dispatchers
513            .into_iter()
514            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
515            .collect();
516        Self {
517            input,
518            inner: DispatchExecutorInner {
519                dispatchers,
520                actor_id,
521                local_barrier_manager,
522                metrics,
523                new_output_request_rx,
524                pending_new_output_requests: Default::default(),
525            },
526        }
527    }
528}
529
530impl StreamConsumer for DispatchExecutor {
531    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
532
533    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
534        let max_barrier_count_per_batch = self
535            .inner
536            .local_barrier_manager
537            .env
538            .config()
539            .developer
540            .max_barrier_batch_size;
541        #[try_stream]
542        async move {
543            let mut input = self.input.execute().peekable();
544            loop {
545                let Some(message) =
546                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
547                else {
548                    // end_of_stream
549                    break;
550                };
551                match message {
552                    chunk @ MessageBatch::Chunk(_) => {
553                        self.inner
554                            .dispatch(chunk)
555                            .instrument(tracing::info_span!("dispatch_chunk"))
556                            .instrument_await("dispatch_chunk")
557                            .await?;
558                    }
559                    MessageBatch::BarrierBatch(barrier_batch) => {
560                        assert!(!barrier_batch.is_empty());
561                        self.inner
562                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
563                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
564                            .instrument_await("dispatch_barrier_batch")
565                            .await?;
566                        self.inner
567                            .metrics
568                            .metrics
569                            .barrier_batch_size
570                            .observe(barrier_batch.len() as f64);
571                        for barrier in barrier_batch {
572                            yield barrier;
573                        }
574                    }
575                    watermark @ MessageBatch::Watermark(_) => {
576                        self.inner
577                            .dispatch(watermark)
578                            .instrument(tracing::info_span!("dispatch_watermark"))
579                            .instrument_await("dispatch_watermark")
580                            .await?;
581                    }
582                }
583            }
584        }
585    }
586}
587
588/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
589///
590/// Returns the message batch.
591///
592/// Returns None if end of stream.
593async fn try_batch_barriers(
594    max_barrier_count_per_batch: u32,
595    input: &mut Peekable<BoxedMessageStream>,
596) -> StreamResult<Option<MessageBatch>> {
597    let Some(msg) = input.next().await else {
598        // end_of_stream
599        return Ok(None);
600    };
601    let mut barrier_batch = vec![];
602    let msg: Message = msg?;
603    let max_peek_attempts = match msg {
604        Message::Chunk(c) => {
605            return Ok(Some(MessageBatch::Chunk(c)));
606        }
607        Message::Watermark(w) => {
608            return Ok(Some(MessageBatch::Watermark(w)));
609        }
610        Message::Barrier(b) => {
611            let peek_more_barrier = b.mutation.is_none();
612            barrier_batch.push(b);
613            if peek_more_barrier {
614                max_barrier_count_per_batch.saturating_sub(1)
615            } else {
616                0
617            }
618        }
619    };
620    // Try to peek more consecutive non-mutation barriers.
621    for _ in 0..max_peek_attempts {
622        let peek = input.peek().now_or_never();
623        let Some(peek) = peek else {
624            break;
625        };
626        let Some(msg) = peek else {
627            // end_of_stream
628            break;
629        };
630        let Ok(Message::Barrier(barrier)) = msg else {
631            break;
632        };
633        if barrier.mutation.is_some() {
634            break;
635        }
636        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
637        let Message::Barrier(ref barrier) = msg else {
638            unreachable!("must be a barrier");
639        };
640        barrier_batch.push(barrier.clone());
641    }
642    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
643}
644
645#[derive(Debug)]
646pub enum DispatcherImpl {
647    Hash(HashDataDispatcher),
648    Broadcast(BroadcastDispatcher),
649    Simple(SimpleDispatcher),
650    RoundRobin(RoundRobinDataDispatcher),
651}
652
653impl DispatcherImpl {
654    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
655        let output_mapping =
656            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
657
658        use risingwave_pb::stream_plan::DispatcherType::*;
659        let dispatcher_impl = match dispatcher.get_type()? {
660            Hash => {
661                assert!(!outputs.is_empty());
662                let dist_key_indices = dispatcher
663                    .dist_key_indices
664                    .iter()
665                    .map(|i| *i as usize)
666                    .collect();
667
668                let hash_mapping =
669                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
670
671                DispatcherImpl::Hash(HashDataDispatcher::new(
672                    outputs,
673                    dist_key_indices,
674                    output_mapping,
675                    hash_mapping,
676                    dispatcher.dispatcher_id,
677                ))
678            }
679            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
680                outputs,
681                output_mapping,
682                dispatcher.dispatcher_id,
683            )),
684            Simple | NoShuffle => {
685                let [output]: [_; 1] = outputs.try_into().unwrap();
686                DispatcherImpl::Simple(SimpleDispatcher::new(
687                    output,
688                    output_mapping,
689                    dispatcher.dispatcher_id,
690                ))
691            }
692            Unspecified => unreachable!(),
693        };
694
695        Ok(dispatcher_impl)
696    }
697}
698
699macro_rules! impl_dispatcher {
700    ($( { $variant_name:ident } ),*) => {
701        impl DispatcherImpl {
702            pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
703                match self {
704                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
705                }
706            }
707
708            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
709                match self {
710                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
711                }
712            }
713
714            pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
715                match self {
716                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
717                }
718            }
719
720            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
721                match self {
722                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
723                }
724            }
725
726            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
727                match self {
728                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
729                }
730            }
731
732            pub fn dispatcher_id(&self) -> DispatcherId {
733                match self {
734                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
735                }
736            }
737
738            pub fn dispatcher_id_str(&self) -> &str {
739                match self {
740                    $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
741                }
742            }
743
744            pub fn is_empty(&self) -> bool {
745                match self {
746                    $(Self::$variant_name(inner) => inner.is_empty(), )*
747                }
748            }
749        }
750    }
751}
752
753macro_rules! for_all_dispatcher_variants {
754    ($macro:ident) => {
755        $macro! {
756            { Hash },
757            { Broadcast },
758            { Simple },
759            { RoundRobin }
760        }
761    };
762}
763
764for_all_dispatcher_variants! { impl_dispatcher }
765
766pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
767
768pub trait Dispatcher: Debug + 'static {
769    /// Dispatch a data chunk to downstream actors.
770    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
771    /// Dispatch barriers to downstream actors, generally by broadcasting it.
772    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
773    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
774    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
775
776    /// Add new outputs to the dispatcher.
777    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
778    /// Remove outputs to `actor_ids` from the dispatcher.
779    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
780
781    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
782    /// different IDs.
783    ///
784    /// Note that the dispatcher id is always equal to the downstream fragment id.
785    /// See also `proto/stream_plan.proto`.
786    fn dispatcher_id(&self) -> DispatcherId;
787
788    /// Dispatcher id in string. See [`Dispatcher::dispatcher_id`].
789    fn dispatcher_id_str(&self) -> &str;
790
791    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
792    /// [`DispatchExecutor`].
793    fn is_empty(&self) -> bool;
794}
795
796/// Concurrently broadcast a message to all outputs.
797///
798/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
799/// always unlimited.
800async fn broadcast_concurrent(
801    outputs: impl IntoIterator<Item = &'_ mut Output>,
802    message: DispatcherMessageBatch,
803) -> StreamResult<()> {
804    futures::future::try_join_all(
805        outputs
806            .into_iter()
807            .map(|output| output.send(message.clone())),
808    )
809    .await?;
810    Ok(())
811}
812
813#[derive(Debug)]
814pub struct RoundRobinDataDispatcher {
815    outputs: Vec<Output>,
816    output_mapping: DispatchOutputMapping,
817    cur: usize,
818    dispatcher_id: DispatcherId,
819    dispatcher_id_str: String,
820}
821
822impl RoundRobinDataDispatcher {
823    pub fn new(
824        outputs: Vec<Output>,
825        output_mapping: DispatchOutputMapping,
826        dispatcher_id: DispatcherId,
827    ) -> Self {
828        Self {
829            outputs,
830            output_mapping,
831            cur: 0,
832            dispatcher_id,
833            dispatcher_id_str: dispatcher_id.to_string(),
834        }
835    }
836}
837
838impl Dispatcher for RoundRobinDataDispatcher {
839    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
840        let chunk = self.output_mapping.apply(chunk);
841
842        self.outputs[self.cur]
843            .send(DispatcherMessageBatch::Chunk(chunk))
844            .await?;
845        self.cur += 1;
846        self.cur %= self.outputs.len();
847        Ok(())
848    }
849
850    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
851        // always broadcast barrier
852        broadcast_concurrent(
853            &mut self.outputs,
854            DispatcherMessageBatch::BarrierBatch(barriers),
855        )
856        .await
857    }
858
859    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
860        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
861            // always broadcast watermark
862            broadcast_concurrent(
863                &mut self.outputs,
864                DispatcherMessageBatch::Watermark(watermark),
865            )
866            .await?;
867        }
868        Ok(())
869    }
870
871    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
872        self.outputs.extend(outputs);
873    }
874
875    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
876        self.outputs
877            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
878            .count();
879        self.cur = self.cur.min(self.outputs.len() - 1);
880    }
881
882    fn dispatcher_id(&self) -> DispatcherId {
883        self.dispatcher_id
884    }
885
886    fn dispatcher_id_str(&self) -> &str {
887        &self.dispatcher_id_str
888    }
889
890    fn is_empty(&self) -> bool {
891        self.outputs.is_empty()
892    }
893}
894
895pub struct HashDataDispatcher {
896    outputs: Vec<Output>,
897    keys: Vec<usize>,
898    output_mapping: DispatchOutputMapping,
899    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
900    /// different downstream actors.
901    hash_mapping: ExpandedActorMapping,
902    dispatcher_id: DispatcherId,
903    dispatcher_id_str: String,
904}
905
906impl Debug for HashDataDispatcher {
907    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
908        f.debug_struct("HashDataDispatcher")
909            .field("outputs", &self.outputs)
910            .field("keys", &self.keys)
911            .field("dispatcher_id", &self.dispatcher_id)
912            .finish_non_exhaustive()
913    }
914}
915
916impl HashDataDispatcher {
917    pub fn new(
918        outputs: Vec<Output>,
919        keys: Vec<usize>,
920        output_mapping: DispatchOutputMapping,
921        hash_mapping: ExpandedActorMapping,
922        dispatcher_id: DispatcherId,
923    ) -> Self {
924        Self {
925            outputs,
926            keys,
927            output_mapping,
928            hash_mapping,
929            dispatcher_id,
930            dispatcher_id_str: dispatcher_id.to_string(),
931        }
932    }
933}
934
935impl Dispatcher for HashDataDispatcher {
936    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
937        self.outputs.extend(outputs);
938    }
939
940    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
941        // always broadcast barrier
942        broadcast_concurrent(
943            &mut self.outputs,
944            DispatcherMessageBatch::BarrierBatch(barriers),
945        )
946        .await
947    }
948
949    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
950        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
951            // always broadcast watermark
952            broadcast_concurrent(
953                &mut self.outputs,
954                DispatcherMessageBatch::Watermark(watermark),
955            )
956            .await?;
957        }
958        Ok(())
959    }
960
961    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
962        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
963        // In these output chunks, the only difference are visibility map, which is calculated
964        // by the hash value of each line in the input chunk.
965        let num_outputs = self.outputs.len();
966
967        // get hash value of every line by its key
968        let vnode_count = self.hash_mapping.len();
969        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
970
971        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
972
973        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
974            .take(num_outputs)
975            .collect_vec();
976        let mut last_vnode_when_update_delete = None;
977        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
978
979        // Apply output indices after calculating the vnode.
980        let chunk = self.output_mapping.apply(chunk);
981
982        for ((vnode, &op), visible) in vnodes
983            .iter()
984            .copied()
985            .zip_eq_fast(chunk.ops())
986            .zip_eq_fast(chunk.visibility().iter())
987        {
988            // Build visibility map for every output chunk.
989            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
990                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
991            }
992
993            if !visible {
994                assert!(
995                    last_vnode_when_update_delete.is_none(),
996                    "invisible row between U- and U+, op = {op:?}",
997                );
998                new_ops.push(op);
999                continue;
1000            }
1001
1002            // The 'update' message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
1003            // need to be rewritten to common `Delete` and `Insert` if they were dispatched to
1004            // different actors.
1005            if op == Op::UpdateDelete {
1006                last_vnode_when_update_delete = Some(vnode);
1007            } else if op == Op::UpdateInsert {
1008                if vnode
1009                    != last_vnode_when_update_delete
1010                        .take()
1011                        .expect("missing U- before U+")
1012                {
1013                    new_ops.push(Op::Delete);
1014                    new_ops.push(Op::Insert);
1015                } else {
1016                    new_ops.push(Op::UpdateDelete);
1017                    new_ops.push(Op::UpdateInsert);
1018                }
1019            } else {
1020                new_ops.push(op);
1021            }
1022        }
1023        assert!(
1024            last_vnode_when_update_delete.is_none(),
1025            "missing U+ after U-"
1026        );
1027
1028        let ops = new_ops;
1029
1030        // individually output StreamChunk integrated with vis_map
1031        futures::future::try_join_all(
1032            vis_maps
1033                .into_iter()
1034                .zip_eq_fast(self.outputs.iter_mut())
1035                .map(|(vis_map, output)| async {
1036                    let vis_map = vis_map.finish();
1037                    // columns is not changed in this function
1038                    let new_stream_chunk =
1039                        StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1040                    if new_stream_chunk.cardinality() > 0 {
1041                        event!(
1042                            tracing::Level::TRACE,
1043                            msg = "chunk",
1044                            downstream = output.actor_id(),
1045                            "send = \n{:#?}",
1046                            new_stream_chunk
1047                        );
1048                        output
1049                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1050                            .await?;
1051                    }
1052                    StreamResult::Ok(())
1053                }),
1054        )
1055        .await?;
1056
1057        Ok(())
1058    }
1059
1060    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1061        self.outputs
1062            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1063            .count();
1064    }
1065
1066    fn dispatcher_id(&self) -> DispatcherId {
1067        self.dispatcher_id
1068    }
1069
1070    fn dispatcher_id_str(&self) -> &str {
1071        &self.dispatcher_id_str
1072    }
1073
1074    fn is_empty(&self) -> bool {
1075        self.outputs.is_empty()
1076    }
1077}
1078
1079/// `BroadcastDispatcher` dispatches message to all outputs.
1080#[derive(Debug)]
1081pub struct BroadcastDispatcher {
1082    outputs: HashMap<ActorId, Output>,
1083    output_mapping: DispatchOutputMapping,
1084    dispatcher_id: DispatcherId,
1085    dispatcher_id_str: String,
1086}
1087
1088impl BroadcastDispatcher {
1089    pub fn new(
1090        outputs: impl IntoIterator<Item = Output>,
1091        output_mapping: DispatchOutputMapping,
1092        dispatcher_id: DispatcherId,
1093    ) -> Self {
1094        Self {
1095            outputs: Self::into_pairs(outputs).collect(),
1096            output_mapping,
1097            dispatcher_id,
1098            dispatcher_id_str: dispatcher_id.to_string(),
1099        }
1100    }
1101
1102    fn into_pairs(
1103        outputs: impl IntoIterator<Item = Output>,
1104    ) -> impl Iterator<Item = (ActorId, Output)> {
1105        outputs
1106            .into_iter()
1107            .map(|output| (output.actor_id(), output))
1108    }
1109}
1110
1111impl Dispatcher for BroadcastDispatcher {
1112    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1113        let chunk = self.output_mapping.apply(chunk);
1114        broadcast_concurrent(
1115            self.outputs.values_mut(),
1116            DispatcherMessageBatch::Chunk(chunk),
1117        )
1118        .await
1119    }
1120
1121    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1122        // always broadcast barrier
1123        broadcast_concurrent(
1124            self.outputs.values_mut(),
1125            DispatcherMessageBatch::BarrierBatch(barriers),
1126        )
1127        .await
1128    }
1129
1130    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1131        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1132            // always broadcast watermark
1133            broadcast_concurrent(
1134                self.outputs.values_mut(),
1135                DispatcherMessageBatch::Watermark(watermark),
1136            )
1137            .await?;
1138        }
1139        Ok(())
1140    }
1141
1142    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1143        self.outputs.extend(Self::into_pairs(outputs));
1144    }
1145
1146    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1147        self.outputs
1148            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1149            .count();
1150    }
1151
1152    fn dispatcher_id(&self) -> DispatcherId {
1153        self.dispatcher_id
1154    }
1155
1156    fn dispatcher_id_str(&self) -> &str {
1157        &self.dispatcher_id_str
1158    }
1159
1160    fn is_empty(&self) -> bool {
1161        self.outputs.is_empty()
1162    }
1163}
1164
1165/// `SimpleDispatcher` dispatches message to a single output.
1166#[derive(Debug)]
1167pub struct SimpleDispatcher {
1168    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1169    /// the field needs to be temporarily set to 0 or 2 outputs.
1170    ///
1171    /// - When dropping a materialized view, the output will be removed and this field becomes
1172    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1173    ///   finishing processing the current mutation.
1174    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1175    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1176    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1177    ///   will be removed and this field becomes single again.
1178    ///
1179    /// Therefore, when dispatching data, we assert that there's exactly one output by
1180    /// `Self::output`.
1181    output: SmallVec<[Output; 2]>,
1182    output_mapping: DispatchOutputMapping,
1183    dispatcher_id: DispatcherId,
1184    dispatcher_id_str: String,
1185}
1186
1187impl SimpleDispatcher {
1188    pub fn new(
1189        output: Output,
1190        output_mapping: DispatchOutputMapping,
1191        dispatcher_id: DispatcherId,
1192    ) -> Self {
1193        Self {
1194            output: smallvec![output],
1195            output_mapping,
1196            dispatcher_id,
1197            dispatcher_id_str: dispatcher_id.to_string(),
1198        }
1199    }
1200}
1201
1202impl Dispatcher for SimpleDispatcher {
1203    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1204        self.output.extend(outputs);
1205        assert!(self.output.len() <= 2);
1206    }
1207
1208    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1209        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1210        for output in &mut self.output {
1211            output
1212                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1213                .await?;
1214        }
1215        Ok(())
1216    }
1217
1218    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1219        let output = self
1220            .output
1221            .iter_mut()
1222            .exactly_one()
1223            .expect("expect exactly one output");
1224
1225        let chunk = self.output_mapping.apply(chunk);
1226        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1227    }
1228
1229    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1230        let output = self
1231            .output
1232            .iter_mut()
1233            .exactly_one()
1234            .expect("expect exactly one output");
1235
1236        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1237            output
1238                .send(DispatcherMessageBatch::Watermark(watermark))
1239                .await?;
1240        }
1241        Ok(())
1242    }
1243
1244    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1245        self.output
1246            .retain(|output| !actor_ids.contains(&output.actor_id()));
1247    }
1248
1249    fn dispatcher_id(&self) -> DispatcherId {
1250        self.dispatcher_id
1251    }
1252
1253    fn dispatcher_id_str(&self) -> &str {
1254        &self.dispatcher_id_str
1255    }
1256
1257    fn is_empty(&self) -> bool {
1258        self.output.is_empty()
1259    }
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264    use std::hash::{BuildHasher, Hasher};
1265
1266    use futures::pin_mut;
1267    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1268    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1269    use risingwave_common::util::epoch::test_epoch;
1270    use risingwave_common::util::hash_util::Crc32FastBuilder;
1271    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1272    use tokio::sync::mpsc::unbounded_channel;
1273
1274    use super::*;
1275    use crate::executor::exchange::output::Output;
1276    use crate::executor::exchange::permit::channel_for_test;
1277    use crate::executor::receiver::ReceiverExecutor;
1278    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1279    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1280
1281    // TODO: this test contains update being shuffled to different partitions, which is not
1282    // supported for now.
1283    #[tokio::test]
1284    async fn test_hash_dispatcher_complex() {
1285        test_hash_dispatcher_complex_inner().await
1286    }
1287
1288    async fn test_hash_dispatcher_complex_inner() {
1289        // This test only works when vnode count is 256.
1290        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1291
1292        let num_outputs = 2; // actor id ranges from 1 to 2
1293        let key_indices = &[0, 2];
1294        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1295            (0..num_outputs).map(|_| channel_for_test()).collect();
1296        let outputs = output_tx_vecs
1297            .into_iter()
1298            .enumerate()
1299            .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1300            .collect::<Vec<_>>();
1301        let mut hash_mapping = (1..num_outputs + 1)
1302            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1303            .collect_vec();
1304        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1305        let mut hash_dispatcher = HashDataDispatcher::new(
1306            outputs,
1307            key_indices.to_vec(),
1308            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1309            hash_mapping,
1310            0,
1311        );
1312
1313        let chunk = StreamChunk::from_pretty(
1314            "  I I I
1315            +  4 6 8
1316            +  5 7 9
1317            +  0 0 0
1318            -  1 1 1 D
1319            U- 2 0 2
1320            U+ 2 0 2
1321            U- 3 3 2
1322            U+ 3 3 4",
1323        );
1324        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1325
1326        assert_eq!(
1327            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1328            StreamChunk::from_pretty(
1329                "  I I I
1330                +  4 6 8
1331                +  5 7 9
1332                +  0 0 0
1333                -  1 1 1 D
1334                U- 2 0 2
1335                U+ 2 0 2
1336                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1337                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1338            )
1339        );
1340        assert_eq!(
1341            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1342            StreamChunk::from_pretty(
1343                "  I I I
1344                +  4 6 8 D
1345                +  5 7 9 D
1346                +  0 0 0 D
1347                -  1 1 1 D  // Should keep original invisible mark
1348                U- 2 0 2 D  // Should keep UpdateDelete
1349                U+ 2 0 2 D  // Should keep UpdateInsert
1350                -  3 3 2    // Should rewrite UpdateDelete to Delete
1351                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1352            )
1353        );
1354    }
1355
1356    #[tokio::test]
1357    async fn test_configuration_change() {
1358        let _schema = Schema { fields: vec![] };
1359        let (tx, rx) = channel_for_test();
1360        let actor_id = 233;
1361        let fragment_id = 666;
1362        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1363        let metrics = Arc::new(StreamingMetrics::unused());
1364
1365        let (untouched, old, new) = (234, 235, 238); // broadcast downstream actors
1366        let (old_simple, new_simple) = (114, 514); // simple downstream actors
1367
1368        // actor_id -> untouched, old, new, old_simple, new_simple
1369
1370        let broadcast_dispatcher_id = 666;
1371        let broadcast_dispatcher = PbDispatcher {
1372            r#type: DispatcherType::Broadcast as _,
1373            dispatcher_id: broadcast_dispatcher_id,
1374            downstream_actor_id: vec![untouched, old],
1375            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1376            ..Default::default()
1377        };
1378
1379        let simple_dispatcher_id = 888;
1380        let simple_dispatcher = PbDispatcher {
1381            r#type: DispatcherType::Simple as _,
1382            dispatcher_id: simple_dispatcher_id,
1383            downstream_actor_id: vec![old_simple],
1384            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1385            ..Default::default()
1386        };
1387
1388        let dispatcher_updates = maplit::hashmap! {
1389            actor_id => vec![PbDispatcherUpdate {
1390                actor_id,
1391                dispatcher_id: broadcast_dispatcher_id,
1392                added_downstream_actor_id: vec![new],
1393                removed_downstream_actor_id: vec![old],
1394                hash_mapping: Default::default(),
1395            }]
1396        };
1397        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1398            UpdateMutation {
1399                dispatchers: dispatcher_updates,
1400                ..Default::default()
1401            },
1402        ));
1403        barrier_test_env.inject_barrier(&b1, [actor_id]);
1404        barrier_test_env.flush_all_events().await;
1405
1406        let input = Executor::new(
1407            Default::default(),
1408            ReceiverExecutor::for_test(
1409                actor_id,
1410                rx,
1411                barrier_test_env.local_barrier_manager.clone(),
1412            )
1413            .boxed(),
1414        );
1415
1416        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1417        let mut rxs = [untouched, old, new, old_simple, new_simple]
1418            .into_iter()
1419            .map(|id| {
1420                (id, {
1421                    let (tx, rx) = channel_for_test();
1422                    new_output_request_tx
1423                        .send((id, NewOutputRequest::Local(tx)))
1424                        .unwrap();
1425                    rx
1426                })
1427            })
1428            .collect::<HashMap<_, _>>();
1429        let executor = Box::new(
1430            DispatchExecutor::new(
1431                input,
1432                new_output_request_rx,
1433                vec![broadcast_dispatcher, simple_dispatcher],
1434                actor_id,
1435                fragment_id,
1436                barrier_test_env.local_barrier_manager.clone(),
1437                metrics,
1438            )
1439            .await
1440            .unwrap(),
1441        )
1442        .execute();
1443
1444        pin_mut!(executor);
1445
1446        macro_rules! try_recv {
1447            ($down_id:expr) => {
1448                rxs.get_mut(&$down_id).unwrap().try_recv()
1449            };
1450        }
1451
1452        // 3. Send a chunk.
1453        tx.send(Message::Chunk(StreamChunk::default()).into())
1454            .await
1455            .unwrap();
1456
1457        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1458            .await
1459            .unwrap();
1460        executor.next().await.unwrap().unwrap();
1461
1462        // 5. Check downstream.
1463        try_recv!(untouched).unwrap().as_chunk().unwrap();
1464        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1465
1466        try_recv!(old).unwrap().as_chunk().unwrap();
1467        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1468
1469        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1470
1471        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1472        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1473
1474        // 6. Send another barrier.
1475        let b2 = Barrier::new_test_barrier(test_epoch(2));
1476        barrier_test_env.inject_barrier(&b2, [actor_id]);
1477        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1478            .await
1479            .unwrap();
1480        executor.next().await.unwrap().unwrap();
1481
1482        // 7. Check downstream.
1483        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1484        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1485        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1486
1487        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1488        try_recv!(new_simple).unwrap_err(); // Untouched.
1489
1490        // 8. Send another chunk.
1491        tx.send(Message::Chunk(StreamChunk::default()).into())
1492            .await
1493            .unwrap();
1494
1495        // 9. Send a configuration change barrier for simple dispatcher.
1496        let dispatcher_updates = maplit::hashmap! {
1497            actor_id => vec![PbDispatcherUpdate {
1498                actor_id,
1499                dispatcher_id: simple_dispatcher_id,
1500                added_downstream_actor_id: vec![new_simple],
1501                removed_downstream_actor_id: vec![old_simple],
1502                hash_mapping: Default::default(),
1503            }]
1504        };
1505        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1506            UpdateMutation {
1507                dispatchers: dispatcher_updates,
1508                ..Default::default()
1509            },
1510        ));
1511        barrier_test_env.inject_barrier(&b3, [actor_id]);
1512        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1513            .await
1514            .unwrap();
1515        executor.next().await.unwrap().unwrap();
1516
1517        // 10. Check downstream.
1518        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1519        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1520
1521        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1522
1523        // 11. Send another barrier.
1524        let b4 = Barrier::new_test_barrier(test_epoch(4));
1525        barrier_test_env.inject_barrier(&b4, [actor_id]);
1526        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1527            .await
1528            .unwrap();
1529        executor.next().await.unwrap().unwrap();
1530
1531        // 12. Check downstream.
1532        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1533        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1534    }
1535
1536    #[tokio::test]
1537    async fn test_hash_dispatcher() {
1538        // This test only works when vnode count is 256.
1539        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1540
1541        let num_outputs = 5; // actor id ranges from 1 to 5
1542        let cardinality = 10;
1543        let dimension = 4;
1544        let key_indices = &[0, 2];
1545        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1546            (0..num_outputs).map(|_| channel_for_test()).collect();
1547        let outputs = output_tx_vecs
1548            .into_iter()
1549            .enumerate()
1550            .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1551            .collect::<Vec<_>>();
1552        let mut hash_mapping = (1..num_outputs + 1)
1553            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1554            .collect_vec();
1555        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1556        let mut hash_dispatcher = HashDataDispatcher::new(
1557            outputs,
1558            key_indices.to_vec(),
1559            DispatchOutputMapping::Simple((0..dimension).collect()),
1560            hash_mapping.clone(),
1561            0,
1562        );
1563
1564        let mut ops = Vec::new();
1565        for idx in 0..cardinality {
1566            if idx % 2 == 0 {
1567                ops.push(Op::Insert);
1568            } else {
1569                ops.push(Op::Delete);
1570            }
1571        }
1572
1573        let mut start = 19260817i32..;
1574        let mut builders = (0..dimension)
1575            .map(|_| I32ArrayBuilder::new(cardinality))
1576            .collect_vec();
1577        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1578        let mut output_ops = vec![vec![]; num_outputs];
1579        for op in &ops {
1580            let hash_builder = Crc32FastBuilder;
1581            let mut hasher = hash_builder.build_hasher();
1582            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1583            for key_idx in key_indices {
1584                let val = one_row[*key_idx];
1585                let bytes = val.to_le_bytes();
1586                hasher.update(&bytes);
1587            }
1588            let output_idx =
1589                hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1590            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1591                builder.append(Some(*val));
1592            }
1593            output_cols[output_idx]
1594                .iter_mut()
1595                .zip_eq_fast(one_row.iter())
1596                .for_each(|(each_column, val)| each_column.push(*val));
1597            output_ops[output_idx].push(op);
1598        }
1599
1600        let columns = builders
1601            .into_iter()
1602            .map(|builder| {
1603                let array = builder.finish();
1604                array.into_ref()
1605            })
1606            .collect();
1607
1608        let chunk = StreamChunk::new(ops, columns);
1609        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1610
1611        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1612            let mut output = vec![];
1613            while let Some(Some(msg)) = rx.recv().now_or_never() {
1614                output.push(msg);
1615            }
1616            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1617            assert!(output.len() <= 1);
1618            if output.is_empty() {
1619                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1620            } else {
1621                let message = output.first().unwrap();
1622                let real_chunk = match message {
1623                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1624                    _ => panic!(),
1625                };
1626                real_chunk
1627                    .columns()
1628                    .iter()
1629                    .zip_eq_fast(output_cols[output_idx].iter())
1630                    .for_each(|(real_col, expect_col)| {
1631                        let real_vals = real_chunk
1632                            .visibility()
1633                            .iter_ones()
1634                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1635                            .collect::<Vec<_>>();
1636                        assert_eq!(real_vals.len(), expect_col.len());
1637                        assert_eq!(real_vals, *expect_col);
1638                    });
1639            }
1640        }
1641    }
1642}