risingwave_stream/executor/
dispatch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_pb::stream_plan;
30use risingwave_pb::stream_plan::PbDispatcher;
31use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
32use smallvec::{SmallVec, smallvec};
33use tokio::sync::mpsc::UnboundedReceiver;
34use tokio::time::Instant;
35use tokio_stream::StreamExt;
36use tokio_stream::adapters::Peekable;
37use tracing::{Instrument, event};
38
39use super::exchange::output::Output;
40use super::{
41    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
42    UpdateMutation,
43};
44use crate::executor::StreamConsumer;
45use crate::executor::prelude::*;
46use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
47
48/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
49/// data chunks will be dispatched with some specified policy, while control message
50/// such as barriers will be distributed to all receivers.
51pub struct DispatchExecutor {
52    input: Executor,
53    inner: DispatchExecutorInner,
54}
55
56struct DispatcherWithMetrics {
57    dispatcher: DispatcherImpl,
58    actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
59}
60
61impl DispatcherWithMetrics {
62    pub fn record_output_buffer_blocking_duration(&self, duration: Duration) {
63        let ns = duration.as_nanos() as u64;
64        self.actor_output_buffer_blocking_duration_ns.inc_by(ns);
65    }
66}
67
68impl Debug for DispatcherWithMetrics {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        self.dispatcher.fmt(f)
71    }
72}
73
74impl Deref for DispatcherWithMetrics {
75    type Target = DispatcherImpl;
76
77    fn deref(&self) -> &Self::Target {
78        &self.dispatcher
79    }
80}
81
82impl DerefMut for DispatcherWithMetrics {
83    fn deref_mut(&mut self) -> &mut Self::Target {
84        &mut self.dispatcher
85    }
86}
87
88struct DispatchExecutorMetrics {
89    actor_id_str: String,
90    fragment_id_str: String,
91    metrics: Arc<StreamingMetrics>,
92    actor_out_record_cnt: LabelGuardedIntCounter,
93}
94
95impl DispatchExecutorMetrics {
96    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
97        DispatcherWithMetrics {
98            actor_output_buffer_blocking_duration_ns: self
99                .metrics
100                .actor_output_buffer_blocking_duration_ns
101                .with_guarded_label_values(&[
102                    self.actor_id_str.as_str(),
103                    self.fragment_id_str.as_str(),
104                    dispatcher.dispatcher_id_str(),
105                ]),
106            dispatcher,
107        }
108    }
109}
110
111struct DispatchExecutorInner {
112    dispatchers: Vec<DispatcherWithMetrics>,
113    actor_id: u32,
114    local_barrier_manager: LocalBarrierManager,
115    metrics: DispatchExecutorMetrics,
116    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
117    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
118}
119
120impl DispatchExecutorInner {
121    async fn collect_outputs(
122        &mut self,
123        downstream_actors: &[ActorId],
124    ) -> StreamResult<Vec<Output>> {
125        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
126            let tx = match request {
127                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
128            };
129            Output::new(downstream_actor, tx)
130        }
131        let mut outputs = Vec::with_capacity(downstream_actors.len());
132        for downstream_actor in downstream_actors {
133            let output =
134                if let Some(request) = self.pending_new_output_requests.remove(downstream_actor) {
135                    resolve_output(*downstream_actor, request)
136                } else {
137                    loop {
138                        let (requested_actor, request) = self
139                            .new_output_request_rx
140                            .recv()
141                            .await
142                            .ok_or_else(|| anyhow!("end of new output request"))?;
143                        if requested_actor == *downstream_actor {
144                            break resolve_output(requested_actor, request);
145                        } else {
146                            assert!(
147                                self.pending_new_output_requests
148                                    .insert(requested_actor, request)
149                                    .is_none(),
150                                "duplicated inflight new output requests from actor {}",
151                                requested_actor
152                            );
153                        }
154                    }
155                };
156            outputs.push(output);
157        }
158        Ok(outputs)
159    }
160
161    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
162        let limit = self
163            .local_barrier_manager
164            .env
165            .config()
166            .developer
167            .exchange_concurrent_dispatchers;
168        // Only barrier can be batched for now.
169        match msg {
170            MessageBatch::BarrierBatch(barrier_batch) => {
171                if barrier_batch.is_empty() {
172                    return Ok(());
173                }
174                // Only the first barrier in a batch can be mutation.
175                let mutation = barrier_batch[0].mutation.clone();
176                self.pre_mutate_dispatchers(&mutation).await?;
177                futures::stream::iter(self.dispatchers.iter_mut())
178                    .map(Ok)
179                    .try_for_each_concurrent(limit, |dispatcher| async {
180                        let start_time = Instant::now();
181                        dispatcher
182                            .dispatch_barriers(
183                                barrier_batch
184                                    .iter()
185                                    .cloned()
186                                    .map(|b| b.into_dispatcher())
187                                    .collect(),
188                            )
189                            .await?;
190                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
191                        StreamResult::Ok(())
192                    })
193                    .await?;
194                self.post_mutate_dispatchers(&mutation)?;
195            }
196            MessageBatch::Watermark(watermark) => {
197                futures::stream::iter(self.dispatchers.iter_mut())
198                    .map(Ok)
199                    .try_for_each_concurrent(limit, |dispatcher| async {
200                        let start_time = Instant::now();
201                        dispatcher.dispatch_watermark(watermark.clone()).await?;
202                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
203                        StreamResult::Ok(())
204                    })
205                    .await?;
206            }
207            MessageBatch::Chunk(chunk) => {
208                futures::stream::iter(self.dispatchers.iter_mut())
209                    .map(Ok)
210                    .try_for_each_concurrent(limit, |dispatcher| async {
211                        let start_time = Instant::now();
212                        dispatcher.dispatch_data(chunk.clone()).await?;
213                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
214                        StreamResult::Ok(())
215                    })
216                    .await?;
217
218                self.metrics
219                    .actor_out_record_cnt
220                    .inc_by(chunk.cardinality() as _);
221            }
222        }
223        Ok(())
224    }
225
226    /// Add new dispatchers to the executor. Will check whether their ids are unique.
227    async fn add_dispatchers<'a>(
228        &mut self,
229        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
230    ) -> StreamResult<()> {
231        for dispatcher in new_dispatchers {
232            let outputs = self
233                .collect_outputs(&dispatcher.downstream_actor_id)
234                .await?;
235            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
236            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
237            self.dispatchers.push(dispatcher);
238        }
239
240        assert!(
241            self.dispatchers
242                .iter()
243                .map(|d| d.dispatcher_id())
244                .all_unique(),
245            "dispatcher ids must be unique: {:?}",
246            self.dispatchers
247        );
248
249        Ok(())
250    }
251
252    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
253        self.dispatchers
254            .iter_mut()
255            .find(|d| d.dispatcher_id() == dispatcher_id)
256            .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
257    }
258
259    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
260    /// outputs.
261    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
262        let outputs = self
263            .collect_outputs(&update.added_downstream_actor_id)
264            .await?;
265
266        let dispatcher = self.find_dispatcher(update.dispatcher_id);
267        dispatcher.add_outputs(outputs);
268
269        Ok(())
270    }
271
272    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
273    /// update the hash mapping.
274    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
275        let ids = update.removed_downstream_actor_id.iter().copied().collect();
276
277        let dispatcher = self.find_dispatcher(update.dispatcher_id);
278        dispatcher.remove_outputs(&ids);
279
280        // The hash mapping is only used by the hash dispatcher.
281        //
282        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
283        // it's possible that there're multiple upstreams with different exchange types, for
284        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
285        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
286        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
287            dispatcher.hash_mapping =
288                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
289        }
290
291        Ok(())
292    }
293
294    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
295    async fn pre_mutate_dispatchers(
296        &mut self,
297        mutation: &Option<Arc<Mutation>>,
298    ) -> StreamResult<()> {
299        let Some(mutation) = mutation.as_deref() else {
300            return Ok(());
301        };
302
303        match mutation {
304            Mutation::Add(AddMutation { adds, .. }) => {
305                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
306                    self.add_dispatchers(new_dispatchers).await?;
307                }
308            }
309            Mutation::Update(UpdateMutation {
310                dispatchers,
311                actor_new_dispatchers: actor_dispatchers,
312                ..
313            }) => {
314                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
315                    self.add_dispatchers(new_dispatchers).await?;
316                }
317
318                if let Some(updates) = dispatchers.get(&self.actor_id) {
319                    for update in updates {
320                        self.pre_update_dispatcher(update).await?;
321                    }
322                }
323            }
324            Mutation::AddAndUpdate(
325                AddMutation { adds, .. },
326                UpdateMutation {
327                    dispatchers,
328                    actor_new_dispatchers: actor_dispatchers,
329                    ..
330                },
331            ) => {
332                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
333                    self.add_dispatchers(new_dispatchers).await?;
334                }
335
336                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
337                    self.add_dispatchers(new_dispatchers).await?;
338                }
339
340                if let Some(updates) = dispatchers.get(&self.actor_id) {
341                    for update in updates {
342                        self.pre_update_dispatcher(update).await?;
343                    }
344                }
345            }
346            _ => {}
347        }
348
349        Ok(())
350    }
351
352    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
353    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
354        let Some(mutation) = mutation.as_deref() else {
355            return Ok(());
356        };
357
358        match mutation {
359            Mutation::Stop(stops) => {
360                // Remove outputs only if this actor itself is not to be stopped.
361                if !stops.contains(&self.actor_id) {
362                    for dispatcher in &mut self.dispatchers {
363                        dispatcher.remove_outputs(stops);
364                    }
365                }
366            }
367            Mutation::Update(UpdateMutation {
368                dispatchers,
369                dropped_actors,
370                ..
371            })
372            | Mutation::AddAndUpdate(
373                _,
374                UpdateMutation {
375                    dispatchers,
376                    dropped_actors,
377                    ..
378                },
379            ) => {
380                if let Some(updates) = dispatchers.get(&self.actor_id) {
381                    for update in updates {
382                        self.post_update_dispatcher(update)?;
383                    }
384                }
385
386                if !dropped_actors.contains(&self.actor_id) {
387                    for dispatcher in &mut self.dispatchers {
388                        dispatcher.remove_outputs(dropped_actors);
389                    }
390                }
391            }
392            _ => {}
393        };
394
395        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
396        // should clean up them.
397        self.dispatchers.retain(|d| !d.is_empty());
398
399        Ok(())
400    }
401}
402
403impl DispatchExecutor {
404    pub(crate) async fn new(
405        input: Executor,
406        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
407        dispatchers: Vec<stream_plan::Dispatcher>,
408        actor_id: u32,
409        fragment_id: u32,
410        local_barrier_manager: LocalBarrierManager,
411        metrics: Arc<StreamingMetrics>,
412    ) -> StreamResult<Self> {
413        let mut executor = Self::new_inner(
414            input,
415            new_output_request_rx,
416            vec![],
417            actor_id,
418            fragment_id,
419            local_barrier_manager,
420            metrics,
421        );
422        let inner = &mut executor.inner;
423        for dispatcher in dispatchers {
424            let outputs = inner
425                .collect_outputs(&dispatcher.downstream_actor_id)
426                .await?;
427            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
428            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
429            inner.dispatchers.push(dispatcher);
430        }
431        Ok(executor)
432    }
433
434    #[cfg(test)]
435    pub(crate) fn for_test(
436        input: Executor,
437        dispatchers: Vec<DispatcherImpl>,
438        actor_id: u32,
439        fragment_id: u32,
440        local_barrier_manager: LocalBarrierManager,
441        metrics: Arc<StreamingMetrics>,
442    ) -> (
443        Self,
444        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
445    ) {
446        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
447
448        (
449            Self::new_inner(
450                input,
451                rx,
452                dispatchers,
453                actor_id,
454                fragment_id,
455                local_barrier_manager,
456                metrics,
457            ),
458            tx,
459        )
460    }
461
462    fn new_inner(
463        mut input: Executor,
464        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
465        dispatchers: Vec<DispatcherImpl>,
466        actor_id: u32,
467        fragment_id: u32,
468        local_barrier_manager: LocalBarrierManager,
469        metrics: Arc<StreamingMetrics>,
470    ) -> Self {
471        let chunk_size = local_barrier_manager.env.config().developer.chunk_size;
472        if crate::consistency::insane() {
473            // make some trouble before dispatching to avoid generating invalid dist key.
474            let mut info = input.info().clone();
475            info.identity = format!("{} (embedded trouble)", info.identity);
476            let troublemaker = TroublemakerExecutor::new(input, chunk_size);
477            input = (info, troublemaker).into();
478        }
479
480        let actor_id_str = actor_id.to_string();
481        let fragment_id_str = fragment_id.to_string();
482        let actor_out_record_cnt = metrics
483            .actor_out_record_cnt
484            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
485        let metrics = DispatchExecutorMetrics {
486            actor_id_str,
487            fragment_id_str,
488            metrics,
489            actor_out_record_cnt,
490        };
491        let dispatchers = dispatchers
492            .into_iter()
493            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
494            .collect();
495        Self {
496            input,
497            inner: DispatchExecutorInner {
498                dispatchers,
499                actor_id,
500                local_barrier_manager,
501                metrics,
502                new_output_request_rx,
503                pending_new_output_requests: Default::default(),
504            },
505        }
506    }
507}
508
509impl StreamConsumer for DispatchExecutor {
510    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
511
512    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
513        let max_barrier_count_per_batch = self
514            .inner
515            .local_barrier_manager
516            .env
517            .config()
518            .developer
519            .max_barrier_batch_size;
520        #[try_stream]
521        async move {
522            let mut input = self.input.execute().peekable();
523            loop {
524                let Some(message) =
525                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
526                else {
527                    // end_of_stream
528                    break;
529                };
530                match message {
531                    chunk @ MessageBatch::Chunk(_) => {
532                        self.inner
533                            .dispatch(chunk)
534                            .instrument(tracing::info_span!("dispatch_chunk"))
535                            .instrument_await("dispatch_chunk")
536                            .await?;
537                    }
538                    MessageBatch::BarrierBatch(barrier_batch) => {
539                        assert!(!barrier_batch.is_empty());
540                        self.inner
541                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
542                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
543                            .instrument_await("dispatch_barrier_batch")
544                            .await?;
545                        self.inner
546                            .metrics
547                            .metrics
548                            .barrier_batch_size
549                            .observe(barrier_batch.len() as f64);
550                        for barrier in barrier_batch {
551                            yield barrier;
552                        }
553                    }
554                    watermark @ MessageBatch::Watermark(_) => {
555                        self.inner
556                            .dispatch(watermark)
557                            .instrument(tracing::info_span!("dispatch_watermark"))
558                            .instrument_await("dispatch_watermark")
559                            .await?;
560                    }
561                }
562            }
563        }
564    }
565}
566
567/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
568///
569/// Returns the message batch.
570///
571/// Returns None if end of stream.
572async fn try_batch_barriers(
573    max_barrier_count_per_batch: u32,
574    input: &mut Peekable<BoxedMessageStream>,
575) -> StreamResult<Option<MessageBatch>> {
576    let Some(msg) = input.next().await else {
577        // end_of_stream
578        return Ok(None);
579    };
580    let mut barrier_batch = vec![];
581    let msg: Message = msg?;
582    let max_peek_attempts = match msg {
583        Message::Chunk(c) => {
584            return Ok(Some(MessageBatch::Chunk(c)));
585        }
586        Message::Watermark(w) => {
587            return Ok(Some(MessageBatch::Watermark(w)));
588        }
589        Message::Barrier(b) => {
590            let peek_more_barrier = b.mutation.is_none();
591            barrier_batch.push(b);
592            if peek_more_barrier {
593                max_barrier_count_per_batch.saturating_sub(1)
594            } else {
595                0
596            }
597        }
598    };
599    // Try to peek more consecutive non-mutation barriers.
600    for _ in 0..max_peek_attempts {
601        let peek = input.peek().now_or_never();
602        let Some(peek) = peek else {
603            break;
604        };
605        let Some(msg) = peek else {
606            // end_of_stream
607            break;
608        };
609        let Ok(Message::Barrier(barrier)) = msg else {
610            break;
611        };
612        if barrier.mutation.is_some() {
613            break;
614        }
615        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
616        let Message::Barrier(ref barrier) = msg else {
617            unreachable!("must be a barrier");
618        };
619        barrier_batch.push(barrier.clone());
620    }
621    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
622}
623
624#[derive(Debug)]
625pub enum DispatcherImpl {
626    Hash(HashDataDispatcher),
627    Broadcast(BroadcastDispatcher),
628    Simple(SimpleDispatcher),
629    RoundRobin(RoundRobinDataDispatcher),
630}
631
632impl DispatcherImpl {
633    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
634        let output_indices = dispatcher
635            .output_indices
636            .iter()
637            .map(|&i| i as usize)
638            .collect_vec();
639
640        use risingwave_pb::stream_plan::DispatcherType::*;
641        let dispatcher_impl = match dispatcher.get_type()? {
642            Hash => {
643                assert!(!outputs.is_empty());
644                let dist_key_indices = dispatcher
645                    .dist_key_indices
646                    .iter()
647                    .map(|i| *i as usize)
648                    .collect();
649
650                let hash_mapping =
651                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
652
653                DispatcherImpl::Hash(HashDataDispatcher::new(
654                    outputs,
655                    dist_key_indices,
656                    output_indices,
657                    hash_mapping,
658                    dispatcher.dispatcher_id,
659                ))
660            }
661            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
662                outputs,
663                output_indices,
664                dispatcher.dispatcher_id,
665            )),
666            Simple | NoShuffle => {
667                let [output]: [_; 1] = outputs.try_into().unwrap();
668                DispatcherImpl::Simple(SimpleDispatcher::new(
669                    output,
670                    output_indices,
671                    dispatcher.dispatcher_id,
672                ))
673            }
674            Unspecified => unreachable!(),
675        };
676
677        Ok(dispatcher_impl)
678    }
679}
680
681macro_rules! impl_dispatcher {
682    ($( { $variant_name:ident } ),*) => {
683        impl DispatcherImpl {
684            pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
685                match self {
686                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
687                }
688            }
689
690            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
691                match self {
692                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
693                }
694            }
695
696            pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
697                match self {
698                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
699                }
700            }
701
702            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
703                match self {
704                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
705                }
706            }
707
708            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
709                match self {
710                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
711                }
712            }
713
714            pub fn dispatcher_id(&self) -> DispatcherId {
715                match self {
716                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
717                }
718            }
719
720            pub fn dispatcher_id_str(&self) -> &str {
721                match self {
722                    $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
723                }
724            }
725
726            pub fn is_empty(&self) -> bool {
727                match self {
728                    $(Self::$variant_name(inner) => inner.is_empty(), )*
729                }
730            }
731        }
732    }
733}
734
735macro_rules! for_all_dispatcher_variants {
736    ($macro:ident) => {
737        $macro! {
738            { Hash },
739            { Broadcast },
740            { Simple },
741            { RoundRobin }
742        }
743    };
744}
745
746for_all_dispatcher_variants! { impl_dispatcher }
747
748pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
749
750pub trait Dispatcher: Debug + 'static {
751    /// Dispatch a data chunk to downstream actors.
752    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
753    /// Dispatch barriers to downstream actors, generally by broadcasting it.
754    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
755    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
756    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
757
758    /// Add new outputs to the dispatcher.
759    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
760    /// Remove outputs to `actor_ids` from the dispatcher.
761    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
762
763    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
764    /// different IDs.
765    ///
766    /// Note that the dispatcher id is always equal to the downstream fragment id.
767    /// See also `proto/stream_plan.proto`.
768    fn dispatcher_id(&self) -> DispatcherId;
769
770    /// Dispatcher id in string. See [`Dispatcher::dispatcher_id`].
771    fn dispatcher_id_str(&self) -> &str;
772
773    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
774    /// [`DispatchExecutor`].
775    fn is_empty(&self) -> bool;
776}
777
778/// Concurrently broadcast a message to all outputs.
779///
780/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
781/// always unlimited.
782async fn broadcast_concurrent(
783    outputs: impl IntoIterator<Item = &'_ mut Output>,
784    message: DispatcherMessageBatch,
785) -> StreamResult<()> {
786    futures::future::try_join_all(
787        outputs
788            .into_iter()
789            .map(|output| output.send(message.clone())),
790    )
791    .await?;
792    Ok(())
793}
794
795#[derive(Debug)]
796pub struct RoundRobinDataDispatcher {
797    outputs: Vec<Output>,
798    output_indices: Vec<usize>,
799    cur: usize,
800    dispatcher_id: DispatcherId,
801    dispatcher_id_str: String,
802}
803
804impl RoundRobinDataDispatcher {
805    pub fn new(
806        outputs: Vec<Output>,
807        output_indices: Vec<usize>,
808        dispatcher_id: DispatcherId,
809    ) -> Self {
810        Self {
811            outputs,
812            output_indices,
813            cur: 0,
814            dispatcher_id,
815            dispatcher_id_str: dispatcher_id.to_string(),
816        }
817    }
818}
819
820impl Dispatcher for RoundRobinDataDispatcher {
821    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
822        let chunk = if self.output_indices.len() < chunk.columns().len() {
823            chunk
824                .project(&self.output_indices)
825                .eliminate_adjacent_noop_update()
826        } else {
827            chunk.project(&self.output_indices)
828        };
829
830        self.outputs[self.cur]
831            .send(DispatcherMessageBatch::Chunk(chunk))
832            .await?;
833        self.cur += 1;
834        self.cur %= self.outputs.len();
835        Ok(())
836    }
837
838    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
839        // always broadcast barrier
840        broadcast_concurrent(
841            &mut self.outputs,
842            DispatcherMessageBatch::BarrierBatch(barriers),
843        )
844        .await
845    }
846
847    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
848        if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
849            // always broadcast watermark
850            broadcast_concurrent(
851                &mut self.outputs,
852                DispatcherMessageBatch::Watermark(watermark),
853            )
854            .await?;
855        }
856        Ok(())
857    }
858
859    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
860        self.outputs.extend(outputs);
861    }
862
863    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
864        self.outputs
865            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
866            .count();
867        self.cur = self.cur.min(self.outputs.len() - 1);
868    }
869
870    fn dispatcher_id(&self) -> DispatcherId {
871        self.dispatcher_id
872    }
873
874    fn dispatcher_id_str(&self) -> &str {
875        &self.dispatcher_id_str
876    }
877
878    fn is_empty(&self) -> bool {
879        self.outputs.is_empty()
880    }
881}
882
883pub struct HashDataDispatcher {
884    outputs: Vec<Output>,
885    keys: Vec<usize>,
886    output_indices: Vec<usize>,
887    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
888    /// different downstream actors.
889    hash_mapping: ExpandedActorMapping,
890    dispatcher_id: DispatcherId,
891    dispatcher_id_str: String,
892}
893
894impl Debug for HashDataDispatcher {
895    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
896        f.debug_struct("HashDataDispatcher")
897            .field("outputs", &self.outputs)
898            .field("keys", &self.keys)
899            .field("dispatcher_id", &self.dispatcher_id)
900            .finish_non_exhaustive()
901    }
902}
903
904impl HashDataDispatcher {
905    pub fn new(
906        outputs: Vec<Output>,
907        keys: Vec<usize>,
908        output_indices: Vec<usize>,
909        hash_mapping: ExpandedActorMapping,
910        dispatcher_id: DispatcherId,
911    ) -> Self {
912        Self {
913            outputs,
914            keys,
915            output_indices,
916            hash_mapping,
917            dispatcher_id,
918            dispatcher_id_str: dispatcher_id.to_string(),
919        }
920    }
921}
922
923impl Dispatcher for HashDataDispatcher {
924    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
925        self.outputs.extend(outputs);
926    }
927
928    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
929        // always broadcast barrier
930        broadcast_concurrent(
931            &mut self.outputs,
932            DispatcherMessageBatch::BarrierBatch(barriers),
933        )
934        .await
935    }
936
937    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
938        if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
939            // always broadcast watermark
940            broadcast_concurrent(
941                &mut self.outputs,
942                DispatcherMessageBatch::Watermark(watermark),
943            )
944            .await?;
945        }
946        Ok(())
947    }
948
949    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
950        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
951        // In these output chunks, the only difference are visibility map, which is calculated
952        // by the hash value of each line in the input chunk.
953        let num_outputs = self.outputs.len();
954
955        // get hash value of every line by its key
956        let vnode_count = self.hash_mapping.len();
957        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
958
959        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
960
961        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
962            .take(num_outputs)
963            .collect_vec();
964        let mut last_vnode_when_update_delete = None;
965        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
966
967        // Apply output indices after calculating the vnode.
968        let chunk = if self.output_indices.len() < chunk.columns().len() {
969            chunk
970                .project(&self.output_indices)
971                .eliminate_adjacent_noop_update()
972        } else {
973            chunk.project(&self.output_indices)
974        };
975
976        for ((vnode, &op), visible) in vnodes
977            .iter()
978            .copied()
979            .zip_eq_fast(chunk.ops())
980            .zip_eq_fast(chunk.visibility().iter())
981        {
982            // Build visibility map for every output chunk.
983            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
984                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
985            }
986
987            if !visible {
988                assert!(
989                    last_vnode_when_update_delete.is_none(),
990                    "invisible row between U- and U+, op = {op:?}",
991                );
992                new_ops.push(op);
993                continue;
994            }
995
996            // The 'update' message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
997            // need to be rewritten to common `Delete` and `Insert` if they were dispatched to
998            // different actors.
999            if op == Op::UpdateDelete {
1000                last_vnode_when_update_delete = Some(vnode);
1001            } else if op == Op::UpdateInsert {
1002                if vnode
1003                    != last_vnode_when_update_delete
1004                        .take()
1005                        .expect("missing U- before U+")
1006                {
1007                    new_ops.push(Op::Delete);
1008                    new_ops.push(Op::Insert);
1009                } else {
1010                    new_ops.push(Op::UpdateDelete);
1011                    new_ops.push(Op::UpdateInsert);
1012                }
1013            } else {
1014                new_ops.push(op);
1015            }
1016        }
1017        assert!(
1018            last_vnode_when_update_delete.is_none(),
1019            "missing U+ after U-"
1020        );
1021
1022        let ops = new_ops;
1023
1024        // individually output StreamChunk integrated with vis_map
1025        futures::future::try_join_all(
1026            vis_maps
1027                .into_iter()
1028                .zip_eq_fast(self.outputs.iter_mut())
1029                .map(|(vis_map, output)| async {
1030                    let vis_map = vis_map.finish();
1031                    // columns is not changed in this function
1032                    let new_stream_chunk =
1033                        StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1034                    if new_stream_chunk.cardinality() > 0 {
1035                        event!(
1036                            tracing::Level::TRACE,
1037                            msg = "chunk",
1038                            downstream = output.actor_id(),
1039                            "send = \n{:#?}",
1040                            new_stream_chunk
1041                        );
1042                        output
1043                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1044                            .await?;
1045                    }
1046                    StreamResult::Ok(())
1047                }),
1048        )
1049        .await?;
1050
1051        Ok(())
1052    }
1053
1054    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1055        self.outputs
1056            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1057            .count();
1058    }
1059
1060    fn dispatcher_id(&self) -> DispatcherId {
1061        self.dispatcher_id
1062    }
1063
1064    fn dispatcher_id_str(&self) -> &str {
1065        &self.dispatcher_id_str
1066    }
1067
1068    fn is_empty(&self) -> bool {
1069        self.outputs.is_empty()
1070    }
1071}
1072
1073/// `BroadcastDispatcher` dispatches message to all outputs.
1074#[derive(Debug)]
1075pub struct BroadcastDispatcher {
1076    outputs: HashMap<ActorId, Output>,
1077    output_indices: Vec<usize>,
1078    dispatcher_id: DispatcherId,
1079    dispatcher_id_str: String,
1080}
1081
1082impl BroadcastDispatcher {
1083    pub fn new(
1084        outputs: impl IntoIterator<Item = Output>,
1085        output_indices: Vec<usize>,
1086        dispatcher_id: DispatcherId,
1087    ) -> Self {
1088        Self {
1089            outputs: Self::into_pairs(outputs).collect(),
1090            output_indices,
1091            dispatcher_id,
1092            dispatcher_id_str: dispatcher_id.to_string(),
1093        }
1094    }
1095
1096    fn into_pairs(
1097        outputs: impl IntoIterator<Item = Output>,
1098    ) -> impl Iterator<Item = (ActorId, Output)> {
1099        outputs
1100            .into_iter()
1101            .map(|output| (output.actor_id(), output))
1102    }
1103}
1104
1105impl Dispatcher for BroadcastDispatcher {
1106    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1107        let chunk = if self.output_indices.len() < chunk.columns().len() {
1108            chunk
1109                .project(&self.output_indices)
1110                .eliminate_adjacent_noop_update()
1111        } else {
1112            chunk.project(&self.output_indices)
1113        };
1114        broadcast_concurrent(
1115            self.outputs.values_mut(),
1116            DispatcherMessageBatch::Chunk(chunk),
1117        )
1118        .await
1119    }
1120
1121    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1122        // always broadcast barrier
1123        broadcast_concurrent(
1124            self.outputs.values_mut(),
1125            DispatcherMessageBatch::BarrierBatch(barriers),
1126        )
1127        .await
1128    }
1129
1130    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1131        if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
1132            // always broadcast watermark
1133            broadcast_concurrent(
1134                self.outputs.values_mut(),
1135                DispatcherMessageBatch::Watermark(watermark),
1136            )
1137            .await?;
1138        }
1139        Ok(())
1140    }
1141
1142    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1143        self.outputs.extend(Self::into_pairs(outputs));
1144    }
1145
1146    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1147        self.outputs
1148            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1149            .count();
1150    }
1151
1152    fn dispatcher_id(&self) -> DispatcherId {
1153        self.dispatcher_id
1154    }
1155
1156    fn dispatcher_id_str(&self) -> &str {
1157        &self.dispatcher_id_str
1158    }
1159
1160    fn is_empty(&self) -> bool {
1161        self.outputs.is_empty()
1162    }
1163}
1164
1165/// `SimpleDispatcher` dispatches message to a single output.
1166#[derive(Debug)]
1167pub struct SimpleDispatcher {
1168    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1169    /// the field needs to be temporarily set to 0 or 2 outputs.
1170    ///
1171    /// - When dropping a materialized view, the output will be removed and this field becomes
1172    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1173    ///   finishing processing the current mutation.
1174    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1175    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1176    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1177    ///   will be removed and this field becomes single again.
1178    ///
1179    /// Therefore, when dispatching data, we assert that there's exactly one output by
1180    /// `Self::output`.
1181    output: SmallVec<[Output; 2]>,
1182    output_indices: Vec<usize>,
1183    dispatcher_id: DispatcherId,
1184    dispatcher_id_str: String,
1185}
1186
1187impl SimpleDispatcher {
1188    pub fn new(output: Output, output_indices: Vec<usize>, dispatcher_id: DispatcherId) -> Self {
1189        Self {
1190            output: smallvec![output],
1191            output_indices,
1192            dispatcher_id,
1193            dispatcher_id_str: dispatcher_id.to_string(),
1194        }
1195    }
1196}
1197
1198impl Dispatcher for SimpleDispatcher {
1199    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1200        self.output.extend(outputs);
1201        assert!(self.output.len() <= 2);
1202    }
1203
1204    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1205        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1206        for output in &mut self.output {
1207            output
1208                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1209                .await?;
1210        }
1211        Ok(())
1212    }
1213
1214    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1215        let output = self
1216            .output
1217            .iter_mut()
1218            .exactly_one()
1219            .expect("expect exactly one output");
1220
1221        let chunk = if self.output_indices.len() < chunk.columns().len() {
1222            chunk
1223                .project(&self.output_indices)
1224                .eliminate_adjacent_noop_update()
1225        } else {
1226            chunk.project(&self.output_indices)
1227        };
1228        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1229    }
1230
1231    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1232        let output = self
1233            .output
1234            .iter_mut()
1235            .exactly_one()
1236            .expect("expect exactly one output");
1237
1238        if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
1239            output
1240                .send(DispatcherMessageBatch::Watermark(watermark))
1241                .await?;
1242        }
1243        Ok(())
1244    }
1245
1246    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1247        self.output
1248            .retain(|output| !actor_ids.contains(&output.actor_id()));
1249    }
1250
1251    fn dispatcher_id(&self) -> DispatcherId {
1252        self.dispatcher_id
1253    }
1254
1255    fn dispatcher_id_str(&self) -> &str {
1256        &self.dispatcher_id_str
1257    }
1258
1259    fn is_empty(&self) -> bool {
1260        self.output.is_empty()
1261    }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266    use std::hash::{BuildHasher, Hasher};
1267
1268    use futures::pin_mut;
1269    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1270    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1271    use risingwave_common::util::epoch::test_epoch;
1272    use risingwave_common::util::hash_util::Crc32FastBuilder;
1273    use risingwave_pb::stream_plan::DispatcherType;
1274    use tokio::sync::mpsc::unbounded_channel;
1275
1276    use super::*;
1277    use crate::executor::exchange::output::Output;
1278    use crate::executor::exchange::permit::channel_for_test;
1279    use crate::executor::receiver::ReceiverExecutor;
1280    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1281    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1282
1283    // TODO: this test contains update being shuffled to different partitions, which is not
1284    // supported for now.
1285    #[tokio::test]
1286    async fn test_hash_dispatcher_complex() {
1287        test_hash_dispatcher_complex_inner().await
1288    }
1289
1290    async fn test_hash_dispatcher_complex_inner() {
1291        // This test only works when vnode count is 256.
1292        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1293
1294        let num_outputs = 2; // actor id ranges from 1 to 2
1295        let key_indices = &[0, 2];
1296        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1297            (0..num_outputs).map(|_| channel_for_test()).collect();
1298        let outputs = output_tx_vecs
1299            .into_iter()
1300            .enumerate()
1301            .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1302            .collect::<Vec<_>>();
1303        let mut hash_mapping = (1..num_outputs + 1)
1304            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1305            .collect_vec();
1306        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1307        let mut hash_dispatcher = HashDataDispatcher::new(
1308            outputs,
1309            key_indices.to_vec(),
1310            vec![0, 1, 2],
1311            hash_mapping,
1312            0,
1313        );
1314
1315        let chunk = StreamChunk::from_pretty(
1316            "  I I I
1317            +  4 6 8
1318            +  5 7 9
1319            +  0 0 0
1320            -  1 1 1 D
1321            U- 2 0 2
1322            U+ 2 0 2
1323            U- 3 3 2
1324            U+ 3 3 4",
1325        );
1326        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1327
1328        assert_eq!(
1329            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1330            StreamChunk::from_pretty(
1331                "  I I I
1332                +  4 6 8
1333                +  5 7 9
1334                +  0 0 0
1335                -  1 1 1 D
1336                U- 2 0 2
1337                U+ 2 0 2
1338                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1339                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1340            )
1341        );
1342        assert_eq!(
1343            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1344            StreamChunk::from_pretty(
1345                "  I I I
1346                +  4 6 8 D
1347                +  5 7 9 D
1348                +  0 0 0 D
1349                -  1 1 1 D  // Should keep original invisible mark
1350                U- 2 0 2 D  // Should keep UpdateDelete
1351                U+ 2 0 2 D  // Should keep UpdateInsert
1352                -  3 3 2    // Should rewrite UpdateDelete to Delete
1353                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1354            )
1355        );
1356    }
1357
1358    #[tokio::test]
1359    async fn test_configuration_change() {
1360        let _schema = Schema { fields: vec![] };
1361        let (tx, rx) = channel_for_test();
1362        let actor_id = 233;
1363        let fragment_id = 666;
1364        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1365        let metrics = Arc::new(StreamingMetrics::unused());
1366
1367        let (untouched, old, new) = (234, 235, 238); // broadcast downstream actors
1368        let (old_simple, new_simple) = (114, 514); // simple downstream actors
1369
1370        // actor_id -> untouched, old, new, old_simple, new_simple
1371
1372        let broadcast_dispatcher_id = 666;
1373        let broadcast_dispatcher = PbDispatcher {
1374            r#type: DispatcherType::Broadcast as _,
1375            dispatcher_id: broadcast_dispatcher_id,
1376            downstream_actor_id: vec![untouched, old],
1377            ..Default::default()
1378        };
1379
1380        let simple_dispatcher_id = 888;
1381        let simple_dispatcher = PbDispatcher {
1382            r#type: DispatcherType::Simple as _,
1383            dispatcher_id: simple_dispatcher_id,
1384            downstream_actor_id: vec![old_simple],
1385            ..Default::default()
1386        };
1387
1388        let dispatcher_updates = maplit::hashmap! {
1389            actor_id => vec![PbDispatcherUpdate {
1390                actor_id,
1391                dispatcher_id: broadcast_dispatcher_id,
1392                added_downstream_actor_id: vec![new],
1393                removed_downstream_actor_id: vec![old],
1394                hash_mapping: Default::default(),
1395            }]
1396        };
1397        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1398            UpdateMutation {
1399                dispatchers: dispatcher_updates,
1400                merges: Default::default(),
1401                vnode_bitmaps: Default::default(),
1402                dropped_actors: Default::default(),
1403                actor_splits: Default::default(),
1404                actor_new_dispatchers: Default::default(),
1405            },
1406        ));
1407        barrier_test_env.inject_barrier(&b1, [actor_id]);
1408        barrier_test_env.flush_all_events().await;
1409
1410        let input = Executor::new(
1411            Default::default(),
1412            ReceiverExecutor::for_test(
1413                actor_id,
1414                rx,
1415                barrier_test_env.local_barrier_manager.clone(),
1416            )
1417            .boxed(),
1418        );
1419
1420        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1421        let mut rxs = [untouched, old, new, old_simple, new_simple]
1422            .into_iter()
1423            .map(|id| {
1424                (id, {
1425                    let (tx, rx) = channel_for_test();
1426                    new_output_request_tx
1427                        .send((id, NewOutputRequest::Local(tx)))
1428                        .unwrap();
1429                    rx
1430                })
1431            })
1432            .collect::<HashMap<_, _>>();
1433        let executor = Box::new(
1434            DispatchExecutor::new(
1435                input,
1436                new_output_request_rx,
1437                vec![broadcast_dispatcher, simple_dispatcher],
1438                actor_id,
1439                fragment_id,
1440                barrier_test_env.local_barrier_manager.clone(),
1441                metrics,
1442            )
1443            .await
1444            .unwrap(),
1445        )
1446        .execute();
1447
1448        pin_mut!(executor);
1449
1450        macro_rules! try_recv {
1451            ($down_id:expr) => {
1452                rxs.get_mut(&$down_id).unwrap().try_recv()
1453            };
1454        }
1455
1456        // 3. Send a chunk.
1457        tx.send(Message::Chunk(StreamChunk::default()).into())
1458            .await
1459            .unwrap();
1460
1461        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1462            .await
1463            .unwrap();
1464        executor.next().await.unwrap().unwrap();
1465
1466        // 5. Check downstream.
1467        try_recv!(untouched).unwrap().as_chunk().unwrap();
1468        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1469
1470        try_recv!(old).unwrap().as_chunk().unwrap();
1471        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1472
1473        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1474
1475        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1476        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1477
1478        // 6. Send another barrier.
1479        let b2 = Barrier::new_test_barrier(test_epoch(2));
1480        barrier_test_env.inject_barrier(&b2, [actor_id]);
1481        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1482            .await
1483            .unwrap();
1484        executor.next().await.unwrap().unwrap();
1485
1486        // 7. Check downstream.
1487        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1488        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1489        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1490
1491        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1492        try_recv!(new_simple).unwrap_err(); // Untouched.
1493
1494        // 8. Send another chunk.
1495        tx.send(Message::Chunk(StreamChunk::default()).into())
1496            .await
1497            .unwrap();
1498
1499        // 9. Send a configuration change barrier for simple dispatcher.
1500        let dispatcher_updates = maplit::hashmap! {
1501            actor_id => vec![PbDispatcherUpdate {
1502                actor_id,
1503                dispatcher_id: simple_dispatcher_id,
1504                added_downstream_actor_id: vec![new_simple],
1505                removed_downstream_actor_id: vec![old_simple],
1506                hash_mapping: Default::default(),
1507            }]
1508        };
1509        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1510            UpdateMutation {
1511                dispatchers: dispatcher_updates,
1512                merges: Default::default(),
1513                vnode_bitmaps: Default::default(),
1514                dropped_actors: Default::default(),
1515                actor_splits: Default::default(),
1516                actor_new_dispatchers: Default::default(),
1517            },
1518        ));
1519        barrier_test_env.inject_barrier(&b3, [actor_id]);
1520        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1521            .await
1522            .unwrap();
1523        executor.next().await.unwrap().unwrap();
1524
1525        // 10. Check downstream.
1526        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1527        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1528
1529        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1530
1531        // 11. Send another barrier.
1532        let b4 = Barrier::new_test_barrier(test_epoch(4));
1533        barrier_test_env.inject_barrier(&b4, [actor_id]);
1534        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1535            .await
1536            .unwrap();
1537        executor.next().await.unwrap().unwrap();
1538
1539        // 12. Check downstream.
1540        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1541        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1542    }
1543
1544    #[tokio::test]
1545    async fn test_hash_dispatcher() {
1546        // This test only works when vnode count is 256.
1547        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1548
1549        let num_outputs = 5; // actor id ranges from 1 to 5
1550        let cardinality = 10;
1551        let dimension = 4;
1552        let key_indices = &[0, 2];
1553        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1554            (0..num_outputs).map(|_| channel_for_test()).collect();
1555        let outputs = output_tx_vecs
1556            .into_iter()
1557            .enumerate()
1558            .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1559            .collect::<Vec<_>>();
1560        let mut hash_mapping = (1..num_outputs + 1)
1561            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1562            .collect_vec();
1563        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1564        let mut hash_dispatcher = HashDataDispatcher::new(
1565            outputs,
1566            key_indices.to_vec(),
1567            (0..dimension).collect(),
1568            hash_mapping.clone(),
1569            0,
1570        );
1571
1572        let mut ops = Vec::new();
1573        for idx in 0..cardinality {
1574            if idx % 2 == 0 {
1575                ops.push(Op::Insert);
1576            } else {
1577                ops.push(Op::Delete);
1578            }
1579        }
1580
1581        let mut start = 19260817i32..;
1582        let mut builders = (0..dimension)
1583            .map(|_| I32ArrayBuilder::new(cardinality))
1584            .collect_vec();
1585        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1586        let mut output_ops = vec![vec![]; num_outputs];
1587        for op in &ops {
1588            let hash_builder = Crc32FastBuilder;
1589            let mut hasher = hash_builder.build_hasher();
1590            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1591            for key_idx in key_indices {
1592                let val = one_row[*key_idx];
1593                let bytes = val.to_le_bytes();
1594                hasher.update(&bytes);
1595            }
1596            let output_idx =
1597                hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1598            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1599                builder.append(Some(*val));
1600            }
1601            output_cols[output_idx]
1602                .iter_mut()
1603                .zip_eq_fast(one_row.iter())
1604                .for_each(|(each_column, val)| each_column.push(*val));
1605            output_ops[output_idx].push(op);
1606        }
1607
1608        let columns = builders
1609            .into_iter()
1610            .map(|builder| {
1611                let array = builder.finish();
1612                array.into_ref()
1613            })
1614            .collect();
1615
1616        let chunk = StreamChunk::new(ops, columns);
1617        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1618
1619        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1620            let mut output = vec![];
1621            while let Some(Some(msg)) = rx.recv().now_or_never() {
1622                output.push(msg);
1623            }
1624            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1625            assert!(output.len() <= 1);
1626            if output.is_empty() {
1627                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1628            } else {
1629                let message = output.first().unwrap();
1630                let real_chunk = match message {
1631                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1632                    _ => panic!(),
1633                };
1634                real_chunk
1635                    .columns()
1636                    .iter()
1637                    .zip_eq_fast(output_cols[output_idx].iter())
1638                    .for_each(|(real_col, expect_col)| {
1639                        let real_vals = real_chunk
1640                            .visibility()
1641                            .iter_ones()
1642                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1643                            .collect::<Vec<_>>();
1644                        assert_eq!(real_vals.len(), expect_col.len());
1645                        assert_eq!(real_vals, *expect_col);
1646                    });
1647            }
1648        }
1649    }
1650}