risingwave_stream/executor/
dispatch.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::FutureExt;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
28use risingwave_common::metrics::LabelGuardedIntCounter;
29use risingwave_common::row::RowExt;
30use risingwave_common::util::iter_util::ZipEqFast;
31use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
32use risingwave_pb::stream_plan::{self, PbDispatcher};
33use smallvec::{SmallVec, smallvec};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::UnboundedReceiver;
36use tokio::task::consume_budget;
37use tokio::time::Instant;
38use tokio_stream::StreamExt;
39use tokio_stream::adapters::Peekable;
40use tracing::{Instrument, event};
41
42use super::exchange::output::Output;
43use super::{
44    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
45    UpdateMutation,
46};
47use crate::executor::prelude::*;
48use crate::executor::{StopMutation, StreamConsumer};
49use crate::task::{DispatcherId, NewOutputRequest};
50
51mod output_mapping;
52pub use output_mapping::DispatchOutputMapping;
53use risingwave_common::id::FragmentId;
54
55use crate::error::StreamError;
56
57/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
58/// data chunks will be dispatched with some specified policy, while control message
59/// such as barriers will be distributed to all receivers.
60pub struct DispatchExecutor {
61    input: Executor,
62    inner: DispatchExecutorInner,
63}
64
65struct DispatcherWithMetrics {
66    dispatcher: DispatcherImpl,
67    pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
68}
69
70impl Debug for DispatcherWithMetrics {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        self.dispatcher.fmt(f)
73    }
74}
75
76impl Deref for DispatcherWithMetrics {
77    type Target = DispatcherImpl;
78
79    fn deref(&self) -> &Self::Target {
80        &self.dispatcher
81    }
82}
83
84impl DerefMut for DispatcherWithMetrics {
85    fn deref_mut(&mut self) -> &mut Self::Target {
86        &mut self.dispatcher
87    }
88}
89
90struct DispatchExecutorMetrics {
91    actor_id_str: String,
92    fragment_id_str: String,
93    metrics: Arc<StreamingMetrics>,
94    actor_out_record_cnt: LabelGuardedIntCounter,
95}
96
97impl DispatchExecutorMetrics {
98    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
99        DispatcherWithMetrics {
100            actor_output_buffer_blocking_duration_ns: self
101                .metrics
102                .actor_output_buffer_blocking_duration_ns
103                .with_guarded_label_values(&[
104                    self.actor_id_str.as_str(),
105                    self.fragment_id_str.as_str(),
106                    dispatcher.dispatcher_id().to_string().as_str(),
107                ]),
108            dispatcher,
109        }
110    }
111}
112
113struct DispatchExecutorInner {
114    dispatchers: Vec<DispatcherWithMetrics>,
115    actor_id: ActorId,
116    actor_config: Arc<StreamingConfig>,
117    metrics: DispatchExecutorMetrics,
118    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
119    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
120}
121
122impl DispatchExecutorInner {
123    async fn collect_outputs(
124        &mut self,
125        downstream_actors: &[ActorId],
126    ) -> StreamResult<Vec<Output>> {
127        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
128            let tx = match request {
129                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
130            };
131            Output::new(downstream_actor, tx)
132        }
133        let mut outputs = Vec::with_capacity(downstream_actors.len());
134        for &downstream_actor in downstream_actors {
135            let output =
136                if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
137                    resolve_output(downstream_actor, request)
138                } else {
139                    loop {
140                        let (requested_actor, request) = self
141                            .new_output_request_rx
142                            .recv()
143                            .await
144                            .ok_or_else(|| anyhow!("end of new output request"))?;
145                        if requested_actor == downstream_actor {
146                            break resolve_output(requested_actor, request);
147                        } else {
148                            assert!(
149                                self.pending_new_output_requests
150                                    .insert(requested_actor, request)
151                                    .is_none(),
152                                "duplicated inflight new output requests from actor {}",
153                                requested_actor
154                            );
155                        }
156                    }
157                };
158            outputs.push(output);
159        }
160        Ok(outputs)
161    }
162
163    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
164        if self.dispatchers.is_empty() {
165            // Our dispatcher internal implementation calls tokio method, which already involves cooperative scheduling
166            // with tokio runtime.
167            //
168            // When there is no dispatcher, we should do the cooperative scheduling together.
169            consume_budget().await;
170        }
171        async fn await_with_metrics(
172            future: impl Future<Output = ()>,
173            metrics: &LabelGuardedIntCounter,
174        ) {
175            let mut start_time = Instant::now();
176            let interval_duration = Duration::from_secs(15);
177            let mut interval =
178                tokio::time::interval_at(start_time + interval_duration, interval_duration);
179
180            let mut fut = pin!(future);
181
182            loop {
183                tokio::select! {
184                    biased;
185                    _res = &mut fut => {
186                        let ns = start_time.elapsed().as_nanos() as u64;
187                        metrics.inc_by(ns);
188                        break;
189                    }
190                    _ = interval.tick() => {
191                        start_time = Instant::now();
192                        metrics.inc_by(interval_duration.as_nanos() as u64);
193                    }
194                };
195            }
196        }
197
198        use futures::StreamExt;
199
200        let limit = self.actor_config.developer.exchange_concurrent_dispatchers;
201        // Only barrier can be batched for now.
202        match msg {
203            MessageBatch::BarrierBatch(barrier_batch) => {
204                if barrier_batch.is_empty() {
205                    return Ok(());
206                }
207                // Only the first barrier in a batch can be mutation.
208                let mutation = barrier_batch[0].mutation.clone();
209                self.pre_mutate_dispatchers(&mutation).await?;
210                futures::stream::iter(self.dispatchers.iter_mut())
211                    .for_each_concurrent(limit, |dispatcher| async {
212                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
213                        let dispatcher_output = &mut dispatcher.dispatcher;
214                        let fut = dispatcher_output.dispatch_barriers(
215                            barrier_batch
216                                .iter()
217                                .cloned()
218                                .map(|b| b.into_dispatcher())
219                                .collect(),
220                        );
221                        await_with_metrics(fut, metrics).await;
222                    })
223                    .await;
224                self.post_mutate_dispatchers(&mutation)?;
225            }
226            MessageBatch::Watermark(watermark) => {
227                futures::stream::iter(self.dispatchers.iter_mut())
228                    .for_each_concurrent(limit, |dispatcher| async {
229                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
230                        let dispatcher_output = &mut dispatcher.dispatcher;
231                        let fut = dispatcher_output.dispatch_watermark(watermark.clone());
232                        await_with_metrics(fut, metrics).await;
233                    })
234                    .await;
235            }
236            MessageBatch::Chunk(chunk) => {
237                futures::stream::iter(self.dispatchers.iter_mut())
238                    .for_each_concurrent(limit, |dispatcher| async {
239                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
240                        let dispatcher_output = &mut dispatcher.dispatcher;
241                        let fut = dispatcher_output.dispatch_data(chunk.clone());
242                        await_with_metrics(fut, metrics).await;
243                    })
244                    .await;
245
246                self.metrics
247                    .actor_out_record_cnt
248                    .inc_by(chunk.cardinality() as _);
249            }
250        }
251        self.dispatchers.retain(|dispatcher| match &**dispatcher {
252            DispatcherImpl::Failed(dispatcher_id, e) => {
253                warn!(%dispatcher_id, err = %e.as_report(), actor_id = %self.actor_id, "dispatch fail");
254                false
255            }
256            _ => true,
257        });
258        Ok(())
259    }
260
261    /// Add new dispatchers to the executor. Will check whether their ids are unique.
262    async fn add_dispatchers<'a>(
263        &mut self,
264        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
265    ) -> StreamResult<()> {
266        for dispatcher in new_dispatchers {
267            let outputs = self
268                .collect_outputs(&dispatcher.downstream_actor_id)
269                .await?;
270            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
271            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
272            self.dispatchers.push(dispatcher);
273        }
274
275        assert!(
276            self.dispatchers
277                .iter()
278                .map(|d| d.dispatcher_id())
279                .all_unique(),
280            "dispatcher ids must be unique: {:?}",
281            self.dispatchers
282        );
283
284        Ok(())
285    }
286
287    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> Option<&mut DispatcherImpl> {
288        self.dispatchers
289            .iter_mut()
290            .find(|d| d.dispatcher_id() == dispatcher_id)
291            .map(DerefMut::deref_mut)
292    }
293
294    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
295    /// outputs.
296    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
297        let outputs = self
298            .collect_outputs(&update.added_downstream_actor_id)
299            .await?;
300
301        let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
302            warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
303            return Ok(());
304        };
305        dispatcher.add_outputs(outputs);
306
307        Ok(())
308    }
309
310    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
311    /// update the hash mapping.
312    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
313        let ids = update.removed_downstream_actor_id.iter().copied().collect();
314
315        let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
316            warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
317            return Ok(());
318        };
319        dispatcher.remove_outputs(&ids);
320
321        // The hash mapping is only used by the hash dispatcher.
322        //
323        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
324        // it's possible that there're multiple upstreams with different exchange types, for
325        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
326        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
327        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
328            dispatcher.hash_mapping =
329                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
330        }
331
332        Ok(())
333    }
334
335    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
336    async fn pre_mutate_dispatchers(
337        &mut self,
338        mutation: &Option<Arc<Mutation>>,
339    ) -> StreamResult<()> {
340        let Some(mutation) = mutation.as_deref() else {
341            return Ok(());
342        };
343
344        match mutation {
345            Mutation::Add(AddMutation { adds, .. }) => {
346                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
347                    self.add_dispatchers(new_dispatchers).await?;
348                }
349            }
350            Mutation::Update(UpdateMutation {
351                dispatchers,
352                actor_new_dispatchers: actor_dispatchers,
353                ..
354            }) => {
355                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
356                    self.add_dispatchers(new_dispatchers).await?;
357                }
358
359                if let Some(updates) = dispatchers.get(&self.actor_id) {
360                    for update in updates {
361                        self.pre_update_dispatcher(update).await?;
362                    }
363                }
364            }
365            _ => {}
366        }
367
368        Ok(())
369    }
370
371    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
372    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
373        let Some(mutation) = mutation.as_deref() else {
374            return Ok(());
375        };
376
377        match mutation {
378            Mutation::Stop(StopMutation { dropped_actors, .. }) => {
379                // Remove outputs only if this actor itself is not to be stopped.
380                if !dropped_actors.contains(&self.actor_id) {
381                    for dispatcher in &mut self.dispatchers {
382                        dispatcher.remove_outputs(dropped_actors);
383                    }
384                }
385            }
386            Mutation::Update(UpdateMutation {
387                dispatchers,
388                dropped_actors,
389                ..
390            }) => {
391                if let Some(updates) = dispatchers.get(&self.actor_id) {
392                    for update in updates {
393                        self.post_update_dispatcher(update)?;
394                    }
395                }
396
397                if !dropped_actors.contains(&self.actor_id) {
398                    for dispatcher in &mut self.dispatchers {
399                        dispatcher.remove_outputs(dropped_actors);
400                    }
401                }
402            }
403            _ => {}
404        };
405
406        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
407        // should clean up them.
408        self.dispatchers.retain(|d| !d.is_empty());
409
410        Ok(())
411    }
412}
413
414impl DispatchExecutor {
415    pub(crate) async fn new(
416        input: Executor,
417        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
418        dispatchers: Vec<stream_plan::Dispatcher>,
419        actor_context: &ActorContextRef,
420    ) -> StreamResult<Self> {
421        let mut executor = Self::new_inner(
422            input,
423            new_output_request_rx,
424            vec![],
425            actor_context.id,
426            actor_context.fragment_id,
427            actor_context.config.clone(),
428            actor_context.streaming_metrics.clone(),
429        );
430        let inner = &mut executor.inner;
431        for dispatcher in dispatchers {
432            let outputs = inner
433                .collect_outputs(&dispatcher.downstream_actor_id)
434                .await?;
435            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
436            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
437            inner.dispatchers.push(dispatcher);
438        }
439        Ok(executor)
440    }
441
442    #[cfg(test)]
443    pub(crate) fn for_test(
444        input: Executor,
445        dispatchers: Vec<DispatcherImpl>,
446        actor_id: ActorId,
447        fragment_id: FragmentId,
448        actor_config: Arc<StreamingConfig>,
449        metrics: Arc<StreamingMetrics>,
450    ) -> (
451        Self,
452        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
453    ) {
454        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
455
456        (
457            Self::new_inner(
458                input,
459                rx,
460                dispatchers,
461                actor_id,
462                fragment_id,
463                actor_config,
464                metrics,
465            ),
466            tx,
467        )
468    }
469
470    fn new_inner(
471        mut input: Executor,
472        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
473        dispatchers: Vec<DispatcherImpl>,
474        actor_id: ActorId,
475        fragment_id: FragmentId,
476        actor_config: Arc<StreamingConfig>,
477        metrics: Arc<StreamingMetrics>,
478    ) -> Self {
479        if crate::consistency::insane() {
480            // make some trouble before dispatching to avoid generating invalid dist key.
481            let mut info = input.info().clone();
482            info.identity = format!("{} (embedded trouble)", info.identity);
483            let troublemaker = TroublemakerExecutor::new(input, actor_config.developer.chunk_size);
484            input = (info, troublemaker).into();
485        }
486
487        let actor_id_str = actor_id.to_string();
488        let fragment_id_str = fragment_id.to_string();
489        let actor_out_record_cnt = metrics
490            .actor_out_record_cnt
491            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
492        let metrics = DispatchExecutorMetrics {
493            actor_id_str,
494            fragment_id_str,
495            metrics,
496            actor_out_record_cnt,
497        };
498        let dispatchers = dispatchers
499            .into_iter()
500            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
501            .collect();
502
503        Self {
504            input,
505            inner: DispatchExecutorInner {
506                dispatchers,
507                actor_id,
508                actor_config,
509                metrics,
510                new_output_request_rx,
511                pending_new_output_requests: Default::default(),
512            },
513        }
514    }
515}
516
517impl StreamConsumer for DispatchExecutor {
518    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
519
520    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
521        let max_barrier_count_per_batch = self.inner.actor_config.developer.max_barrier_batch_size;
522        #[try_stream]
523        async move {
524            let mut input = self.input.execute().peekable();
525            loop {
526                let Some(message) =
527                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
528                else {
529                    // end_of_stream
530                    break;
531                };
532                match message {
533                    chunk @ MessageBatch::Chunk(_) => {
534                        self.inner
535                            .dispatch(chunk)
536                            .instrument(tracing::info_span!("dispatch_chunk"))
537                            .instrument_await("dispatch_chunk")
538                            .await?;
539                    }
540                    MessageBatch::BarrierBatch(barrier_batch) => {
541                        assert!(!barrier_batch.is_empty());
542                        self.inner
543                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
544                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
545                            .instrument_await("dispatch_barrier_batch")
546                            .await?;
547                        self.inner
548                            .metrics
549                            .metrics
550                            .barrier_batch_size
551                            .observe(barrier_batch.len() as f64);
552                        for barrier in barrier_batch {
553                            yield barrier;
554                        }
555                    }
556                    watermark @ MessageBatch::Watermark(_) => {
557                        self.inner
558                            .dispatch(watermark)
559                            .instrument(tracing::info_span!("dispatch_watermark"))
560                            .instrument_await("dispatch_watermark")
561                            .await?;
562                    }
563                }
564            }
565        }
566    }
567}
568
569/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
570///
571/// Returns the message batch.
572///
573/// Returns None if end of stream.
574async fn try_batch_barriers(
575    max_barrier_count_per_batch: u32,
576    input: &mut Peekable<BoxedMessageStream>,
577) -> StreamResult<Option<MessageBatch>> {
578    let Some(msg) = input.next().await else {
579        // end_of_stream
580        return Ok(None);
581    };
582    let mut barrier_batch = vec![];
583    let msg: Message = msg?;
584    let max_peek_attempts = match msg {
585        Message::Chunk(c) => {
586            return Ok(Some(MessageBatch::Chunk(c)));
587        }
588        Message::Watermark(w) => {
589            return Ok(Some(MessageBatch::Watermark(w)));
590        }
591        Message::Barrier(b) => {
592            let peek_more_barrier = b.mutation.is_none();
593            barrier_batch.push(b);
594            if peek_more_barrier {
595                max_barrier_count_per_batch.saturating_sub(1)
596            } else {
597                0
598            }
599        }
600    };
601    // Try to peek more consecutive non-mutation barriers.
602    for _ in 0..max_peek_attempts {
603        let peek = input.peek().now_or_never();
604        let Some(peek) = peek else {
605            break;
606        };
607        let Some(msg) = peek else {
608            // end_of_stream
609            break;
610        };
611        let Ok(Message::Barrier(barrier)) = msg else {
612            break;
613        };
614        if barrier.mutation.is_some() {
615            break;
616        }
617        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
618        let Message::Barrier(ref barrier) = msg else {
619            unreachable!("must be a barrier");
620        };
621        barrier_batch.push(barrier.clone());
622    }
623    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
624}
625
626#[derive(Debug)]
627pub(crate) enum DispatcherImpl {
628    Hash(HashDataDispatcher),
629    Broadcast(BroadcastDispatcher),
630    Simple(SimpleDispatcher),
631    #[cfg_attr(not(test), expect(dead_code))]
632    RoundRobin(RoundRobinDataDispatcher),
633    /// The dispatcher has failed to dispatch to downstream and gets pending
634    /// Should be clean up later.
635    Failed(DispatcherId, StreamError),
636}
637
638impl DispatcherImpl {
639    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
640        let output_mapping =
641            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
642
643        use risingwave_pb::stream_plan::DispatcherType::*;
644        let dispatcher_impl = match dispatcher.get_type()? {
645            Hash => {
646                assert!(!outputs.is_empty());
647                let dist_key_indices = dispatcher
648                    .dist_key_indices
649                    .iter()
650                    .map(|i| *i as usize)
651                    .collect();
652
653                let hash_mapping =
654                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
655
656                DispatcherImpl::Hash(HashDataDispatcher::new(
657                    outputs,
658                    dist_key_indices,
659                    output_mapping,
660                    hash_mapping,
661                    dispatcher.dispatcher_id,
662                ))
663            }
664            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
665                outputs,
666                output_mapping,
667                dispatcher.dispatcher_id,
668            )),
669            Simple | NoShuffle => {
670                let [output]: [_; 1] = outputs.try_into().unwrap();
671                DispatcherImpl::Simple(SimpleDispatcher::new(
672                    output,
673                    output_mapping,
674                    dispatcher.dispatcher_id,
675                ))
676            }
677            Unspecified => unreachable!(),
678        };
679
680        Ok(dispatcher_impl)
681    }
682}
683
684macro_rules! impl_dispatcher {
685    ($( { $variant_name:ident } ),*) => {
686        impl DispatcherImpl {
687            pub async fn dispatch_data(&mut self, chunk: StreamChunk) {
688                if let Err(e) = match self {
689                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
690                    Self::Failed(..) => unreachable!(),
691                } {
692                    *self = Self::Failed(self.dispatcher_id(), e);
693                }
694            }
695
696            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) {
697                if let Err(e) = match self {
698                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
699                    Self::Failed(..) => unreachable!(),
700                } {
701                    *self = Self::Failed(self.dispatcher_id(), e);
702                }
703            }
704
705            pub async fn dispatch_watermark(&mut self, watermark: Watermark) {
706                if let Err(e) = match self {
707                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
708                    Self::Failed(..) => unreachable!(),
709                } {
710                    *self = Self::Failed(self.dispatcher_id(), e);
711                }
712            }
713
714            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
715                match self {
716                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
717                    Self::Failed(..) => {},
718                }
719            }
720
721            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
722                match self {
723                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
724                    Self::Failed(..) => {},
725                }
726            }
727
728            pub fn dispatcher_id(&self) -> DispatcherId {
729                match self {
730                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
731                    Self::Failed(dispatcher_id, ..) => *dispatcher_id,
732                }
733            }
734
735            pub fn is_empty(&self) -> bool {
736                match self {
737                    $(Self::$variant_name(inner) => inner.is_empty(), )*
738                    Self::Failed(..) => true,
739                }
740            }
741        }
742    }
743}
744
745macro_rules! for_all_dispatcher_variants {
746    ($macro:ident) => {
747        $macro! {
748            { Hash },
749            { Broadcast },
750            { Simple },
751            { RoundRobin }
752        }
753    };
754}
755
756for_all_dispatcher_variants! { impl_dispatcher }
757
758pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
759
760trait Dispatcher: Debug + 'static {
761    /// Dispatch a data chunk to downstream actors.
762    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
763    /// Dispatch barriers to downstream actors, generally by broadcasting it.
764    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
765    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
766    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
767
768    /// Add new outputs to the dispatcher.
769    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
770    /// Remove outputs to `actor_ids` from the dispatcher.
771    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
772
773    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
774    /// different IDs.
775    ///
776    /// Note that the dispatcher id is always equal to the downstream fragment id.
777    /// See also `proto/stream_plan.proto`.
778    fn dispatcher_id(&self) -> DispatcherId;
779
780    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
781    /// [`DispatchExecutor`].
782    fn is_empty(&self) -> bool;
783}
784
785/// Concurrently broadcast a message to all outputs.
786///
787/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
788/// always unlimited.
789async fn broadcast_concurrent(
790    outputs: impl IntoIterator<Item = &'_ mut Output>,
791    message: DispatcherMessageBatch,
792) -> StreamResult<()> {
793    futures::future::try_join_all(
794        outputs
795            .into_iter()
796            .map(|output| output.send(message.clone())),
797    )
798    .await?;
799    Ok(())
800}
801
802#[derive(Debug)]
803pub struct RoundRobinDataDispatcher {
804    outputs: Vec<Output>,
805    output_mapping: DispatchOutputMapping,
806    cur: usize,
807    dispatcher_id: DispatcherId,
808}
809
810impl RoundRobinDataDispatcher {
811    #[cfg_attr(not(test), expect(dead_code))]
812    pub fn new(
813        outputs: Vec<Output>,
814        output_mapping: DispatchOutputMapping,
815        dispatcher_id: DispatcherId,
816    ) -> Self {
817        Self {
818            outputs,
819            output_mapping,
820            cur: 0,
821            dispatcher_id,
822        }
823    }
824}
825
826impl Dispatcher for RoundRobinDataDispatcher {
827    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
828        let chunk = self.output_mapping.apply(chunk);
829
830        self.outputs[self.cur]
831            .send(DispatcherMessageBatch::Chunk(chunk))
832            .await?;
833        self.cur += 1;
834        self.cur %= self.outputs.len();
835        Ok(())
836    }
837
838    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
839        // always broadcast barrier
840        broadcast_concurrent(
841            &mut self.outputs,
842            DispatcherMessageBatch::BarrierBatch(barriers),
843        )
844        .await
845    }
846
847    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
848        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
849            // always broadcast watermark
850            broadcast_concurrent(
851                &mut self.outputs,
852                DispatcherMessageBatch::Watermark(watermark),
853            )
854            .await?;
855        }
856        Ok(())
857    }
858
859    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
860        self.outputs.extend(outputs);
861    }
862
863    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
864        self.outputs
865            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
866            .count();
867        self.cur = self.cur.min(self.outputs.len() - 1);
868    }
869
870    fn dispatcher_id(&self) -> DispatcherId {
871        self.dispatcher_id
872    }
873
874    fn is_empty(&self) -> bool {
875        self.outputs.is_empty()
876    }
877}
878
879pub struct HashDataDispatcher {
880    outputs: Vec<Output>,
881    keys: Vec<usize>,
882    output_mapping: DispatchOutputMapping,
883    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
884    /// different downstream actors.
885    hash_mapping: ExpandedActorMapping,
886    dispatcher_id: DispatcherId,
887}
888
889impl Debug for HashDataDispatcher {
890    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
891        f.debug_struct("HashDataDispatcher")
892            .field("outputs", &self.outputs)
893            .field("keys", &self.keys)
894            .field("dispatcher_id", &self.dispatcher_id)
895            .finish_non_exhaustive()
896    }
897}
898
899impl HashDataDispatcher {
900    pub fn new(
901        outputs: Vec<Output>,
902        keys: Vec<usize>,
903        output_mapping: DispatchOutputMapping,
904        hash_mapping: ExpandedActorMapping,
905        dispatcher_id: DispatcherId,
906    ) -> Self {
907        Self {
908            outputs,
909            keys,
910            output_mapping,
911            hash_mapping,
912            dispatcher_id,
913        }
914    }
915}
916
917impl Dispatcher for HashDataDispatcher {
918    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
919        self.outputs.extend(outputs);
920    }
921
922    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
923        // always broadcast barrier
924        broadcast_concurrent(
925            &mut self.outputs,
926            DispatcherMessageBatch::BarrierBatch(barriers),
927        )
928        .await
929    }
930
931    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
932        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
933            // always broadcast watermark
934            broadcast_concurrent(
935                &mut self.outputs,
936                DispatcherMessageBatch::Watermark(watermark),
937            )
938            .await?;
939        }
940        Ok(())
941    }
942
943    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
944        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
945        // In these output chunks, the only difference are visibility map, which is calculated
946        // by the hash value of each line in the input chunk.
947        let num_outputs = self.outputs.len();
948
949        // get hash value of every line by its key
950        let vnode_count = self.hash_mapping.len();
951        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
952
953        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
954
955        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
956            .take(num_outputs)
957            .collect_vec();
958        let mut last_update_delete_row_idx = None;
959        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
960
961        for (row_idx, ((vnode, &op), visible)) in vnodes
962            .iter()
963            .copied()
964            .zip_eq_fast(chunk.ops())
965            .zip_eq_fast(chunk.visibility().iter())
966            .enumerate()
967        {
968            // Build visibility map for every output chunk.
969            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
970                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
971            }
972
973            if !visible {
974                new_ops.push(op);
975                continue;
976            }
977
978            // The `Update` message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
979            // need to be rewritten to common `Delete` and `Insert` if the distribution key
980            // columns are changed, since the distribution key will eventually be part of the
981            // stream key of the downstream executor, and there's an invariant that stream key
982            // must be the same for rows within an `Update` pair.
983            if op == Op::UpdateDelete {
984                last_update_delete_row_idx = Some(row_idx);
985            } else if op == Op::UpdateInsert {
986                let delete_row_idx = last_update_delete_row_idx
987                    .take()
988                    .expect("missing U- before U+");
989                assert!(delete_row_idx + 1 == row_idx, "U- and U+ are not adjacent");
990
991                // Check if any distribution key column value changed
992                let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
993                    != chunk.row_at(row_idx).1.project(&self.keys);
994
995                if dist_key_changed {
996                    new_ops.push(Op::Delete);
997                    new_ops.push(Op::Insert);
998                } else {
999                    new_ops.push(Op::UpdateDelete);
1000                    new_ops.push(Op::UpdateInsert);
1001                }
1002            } else {
1003                new_ops.push(op);
1004            }
1005        }
1006        assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1007
1008        let ops = new_ops;
1009        // Apply output mapping after calculating the vnode and new visibility maps.
1010        // The output mapping may project columns and eliminate noop updates.
1011        let chunk = self.output_mapping.apply(chunk);
1012        // Get the visibility after noop update elimination to incorporate into the final visibility.
1013        let chunk_vis = chunk.visibility();
1014
1015        // individually output StreamChunk integrated with vis_map
1016        futures::future::try_join_all(
1017            vis_maps
1018                .into_iter()
1019                .zip_eq_fast(self.outputs.iter_mut())
1020                .map(|(vis_map, output)| async {
1021                    let vis_map = vis_map.finish();
1022                    // Combine hash routing visibility with noop update elimination visibility.
1023                    // A row is visible only if it passes BOTH:
1024                    // 1. Hash routing (the row should go to this output)
1025                    // 2. Noop update elimination (the row was not eliminated as a noop)
1026                    let combined_vis = &vis_map & chunk_vis;
1027                    let new_stream_chunk = StreamChunk::with_visibility(
1028                        ops.clone(),
1029                        chunk.columns().into(),
1030                        combined_vis,
1031                    );
1032                    if new_stream_chunk.cardinality() > 0 {
1033                        event!(
1034                            tracing::Level::TRACE,
1035                            msg = "chunk",
1036                            downstream = %output.actor_id(),
1037                            "send = \n{:#?}",
1038                            new_stream_chunk
1039                        );
1040                        output
1041                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1042                            .await?;
1043                    }
1044                    StreamResult::Ok(())
1045                }),
1046        )
1047        .await?;
1048
1049        Ok(())
1050    }
1051
1052    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1053        self.outputs
1054            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1055            .count();
1056    }
1057
1058    fn dispatcher_id(&self) -> DispatcherId {
1059        self.dispatcher_id
1060    }
1061
1062    fn is_empty(&self) -> bool {
1063        self.outputs.is_empty()
1064    }
1065}
1066
1067/// `BroadcastDispatcher` dispatches message to all outputs.
1068#[derive(Debug)]
1069pub struct BroadcastDispatcher {
1070    outputs: HashMap<ActorId, Output>,
1071    output_mapping: DispatchOutputMapping,
1072    dispatcher_id: DispatcherId,
1073}
1074
1075impl BroadcastDispatcher {
1076    pub fn new(
1077        outputs: impl IntoIterator<Item = Output>,
1078        output_mapping: DispatchOutputMapping,
1079        dispatcher_id: DispatcherId,
1080    ) -> Self {
1081        Self {
1082            outputs: Self::into_pairs(outputs).collect(),
1083            output_mapping,
1084            dispatcher_id,
1085        }
1086    }
1087
1088    fn into_pairs(
1089        outputs: impl IntoIterator<Item = Output>,
1090    ) -> impl Iterator<Item = (ActorId, Output)> {
1091        outputs
1092            .into_iter()
1093            .map(|output| (output.actor_id(), output))
1094    }
1095}
1096
1097impl Dispatcher for BroadcastDispatcher {
1098    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1099        let chunk = self.output_mapping.apply(chunk);
1100        broadcast_concurrent(
1101            self.outputs.values_mut(),
1102            DispatcherMessageBatch::Chunk(chunk),
1103        )
1104        .await
1105    }
1106
1107    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1108        // always broadcast barrier
1109        broadcast_concurrent(
1110            self.outputs.values_mut(),
1111            DispatcherMessageBatch::BarrierBatch(barriers),
1112        )
1113        .await
1114    }
1115
1116    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1117        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1118            // always broadcast watermark
1119            broadcast_concurrent(
1120                self.outputs.values_mut(),
1121                DispatcherMessageBatch::Watermark(watermark),
1122            )
1123            .await?;
1124        }
1125        Ok(())
1126    }
1127
1128    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1129        self.outputs.extend(Self::into_pairs(outputs));
1130    }
1131
1132    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1133        self.outputs
1134            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1135            .count();
1136    }
1137
1138    fn dispatcher_id(&self) -> DispatcherId {
1139        self.dispatcher_id
1140    }
1141
1142    fn is_empty(&self) -> bool {
1143        self.outputs.is_empty()
1144    }
1145}
1146
1147/// `SimpleDispatcher` dispatches message to a single output.
1148#[derive(Debug)]
1149pub struct SimpleDispatcher {
1150    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1151    /// the field needs to be temporarily set to 0 or 2 outputs.
1152    ///
1153    /// - When dropping a materialized view, the output will be removed and this field becomes
1154    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1155    ///   finishing processing the current mutation.
1156    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1157    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1158    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1159    ///   will be removed and this field becomes single again.
1160    ///
1161    /// Therefore, when dispatching data, we assert that there's exactly one output by
1162    /// `Self::output`.
1163    output: SmallVec<[Output; 2]>,
1164    output_mapping: DispatchOutputMapping,
1165    dispatcher_id: DispatcherId,
1166}
1167
1168impl SimpleDispatcher {
1169    pub fn new(
1170        output: Output,
1171        output_mapping: DispatchOutputMapping,
1172        dispatcher_id: DispatcherId,
1173    ) -> Self {
1174        Self {
1175            output: smallvec![output],
1176            output_mapping,
1177            dispatcher_id,
1178        }
1179    }
1180}
1181
1182impl Dispatcher for SimpleDispatcher {
1183    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1184        self.output.extend(outputs);
1185        assert!(self.output.len() <= 2);
1186    }
1187
1188    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1189        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1190        for output in &mut self.output {
1191            output
1192                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1193                .await?;
1194        }
1195        Ok(())
1196    }
1197
1198    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1199        let output = self
1200            .output
1201            .iter_mut()
1202            .exactly_one()
1203            .expect("expect exactly one output");
1204
1205        let chunk = self.output_mapping.apply(chunk);
1206        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1207    }
1208
1209    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1210        let output = self
1211            .output
1212            .iter_mut()
1213            .exactly_one()
1214            .expect("expect exactly one output");
1215
1216        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1217            output
1218                .send(DispatcherMessageBatch::Watermark(watermark))
1219                .await?;
1220        }
1221        Ok(())
1222    }
1223
1224    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1225        self.output
1226            .retain(|output| !actor_ids.contains(&output.actor_id()));
1227    }
1228
1229    fn dispatcher_id(&self) -> DispatcherId {
1230        self.dispatcher_id
1231    }
1232
1233    fn is_empty(&self) -> bool {
1234        self.output.is_empty()
1235    }
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240    use std::hash::{BuildHasher, Hasher};
1241
1242    use futures::pin_mut;
1243    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1244    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1245    use risingwave_common::util::epoch::test_epoch;
1246    use risingwave_common::util::hash_util::Crc32FastBuilder;
1247    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1248    use tokio::sync::mpsc::unbounded_channel;
1249
1250    use super::*;
1251    use crate::executor::exchange::output::Output;
1252    use crate::executor::exchange::permit::channel_for_test;
1253    use crate::executor::receiver::ReceiverExecutor;
1254    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1255    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1256
1257    #[tokio::test]
1258    async fn test_hash_dispatcher_complex() {
1259        // This test only works when vnode count is 256.
1260        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1261
1262        let num_outputs = 2; // actor id ranges from 1 to 2
1263        let key_indices = &[0, 2];
1264        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1265            (0..num_outputs).map(|_| channel_for_test()).collect();
1266        let outputs = output_tx_vecs
1267            .into_iter()
1268            .enumerate()
1269            .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1270            .collect::<Vec<_>>();
1271        let mut hash_mapping = (1..num_outputs + 1)
1272            .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1273            .collect_vec();
1274        hash_mapping.resize(
1275            VirtualNode::COUNT_FOR_TEST,
1276            ActorId::new(num_outputs as u32),
1277        );
1278        let mut hash_dispatcher = HashDataDispatcher::new(
1279            outputs,
1280            key_indices.to_vec(),
1281            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1282            hash_mapping,
1283            0.into(),
1284        );
1285
1286        let chunk = StreamChunk::from_pretty(
1287            "  I I I
1288            +  4 6 8
1289            +  5 7 9
1290            +  0 0 0
1291            -  1 1 1 D
1292            U- 2 0 2
1293            U+ 2 0 2
1294            U- 3 3 2
1295            U+ 3 3 4",
1296        );
1297        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1298
1299        assert_eq!(
1300            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1301            StreamChunk::from_pretty(
1302                "  I I I
1303                +  4 6 8
1304                +  5 7 9
1305                +  0 0 0
1306                -  1 1 1 D
1307                U- 2 0 2
1308                U+ 2 0 2
1309                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1310                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1311            )
1312        );
1313        assert_eq!(
1314            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1315            StreamChunk::from_pretty(
1316                "  I I I
1317                +  4 6 8 D
1318                +  5 7 9 D
1319                +  0 0 0 D
1320                -  1 1 1 D  // Should keep original invisible mark
1321                U- 2 0 2 D  // Should keep UpdateDelete
1322                U+ 2 0 2 D  // Should keep UpdateInsert
1323                -  3 3 2    // Should rewrite UpdateDelete to Delete
1324                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1325            )
1326        );
1327    }
1328
1329    #[tokio::test]
1330    async fn test_configuration_change() {
1331        let _schema = Schema { fields: vec![] };
1332        let (tx, rx) = channel_for_test();
1333        let actor_id = 233.into();
1334        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1335
1336        let (untouched, old, new) = (234.into(), 235.into(), 238.into()); // broadcast downstream actors
1337        let (old_simple, new_simple) = (114.into(), 514.into()); // simple downstream actors
1338
1339        // actor_id -> untouched, old, new, old_simple, new_simple
1340
1341        let broadcast_dispatcher_id = 666.into();
1342        let broadcast_dispatcher = PbDispatcher {
1343            r#type: DispatcherType::Broadcast as _,
1344            dispatcher_id: broadcast_dispatcher_id,
1345            downstream_actor_id: vec![untouched, old],
1346            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1347            ..Default::default()
1348        };
1349
1350        let simple_dispatcher_id = 888.into();
1351        let simple_dispatcher = PbDispatcher {
1352            r#type: DispatcherType::Simple as _,
1353            dispatcher_id: simple_dispatcher_id,
1354            downstream_actor_id: vec![old_simple],
1355            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1356            ..Default::default()
1357        };
1358
1359        let dispatcher_updates = maplit::hashmap! {
1360            actor_id => vec![PbDispatcherUpdate {
1361                actor_id,
1362                dispatcher_id: broadcast_dispatcher_id,
1363                added_downstream_actor_id: vec![new],
1364                removed_downstream_actor_id: vec![old],
1365                hash_mapping: Default::default(),
1366            }]
1367        };
1368        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1369            UpdateMutation {
1370                dispatchers: dispatcher_updates,
1371                ..Default::default()
1372            },
1373        ));
1374        barrier_test_env.inject_barrier(&b1, [actor_id]);
1375        barrier_test_env.flush_all_events().await;
1376
1377        let input = Executor::new(
1378            Default::default(),
1379            ReceiverExecutor::for_test(
1380                actor_id,
1381                rx,
1382                barrier_test_env.local_barrier_manager.clone(),
1383            )
1384            .boxed(),
1385        );
1386
1387        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1388        let mut rxs = [untouched, old, new, old_simple, new_simple]
1389            .into_iter()
1390            .map(|id| {
1391                (id, {
1392                    let (tx, rx) = channel_for_test();
1393                    new_output_request_tx
1394                        .send((id, NewOutputRequest::Local(tx)))
1395                        .unwrap();
1396                    rx
1397                })
1398            })
1399            .collect::<HashMap<_, _>>();
1400        let executor = Box::new(
1401            DispatchExecutor::new(
1402                input,
1403                new_output_request_rx,
1404                vec![broadcast_dispatcher, simple_dispatcher],
1405                &ActorContext::for_test(actor_id),
1406            )
1407            .await
1408            .unwrap(),
1409        )
1410        .execute();
1411
1412        pin_mut!(executor);
1413
1414        macro_rules! try_recv {
1415            ($down_id:expr) => {
1416                rxs.get_mut(&$down_id).unwrap().try_recv()
1417            };
1418        }
1419
1420        // 3. Send a chunk.
1421        tx.send(Message::Chunk(StreamChunk::default()).into())
1422            .await
1423            .unwrap();
1424
1425        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1426            .await
1427            .unwrap();
1428        executor.next().await.unwrap().unwrap();
1429
1430        // 5. Check downstream.
1431        try_recv!(untouched).unwrap().as_chunk().unwrap();
1432        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1433
1434        try_recv!(old).unwrap().as_chunk().unwrap();
1435        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1436
1437        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1438
1439        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1440        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1441
1442        // 6. Send another barrier.
1443        let b2 = Barrier::new_test_barrier(test_epoch(2));
1444        barrier_test_env.inject_barrier(&b2, [actor_id]);
1445        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1446            .await
1447            .unwrap();
1448        executor.next().await.unwrap().unwrap();
1449
1450        // 7. Check downstream.
1451        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1452        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1453        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1454
1455        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1456        try_recv!(new_simple).unwrap_err(); // Untouched.
1457
1458        // 8. Send another chunk.
1459        tx.send(Message::Chunk(StreamChunk::default()).into())
1460            .await
1461            .unwrap();
1462
1463        // 9. Send a configuration change barrier for simple dispatcher.
1464        let dispatcher_updates = maplit::hashmap! {
1465            actor_id => vec![PbDispatcherUpdate {
1466                actor_id,
1467                dispatcher_id: simple_dispatcher_id,
1468                added_downstream_actor_id: vec![new_simple],
1469                removed_downstream_actor_id: vec![old_simple],
1470                hash_mapping: Default::default(),
1471            }]
1472        };
1473        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1474            UpdateMutation {
1475                dispatchers: dispatcher_updates,
1476                ..Default::default()
1477            },
1478        ));
1479        barrier_test_env.inject_barrier(&b3, [actor_id]);
1480        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1481            .await
1482            .unwrap();
1483        executor.next().await.unwrap().unwrap();
1484
1485        // 10. Check downstream.
1486        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1487        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1488
1489        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1490
1491        // 11. Send another barrier.
1492        let b4 = Barrier::new_test_barrier(test_epoch(4));
1493        barrier_test_env.inject_barrier(&b4, [actor_id]);
1494        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1495            .await
1496            .unwrap();
1497        executor.next().await.unwrap().unwrap();
1498
1499        // 12. Check downstream.
1500        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1501        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1502    }
1503
1504    #[tokio::test]
1505    async fn test_hash_dispatcher() {
1506        // This test only works when vnode count is 256.
1507        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1508
1509        let num_outputs = 5; // actor id ranges from 1 to 5
1510        let cardinality = 10;
1511        let dimension = 4;
1512        let key_indices = &[0, 2];
1513        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1514            (0..num_outputs).map(|_| channel_for_test()).collect();
1515        let outputs = output_tx_vecs
1516            .into_iter()
1517            .enumerate()
1518            .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1519            .collect::<Vec<_>>();
1520        let mut hash_mapping = (1..num_outputs + 1)
1521            .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1522            .collect_vec();
1523        hash_mapping.resize(
1524            VirtualNode::COUNT_FOR_TEST,
1525            ActorId::new(num_outputs as u32),
1526        );
1527        let mut hash_dispatcher = HashDataDispatcher::new(
1528            outputs,
1529            key_indices.to_vec(),
1530            DispatchOutputMapping::Simple((0..dimension).collect()),
1531            hash_mapping.clone(),
1532            0.into(),
1533        );
1534
1535        let mut ops = Vec::new();
1536        for idx in 0..cardinality {
1537            if idx % 2 == 0 {
1538                ops.push(Op::Insert);
1539            } else {
1540                ops.push(Op::Delete);
1541            }
1542        }
1543
1544        let mut start = 19260817i32..;
1545        let mut builders = (0..dimension)
1546            .map(|_| I32ArrayBuilder::new(cardinality))
1547            .collect_vec();
1548        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1549        let mut output_ops = vec![vec![]; num_outputs];
1550        for op in &ops {
1551            let hash_builder = Crc32FastBuilder;
1552            let mut hasher = hash_builder.build_hasher();
1553            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1554            for key_idx in key_indices {
1555                let val = one_row[*key_idx];
1556                let bytes = val.to_le_bytes();
1557                hasher.update(&bytes);
1558            }
1559            let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1560                .as_raw_id() as usize
1561                - 1;
1562            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1563                builder.append(Some(*val));
1564            }
1565            output_cols[output_idx]
1566                .iter_mut()
1567                .zip_eq_fast(one_row.iter())
1568                .for_each(|(each_column, val)| each_column.push(*val));
1569            output_ops[output_idx].push(op);
1570        }
1571
1572        let columns = builders
1573            .into_iter()
1574            .map(|builder| {
1575                let array = builder.finish();
1576                array.into_ref()
1577            })
1578            .collect();
1579
1580        let chunk = StreamChunk::new(ops, columns);
1581        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1582
1583        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1584            let mut output = vec![];
1585            while let Some(Some(msg)) = rx.recv().now_or_never() {
1586                output.push(msg);
1587            }
1588            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1589            assert!(output.len() <= 1);
1590            if output.is_empty() {
1591                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1592            } else {
1593                let message = output.first().unwrap();
1594                let real_chunk = match message {
1595                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1596                    _ => panic!(),
1597                };
1598                real_chunk
1599                    .columns()
1600                    .iter()
1601                    .zip_eq_fast(output_cols[output_idx].iter())
1602                    .for_each(|(real_col, expect_col)| {
1603                        let real_vals = real_chunk
1604                            .visibility()
1605                            .iter_ones()
1606                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1607                            .collect::<Vec<_>>();
1608                        assert_eq!(real_vals.len(), expect_col.len());
1609                        assert_eq!(real_vals, *expect_col);
1610                    });
1611            }
1612        }
1613    }
1614}