risingwave_stream/executor/
dispatch.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::FutureExt;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
28use risingwave_common::metrics::LabelGuardedIntCounter;
29use risingwave_common::row::RowExt;
30use risingwave_common::util::iter_util::ZipEqFast;
31use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
32use risingwave_pb::stream_plan::{self, PbDispatcher};
33use smallvec::{SmallVec, smallvec};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::UnboundedReceiver;
36use tokio::task::consume_budget;
37use tokio::time::Instant;
38use tokio_stream::StreamExt;
39use tokio_stream::adapters::Peekable;
40use tracing::{Instrument, event};
41
42use super::exchange::output::Output;
43use super::{
44    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
45    UpdateMutation,
46};
47use crate::executor::prelude::*;
48use crate::executor::{StopMutation, StreamConsumer};
49use crate::task::{DispatcherId, NewOutputRequest};
50
51mod output_mapping;
52pub use output_mapping::DispatchOutputMapping;
53use risingwave_common::id::FragmentId;
54
55use crate::error::StreamError;
56
57/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
58/// data chunks will be dispatched with some specified policy, while control message
59/// such as barriers will be distributed to all receivers.
60pub struct DispatchExecutor {
61    input: Executor,
62    inner: DispatchExecutorInner,
63}
64
65struct DispatcherWithMetrics {
66    dispatcher: DispatcherImpl,
67    pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
68}
69
70impl Debug for DispatcherWithMetrics {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        self.dispatcher.fmt(f)
73    }
74}
75
76impl Deref for DispatcherWithMetrics {
77    type Target = DispatcherImpl;
78
79    fn deref(&self) -> &Self::Target {
80        &self.dispatcher
81    }
82}
83
84impl DerefMut for DispatcherWithMetrics {
85    fn deref_mut(&mut self) -> &mut Self::Target {
86        &mut self.dispatcher
87    }
88}
89
90struct DispatchExecutorMetrics {
91    actor_id_str: String,
92    fragment_id_str: String,
93    metrics: Arc<StreamingMetrics>,
94    actor_out_record_cnt: LabelGuardedIntCounter,
95}
96
97impl DispatchExecutorMetrics {
98    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
99        DispatcherWithMetrics {
100            actor_output_buffer_blocking_duration_ns: self
101                .metrics
102                .actor_output_buffer_blocking_duration_ns
103                .with_guarded_label_values(&[
104                    self.actor_id_str.as_str(),
105                    self.fragment_id_str.as_str(),
106                    dispatcher.dispatcher_id().to_string().as_str(),
107                ]),
108            dispatcher,
109        }
110    }
111}
112
113struct DispatchExecutorInner {
114    dispatchers: Vec<DispatcherWithMetrics>,
115    actor_id: ActorId,
116    actor_config: Arc<StreamingConfig>,
117    metrics: DispatchExecutorMetrics,
118    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
119    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
120}
121
122impl DispatchExecutorInner {
123    async fn collect_outputs(
124        &mut self,
125        downstream_actors: &[ActorId],
126    ) -> StreamResult<Vec<Output>> {
127        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
128            let tx = match request {
129                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
130            };
131            Output::new(downstream_actor, tx)
132        }
133        let mut outputs = Vec::with_capacity(downstream_actors.len());
134        for &downstream_actor in downstream_actors {
135            let output =
136                if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
137                    resolve_output(downstream_actor, request)
138                } else {
139                    loop {
140                        let (requested_actor, request) = self
141                            .new_output_request_rx
142                            .recv()
143                            .await
144                            .ok_or_else(|| anyhow!("end of new output request"))?;
145                        if requested_actor == downstream_actor {
146                            break resolve_output(requested_actor, request);
147                        } else {
148                            assert!(
149                                self.pending_new_output_requests
150                                    .insert(requested_actor, request)
151                                    .is_none(),
152                                "duplicated inflight new output requests from actor {}",
153                                requested_actor
154                            );
155                        }
156                    }
157                };
158            outputs.push(output);
159        }
160        Ok(outputs)
161    }
162
163    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
164        if self.dispatchers.is_empty() {
165            // Our dispatcher internal implementation calls tokio method, which already involves cooperative scheduling
166            // with tokio runtime.
167            //
168            // When there is no dispatcher, we should do the cooperative scheduling together.
169            consume_budget().await;
170        }
171        async fn await_with_metrics(
172            future: impl Future<Output = ()>,
173            metrics: &LabelGuardedIntCounter,
174        ) {
175            let mut start_time = Instant::now();
176            let interval_duration = Duration::from_secs(15);
177            let mut interval =
178                tokio::time::interval_at(start_time + interval_duration, interval_duration);
179
180            let mut fut = pin!(future);
181
182            loop {
183                tokio::select! {
184                    biased;
185                    _res = &mut fut => {
186                        let ns = start_time.elapsed().as_nanos() as u64;
187                        metrics.inc_by(ns);
188                        break;
189                    }
190                    _ = interval.tick() => {
191                        start_time = Instant::now();
192                        metrics.inc_by(interval_duration.as_nanos() as u64);
193                    }
194                };
195            }
196        }
197
198        use futures::StreamExt;
199
200        let limit = self.actor_config.developer.exchange_concurrent_dispatchers;
201        // Only barrier can be batched for now.
202        match msg {
203            MessageBatch::BarrierBatch(barrier_batch) => {
204                if barrier_batch.is_empty() {
205                    return Ok(());
206                }
207                // Only the first barrier in a batch can be mutation.
208                let mutation = barrier_batch[0].mutation.clone();
209                self.pre_mutate_dispatchers(&mutation).await?;
210                futures::stream::iter(self.dispatchers.iter_mut())
211                    .for_each_concurrent(limit, |dispatcher| async {
212                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
213                        let dispatcher_output = &mut dispatcher.dispatcher;
214                        let fut = dispatcher_output.dispatch_barriers(
215                            barrier_batch
216                                .iter()
217                                .cloned()
218                                .map(|b| b.into_dispatcher())
219                                .collect(),
220                        );
221                        await_with_metrics(fut, metrics).await;
222                    })
223                    .await;
224                self.post_mutate_dispatchers(&mutation)?;
225            }
226            MessageBatch::Watermark(watermark) => {
227                futures::stream::iter(self.dispatchers.iter_mut())
228                    .for_each_concurrent(limit, |dispatcher| async {
229                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
230                        let dispatcher_output = &mut dispatcher.dispatcher;
231                        let fut = dispatcher_output.dispatch_watermark(watermark.clone());
232                        await_with_metrics(fut, metrics).await;
233                    })
234                    .await;
235            }
236            MessageBatch::Chunk(chunk) => {
237                futures::stream::iter(self.dispatchers.iter_mut())
238                    .for_each_concurrent(limit, |dispatcher| async {
239                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
240                        let dispatcher_output = &mut dispatcher.dispatcher;
241                        let fut = dispatcher_output.dispatch_data(chunk.clone());
242                        await_with_metrics(fut, metrics).await;
243                    })
244                    .await;
245
246                self.metrics
247                    .actor_out_record_cnt
248                    .inc_by(chunk.cardinality() as _);
249            }
250        }
251        self.dispatchers.retain(|dispatcher| match &**dispatcher {
252            DispatcherImpl::Failed(dispatcher_id, e) => {
253                warn!(%dispatcher_id, err = %e.as_report(), actor_id = %self.actor_id, "dispatch fail");
254                false
255            }
256            _ => true,
257        });
258        Ok(())
259    }
260
261    /// Add new dispatchers to the executor. Will check whether their ids are unique.
262    async fn add_dispatchers<'a>(
263        &mut self,
264        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
265    ) -> StreamResult<()> {
266        for dispatcher in new_dispatchers {
267            let outputs = self
268                .collect_outputs(&dispatcher.downstream_actor_id)
269                .await?;
270            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
271            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
272            self.dispatchers.push(dispatcher);
273        }
274
275        assert!(
276            self.dispatchers
277                .iter()
278                .map(|d| d.dispatcher_id())
279                .all_unique(),
280            "dispatcher ids must be unique: {:?}",
281            self.dispatchers
282        );
283
284        Ok(())
285    }
286
287    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> Option<&mut DispatcherImpl> {
288        self.dispatchers
289            .iter_mut()
290            .find(|d| d.dispatcher_id() == dispatcher_id)
291            .map(DerefMut::deref_mut)
292    }
293
294    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
295    /// outputs.
296    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
297        let outputs = self
298            .collect_outputs(&update.added_downstream_actor_id)
299            .await?;
300
301        let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
302            warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
303            return Ok(());
304        };
305        dispatcher.add_outputs(outputs);
306
307        Ok(())
308    }
309
310    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
311    /// update the hash mapping.
312    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
313        let ids = update.removed_downstream_actor_id.iter().copied().collect();
314
315        let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
316            warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
317            return Ok(());
318        };
319        dispatcher.remove_outputs(&ids);
320
321        // The hash mapping is only used by the hash dispatcher.
322        //
323        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
324        // it's possible that there're multiple upstreams with different exchange types, for
325        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
326        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
327        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
328            dispatcher.hash_mapping =
329                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
330        }
331
332        Ok(())
333    }
334
335    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
336    async fn pre_mutate_dispatchers(
337        &mut self,
338        mutation: &Option<Arc<Mutation>>,
339    ) -> StreamResult<()> {
340        let Some(mutation) = mutation.as_deref() else {
341            return Ok(());
342        };
343
344        match mutation {
345            Mutation::Add(AddMutation { adds, .. }) => {
346                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
347                    self.add_dispatchers(new_dispatchers).await?;
348                }
349            }
350            Mutation::Update(UpdateMutation {
351                dispatchers,
352                actor_new_dispatchers: actor_dispatchers,
353                ..
354            }) => {
355                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
356                    self.add_dispatchers(new_dispatchers).await?;
357                }
358
359                if let Some(updates) = dispatchers.get(&self.actor_id) {
360                    for update in updates {
361                        self.pre_update_dispatcher(update).await?;
362                    }
363                }
364            }
365            _ => {}
366        }
367
368        Ok(())
369    }
370
371    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
372    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
373        let Some(mutation) = mutation.as_deref() else {
374            return Ok(());
375        };
376
377        match mutation {
378            Mutation::Stop(StopMutation { dropped_actors, .. }) => {
379                // Remove outputs only if this actor itself is not to be stopped.
380                if !dropped_actors.contains(&self.actor_id) {
381                    for dispatcher in &mut self.dispatchers {
382                        dispatcher.remove_outputs(dropped_actors);
383                    }
384                }
385            }
386            Mutation::Update(UpdateMutation {
387                dispatchers,
388                dropped_actors,
389                ..
390            }) => {
391                if let Some(updates) = dispatchers.get(&self.actor_id) {
392                    for update in updates {
393                        self.post_update_dispatcher(update)?;
394                    }
395                }
396
397                if !dropped_actors.contains(&self.actor_id) {
398                    for dispatcher in &mut self.dispatchers {
399                        dispatcher.remove_outputs(dropped_actors);
400                    }
401                }
402            }
403            _ => {}
404        };
405
406        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
407        // should clean up them.
408        self.dispatchers.retain(|d| !d.is_empty());
409
410        Ok(())
411    }
412}
413
414impl DispatchExecutor {
415    pub(crate) async fn new(
416        input: Executor,
417        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
418        dispatchers: Vec<stream_plan::Dispatcher>,
419        actor_context: &ActorContextRef,
420    ) -> StreamResult<Self> {
421        let mut executor = Self::new_inner(
422            input,
423            new_output_request_rx,
424            vec![],
425            actor_context.id,
426            actor_context.fragment_id,
427            actor_context.config.clone(),
428            actor_context.streaming_metrics.clone(),
429        );
430        let inner = &mut executor.inner;
431        for dispatcher in dispatchers {
432            let outputs = inner
433                .collect_outputs(&dispatcher.downstream_actor_id)
434                .await?;
435            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
436            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
437            inner.dispatchers.push(dispatcher);
438        }
439        Ok(executor)
440    }
441
442    #[cfg(test)]
443    pub(crate) fn for_test(
444        input: Executor,
445        dispatchers: Vec<DispatcherImpl>,
446        actor_id: ActorId,
447        fragment_id: FragmentId,
448        actor_config: Arc<StreamingConfig>,
449        metrics: Arc<StreamingMetrics>,
450    ) -> (
451        Self,
452        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
453    ) {
454        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
455
456        (
457            Self::new_inner(
458                input,
459                rx,
460                dispatchers,
461                actor_id,
462                fragment_id,
463                actor_config,
464                metrics,
465            ),
466            tx,
467        )
468    }
469
470    fn new_inner(
471        mut input: Executor,
472        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
473        dispatchers: Vec<DispatcherImpl>,
474        actor_id: ActorId,
475        fragment_id: FragmentId,
476        actor_config: Arc<StreamingConfig>,
477        metrics: Arc<StreamingMetrics>,
478    ) -> Self {
479        if crate::consistency::insane() {
480            // make some trouble before dispatching to avoid generating invalid dist key.
481            let mut info = input.info().clone();
482            info.identity = format!("{} (embedded trouble)", info.identity);
483            let troublemaker = TroublemakerExecutor::new(input, actor_config.developer.chunk_size);
484            input = (info, troublemaker).into();
485        }
486
487        let actor_id_str = actor_id.to_string();
488        let fragment_id_str = fragment_id.to_string();
489        let actor_out_record_cnt = metrics
490            .actor_out_record_cnt
491            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
492        let metrics = DispatchExecutorMetrics {
493            actor_id_str,
494            fragment_id_str,
495            metrics,
496            actor_out_record_cnt,
497        };
498        let dispatchers = dispatchers
499            .into_iter()
500            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
501            .collect();
502
503        Self {
504            input,
505            inner: DispatchExecutorInner {
506                dispatchers,
507                actor_id,
508                actor_config,
509                metrics,
510                new_output_request_rx,
511                pending_new_output_requests: Default::default(),
512            },
513        }
514    }
515}
516
517impl StreamConsumer for DispatchExecutor {
518    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
519
520    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
521        let max_barrier_count_per_batch = self.inner.actor_config.developer.max_barrier_batch_size;
522        #[try_stream]
523        async move {
524            let mut input = self.input.execute().peekable();
525            loop {
526                let Some(message) =
527                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
528                else {
529                    // end_of_stream
530                    break;
531                };
532                match message {
533                    chunk @ MessageBatch::Chunk(_) => {
534                        self.inner
535                            .dispatch(chunk)
536                            .instrument(tracing::info_span!("dispatch_chunk"))
537                            .instrument_await("dispatch_chunk")
538                            .await?;
539                    }
540                    MessageBatch::BarrierBatch(barrier_batch) => {
541                        assert!(!barrier_batch.is_empty());
542                        self.inner
543                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
544                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
545                            .instrument_await("dispatch_barrier_batch")
546                            .await?;
547                        self.inner
548                            .metrics
549                            .metrics
550                            .barrier_batch_size
551                            .observe(barrier_batch.len() as f64);
552                        for barrier in barrier_batch {
553                            yield barrier;
554                        }
555                    }
556                    watermark @ MessageBatch::Watermark(_) => {
557                        self.inner
558                            .dispatch(watermark)
559                            .instrument(tracing::info_span!("dispatch_watermark"))
560                            .instrument_await("dispatch_watermark")
561                            .await?;
562                    }
563                }
564            }
565        }
566    }
567}
568
569/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
570///
571/// Returns the message batch.
572///
573/// Returns None if end of stream.
574async fn try_batch_barriers(
575    max_barrier_count_per_batch: u32,
576    input: &mut Peekable<BoxedMessageStream>,
577) -> StreamResult<Option<MessageBatch>> {
578    let Some(msg) = input.next().await else {
579        // end_of_stream
580        return Ok(None);
581    };
582    let mut barrier_batch = vec![];
583    let msg: Message = msg?;
584    let max_peek_attempts = match msg {
585        Message::Chunk(c) => {
586            return Ok(Some(MessageBatch::Chunk(c)));
587        }
588        Message::Watermark(w) => {
589            return Ok(Some(MessageBatch::Watermark(w)));
590        }
591        Message::Barrier(b) => {
592            let peek_more_barrier = b.mutation.is_none();
593            barrier_batch.push(b);
594            if peek_more_barrier {
595                max_barrier_count_per_batch.saturating_sub(1)
596            } else {
597                0
598            }
599        }
600    };
601    // Try to peek more consecutive non-mutation barriers.
602    for _ in 0..max_peek_attempts {
603        let peek = input.peek().now_or_never();
604        let Some(peek) = peek else {
605            break;
606        };
607        let Some(msg) = peek else {
608            // end_of_stream
609            break;
610        };
611        let Ok(Message::Barrier(barrier)) = msg else {
612            break;
613        };
614        if barrier.mutation.is_some() {
615            break;
616        }
617        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
618        let Message::Barrier(ref barrier) = msg else {
619            unreachable!("must be a barrier");
620        };
621        barrier_batch.push(barrier.clone());
622    }
623    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
624}
625
626#[derive(Debug)]
627pub(crate) enum DispatcherImpl {
628    Hash(HashDataDispatcher),
629    Broadcast(BroadcastDispatcher),
630    Simple(SimpleDispatcher),
631    #[cfg_attr(not(test), expect(dead_code))]
632    RoundRobin(RoundRobinDataDispatcher),
633    /// The dispatcher has failed to dispatch to downstream and gets pending
634    /// Should be clean up later.
635    Failed(DispatcherId, StreamError),
636}
637
638impl DispatcherImpl {
639    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
640        let output_mapping =
641            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
642
643        use risingwave_pb::stream_plan::DispatcherType::*;
644        let dispatcher_impl = match dispatcher.get_type()? {
645            Hash => {
646                assert!(!outputs.is_empty());
647                let dist_key_indices = dispatcher
648                    .dist_key_indices
649                    .iter()
650                    .map(|i| *i as usize)
651                    .collect();
652
653                let hash_mapping =
654                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
655
656                DispatcherImpl::Hash(HashDataDispatcher::new(
657                    outputs,
658                    dist_key_indices,
659                    output_mapping,
660                    hash_mapping,
661                    dispatcher.dispatcher_id,
662                ))
663            }
664            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
665                outputs,
666                output_mapping,
667                dispatcher.dispatcher_id,
668            )),
669            Simple | NoShuffle => {
670                let [output]: [_; 1] = outputs.try_into().unwrap();
671                DispatcherImpl::Simple(SimpleDispatcher::new(
672                    output,
673                    output_mapping,
674                    dispatcher.dispatcher_id,
675                ))
676            }
677            Unspecified => unreachable!(),
678        };
679
680        Ok(dispatcher_impl)
681    }
682}
683
684macro_rules! impl_dispatcher {
685    ($( { $variant_name:ident } ),*) => {
686        impl DispatcherImpl {
687            pub async fn dispatch_data(&mut self, chunk: StreamChunk) {
688                if let Err(e) = match self {
689                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
690                    Self::Failed(..) => unreachable!(),
691                } {
692                    *self = Self::Failed(self.dispatcher_id(), e);
693                }
694            }
695
696            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) {
697                if let Err(e) = match self {
698                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
699                    Self::Failed(..) => unreachable!(),
700                } {
701                    *self = Self::Failed(self.dispatcher_id(), e);
702                }
703            }
704
705            pub async fn dispatch_watermark(&mut self, watermark: Watermark) {
706                if let Err(e) = match self {
707                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
708                    Self::Failed(..) => unreachable!(),
709                } {
710                    *self = Self::Failed(self.dispatcher_id(), e);
711                }
712            }
713
714            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
715                match self {
716                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
717                    Self::Failed(..) => {},
718                }
719            }
720
721            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
722                match self {
723                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
724                    Self::Failed(..) => {},
725                }
726            }
727
728            pub fn dispatcher_id(&self) -> DispatcherId {
729                match self {
730                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
731                    Self::Failed(dispatcher_id, ..) => *dispatcher_id,
732                }
733            }
734
735            pub fn is_empty(&self) -> bool {
736                match self {
737                    $(Self::$variant_name(inner) => inner.is_empty(), )*
738                    Self::Failed(..) => true,
739                }
740            }
741        }
742    }
743}
744
745macro_rules! for_all_dispatcher_variants {
746    ($macro:ident) => {
747        $macro! {
748            { Hash },
749            { Broadcast },
750            { Simple },
751            { RoundRobin }
752        }
753    };
754}
755
756for_all_dispatcher_variants! { impl_dispatcher }
757
758pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
759
760trait Dispatcher: Debug + 'static {
761    /// Dispatch a data chunk to downstream actors.
762    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
763    /// Dispatch barriers to downstream actors, generally by broadcasting it.
764    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
765    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
766    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
767
768    /// Add new outputs to the dispatcher.
769    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
770    /// Remove outputs to `actor_ids` from the dispatcher.
771    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
772
773    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
774    /// different IDs.
775    ///
776    /// Note that the dispatcher id is always equal to the downstream fragment id.
777    /// See also `proto/stream_plan.proto`.
778    fn dispatcher_id(&self) -> DispatcherId;
779
780    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
781    /// [`DispatchExecutor`].
782    fn is_empty(&self) -> bool;
783}
784
785/// Concurrently broadcast a message to all outputs.
786///
787/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
788/// always unlimited.
789async fn broadcast_concurrent(
790    outputs: impl IntoIterator<Item = &'_ mut Output>,
791    message: DispatcherMessageBatch,
792) -> StreamResult<()> {
793    futures::future::try_join_all(
794        outputs
795            .into_iter()
796            .map(|output| output.send(message.clone())),
797    )
798    .await?;
799    Ok(())
800}
801
802#[derive(Debug)]
803pub struct RoundRobinDataDispatcher {
804    outputs: Vec<Output>,
805    output_mapping: DispatchOutputMapping,
806    cur: usize,
807    dispatcher_id: DispatcherId,
808}
809
810impl RoundRobinDataDispatcher {
811    #[cfg_attr(not(test), expect(dead_code))]
812    pub fn new(
813        outputs: Vec<Output>,
814        output_mapping: DispatchOutputMapping,
815        dispatcher_id: DispatcherId,
816    ) -> Self {
817        Self {
818            outputs,
819            output_mapping,
820            cur: 0,
821            dispatcher_id,
822        }
823    }
824}
825
826impl Dispatcher for RoundRobinDataDispatcher {
827    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
828        let chunk = self.output_mapping.apply(chunk);
829
830        self.outputs[self.cur]
831            .send(DispatcherMessageBatch::Chunk(chunk))
832            .await?;
833        self.cur += 1;
834        self.cur %= self.outputs.len();
835        Ok(())
836    }
837
838    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
839        // always broadcast barrier
840        broadcast_concurrent(
841            &mut self.outputs,
842            DispatcherMessageBatch::BarrierBatch(barriers),
843        )
844        .await
845    }
846
847    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
848        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
849            // always broadcast watermark
850            broadcast_concurrent(
851                &mut self.outputs,
852                DispatcherMessageBatch::Watermark(watermark),
853            )
854            .await?;
855        }
856        Ok(())
857    }
858
859    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
860        self.outputs.extend(outputs);
861    }
862
863    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
864        self.outputs
865            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
866            .count();
867        self.cur = self.cur.min(self.outputs.len() - 1);
868    }
869
870    fn dispatcher_id(&self) -> DispatcherId {
871        self.dispatcher_id
872    }
873
874    fn is_empty(&self) -> bool {
875        self.outputs.is_empty()
876    }
877}
878
879pub struct HashDataDispatcher {
880    outputs: Vec<Output>,
881    keys: Vec<usize>,
882    output_mapping: DispatchOutputMapping,
883    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
884    /// different downstream actors.
885    hash_mapping: ExpandedActorMapping,
886    dispatcher_id: DispatcherId,
887}
888
889impl Debug for HashDataDispatcher {
890    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
891        f.debug_struct("HashDataDispatcher")
892            .field("outputs", &self.outputs)
893            .field("keys", &self.keys)
894            .field("dispatcher_id", &self.dispatcher_id)
895            .finish_non_exhaustive()
896    }
897}
898
899impl HashDataDispatcher {
900    pub fn new(
901        outputs: Vec<Output>,
902        keys: Vec<usize>,
903        output_mapping: DispatchOutputMapping,
904        hash_mapping: ExpandedActorMapping,
905        dispatcher_id: DispatcherId,
906    ) -> Self {
907        Self {
908            outputs,
909            keys,
910            output_mapping,
911            hash_mapping,
912            dispatcher_id,
913        }
914    }
915}
916
917impl Dispatcher for HashDataDispatcher {
918    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
919        self.outputs.extend(outputs);
920    }
921
922    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
923        // always broadcast barrier
924        broadcast_concurrent(
925            &mut self.outputs,
926            DispatcherMessageBatch::BarrierBatch(barriers),
927        )
928        .await
929    }
930
931    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
932        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
933            // always broadcast watermark
934            broadcast_concurrent(
935                &mut self.outputs,
936                DispatcherMessageBatch::Watermark(watermark),
937            )
938            .await?;
939        }
940        Ok(())
941    }
942
943    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
944        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
945        // In these output chunks, the only difference are visibility map, which is calculated
946        // by the hash value of each line in the input chunk.
947        let num_outputs = self.outputs.len();
948
949        // get hash value of every line by its key
950        let vnode_count = self.hash_mapping.len();
951        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
952
953        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
954
955        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
956            .take(num_outputs)
957            .collect_vec();
958        let mut last_update_delete_row_idx = None;
959        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
960
961        for (row_idx, ((vnode, &op), visible)) in vnodes
962            .iter()
963            .copied()
964            .zip_eq_fast(chunk.ops())
965            .zip_eq_fast(chunk.visibility().iter())
966            .enumerate()
967        {
968            // Build visibility map for every output chunk.
969            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
970                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
971            }
972
973            if !visible {
974                new_ops.push(op);
975                continue;
976            }
977
978            // The `Update` message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
979            // need to be rewritten to common `Delete` and `Insert` if the distribution key
980            // columns are changed, since the distribution key will eventually be part of the
981            // stream key of the downstream executor, and there's an invariant that stream key
982            // must be the same for rows within an `Update` pair.
983            if op == Op::UpdateDelete {
984                last_update_delete_row_idx = Some(row_idx);
985            } else if op == Op::UpdateInsert {
986                let delete_row_idx = last_update_delete_row_idx
987                    .take()
988                    .expect("missing U- before U+");
989                assert!(delete_row_idx + 1 == row_idx, "U- and U+ are not adjacent");
990
991                // Check if any distribution key column value changed
992                let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
993                    != chunk.row_at(row_idx).1.project(&self.keys);
994
995                if dist_key_changed {
996                    new_ops.push(Op::Delete);
997                    new_ops.push(Op::Insert);
998                } else {
999                    new_ops.push(Op::UpdateDelete);
1000                    new_ops.push(Op::UpdateInsert);
1001                }
1002            } else {
1003                new_ops.push(op);
1004            }
1005        }
1006        assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1007
1008        let ops = new_ops;
1009        // Apply output mapping after calculating the vnode and new visibility maps.
1010        let chunk = self.output_mapping.apply(chunk);
1011
1012        // individually output StreamChunk integrated with vis_map
1013        futures::future::try_join_all(
1014            vis_maps
1015                .into_iter()
1016                .zip_eq_fast(self.outputs.iter_mut())
1017                .map(|(vis_map, output)| async {
1018                    let vis_map = vis_map.finish();
1019                    // columns is not changed in this function
1020                    let new_stream_chunk =
1021                        StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1022                    if new_stream_chunk.cardinality() > 0 {
1023                        event!(
1024                            tracing::Level::TRACE,
1025                            msg = "chunk",
1026                            downstream = %output.actor_id(),
1027                            "send = \n{:#?}",
1028                            new_stream_chunk
1029                        );
1030                        output
1031                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1032                            .await?;
1033                    }
1034                    StreamResult::Ok(())
1035                }),
1036        )
1037        .await?;
1038
1039        Ok(())
1040    }
1041
1042    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1043        self.outputs
1044            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1045            .count();
1046    }
1047
1048    fn dispatcher_id(&self) -> DispatcherId {
1049        self.dispatcher_id
1050    }
1051
1052    fn is_empty(&self) -> bool {
1053        self.outputs.is_empty()
1054    }
1055}
1056
1057/// `BroadcastDispatcher` dispatches message to all outputs.
1058#[derive(Debug)]
1059pub struct BroadcastDispatcher {
1060    outputs: HashMap<ActorId, Output>,
1061    output_mapping: DispatchOutputMapping,
1062    dispatcher_id: DispatcherId,
1063}
1064
1065impl BroadcastDispatcher {
1066    pub fn new(
1067        outputs: impl IntoIterator<Item = Output>,
1068        output_mapping: DispatchOutputMapping,
1069        dispatcher_id: DispatcherId,
1070    ) -> Self {
1071        Self {
1072            outputs: Self::into_pairs(outputs).collect(),
1073            output_mapping,
1074            dispatcher_id,
1075        }
1076    }
1077
1078    fn into_pairs(
1079        outputs: impl IntoIterator<Item = Output>,
1080    ) -> impl Iterator<Item = (ActorId, Output)> {
1081        outputs
1082            .into_iter()
1083            .map(|output| (output.actor_id(), output))
1084    }
1085}
1086
1087impl Dispatcher for BroadcastDispatcher {
1088    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1089        let chunk = self.output_mapping.apply(chunk);
1090        broadcast_concurrent(
1091            self.outputs.values_mut(),
1092            DispatcherMessageBatch::Chunk(chunk),
1093        )
1094        .await
1095    }
1096
1097    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1098        // always broadcast barrier
1099        broadcast_concurrent(
1100            self.outputs.values_mut(),
1101            DispatcherMessageBatch::BarrierBatch(barriers),
1102        )
1103        .await
1104    }
1105
1106    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1107        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1108            // always broadcast watermark
1109            broadcast_concurrent(
1110                self.outputs.values_mut(),
1111                DispatcherMessageBatch::Watermark(watermark),
1112            )
1113            .await?;
1114        }
1115        Ok(())
1116    }
1117
1118    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1119        self.outputs.extend(Self::into_pairs(outputs));
1120    }
1121
1122    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1123        self.outputs
1124            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1125            .count();
1126    }
1127
1128    fn dispatcher_id(&self) -> DispatcherId {
1129        self.dispatcher_id
1130    }
1131
1132    fn is_empty(&self) -> bool {
1133        self.outputs.is_empty()
1134    }
1135}
1136
1137/// `SimpleDispatcher` dispatches message to a single output.
1138#[derive(Debug)]
1139pub struct SimpleDispatcher {
1140    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1141    /// the field needs to be temporarily set to 0 or 2 outputs.
1142    ///
1143    /// - When dropping a materialized view, the output will be removed and this field becomes
1144    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1145    ///   finishing processing the current mutation.
1146    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1147    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1148    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1149    ///   will be removed and this field becomes single again.
1150    ///
1151    /// Therefore, when dispatching data, we assert that there's exactly one output by
1152    /// `Self::output`.
1153    output: SmallVec<[Output; 2]>,
1154    output_mapping: DispatchOutputMapping,
1155    dispatcher_id: DispatcherId,
1156}
1157
1158impl SimpleDispatcher {
1159    pub fn new(
1160        output: Output,
1161        output_mapping: DispatchOutputMapping,
1162        dispatcher_id: DispatcherId,
1163    ) -> Self {
1164        Self {
1165            output: smallvec![output],
1166            output_mapping,
1167            dispatcher_id,
1168        }
1169    }
1170}
1171
1172impl Dispatcher for SimpleDispatcher {
1173    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1174        self.output.extend(outputs);
1175        assert!(self.output.len() <= 2);
1176    }
1177
1178    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1179        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1180        for output in &mut self.output {
1181            output
1182                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1183                .await?;
1184        }
1185        Ok(())
1186    }
1187
1188    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1189        let output = self
1190            .output
1191            .iter_mut()
1192            .exactly_one()
1193            .expect("expect exactly one output");
1194
1195        let chunk = self.output_mapping.apply(chunk);
1196        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1197    }
1198
1199    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1200        let output = self
1201            .output
1202            .iter_mut()
1203            .exactly_one()
1204            .expect("expect exactly one output");
1205
1206        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1207            output
1208                .send(DispatcherMessageBatch::Watermark(watermark))
1209                .await?;
1210        }
1211        Ok(())
1212    }
1213
1214    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1215        self.output
1216            .retain(|output| !actor_ids.contains(&output.actor_id()));
1217    }
1218
1219    fn dispatcher_id(&self) -> DispatcherId {
1220        self.dispatcher_id
1221    }
1222
1223    fn is_empty(&self) -> bool {
1224        self.output.is_empty()
1225    }
1226}
1227
1228#[cfg(test)]
1229mod tests {
1230    use std::hash::{BuildHasher, Hasher};
1231
1232    use futures::pin_mut;
1233    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1234    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1235    use risingwave_common::util::epoch::test_epoch;
1236    use risingwave_common::util::hash_util::Crc32FastBuilder;
1237    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1238    use tokio::sync::mpsc::unbounded_channel;
1239
1240    use super::*;
1241    use crate::executor::exchange::output::Output;
1242    use crate::executor::exchange::permit::channel_for_test;
1243    use crate::executor::receiver::ReceiverExecutor;
1244    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1245    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1246
1247    #[tokio::test]
1248    async fn test_hash_dispatcher_complex() {
1249        // This test only works when vnode count is 256.
1250        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1251
1252        let num_outputs = 2; // actor id ranges from 1 to 2
1253        let key_indices = &[0, 2];
1254        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1255            (0..num_outputs).map(|_| channel_for_test()).collect();
1256        let outputs = output_tx_vecs
1257            .into_iter()
1258            .enumerate()
1259            .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1260            .collect::<Vec<_>>();
1261        let mut hash_mapping = (1..num_outputs + 1)
1262            .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1263            .collect_vec();
1264        hash_mapping.resize(
1265            VirtualNode::COUNT_FOR_TEST,
1266            ActorId::new(num_outputs as u32),
1267        );
1268        let mut hash_dispatcher = HashDataDispatcher::new(
1269            outputs,
1270            key_indices.to_vec(),
1271            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1272            hash_mapping,
1273            0.into(),
1274        );
1275
1276        let chunk = StreamChunk::from_pretty(
1277            "  I I I
1278            +  4 6 8
1279            +  5 7 9
1280            +  0 0 0
1281            -  1 1 1 D
1282            U- 2 0 2
1283            U+ 2 0 2
1284            U- 3 3 2
1285            U+ 3 3 4",
1286        );
1287        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1288
1289        assert_eq!(
1290            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1291            StreamChunk::from_pretty(
1292                "  I I I
1293                +  4 6 8
1294                +  5 7 9
1295                +  0 0 0
1296                -  1 1 1 D
1297                U- 2 0 2
1298                U+ 2 0 2
1299                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1300                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1301            )
1302        );
1303        assert_eq!(
1304            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1305            StreamChunk::from_pretty(
1306                "  I I I
1307                +  4 6 8 D
1308                +  5 7 9 D
1309                +  0 0 0 D
1310                -  1 1 1 D  // Should keep original invisible mark
1311                U- 2 0 2 D  // Should keep UpdateDelete
1312                U+ 2 0 2 D  // Should keep UpdateInsert
1313                -  3 3 2    // Should rewrite UpdateDelete to Delete
1314                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1315            )
1316        );
1317    }
1318
1319    #[tokio::test]
1320    async fn test_configuration_change() {
1321        let _schema = Schema { fields: vec![] };
1322        let (tx, rx) = channel_for_test();
1323        let actor_id = 233.into();
1324        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1325
1326        let (untouched, old, new) = (234.into(), 235.into(), 238.into()); // broadcast downstream actors
1327        let (old_simple, new_simple) = (114.into(), 514.into()); // simple downstream actors
1328
1329        // actor_id -> untouched, old, new, old_simple, new_simple
1330
1331        let broadcast_dispatcher_id = 666.into();
1332        let broadcast_dispatcher = PbDispatcher {
1333            r#type: DispatcherType::Broadcast as _,
1334            dispatcher_id: broadcast_dispatcher_id,
1335            downstream_actor_id: vec![untouched, old],
1336            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1337            ..Default::default()
1338        };
1339
1340        let simple_dispatcher_id = 888.into();
1341        let simple_dispatcher = PbDispatcher {
1342            r#type: DispatcherType::Simple as _,
1343            dispatcher_id: simple_dispatcher_id,
1344            downstream_actor_id: vec![old_simple],
1345            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1346            ..Default::default()
1347        };
1348
1349        let dispatcher_updates = maplit::hashmap! {
1350            actor_id => vec![PbDispatcherUpdate {
1351                actor_id,
1352                dispatcher_id: broadcast_dispatcher_id,
1353                added_downstream_actor_id: vec![new],
1354                removed_downstream_actor_id: vec![old],
1355                hash_mapping: Default::default(),
1356            }]
1357        };
1358        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1359            UpdateMutation {
1360                dispatchers: dispatcher_updates,
1361                ..Default::default()
1362            },
1363        ));
1364        barrier_test_env.inject_barrier(&b1, [actor_id]);
1365        barrier_test_env.flush_all_events().await;
1366
1367        let input = Executor::new(
1368            Default::default(),
1369            ReceiverExecutor::for_test(
1370                actor_id,
1371                rx,
1372                barrier_test_env.local_barrier_manager.clone(),
1373            )
1374            .boxed(),
1375        );
1376
1377        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1378        let mut rxs = [untouched, old, new, old_simple, new_simple]
1379            .into_iter()
1380            .map(|id| {
1381                (id, {
1382                    let (tx, rx) = channel_for_test();
1383                    new_output_request_tx
1384                        .send((id, NewOutputRequest::Local(tx)))
1385                        .unwrap();
1386                    rx
1387                })
1388            })
1389            .collect::<HashMap<_, _>>();
1390        let executor = Box::new(
1391            DispatchExecutor::new(
1392                input,
1393                new_output_request_rx,
1394                vec![broadcast_dispatcher, simple_dispatcher],
1395                &ActorContext::for_test(actor_id),
1396            )
1397            .await
1398            .unwrap(),
1399        )
1400        .execute();
1401
1402        pin_mut!(executor);
1403
1404        macro_rules! try_recv {
1405            ($down_id:expr) => {
1406                rxs.get_mut(&$down_id).unwrap().try_recv()
1407            };
1408        }
1409
1410        // 3. Send a chunk.
1411        tx.send(Message::Chunk(StreamChunk::default()).into())
1412            .await
1413            .unwrap();
1414
1415        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1416            .await
1417            .unwrap();
1418        executor.next().await.unwrap().unwrap();
1419
1420        // 5. Check downstream.
1421        try_recv!(untouched).unwrap().as_chunk().unwrap();
1422        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1423
1424        try_recv!(old).unwrap().as_chunk().unwrap();
1425        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1426
1427        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1428
1429        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1430        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1431
1432        // 6. Send another barrier.
1433        let b2 = Barrier::new_test_barrier(test_epoch(2));
1434        barrier_test_env.inject_barrier(&b2, [actor_id]);
1435        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1436            .await
1437            .unwrap();
1438        executor.next().await.unwrap().unwrap();
1439
1440        // 7. Check downstream.
1441        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1442        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1443        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1444
1445        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1446        try_recv!(new_simple).unwrap_err(); // Untouched.
1447
1448        // 8. Send another chunk.
1449        tx.send(Message::Chunk(StreamChunk::default()).into())
1450            .await
1451            .unwrap();
1452
1453        // 9. Send a configuration change barrier for simple dispatcher.
1454        let dispatcher_updates = maplit::hashmap! {
1455            actor_id => vec![PbDispatcherUpdate {
1456                actor_id,
1457                dispatcher_id: simple_dispatcher_id,
1458                added_downstream_actor_id: vec![new_simple],
1459                removed_downstream_actor_id: vec![old_simple],
1460                hash_mapping: Default::default(),
1461            }]
1462        };
1463        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1464            UpdateMutation {
1465                dispatchers: dispatcher_updates,
1466                ..Default::default()
1467            },
1468        ));
1469        barrier_test_env.inject_barrier(&b3, [actor_id]);
1470        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1471            .await
1472            .unwrap();
1473        executor.next().await.unwrap().unwrap();
1474
1475        // 10. Check downstream.
1476        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1477        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1478
1479        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1480
1481        // 11. Send another barrier.
1482        let b4 = Barrier::new_test_barrier(test_epoch(4));
1483        barrier_test_env.inject_barrier(&b4, [actor_id]);
1484        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1485            .await
1486            .unwrap();
1487        executor.next().await.unwrap().unwrap();
1488
1489        // 12. Check downstream.
1490        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1491        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1492    }
1493
1494    #[tokio::test]
1495    async fn test_hash_dispatcher() {
1496        // This test only works when vnode count is 256.
1497        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1498
1499        let num_outputs = 5; // actor id ranges from 1 to 5
1500        let cardinality = 10;
1501        let dimension = 4;
1502        let key_indices = &[0, 2];
1503        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1504            (0..num_outputs).map(|_| channel_for_test()).collect();
1505        let outputs = output_tx_vecs
1506            .into_iter()
1507            .enumerate()
1508            .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1509            .collect::<Vec<_>>();
1510        let mut hash_mapping = (1..num_outputs + 1)
1511            .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1512            .collect_vec();
1513        hash_mapping.resize(
1514            VirtualNode::COUNT_FOR_TEST,
1515            ActorId::new(num_outputs as u32),
1516        );
1517        let mut hash_dispatcher = HashDataDispatcher::new(
1518            outputs,
1519            key_indices.to_vec(),
1520            DispatchOutputMapping::Simple((0..dimension).collect()),
1521            hash_mapping.clone(),
1522            0.into(),
1523        );
1524
1525        let mut ops = Vec::new();
1526        for idx in 0..cardinality {
1527            if idx % 2 == 0 {
1528                ops.push(Op::Insert);
1529            } else {
1530                ops.push(Op::Delete);
1531            }
1532        }
1533
1534        let mut start = 19260817i32..;
1535        let mut builders = (0..dimension)
1536            .map(|_| I32ArrayBuilder::new(cardinality))
1537            .collect_vec();
1538        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1539        let mut output_ops = vec![vec![]; num_outputs];
1540        for op in &ops {
1541            let hash_builder = Crc32FastBuilder;
1542            let mut hasher = hash_builder.build_hasher();
1543            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1544            for key_idx in key_indices {
1545                let val = one_row[*key_idx];
1546                let bytes = val.to_le_bytes();
1547                hasher.update(&bytes);
1548            }
1549            let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1550                .as_raw_id() as usize
1551                - 1;
1552            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1553                builder.append(Some(*val));
1554            }
1555            output_cols[output_idx]
1556                .iter_mut()
1557                .zip_eq_fast(one_row.iter())
1558                .for_each(|(each_column, val)| each_column.push(*val));
1559            output_ops[output_idx].push(op);
1560        }
1561
1562        let columns = builders
1563            .into_iter()
1564            .map(|builder| {
1565                let array = builder.finish();
1566                array.into_ref()
1567            })
1568            .collect();
1569
1570        let chunk = StreamChunk::new(ops, columns);
1571        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1572
1573        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1574            let mut output = vec![];
1575            while let Some(Some(msg)) = rx.recv().now_or_never() {
1576                output.push(msg);
1577            }
1578            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1579            assert!(output.len() <= 1);
1580            if output.is_empty() {
1581                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1582            } else {
1583                let message = output.first().unwrap();
1584                let real_chunk = match message {
1585                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1586                    _ => panic!(),
1587                };
1588                real_chunk
1589                    .columns()
1590                    .iter()
1591                    .zip_eq_fast(output_cols[output_idx].iter())
1592                    .for_each(|(real_col, expect_col)| {
1593                        let real_vals = real_chunk
1594                            .visibility()
1595                            .iter_ones()
1596                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1597                            .collect::<Vec<_>>();
1598                        assert_eq!(real_vals.len(), expect_col.len());
1599                        assert_eq!(real_vals, *expect_col);
1600                    });
1601            }
1602        }
1603    }
1604}