risingwave_stream/executor/
dispatch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use risingwave_pb::stream_plan::{self, PbDispatcher};
31use smallvec::{SmallVec, smallvec};
32use tokio::sync::mpsc::UnboundedReceiver;
33use tokio::time::Instant;
34use tokio_stream::StreamExt;
35use tokio_stream::adapters::Peekable;
36use tracing::{Instrument, event};
37
38use super::exchange::output::Output;
39use super::{
40    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
41    UpdateMutation,
42};
43use crate::executor::StreamConsumer;
44use crate::executor::prelude::*;
45use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
46
47mod output_mapping;
48pub use output_mapping::DispatchOutputMapping;
49
50/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
51/// data chunks will be dispatched with some specified policy, while control message
52/// such as barriers will be distributed to all receivers.
53pub struct DispatchExecutor {
54    input: Executor,
55    inner: DispatchExecutorInner,
56}
57
58struct DispatcherWithMetrics {
59    dispatcher: DispatcherImpl,
60    actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
61}
62
63impl DispatcherWithMetrics {
64    pub fn record_output_buffer_blocking_duration(&self, duration: Duration) {
65        let ns = duration.as_nanos() as u64;
66        self.actor_output_buffer_blocking_duration_ns.inc_by(ns);
67    }
68}
69
70impl Debug for DispatcherWithMetrics {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        self.dispatcher.fmt(f)
73    }
74}
75
76impl Deref for DispatcherWithMetrics {
77    type Target = DispatcherImpl;
78
79    fn deref(&self) -> &Self::Target {
80        &self.dispatcher
81    }
82}
83
84impl DerefMut for DispatcherWithMetrics {
85    fn deref_mut(&mut self) -> &mut Self::Target {
86        &mut self.dispatcher
87    }
88}
89
90struct DispatchExecutorMetrics {
91    actor_id_str: String,
92    fragment_id_str: String,
93    metrics: Arc<StreamingMetrics>,
94    actor_out_record_cnt: LabelGuardedIntCounter,
95}
96
97impl DispatchExecutorMetrics {
98    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
99        DispatcherWithMetrics {
100            actor_output_buffer_blocking_duration_ns: self
101                .metrics
102                .actor_output_buffer_blocking_duration_ns
103                .with_guarded_label_values(&[
104                    self.actor_id_str.as_str(),
105                    self.fragment_id_str.as_str(),
106                    dispatcher.dispatcher_id_str(),
107                ]),
108            dispatcher,
109        }
110    }
111}
112
113struct DispatchExecutorInner {
114    dispatchers: Vec<DispatcherWithMetrics>,
115    actor_id: u32,
116    local_barrier_manager: LocalBarrierManager,
117    metrics: DispatchExecutorMetrics,
118    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
119    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
120}
121
122impl DispatchExecutorInner {
123    async fn collect_outputs(
124        &mut self,
125        downstream_actors: &[ActorId],
126    ) -> StreamResult<Vec<Output>> {
127        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
128            let tx = match request {
129                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
130            };
131            Output::new(downstream_actor, tx)
132        }
133        let mut outputs = Vec::with_capacity(downstream_actors.len());
134        for downstream_actor in downstream_actors {
135            let output =
136                if let Some(request) = self.pending_new_output_requests.remove(downstream_actor) {
137                    resolve_output(*downstream_actor, request)
138                } else {
139                    loop {
140                        let (requested_actor, request) = self
141                            .new_output_request_rx
142                            .recv()
143                            .await
144                            .ok_or_else(|| anyhow!("end of new output request"))?;
145                        if requested_actor == *downstream_actor {
146                            break resolve_output(requested_actor, request);
147                        } else {
148                            assert!(
149                                self.pending_new_output_requests
150                                    .insert(requested_actor, request)
151                                    .is_none(),
152                                "duplicated inflight new output requests from actor {}",
153                                requested_actor
154                            );
155                        }
156                    }
157                };
158            outputs.push(output);
159        }
160        Ok(outputs)
161    }
162
163    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
164        let limit = self
165            .local_barrier_manager
166            .env
167            .config()
168            .developer
169            .exchange_concurrent_dispatchers;
170        // Only barrier can be batched for now.
171        match msg {
172            MessageBatch::BarrierBatch(barrier_batch) => {
173                if barrier_batch.is_empty() {
174                    return Ok(());
175                }
176                // Only the first barrier in a batch can be mutation.
177                let mutation = barrier_batch[0].mutation.clone();
178                self.pre_mutate_dispatchers(&mutation).await?;
179                futures::stream::iter(self.dispatchers.iter_mut())
180                    .map(Ok)
181                    .try_for_each_concurrent(limit, |dispatcher| async {
182                        let start_time = Instant::now();
183                        dispatcher
184                            .dispatch_barriers(
185                                barrier_batch
186                                    .iter()
187                                    .cloned()
188                                    .map(|b| b.into_dispatcher())
189                                    .collect(),
190                            )
191                            .await?;
192                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
193                        StreamResult::Ok(())
194                    })
195                    .await?;
196                self.post_mutate_dispatchers(&mutation)?;
197            }
198            MessageBatch::Watermark(watermark) => {
199                futures::stream::iter(self.dispatchers.iter_mut())
200                    .map(Ok)
201                    .try_for_each_concurrent(limit, |dispatcher| async {
202                        let start_time = Instant::now();
203                        dispatcher.dispatch_watermark(watermark.clone()).await?;
204                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
205                        StreamResult::Ok(())
206                    })
207                    .await?;
208            }
209            MessageBatch::Chunk(chunk) => {
210                futures::stream::iter(self.dispatchers.iter_mut())
211                    .map(Ok)
212                    .try_for_each_concurrent(limit, |dispatcher| async {
213                        let start_time = Instant::now();
214                        dispatcher.dispatch_data(chunk.clone()).await?;
215                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
216                        StreamResult::Ok(())
217                    })
218                    .await?;
219
220                self.metrics
221                    .actor_out_record_cnt
222                    .inc_by(chunk.cardinality() as _);
223            }
224        }
225        Ok(())
226    }
227
228    /// Add new dispatchers to the executor. Will check whether their ids are unique.
229    async fn add_dispatchers<'a>(
230        &mut self,
231        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
232    ) -> StreamResult<()> {
233        for dispatcher in new_dispatchers {
234            let outputs = self
235                .collect_outputs(&dispatcher.downstream_actor_id)
236                .await?;
237            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
238            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
239            self.dispatchers.push(dispatcher);
240        }
241
242        assert!(
243            self.dispatchers
244                .iter()
245                .map(|d| d.dispatcher_id())
246                .all_unique(),
247            "dispatcher ids must be unique: {:?}",
248            self.dispatchers
249        );
250
251        Ok(())
252    }
253
254    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
255        self.dispatchers
256            .iter_mut()
257            .find(|d| d.dispatcher_id() == dispatcher_id)
258            .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
259    }
260
261    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
262    /// outputs.
263    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
264        let outputs = self
265            .collect_outputs(&update.added_downstream_actor_id)
266            .await?;
267
268        let dispatcher = self.find_dispatcher(update.dispatcher_id);
269        dispatcher.add_outputs(outputs);
270
271        Ok(())
272    }
273
274    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
275    /// update the hash mapping.
276    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
277        let ids = update.removed_downstream_actor_id.iter().copied().collect();
278
279        let dispatcher = self.find_dispatcher(update.dispatcher_id);
280        dispatcher.remove_outputs(&ids);
281
282        // The hash mapping is only used by the hash dispatcher.
283        //
284        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
285        // it's possible that there're multiple upstreams with different exchange types, for
286        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
287        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
288        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
289            dispatcher.hash_mapping =
290                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
291        }
292
293        Ok(())
294    }
295
296    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
297    async fn pre_mutate_dispatchers(
298        &mut self,
299        mutation: &Option<Arc<Mutation>>,
300    ) -> StreamResult<()> {
301        let Some(mutation) = mutation.as_deref() else {
302            return Ok(());
303        };
304
305        match mutation {
306            Mutation::Add(AddMutation { adds, .. }) => {
307                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
308                    self.add_dispatchers(new_dispatchers).await?;
309                }
310            }
311            Mutation::Update(UpdateMutation {
312                dispatchers,
313                actor_new_dispatchers: actor_dispatchers,
314                ..
315            }) => {
316                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
317                    self.add_dispatchers(new_dispatchers).await?;
318                }
319
320                if let Some(updates) = dispatchers.get(&self.actor_id) {
321                    for update in updates {
322                        self.pre_update_dispatcher(update).await?;
323                    }
324                }
325            }
326            Mutation::AddAndUpdate(
327                AddMutation { adds, .. },
328                UpdateMutation {
329                    dispatchers,
330                    actor_new_dispatchers: actor_dispatchers,
331                    ..
332                },
333            ) => {
334                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
335                    self.add_dispatchers(new_dispatchers).await?;
336                }
337
338                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
339                    self.add_dispatchers(new_dispatchers).await?;
340                }
341
342                if let Some(updates) = dispatchers.get(&self.actor_id) {
343                    for update in updates {
344                        self.pre_update_dispatcher(update).await?;
345                    }
346                }
347            }
348            _ => {}
349        }
350
351        Ok(())
352    }
353
354    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
355    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
356        let Some(mutation) = mutation.as_deref() else {
357            return Ok(());
358        };
359
360        match mutation {
361            Mutation::Stop(stops) => {
362                // Remove outputs only if this actor itself is not to be stopped.
363                if !stops.contains(&self.actor_id) {
364                    for dispatcher in &mut self.dispatchers {
365                        dispatcher.remove_outputs(stops);
366                    }
367                }
368            }
369            Mutation::Update(UpdateMutation {
370                dispatchers,
371                dropped_actors,
372                ..
373            })
374            | Mutation::AddAndUpdate(
375                _,
376                UpdateMutation {
377                    dispatchers,
378                    dropped_actors,
379                    ..
380                },
381            ) => {
382                if let Some(updates) = dispatchers.get(&self.actor_id) {
383                    for update in updates {
384                        self.post_update_dispatcher(update)?;
385                    }
386                }
387
388                if !dropped_actors.contains(&self.actor_id) {
389                    for dispatcher in &mut self.dispatchers {
390                        dispatcher.remove_outputs(dropped_actors);
391                    }
392                }
393            }
394            _ => {}
395        };
396
397        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
398        // should clean up them.
399        self.dispatchers.retain(|d| !d.is_empty());
400
401        Ok(())
402    }
403}
404
405impl DispatchExecutor {
406    pub(crate) async fn new(
407        input: Executor,
408        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
409        dispatchers: Vec<stream_plan::Dispatcher>,
410        actor_id: u32,
411        fragment_id: u32,
412        local_barrier_manager: LocalBarrierManager,
413        metrics: Arc<StreamingMetrics>,
414    ) -> StreamResult<Self> {
415        let mut executor = Self::new_inner(
416            input,
417            new_output_request_rx,
418            vec![],
419            actor_id,
420            fragment_id,
421            local_barrier_manager,
422            metrics,
423        );
424        let inner = &mut executor.inner;
425        for dispatcher in dispatchers {
426            let outputs = inner
427                .collect_outputs(&dispatcher.downstream_actor_id)
428                .await?;
429            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
430            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
431            inner.dispatchers.push(dispatcher);
432        }
433        Ok(executor)
434    }
435
436    #[cfg(test)]
437    pub(crate) fn for_test(
438        input: Executor,
439        dispatchers: Vec<DispatcherImpl>,
440        actor_id: u32,
441        fragment_id: u32,
442        local_barrier_manager: LocalBarrierManager,
443        metrics: Arc<StreamingMetrics>,
444    ) -> (
445        Self,
446        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
447    ) {
448        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
449
450        (
451            Self::new_inner(
452                input,
453                rx,
454                dispatchers,
455                actor_id,
456                fragment_id,
457                local_barrier_manager,
458                metrics,
459            ),
460            tx,
461        )
462    }
463
464    fn new_inner(
465        mut input: Executor,
466        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
467        dispatchers: Vec<DispatcherImpl>,
468        actor_id: u32,
469        fragment_id: u32,
470        local_barrier_manager: LocalBarrierManager,
471        metrics: Arc<StreamingMetrics>,
472    ) -> Self {
473        let chunk_size = local_barrier_manager.env.config().developer.chunk_size;
474        if crate::consistency::insane() {
475            // make some trouble before dispatching to avoid generating invalid dist key.
476            let mut info = input.info().clone();
477            info.identity = format!("{} (embedded trouble)", info.identity);
478            let troublemaker = TroublemakerExecutor::new(input, chunk_size);
479            input = (info, troublemaker).into();
480        }
481
482        let actor_id_str = actor_id.to_string();
483        let fragment_id_str = fragment_id.to_string();
484        let actor_out_record_cnt = metrics
485            .actor_out_record_cnt
486            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
487        let metrics = DispatchExecutorMetrics {
488            actor_id_str,
489            fragment_id_str,
490            metrics,
491            actor_out_record_cnt,
492        };
493        let dispatchers = dispatchers
494            .into_iter()
495            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
496            .collect();
497        Self {
498            input,
499            inner: DispatchExecutorInner {
500                dispatchers,
501                actor_id,
502                local_barrier_manager,
503                metrics,
504                new_output_request_rx,
505                pending_new_output_requests: Default::default(),
506            },
507        }
508    }
509}
510
511impl StreamConsumer for DispatchExecutor {
512    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
513
514    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
515        let max_barrier_count_per_batch = self
516            .inner
517            .local_barrier_manager
518            .env
519            .config()
520            .developer
521            .max_barrier_batch_size;
522        #[try_stream]
523        async move {
524            let mut input = self.input.execute().peekable();
525            loop {
526                let Some(message) =
527                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
528                else {
529                    // end_of_stream
530                    break;
531                };
532                match message {
533                    chunk @ MessageBatch::Chunk(_) => {
534                        self.inner
535                            .dispatch(chunk)
536                            .instrument(tracing::info_span!("dispatch_chunk"))
537                            .instrument_await("dispatch_chunk")
538                            .await?;
539                    }
540                    MessageBatch::BarrierBatch(barrier_batch) => {
541                        assert!(!barrier_batch.is_empty());
542                        self.inner
543                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
544                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
545                            .instrument_await("dispatch_barrier_batch")
546                            .await?;
547                        self.inner
548                            .metrics
549                            .metrics
550                            .barrier_batch_size
551                            .observe(barrier_batch.len() as f64);
552                        for barrier in barrier_batch {
553                            yield barrier;
554                        }
555                    }
556                    watermark @ MessageBatch::Watermark(_) => {
557                        self.inner
558                            .dispatch(watermark)
559                            .instrument(tracing::info_span!("dispatch_watermark"))
560                            .instrument_await("dispatch_watermark")
561                            .await?;
562                    }
563                }
564            }
565        }
566    }
567}
568
569/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
570///
571/// Returns the message batch.
572///
573/// Returns None if end of stream.
574async fn try_batch_barriers(
575    max_barrier_count_per_batch: u32,
576    input: &mut Peekable<BoxedMessageStream>,
577) -> StreamResult<Option<MessageBatch>> {
578    let Some(msg) = input.next().await else {
579        // end_of_stream
580        return Ok(None);
581    };
582    let mut barrier_batch = vec![];
583    let msg: Message = msg?;
584    let max_peek_attempts = match msg {
585        Message::Chunk(c) => {
586            return Ok(Some(MessageBatch::Chunk(c)));
587        }
588        Message::Watermark(w) => {
589            return Ok(Some(MessageBatch::Watermark(w)));
590        }
591        Message::Barrier(b) => {
592            let peek_more_barrier = b.mutation.is_none();
593            barrier_batch.push(b);
594            if peek_more_barrier {
595                max_barrier_count_per_batch.saturating_sub(1)
596            } else {
597                0
598            }
599        }
600    };
601    // Try to peek more consecutive non-mutation barriers.
602    for _ in 0..max_peek_attempts {
603        let peek = input.peek().now_or_never();
604        let Some(peek) = peek else {
605            break;
606        };
607        let Some(msg) = peek else {
608            // end_of_stream
609            break;
610        };
611        let Ok(Message::Barrier(barrier)) = msg else {
612            break;
613        };
614        if barrier.mutation.is_some() {
615            break;
616        }
617        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
618        let Message::Barrier(ref barrier) = msg else {
619            unreachable!("must be a barrier");
620        };
621        barrier_batch.push(barrier.clone());
622    }
623    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
624}
625
626#[derive(Debug)]
627pub enum DispatcherImpl {
628    Hash(HashDataDispatcher),
629    Broadcast(BroadcastDispatcher),
630    Simple(SimpleDispatcher),
631    RoundRobin(RoundRobinDataDispatcher),
632}
633
634impl DispatcherImpl {
635    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
636        let output_mapping =
637            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
638
639        use risingwave_pb::stream_plan::DispatcherType::*;
640        let dispatcher_impl = match dispatcher.get_type()? {
641            Hash => {
642                assert!(!outputs.is_empty());
643                let dist_key_indices = dispatcher
644                    .dist_key_indices
645                    .iter()
646                    .map(|i| *i as usize)
647                    .collect();
648
649                let hash_mapping =
650                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
651
652                DispatcherImpl::Hash(HashDataDispatcher::new(
653                    outputs,
654                    dist_key_indices,
655                    output_mapping,
656                    hash_mapping,
657                    dispatcher.dispatcher_id,
658                ))
659            }
660            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
661                outputs,
662                output_mapping,
663                dispatcher.dispatcher_id,
664            )),
665            Simple | NoShuffle => {
666                let [output]: [_; 1] = outputs.try_into().unwrap();
667                DispatcherImpl::Simple(SimpleDispatcher::new(
668                    output,
669                    output_mapping,
670                    dispatcher.dispatcher_id,
671                ))
672            }
673            Unspecified => unreachable!(),
674        };
675
676        Ok(dispatcher_impl)
677    }
678}
679
680macro_rules! impl_dispatcher {
681    ($( { $variant_name:ident } ),*) => {
682        impl DispatcherImpl {
683            pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
684                match self {
685                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
686                }
687            }
688
689            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
690                match self {
691                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
692                }
693            }
694
695            pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
696                match self {
697                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
698                }
699            }
700
701            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
702                match self {
703                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
704                }
705            }
706
707            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
708                match self {
709                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
710                }
711            }
712
713            pub fn dispatcher_id(&self) -> DispatcherId {
714                match self {
715                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
716                }
717            }
718
719            pub fn dispatcher_id_str(&self) -> &str {
720                match self {
721                    $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
722                }
723            }
724
725            pub fn is_empty(&self) -> bool {
726                match self {
727                    $(Self::$variant_name(inner) => inner.is_empty(), )*
728                }
729            }
730        }
731    }
732}
733
734macro_rules! for_all_dispatcher_variants {
735    ($macro:ident) => {
736        $macro! {
737            { Hash },
738            { Broadcast },
739            { Simple },
740            { RoundRobin }
741        }
742    };
743}
744
745for_all_dispatcher_variants! { impl_dispatcher }
746
747pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
748
749pub trait Dispatcher: Debug + 'static {
750    /// Dispatch a data chunk to downstream actors.
751    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
752    /// Dispatch barriers to downstream actors, generally by broadcasting it.
753    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
754    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
755    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
756
757    /// Add new outputs to the dispatcher.
758    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
759    /// Remove outputs to `actor_ids` from the dispatcher.
760    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
761
762    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
763    /// different IDs.
764    ///
765    /// Note that the dispatcher id is always equal to the downstream fragment id.
766    /// See also `proto/stream_plan.proto`.
767    fn dispatcher_id(&self) -> DispatcherId;
768
769    /// Dispatcher id in string. See [`Dispatcher::dispatcher_id`].
770    fn dispatcher_id_str(&self) -> &str;
771
772    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
773    /// [`DispatchExecutor`].
774    fn is_empty(&self) -> bool;
775}
776
777/// Concurrently broadcast a message to all outputs.
778///
779/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
780/// always unlimited.
781async fn broadcast_concurrent(
782    outputs: impl IntoIterator<Item = &'_ mut Output>,
783    message: DispatcherMessageBatch,
784) -> StreamResult<()> {
785    futures::future::try_join_all(
786        outputs
787            .into_iter()
788            .map(|output| output.send(message.clone())),
789    )
790    .await?;
791    Ok(())
792}
793
794#[derive(Debug)]
795pub struct RoundRobinDataDispatcher {
796    outputs: Vec<Output>,
797    output_mapping: DispatchOutputMapping,
798    cur: usize,
799    dispatcher_id: DispatcherId,
800    dispatcher_id_str: String,
801}
802
803impl RoundRobinDataDispatcher {
804    pub fn new(
805        outputs: Vec<Output>,
806        output_mapping: DispatchOutputMapping,
807        dispatcher_id: DispatcherId,
808    ) -> Self {
809        Self {
810            outputs,
811            output_mapping,
812            cur: 0,
813            dispatcher_id,
814            dispatcher_id_str: dispatcher_id.to_string(),
815        }
816    }
817}
818
819impl Dispatcher for RoundRobinDataDispatcher {
820    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
821        let chunk = self.output_mapping.apply(chunk);
822
823        self.outputs[self.cur]
824            .send(DispatcherMessageBatch::Chunk(chunk))
825            .await?;
826        self.cur += 1;
827        self.cur %= self.outputs.len();
828        Ok(())
829    }
830
831    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
832        // always broadcast barrier
833        broadcast_concurrent(
834            &mut self.outputs,
835            DispatcherMessageBatch::BarrierBatch(barriers),
836        )
837        .await
838    }
839
840    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
841        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
842            // always broadcast watermark
843            broadcast_concurrent(
844                &mut self.outputs,
845                DispatcherMessageBatch::Watermark(watermark),
846            )
847            .await?;
848        }
849        Ok(())
850    }
851
852    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
853        self.outputs.extend(outputs);
854    }
855
856    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
857        self.outputs
858            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
859            .count();
860        self.cur = self.cur.min(self.outputs.len() - 1);
861    }
862
863    fn dispatcher_id(&self) -> DispatcherId {
864        self.dispatcher_id
865    }
866
867    fn dispatcher_id_str(&self) -> &str {
868        &self.dispatcher_id_str
869    }
870
871    fn is_empty(&self) -> bool {
872        self.outputs.is_empty()
873    }
874}
875
876pub struct HashDataDispatcher {
877    outputs: Vec<Output>,
878    keys: Vec<usize>,
879    output_mapping: DispatchOutputMapping,
880    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
881    /// different downstream actors.
882    hash_mapping: ExpandedActorMapping,
883    dispatcher_id: DispatcherId,
884    dispatcher_id_str: String,
885}
886
887impl Debug for HashDataDispatcher {
888    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
889        f.debug_struct("HashDataDispatcher")
890            .field("outputs", &self.outputs)
891            .field("keys", &self.keys)
892            .field("dispatcher_id", &self.dispatcher_id)
893            .finish_non_exhaustive()
894    }
895}
896
897impl HashDataDispatcher {
898    pub fn new(
899        outputs: Vec<Output>,
900        keys: Vec<usize>,
901        output_mapping: DispatchOutputMapping,
902        hash_mapping: ExpandedActorMapping,
903        dispatcher_id: DispatcherId,
904    ) -> Self {
905        Self {
906            outputs,
907            keys,
908            output_mapping,
909            hash_mapping,
910            dispatcher_id,
911            dispatcher_id_str: dispatcher_id.to_string(),
912        }
913    }
914}
915
916impl Dispatcher for HashDataDispatcher {
917    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
918        self.outputs.extend(outputs);
919    }
920
921    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
922        // always broadcast barrier
923        broadcast_concurrent(
924            &mut self.outputs,
925            DispatcherMessageBatch::BarrierBatch(barriers),
926        )
927        .await
928    }
929
930    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
931        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
932            // always broadcast watermark
933            broadcast_concurrent(
934                &mut self.outputs,
935                DispatcherMessageBatch::Watermark(watermark),
936            )
937            .await?;
938        }
939        Ok(())
940    }
941
942    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
943        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
944        // In these output chunks, the only difference are visibility map, which is calculated
945        // by the hash value of each line in the input chunk.
946        let num_outputs = self.outputs.len();
947
948        // get hash value of every line by its key
949        let vnode_count = self.hash_mapping.len();
950        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
951
952        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
953
954        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
955            .take(num_outputs)
956            .collect_vec();
957        let mut last_vnode_when_update_delete = None;
958        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
959
960        // Apply output indices after calculating the vnode.
961        let chunk = self.output_mapping.apply(chunk);
962
963        for ((vnode, &op), visible) in vnodes
964            .iter()
965            .copied()
966            .zip_eq_fast(chunk.ops())
967            .zip_eq_fast(chunk.visibility().iter())
968        {
969            // Build visibility map for every output chunk.
970            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
971                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
972            }
973
974            if !visible {
975                assert!(
976                    last_vnode_when_update_delete.is_none(),
977                    "invisible row between U- and U+, op = {op:?}",
978                );
979                new_ops.push(op);
980                continue;
981            }
982
983            // The 'update' message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
984            // need to be rewritten to common `Delete` and `Insert` if they were dispatched to
985            // different actors.
986            if op == Op::UpdateDelete {
987                last_vnode_when_update_delete = Some(vnode);
988            } else if op == Op::UpdateInsert {
989                if vnode
990                    != last_vnode_when_update_delete
991                        .take()
992                        .expect("missing U- before U+")
993                {
994                    new_ops.push(Op::Delete);
995                    new_ops.push(Op::Insert);
996                } else {
997                    new_ops.push(Op::UpdateDelete);
998                    new_ops.push(Op::UpdateInsert);
999                }
1000            } else {
1001                new_ops.push(op);
1002            }
1003        }
1004        assert!(
1005            last_vnode_when_update_delete.is_none(),
1006            "missing U+ after U-"
1007        );
1008
1009        let ops = new_ops;
1010
1011        // individually output StreamChunk integrated with vis_map
1012        futures::future::try_join_all(
1013            vis_maps
1014                .into_iter()
1015                .zip_eq_fast(self.outputs.iter_mut())
1016                .map(|(vis_map, output)| async {
1017                    let vis_map = vis_map.finish();
1018                    // columns is not changed in this function
1019                    let new_stream_chunk =
1020                        StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1021                    if new_stream_chunk.cardinality() > 0 {
1022                        event!(
1023                            tracing::Level::TRACE,
1024                            msg = "chunk",
1025                            downstream = output.actor_id(),
1026                            "send = \n{:#?}",
1027                            new_stream_chunk
1028                        );
1029                        output
1030                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1031                            .await?;
1032                    }
1033                    StreamResult::Ok(())
1034                }),
1035        )
1036        .await?;
1037
1038        Ok(())
1039    }
1040
1041    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1042        self.outputs
1043            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1044            .count();
1045    }
1046
1047    fn dispatcher_id(&self) -> DispatcherId {
1048        self.dispatcher_id
1049    }
1050
1051    fn dispatcher_id_str(&self) -> &str {
1052        &self.dispatcher_id_str
1053    }
1054
1055    fn is_empty(&self) -> bool {
1056        self.outputs.is_empty()
1057    }
1058}
1059
1060/// `BroadcastDispatcher` dispatches message to all outputs.
1061#[derive(Debug)]
1062pub struct BroadcastDispatcher {
1063    outputs: HashMap<ActorId, Output>,
1064    output_mapping: DispatchOutputMapping,
1065    dispatcher_id: DispatcherId,
1066    dispatcher_id_str: String,
1067}
1068
1069impl BroadcastDispatcher {
1070    pub fn new(
1071        outputs: impl IntoIterator<Item = Output>,
1072        output_mapping: DispatchOutputMapping,
1073        dispatcher_id: DispatcherId,
1074    ) -> Self {
1075        Self {
1076            outputs: Self::into_pairs(outputs).collect(),
1077            output_mapping,
1078            dispatcher_id,
1079            dispatcher_id_str: dispatcher_id.to_string(),
1080        }
1081    }
1082
1083    fn into_pairs(
1084        outputs: impl IntoIterator<Item = Output>,
1085    ) -> impl Iterator<Item = (ActorId, Output)> {
1086        outputs
1087            .into_iter()
1088            .map(|output| (output.actor_id(), output))
1089    }
1090}
1091
1092impl Dispatcher for BroadcastDispatcher {
1093    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1094        let chunk = self.output_mapping.apply(chunk);
1095        broadcast_concurrent(
1096            self.outputs.values_mut(),
1097            DispatcherMessageBatch::Chunk(chunk),
1098        )
1099        .await
1100    }
1101
1102    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1103        // always broadcast barrier
1104        broadcast_concurrent(
1105            self.outputs.values_mut(),
1106            DispatcherMessageBatch::BarrierBatch(barriers),
1107        )
1108        .await
1109    }
1110
1111    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1112        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1113            // always broadcast watermark
1114            broadcast_concurrent(
1115                self.outputs.values_mut(),
1116                DispatcherMessageBatch::Watermark(watermark),
1117            )
1118            .await?;
1119        }
1120        Ok(())
1121    }
1122
1123    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1124        self.outputs.extend(Self::into_pairs(outputs));
1125    }
1126
1127    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1128        self.outputs
1129            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1130            .count();
1131    }
1132
1133    fn dispatcher_id(&self) -> DispatcherId {
1134        self.dispatcher_id
1135    }
1136
1137    fn dispatcher_id_str(&self) -> &str {
1138        &self.dispatcher_id_str
1139    }
1140
1141    fn is_empty(&self) -> bool {
1142        self.outputs.is_empty()
1143    }
1144}
1145
1146/// `SimpleDispatcher` dispatches message to a single output.
1147#[derive(Debug)]
1148pub struct SimpleDispatcher {
1149    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1150    /// the field needs to be temporarily set to 0 or 2 outputs.
1151    ///
1152    /// - When dropping a materialized view, the output will be removed and this field becomes
1153    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1154    ///   finishing processing the current mutation.
1155    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1156    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1157    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1158    ///   will be removed and this field becomes single again.
1159    ///
1160    /// Therefore, when dispatching data, we assert that there's exactly one output by
1161    /// `Self::output`.
1162    output: SmallVec<[Output; 2]>,
1163    output_mapping: DispatchOutputMapping,
1164    dispatcher_id: DispatcherId,
1165    dispatcher_id_str: String,
1166}
1167
1168impl SimpleDispatcher {
1169    pub fn new(
1170        output: Output,
1171        output_mapping: DispatchOutputMapping,
1172        dispatcher_id: DispatcherId,
1173    ) -> Self {
1174        Self {
1175            output: smallvec![output],
1176            output_mapping,
1177            dispatcher_id,
1178            dispatcher_id_str: dispatcher_id.to_string(),
1179        }
1180    }
1181}
1182
1183impl Dispatcher for SimpleDispatcher {
1184    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1185        self.output.extend(outputs);
1186        assert!(self.output.len() <= 2);
1187    }
1188
1189    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1190        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1191        for output in &mut self.output {
1192            output
1193                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1194                .await?;
1195        }
1196        Ok(())
1197    }
1198
1199    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1200        let output = self
1201            .output
1202            .iter_mut()
1203            .exactly_one()
1204            .expect("expect exactly one output");
1205
1206        let chunk = self.output_mapping.apply(chunk);
1207        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1208    }
1209
1210    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1211        let output = self
1212            .output
1213            .iter_mut()
1214            .exactly_one()
1215            .expect("expect exactly one output");
1216
1217        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1218            output
1219                .send(DispatcherMessageBatch::Watermark(watermark))
1220                .await?;
1221        }
1222        Ok(())
1223    }
1224
1225    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1226        self.output
1227            .retain(|output| !actor_ids.contains(&output.actor_id()));
1228    }
1229
1230    fn dispatcher_id(&self) -> DispatcherId {
1231        self.dispatcher_id
1232    }
1233
1234    fn dispatcher_id_str(&self) -> &str {
1235        &self.dispatcher_id_str
1236    }
1237
1238    fn is_empty(&self) -> bool {
1239        self.output.is_empty()
1240    }
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245    use std::hash::{BuildHasher, Hasher};
1246
1247    use futures::pin_mut;
1248    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1249    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1250    use risingwave_common::util::epoch::test_epoch;
1251    use risingwave_common::util::hash_util::Crc32FastBuilder;
1252    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1253    use tokio::sync::mpsc::unbounded_channel;
1254
1255    use super::*;
1256    use crate::executor::exchange::output::Output;
1257    use crate::executor::exchange::permit::channel_for_test;
1258    use crate::executor::receiver::ReceiverExecutor;
1259    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1260    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1261
1262    // TODO: this test contains update being shuffled to different partitions, which is not
1263    // supported for now.
1264    #[tokio::test]
1265    async fn test_hash_dispatcher_complex() {
1266        test_hash_dispatcher_complex_inner().await
1267    }
1268
1269    async fn test_hash_dispatcher_complex_inner() {
1270        // This test only works when vnode count is 256.
1271        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1272
1273        let num_outputs = 2; // actor id ranges from 1 to 2
1274        let key_indices = &[0, 2];
1275        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1276            (0..num_outputs).map(|_| channel_for_test()).collect();
1277        let outputs = output_tx_vecs
1278            .into_iter()
1279            .enumerate()
1280            .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1281            .collect::<Vec<_>>();
1282        let mut hash_mapping = (1..num_outputs + 1)
1283            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1284            .collect_vec();
1285        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1286        let mut hash_dispatcher = HashDataDispatcher::new(
1287            outputs,
1288            key_indices.to_vec(),
1289            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1290            hash_mapping,
1291            0,
1292        );
1293
1294        let chunk = StreamChunk::from_pretty(
1295            "  I I I
1296            +  4 6 8
1297            +  5 7 9
1298            +  0 0 0
1299            -  1 1 1 D
1300            U- 2 0 2
1301            U+ 2 0 2
1302            U- 3 3 2
1303            U+ 3 3 4",
1304        );
1305        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1306
1307        assert_eq!(
1308            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1309            StreamChunk::from_pretty(
1310                "  I I I
1311                +  4 6 8
1312                +  5 7 9
1313                +  0 0 0
1314                -  1 1 1 D
1315                U- 2 0 2
1316                U+ 2 0 2
1317                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1318                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1319            )
1320        );
1321        assert_eq!(
1322            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1323            StreamChunk::from_pretty(
1324                "  I I I
1325                +  4 6 8 D
1326                +  5 7 9 D
1327                +  0 0 0 D
1328                -  1 1 1 D  // Should keep original invisible mark
1329                U- 2 0 2 D  // Should keep UpdateDelete
1330                U+ 2 0 2 D  // Should keep UpdateInsert
1331                -  3 3 2    // Should rewrite UpdateDelete to Delete
1332                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1333            )
1334        );
1335    }
1336
1337    #[tokio::test]
1338    async fn test_configuration_change() {
1339        let _schema = Schema { fields: vec![] };
1340        let (tx, rx) = channel_for_test();
1341        let actor_id = 233;
1342        let fragment_id = 666;
1343        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1344        let metrics = Arc::new(StreamingMetrics::unused());
1345
1346        let (untouched, old, new) = (234, 235, 238); // broadcast downstream actors
1347        let (old_simple, new_simple) = (114, 514); // simple downstream actors
1348
1349        // actor_id -> untouched, old, new, old_simple, new_simple
1350
1351        let broadcast_dispatcher_id = 666;
1352        let broadcast_dispatcher = PbDispatcher {
1353            r#type: DispatcherType::Broadcast as _,
1354            dispatcher_id: broadcast_dispatcher_id,
1355            downstream_actor_id: vec![untouched, old],
1356            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1357            ..Default::default()
1358        };
1359
1360        let simple_dispatcher_id = 888;
1361        let simple_dispatcher = PbDispatcher {
1362            r#type: DispatcherType::Simple as _,
1363            dispatcher_id: simple_dispatcher_id,
1364            downstream_actor_id: vec![old_simple],
1365            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1366            ..Default::default()
1367        };
1368
1369        let dispatcher_updates = maplit::hashmap! {
1370            actor_id => vec![PbDispatcherUpdate {
1371                actor_id,
1372                dispatcher_id: broadcast_dispatcher_id,
1373                added_downstream_actor_id: vec![new],
1374                removed_downstream_actor_id: vec![old],
1375                hash_mapping: Default::default(),
1376            }]
1377        };
1378        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1379            UpdateMutation {
1380                dispatchers: dispatcher_updates,
1381                merges: Default::default(),
1382                vnode_bitmaps: Default::default(),
1383                dropped_actors: Default::default(),
1384                actor_splits: Default::default(),
1385                actor_new_dispatchers: Default::default(),
1386            },
1387        ));
1388        barrier_test_env.inject_barrier(&b1, [actor_id]);
1389        barrier_test_env.flush_all_events().await;
1390
1391        let input = Executor::new(
1392            Default::default(),
1393            ReceiverExecutor::for_test(
1394                actor_id,
1395                rx,
1396                barrier_test_env.local_barrier_manager.clone(),
1397            )
1398            .boxed(),
1399        );
1400
1401        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1402        let mut rxs = [untouched, old, new, old_simple, new_simple]
1403            .into_iter()
1404            .map(|id| {
1405                (id, {
1406                    let (tx, rx) = channel_for_test();
1407                    new_output_request_tx
1408                        .send((id, NewOutputRequest::Local(tx)))
1409                        .unwrap();
1410                    rx
1411                })
1412            })
1413            .collect::<HashMap<_, _>>();
1414        let executor = Box::new(
1415            DispatchExecutor::new(
1416                input,
1417                new_output_request_rx,
1418                vec![broadcast_dispatcher, simple_dispatcher],
1419                actor_id,
1420                fragment_id,
1421                barrier_test_env.local_barrier_manager.clone(),
1422                metrics,
1423            )
1424            .await
1425            .unwrap(),
1426        )
1427        .execute();
1428
1429        pin_mut!(executor);
1430
1431        macro_rules! try_recv {
1432            ($down_id:expr) => {
1433                rxs.get_mut(&$down_id).unwrap().try_recv()
1434            };
1435        }
1436
1437        // 3. Send a chunk.
1438        tx.send(Message::Chunk(StreamChunk::default()).into())
1439            .await
1440            .unwrap();
1441
1442        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1443            .await
1444            .unwrap();
1445        executor.next().await.unwrap().unwrap();
1446
1447        // 5. Check downstream.
1448        try_recv!(untouched).unwrap().as_chunk().unwrap();
1449        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1450
1451        try_recv!(old).unwrap().as_chunk().unwrap();
1452        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1453
1454        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1455
1456        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1457        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1458
1459        // 6. Send another barrier.
1460        let b2 = Barrier::new_test_barrier(test_epoch(2));
1461        barrier_test_env.inject_barrier(&b2, [actor_id]);
1462        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1463            .await
1464            .unwrap();
1465        executor.next().await.unwrap().unwrap();
1466
1467        // 7. Check downstream.
1468        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1469        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1470        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1471
1472        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1473        try_recv!(new_simple).unwrap_err(); // Untouched.
1474
1475        // 8. Send another chunk.
1476        tx.send(Message::Chunk(StreamChunk::default()).into())
1477            .await
1478            .unwrap();
1479
1480        // 9. Send a configuration change barrier for simple dispatcher.
1481        let dispatcher_updates = maplit::hashmap! {
1482            actor_id => vec![PbDispatcherUpdate {
1483                actor_id,
1484                dispatcher_id: simple_dispatcher_id,
1485                added_downstream_actor_id: vec![new_simple],
1486                removed_downstream_actor_id: vec![old_simple],
1487                hash_mapping: Default::default(),
1488            }]
1489        };
1490        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1491            UpdateMutation {
1492                dispatchers: dispatcher_updates,
1493                merges: Default::default(),
1494                vnode_bitmaps: Default::default(),
1495                dropped_actors: Default::default(),
1496                actor_splits: Default::default(),
1497                actor_new_dispatchers: Default::default(),
1498            },
1499        ));
1500        barrier_test_env.inject_barrier(&b3, [actor_id]);
1501        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1502            .await
1503            .unwrap();
1504        executor.next().await.unwrap().unwrap();
1505
1506        // 10. Check downstream.
1507        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1508        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1509
1510        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1511
1512        // 11. Send another barrier.
1513        let b4 = Barrier::new_test_barrier(test_epoch(4));
1514        barrier_test_env.inject_barrier(&b4, [actor_id]);
1515        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1516            .await
1517            .unwrap();
1518        executor.next().await.unwrap().unwrap();
1519
1520        // 12. Check downstream.
1521        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1522        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1523    }
1524
1525    #[tokio::test]
1526    async fn test_hash_dispatcher() {
1527        // This test only works when vnode count is 256.
1528        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1529
1530        let num_outputs = 5; // actor id ranges from 1 to 5
1531        let cardinality = 10;
1532        let dimension = 4;
1533        let key_indices = &[0, 2];
1534        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1535            (0..num_outputs).map(|_| channel_for_test()).collect();
1536        let outputs = output_tx_vecs
1537            .into_iter()
1538            .enumerate()
1539            .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1540            .collect::<Vec<_>>();
1541        let mut hash_mapping = (1..num_outputs + 1)
1542            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1543            .collect_vec();
1544        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1545        let mut hash_dispatcher = HashDataDispatcher::new(
1546            outputs,
1547            key_indices.to_vec(),
1548            DispatchOutputMapping::Simple((0..dimension).collect()),
1549            hash_mapping.clone(),
1550            0,
1551        );
1552
1553        let mut ops = Vec::new();
1554        for idx in 0..cardinality {
1555            if idx % 2 == 0 {
1556                ops.push(Op::Insert);
1557            } else {
1558                ops.push(Op::Delete);
1559            }
1560        }
1561
1562        let mut start = 19260817i32..;
1563        let mut builders = (0..dimension)
1564            .map(|_| I32ArrayBuilder::new(cardinality))
1565            .collect_vec();
1566        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1567        let mut output_ops = vec![vec![]; num_outputs];
1568        for op in &ops {
1569            let hash_builder = Crc32FastBuilder;
1570            let mut hasher = hash_builder.build_hasher();
1571            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1572            for key_idx in key_indices {
1573                let val = one_row[*key_idx];
1574                let bytes = val.to_le_bytes();
1575                hasher.update(&bytes);
1576            }
1577            let output_idx =
1578                hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1579            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1580                builder.append(Some(*val));
1581            }
1582            output_cols[output_idx]
1583                .iter_mut()
1584                .zip_eq_fast(one_row.iter())
1585                .for_each(|(each_column, val)| each_column.push(*val));
1586            output_ops[output_idx].push(op);
1587        }
1588
1589        let columns = builders
1590            .into_iter()
1591            .map(|builder| {
1592                let array = builder.finish();
1593                array.into_ref()
1594            })
1595            .collect();
1596
1597        let chunk = StreamChunk::new(ops, columns);
1598        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1599
1600        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1601            let mut output = vec![];
1602            while let Some(Some(msg)) = rx.recv().now_or_never() {
1603                output.push(msg);
1604            }
1605            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1606            assert!(output.len() <= 1);
1607            if output.is_empty() {
1608                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1609            } else {
1610                let message = output.first().unwrap();
1611                let real_chunk = match message {
1612                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1613                    _ => panic!(),
1614                };
1615                real_chunk
1616                    .columns()
1617                    .iter()
1618                    .zip_eq_fast(output_cols[output_idx].iter())
1619                    .for_each(|(real_col, expect_col)| {
1620                        let real_vals = real_chunk
1621                            .visibility()
1622                            .iter_ones()
1623                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1624                            .collect::<Vec<_>>();
1625                        assert_eq!(real_vals.len(), expect_col.len());
1626                        assert_eq!(real_vals, *expect_col);
1627                    });
1628            }
1629        }
1630    }
1631}