risingwave_stream/executor/
dispatch.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use futures::{FutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::BitmapBuilder;
25use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
26use risingwave_common::metrics::LabelGuardedIntCounter;
27use risingwave_common::util::iter_util::ZipEqFast;
28use risingwave_pb::stream_plan::PbDispatcher;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use smallvec::{SmallVec, smallvec};
31use tokio::time::Instant;
32use tokio_stream::StreamExt;
33use tokio_stream::adapters::Peekable;
34use tracing::{Instrument, event};
35
36use super::exchange::output::{BoxedOutput, new_output};
37use super::{
38    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
39    UpdateMutation,
40};
41use crate::executor::StreamConsumer;
42use crate::executor::prelude::*;
43use crate::task::{DispatcherId, SharedContext};
44
45/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
46/// data chunks will be dispatched with some specified policy, while control message
47/// such as barriers will be distributed to all receivers.
48pub struct DispatchExecutor {
49    input: Executor,
50    inner: DispatchExecutorInner,
51}
52
53struct DispatcherWithMetrics {
54    dispatcher: DispatcherImpl,
55    actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter<3>,
56}
57
58impl DispatcherWithMetrics {
59    pub fn record_output_buffer_blocking_duration(&self, duration: Duration) {
60        let ns = duration.as_nanos() as u64;
61        self.actor_output_buffer_blocking_duration_ns.inc_by(ns);
62    }
63}
64
65impl Debug for DispatcherWithMetrics {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        self.dispatcher.fmt(f)
68    }
69}
70
71impl Deref for DispatcherWithMetrics {
72    type Target = DispatcherImpl;
73
74    fn deref(&self) -> &Self::Target {
75        &self.dispatcher
76    }
77}
78
79impl DerefMut for DispatcherWithMetrics {
80    fn deref_mut(&mut self) -> &mut Self::Target {
81        &mut self.dispatcher
82    }
83}
84
85struct DispatchExecutorMetrics {
86    actor_id_str: String,
87    fragment_id_str: String,
88    metrics: Arc<StreamingMetrics>,
89    actor_out_record_cnt: LabelGuardedIntCounter<2>,
90}
91
92impl DispatchExecutorMetrics {
93    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
94        DispatcherWithMetrics {
95            actor_output_buffer_blocking_duration_ns: self
96                .metrics
97                .actor_output_buffer_blocking_duration_ns
98                .with_guarded_label_values(&[
99                    &self.actor_id_str,
100                    &self.fragment_id_str,
101                    dispatcher.dispatcher_id_str(),
102                ]),
103            dispatcher,
104        }
105    }
106}
107
108struct DispatchExecutorInner {
109    dispatchers: Vec<DispatcherWithMetrics>,
110    actor_id: u32,
111    context: Arc<SharedContext>,
112    metrics: DispatchExecutorMetrics,
113}
114
115impl DispatchExecutorInner {
116    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
117        let limit = (self.context.config.developer).exchange_concurrent_dispatchers;
118        // Only barrier can be batched for now.
119        match msg {
120            MessageBatch::BarrierBatch(barrier_batch) => {
121                if barrier_batch.is_empty() {
122                    return Ok(());
123                }
124                // Only the first barrier in a batch can be mutation.
125                let mutation = barrier_batch[0].mutation.clone();
126                self.pre_mutate_dispatchers(&mutation)?;
127                futures::stream::iter(self.dispatchers.iter_mut())
128                    .map(Ok)
129                    .try_for_each_concurrent(limit, |dispatcher| async {
130                        let start_time = Instant::now();
131                        dispatcher
132                            .dispatch_barriers(
133                                barrier_batch
134                                    .iter()
135                                    .cloned()
136                                    .map(|b| b.into_dispatcher())
137                                    .collect(),
138                            )
139                            .await?;
140                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
141                        StreamResult::Ok(())
142                    })
143                    .await?;
144                self.post_mutate_dispatchers(&mutation)?;
145            }
146            MessageBatch::Watermark(watermark) => {
147                futures::stream::iter(self.dispatchers.iter_mut())
148                    .map(Ok)
149                    .try_for_each_concurrent(limit, |dispatcher| async {
150                        let start_time = Instant::now();
151                        dispatcher.dispatch_watermark(watermark.clone()).await?;
152                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
153                        StreamResult::Ok(())
154                    })
155                    .await?;
156            }
157            MessageBatch::Chunk(chunk) => {
158                futures::stream::iter(self.dispatchers.iter_mut())
159                    .map(Ok)
160                    .try_for_each_concurrent(limit, |dispatcher| async {
161                        let start_time = Instant::now();
162                        dispatcher.dispatch_data(chunk.clone()).await?;
163                        dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
164                        StreamResult::Ok(())
165                    })
166                    .await?;
167
168                self.metrics
169                    .actor_out_record_cnt
170                    .inc_by(chunk.cardinality() as _);
171            }
172        }
173        Ok(())
174    }
175
176    /// Add new dispatchers to the executor. Will check whether their ids are unique.
177    fn add_dispatchers<'a>(
178        &mut self,
179        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
180    ) -> StreamResult<()> {
181        let new_dispatchers: Vec<_> = new_dispatchers
182            .into_iter()
183            .map(|d| {
184                DispatcherImpl::new(&self.context, self.actor_id, d)
185                    .map(|dispatcher| self.metrics.monitor_dispatcher(dispatcher))
186            })
187            .try_collect()?;
188
189        self.dispatchers.extend(new_dispatchers);
190
191        assert!(
192            self.dispatchers
193                .iter()
194                .map(|d| d.dispatcher_id())
195                .all_unique(),
196            "dispatcher ids must be unique: {:?}",
197            self.dispatchers
198        );
199
200        Ok(())
201    }
202
203    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
204        self.dispatchers
205            .iter_mut()
206            .find(|d| d.dispatcher_id() == dispatcher_id)
207            .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
208    }
209
210    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
211    /// outputs.
212    fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
213        let outputs: Vec<_> = update
214            .added_downstream_actor_id
215            .iter()
216            .map(|&id| new_output(&self.context, self.actor_id, id))
217            .try_collect()?;
218
219        let dispatcher = self.find_dispatcher(update.dispatcher_id);
220        dispatcher.add_outputs(outputs);
221
222        Ok(())
223    }
224
225    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
226    /// update the hash mapping.
227    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
228        let ids = update.removed_downstream_actor_id.iter().copied().collect();
229
230        let dispatcher = self.find_dispatcher(update.dispatcher_id);
231        dispatcher.remove_outputs(&ids);
232
233        // The hash mapping is only used by the hash dispatcher.
234        //
235        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
236        // it's possible that there're multiple upstreams with different exchange types, for
237        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
238        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
239        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
240            dispatcher.hash_mapping =
241                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
242        }
243
244        Ok(())
245    }
246
247    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
248    fn pre_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
249        let Some(mutation) = mutation.as_deref() else {
250            return Ok(());
251        };
252
253        match mutation {
254            Mutation::Add(AddMutation { adds, .. }) => {
255                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
256                    self.add_dispatchers(new_dispatchers)?;
257                }
258            }
259            Mutation::Update(UpdateMutation {
260                dispatchers,
261                actor_new_dispatchers: actor_dispatchers,
262                ..
263            }) => {
264                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
265                    self.add_dispatchers(new_dispatchers)?;
266                }
267
268                if let Some(updates) = dispatchers.get(&self.actor_id) {
269                    for update in updates {
270                        self.pre_update_dispatcher(update)?;
271                    }
272                }
273            }
274            Mutation::AddAndUpdate(
275                AddMutation { adds, .. },
276                UpdateMutation {
277                    dispatchers,
278                    actor_new_dispatchers: actor_dispatchers,
279                    ..
280                },
281            ) => {
282                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
283                    self.add_dispatchers(new_dispatchers)?;
284                }
285
286                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
287                    self.add_dispatchers(new_dispatchers)?;
288                }
289
290                if let Some(updates) = dispatchers.get(&self.actor_id) {
291                    for update in updates {
292                        self.pre_update_dispatcher(update)?;
293                    }
294                }
295            }
296            _ => {}
297        }
298
299        Ok(())
300    }
301
302    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
303    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
304        let Some(mutation) = mutation.as_deref() else {
305            return Ok(());
306        };
307
308        match mutation {
309            Mutation::Stop(stops) => {
310                // Remove outputs only if this actor itself is not to be stopped.
311                if !stops.contains(&self.actor_id) {
312                    for dispatcher in &mut self.dispatchers {
313                        dispatcher.remove_outputs(stops);
314                    }
315                }
316            }
317            Mutation::Update(UpdateMutation {
318                dispatchers,
319                dropped_actors,
320                ..
321            })
322            | Mutation::AddAndUpdate(
323                _,
324                UpdateMutation {
325                    dispatchers,
326                    dropped_actors,
327                    ..
328                },
329            ) => {
330                if let Some(updates) = dispatchers.get(&self.actor_id) {
331                    for update in updates {
332                        self.post_update_dispatcher(update)?;
333                    }
334                }
335
336                if !dropped_actors.contains(&self.actor_id) {
337                    for dispatcher in &mut self.dispatchers {
338                        dispatcher.remove_outputs(dropped_actors);
339                    }
340                }
341            }
342            _ => {}
343        };
344
345        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
346        // should clean up them.
347        self.dispatchers.retain(|d| !d.is_empty());
348
349        Ok(())
350    }
351}
352
353impl DispatchExecutor {
354    pub fn new(
355        mut input: Executor,
356        dispatchers: Vec<DispatcherImpl>,
357        actor_id: u32,
358        fragment_id: u32,
359        context: Arc<SharedContext>,
360        metrics: Arc<StreamingMetrics>,
361        chunk_size: usize,
362    ) -> Self {
363        if crate::consistency::insane() {
364            // make some trouble before dispatching to avoid generating invalid dist key.
365            let mut info = input.info().clone();
366            info.identity = format!("{} (embedded trouble)", info.identity);
367            let troublemaker = TroublemakerExecutor::new(input, chunk_size);
368            input = (info, troublemaker).into();
369        }
370
371        let actor_id_str = actor_id.to_string();
372        let fragment_id_str = fragment_id.to_string();
373        let actor_out_record_cnt = metrics
374            .actor_out_record_cnt
375            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
376        let metrics = DispatchExecutorMetrics {
377            actor_id_str,
378            fragment_id_str,
379            metrics,
380            actor_out_record_cnt,
381        };
382        let dispatchers = dispatchers
383            .into_iter()
384            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
385            .collect();
386        Self {
387            input,
388            inner: DispatchExecutorInner {
389                dispatchers,
390                actor_id,
391                context,
392                metrics,
393            },
394        }
395    }
396}
397
398impl StreamConsumer for DispatchExecutor {
399    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
400
401    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
402        let max_barrier_count_per_batch =
403            self.inner.context.config.developer.max_barrier_batch_size;
404        #[try_stream]
405        async move {
406            let mut input = self.input.execute().peekable();
407            loop {
408                let Some(message) =
409                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
410                else {
411                    // end_of_stream
412                    break;
413                };
414                match message {
415                    chunk @ MessageBatch::Chunk(_) => {
416                        self.inner
417                            .dispatch(chunk)
418                            .instrument(tracing::info_span!("dispatch_chunk"))
419                            .instrument_await("dispatch_chunk")
420                            .await?;
421                    }
422                    MessageBatch::BarrierBatch(barrier_batch) => {
423                        assert!(!barrier_batch.is_empty());
424                        self.inner
425                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
426                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
427                            .instrument_await("dispatch_barrier_batch")
428                            .await?;
429                        self.inner
430                            .metrics
431                            .metrics
432                            .barrier_batch_size
433                            .observe(barrier_batch.len() as f64);
434                        for barrier in barrier_batch {
435                            yield barrier;
436                        }
437                    }
438                    watermark @ MessageBatch::Watermark(_) => {
439                        self.inner
440                            .dispatch(watermark)
441                            .instrument(tracing::info_span!("dispatch_watermark"))
442                            .instrument_await("dispatch_watermark")
443                            .await?;
444                    }
445                }
446            }
447        }
448    }
449}
450
451/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
452///
453/// Returns the message batch.
454///
455/// Returns None if end of stream.
456async fn try_batch_barriers(
457    max_barrier_count_per_batch: u32,
458    input: &mut Peekable<BoxedMessageStream>,
459) -> StreamResult<Option<MessageBatch>> {
460    let Some(msg) = input.next().await else {
461        // end_of_stream
462        return Ok(None);
463    };
464    let mut barrier_batch = vec![];
465    let msg: Message = msg?;
466    let max_peek_attempts = match msg {
467        Message::Chunk(c) => {
468            return Ok(Some(MessageBatch::Chunk(c)));
469        }
470        Message::Watermark(w) => {
471            return Ok(Some(MessageBatch::Watermark(w)));
472        }
473        Message::Barrier(b) => {
474            let peek_more_barrier = b.mutation.is_none();
475            barrier_batch.push(b);
476            if peek_more_barrier {
477                max_barrier_count_per_batch.saturating_sub(1)
478            } else {
479                0
480            }
481        }
482    };
483    // Try to peek more consecutive non-mutation barriers.
484    for _ in 0..max_peek_attempts {
485        let peek = input.peek().now_or_never();
486        let Some(peek) = peek else {
487            break;
488        };
489        let Some(msg) = peek else {
490            // end_of_stream
491            break;
492        };
493        let Ok(Message::Barrier(barrier)) = msg else {
494            break;
495        };
496        if barrier.mutation.is_some() {
497            break;
498        }
499        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
500        let Message::Barrier(ref barrier) = msg else {
501            unreachable!("must be a barrier");
502        };
503        barrier_batch.push(barrier.clone());
504    }
505    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
506}
507
508#[derive(Debug)]
509pub enum DispatcherImpl {
510    Hash(HashDataDispatcher),
511    Broadcast(BroadcastDispatcher),
512    Simple(SimpleDispatcher),
513    RoundRobin(RoundRobinDataDispatcher),
514}
515
516impl DispatcherImpl {
517    pub fn new(
518        context: &SharedContext,
519        actor_id: ActorId,
520        dispatcher: &PbDispatcher,
521    ) -> StreamResult<Self> {
522        let outputs = dispatcher
523            .downstream_actor_id
524            .iter()
525            .map(|&down_id| new_output(context, actor_id, down_id))
526            .collect::<StreamResult<Vec<_>>>()?;
527
528        let output_indices = dispatcher
529            .output_indices
530            .iter()
531            .map(|&i| i as usize)
532            .collect_vec();
533
534        use risingwave_pb::stream_plan::DispatcherType::*;
535        let dispatcher_impl = match dispatcher.get_type()? {
536            Hash => {
537                assert!(!outputs.is_empty());
538                let dist_key_indices = dispatcher
539                    .dist_key_indices
540                    .iter()
541                    .map(|i| *i as usize)
542                    .collect();
543
544                let hash_mapping =
545                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
546
547                DispatcherImpl::Hash(HashDataDispatcher::new(
548                    outputs,
549                    dist_key_indices,
550                    output_indices,
551                    hash_mapping,
552                    dispatcher.dispatcher_id,
553                ))
554            }
555            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
556                outputs,
557                output_indices,
558                dispatcher.dispatcher_id,
559            )),
560            Simple | NoShuffle => {
561                let [output]: [_; 1] = outputs.try_into().unwrap();
562                DispatcherImpl::Simple(SimpleDispatcher::new(
563                    output,
564                    output_indices,
565                    dispatcher.dispatcher_id,
566                ))
567            }
568            Unspecified => unreachable!(),
569        };
570
571        Ok(dispatcher_impl)
572    }
573}
574
575macro_rules! impl_dispatcher {
576    ($( { $variant_name:ident } ),*) => {
577        impl DispatcherImpl {
578            pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
579                match self {
580                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
581                }
582            }
583
584            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
585                match self {
586                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
587                }
588            }
589
590            pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
591                match self {
592                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
593                }
594            }
595
596            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
597                match self {
598                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
599                }
600            }
601
602            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
603                match self {
604                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
605                }
606            }
607
608            pub fn dispatcher_id(&self) -> DispatcherId {
609                match self {
610                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
611                }
612            }
613
614            pub fn dispatcher_id_str(&self) -> &str {
615                match self {
616                    $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
617                }
618            }
619
620            pub fn is_empty(&self) -> bool {
621                match self {
622                    $(Self::$variant_name(inner) => inner.is_empty(), )*
623                }
624            }
625        }
626    }
627}
628
629macro_rules! for_all_dispatcher_variants {
630    ($macro:ident) => {
631        $macro! {
632            { Hash },
633            { Broadcast },
634            { Simple },
635            { RoundRobin }
636        }
637    };
638}
639
640for_all_dispatcher_variants! { impl_dispatcher }
641
642pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
643
644pub trait Dispatcher: Debug + 'static {
645    /// Dispatch a data chunk to downstream actors.
646    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
647    /// Dispatch barriers to downstream actors, generally by broadcasting it.
648    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
649    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
650    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
651
652    /// Add new outputs to the dispatcher.
653    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>);
654    /// Remove outputs to `actor_ids` from the dispatcher.
655    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
656
657    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
658    /// different IDs.
659    ///
660    /// Note that the dispatcher id is always equal to the downstream fragment id.
661    /// See also `proto/stream_plan.proto`.
662    fn dispatcher_id(&self) -> DispatcherId;
663
664    /// Dispatcher id in string. See [`Dispatcher::dispatcher_id`].
665    fn dispatcher_id_str(&self) -> &str;
666
667    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
668    /// [`DispatchExecutor`].
669    fn is_empty(&self) -> bool;
670}
671
672/// Concurrently broadcast a message to all outputs.
673///
674/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
675/// always unlimited.
676async fn broadcast_concurrent(
677    outputs: impl IntoIterator<Item = &'_ mut BoxedOutput>,
678    message: DispatcherMessageBatch,
679) -> StreamResult<()> {
680    futures::future::try_join_all(
681        outputs
682            .into_iter()
683            .map(|output| output.send(message.clone())),
684    )
685    .await?;
686    Ok(())
687}
688
689#[derive(Debug)]
690pub struct RoundRobinDataDispatcher {
691    outputs: Vec<BoxedOutput>,
692    output_indices: Vec<usize>,
693    cur: usize,
694    dispatcher_id: DispatcherId,
695    dispatcher_id_str: String,
696}
697
698impl RoundRobinDataDispatcher {
699    pub fn new(
700        outputs: Vec<BoxedOutput>,
701        output_indices: Vec<usize>,
702        dispatcher_id: DispatcherId,
703    ) -> Self {
704        Self {
705            outputs,
706            output_indices,
707            cur: 0,
708            dispatcher_id,
709            dispatcher_id_str: dispatcher_id.to_string(),
710        }
711    }
712}
713
714impl Dispatcher for RoundRobinDataDispatcher {
715    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
716        let chunk = if self.output_indices.len() < chunk.columns().len() {
717            chunk
718                .project(&self.output_indices)
719                .eliminate_adjacent_noop_update()
720        } else {
721            chunk.project(&self.output_indices)
722        };
723
724        self.outputs[self.cur]
725            .send(DispatcherMessageBatch::Chunk(chunk))
726            .await?;
727        self.cur += 1;
728        self.cur %= self.outputs.len();
729        Ok(())
730    }
731
732    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
733        // always broadcast barrier
734        broadcast_concurrent(
735            &mut self.outputs,
736            DispatcherMessageBatch::BarrierBatch(barriers),
737        )
738        .await
739    }
740
741    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
742        if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
743            // always broadcast watermark
744            broadcast_concurrent(
745                &mut self.outputs,
746                DispatcherMessageBatch::Watermark(watermark),
747            )
748            .await?;
749        }
750        Ok(())
751    }
752
753    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
754        self.outputs.extend(outputs);
755    }
756
757    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
758        self.outputs
759            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
760            .count();
761        self.cur = self.cur.min(self.outputs.len() - 1);
762    }
763
764    fn dispatcher_id(&self) -> DispatcherId {
765        self.dispatcher_id
766    }
767
768    fn dispatcher_id_str(&self) -> &str {
769        &self.dispatcher_id_str
770    }
771
772    fn is_empty(&self) -> bool {
773        self.outputs.is_empty()
774    }
775}
776
777pub struct HashDataDispatcher {
778    outputs: Vec<BoxedOutput>,
779    keys: Vec<usize>,
780    output_indices: Vec<usize>,
781    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
782    /// different downstream actors.
783    hash_mapping: ExpandedActorMapping,
784    dispatcher_id: DispatcherId,
785    dispatcher_id_str: String,
786}
787
788impl Debug for HashDataDispatcher {
789    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
790        f.debug_struct("HashDataDispatcher")
791            .field("outputs", &self.outputs)
792            .field("keys", &self.keys)
793            .field("dispatcher_id", &self.dispatcher_id)
794            .finish_non_exhaustive()
795    }
796}
797
798impl HashDataDispatcher {
799    pub fn new(
800        outputs: Vec<BoxedOutput>,
801        keys: Vec<usize>,
802        output_indices: Vec<usize>,
803        hash_mapping: ExpandedActorMapping,
804        dispatcher_id: DispatcherId,
805    ) -> Self {
806        Self {
807            outputs,
808            keys,
809            output_indices,
810            hash_mapping,
811            dispatcher_id,
812            dispatcher_id_str: dispatcher_id.to_string(),
813        }
814    }
815}
816
817impl Dispatcher for HashDataDispatcher {
818    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
819        self.outputs.extend(outputs);
820    }
821
822    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
823        // always broadcast barrier
824        broadcast_concurrent(
825            &mut self.outputs,
826            DispatcherMessageBatch::BarrierBatch(barriers),
827        )
828        .await
829    }
830
831    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
832        if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
833            // always broadcast watermark
834            broadcast_concurrent(
835                &mut self.outputs,
836                DispatcherMessageBatch::Watermark(watermark),
837            )
838            .await?;
839        }
840        Ok(())
841    }
842
843    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
844        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
845        // In these output chunks, the only difference are visibility map, which is calculated
846        // by the hash value of each line in the input chunk.
847        let num_outputs = self.outputs.len();
848
849        // get hash value of every line by its key
850        let vnode_count = self.hash_mapping.len();
851        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
852
853        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
854
855        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
856            .take(num_outputs)
857            .collect_vec();
858        let mut last_vnode_when_update_delete = None;
859        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
860
861        // Apply output indices after calculating the vnode.
862        let chunk = if self.output_indices.len() < chunk.columns().len() {
863            chunk
864                .project(&self.output_indices)
865                .eliminate_adjacent_noop_update()
866        } else {
867            chunk.project(&self.output_indices)
868        };
869
870        for ((vnode, &op), visible) in vnodes
871            .iter()
872            .copied()
873            .zip_eq_fast(chunk.ops())
874            .zip_eq_fast(chunk.visibility().iter())
875        {
876            // Build visibility map for every output chunk.
877            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
878                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
879            }
880
881            if !visible {
882                assert!(
883                    last_vnode_when_update_delete.is_none(),
884                    "invisible row between U- and U+, op = {op:?}",
885                );
886                new_ops.push(op);
887                continue;
888            }
889
890            // The 'update' message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
891            // need to be rewritten to common `Delete` and `Insert` if they were dispatched to
892            // different actors.
893            if op == Op::UpdateDelete {
894                last_vnode_when_update_delete = Some(vnode);
895            } else if op == Op::UpdateInsert {
896                if vnode
897                    != last_vnode_when_update_delete
898                        .take()
899                        .expect("missing U- before U+")
900                {
901                    new_ops.push(Op::Delete);
902                    new_ops.push(Op::Insert);
903                } else {
904                    new_ops.push(Op::UpdateDelete);
905                    new_ops.push(Op::UpdateInsert);
906                }
907            } else {
908                new_ops.push(op);
909            }
910        }
911        assert!(
912            last_vnode_when_update_delete.is_none(),
913            "missing U+ after U-"
914        );
915
916        let ops = new_ops;
917
918        // individually output StreamChunk integrated with vis_map
919        futures::future::try_join_all(
920            vis_maps
921                .into_iter()
922                .zip_eq_fast(self.outputs.iter_mut())
923                .map(|(vis_map, output)| async {
924                    let vis_map = vis_map.finish();
925                    // columns is not changed in this function
926                    let new_stream_chunk =
927                        StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
928                    if new_stream_chunk.cardinality() > 0 {
929                        event!(
930                            tracing::Level::TRACE,
931                            msg = "chunk",
932                            downstream = output.actor_id(),
933                            "send = \n{:#?}",
934                            new_stream_chunk
935                        );
936                        output
937                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
938                            .await?;
939                    }
940                    StreamResult::Ok(())
941                }),
942        )
943        .await?;
944
945        Ok(())
946    }
947
948    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
949        self.outputs
950            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
951            .count();
952    }
953
954    fn dispatcher_id(&self) -> DispatcherId {
955        self.dispatcher_id
956    }
957
958    fn dispatcher_id_str(&self) -> &str {
959        &self.dispatcher_id_str
960    }
961
962    fn is_empty(&self) -> bool {
963        self.outputs.is_empty()
964    }
965}
966
967/// `BroadcastDispatcher` dispatches message to all outputs.
968#[derive(Debug)]
969pub struct BroadcastDispatcher {
970    outputs: HashMap<ActorId, BoxedOutput>,
971    output_indices: Vec<usize>,
972    dispatcher_id: DispatcherId,
973    dispatcher_id_str: String,
974}
975
976impl BroadcastDispatcher {
977    pub fn new(
978        outputs: impl IntoIterator<Item = BoxedOutput>,
979        output_indices: Vec<usize>,
980        dispatcher_id: DispatcherId,
981    ) -> Self {
982        Self {
983            outputs: Self::into_pairs(outputs).collect(),
984            output_indices,
985            dispatcher_id,
986            dispatcher_id_str: dispatcher_id.to_string(),
987        }
988    }
989
990    fn into_pairs(
991        outputs: impl IntoIterator<Item = BoxedOutput>,
992    ) -> impl Iterator<Item = (ActorId, BoxedOutput)> {
993        outputs
994            .into_iter()
995            .map(|output| (output.actor_id(), output))
996    }
997}
998
999impl Dispatcher for BroadcastDispatcher {
1000    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1001        let chunk = if self.output_indices.len() < chunk.columns().len() {
1002            chunk
1003                .project(&self.output_indices)
1004                .eliminate_adjacent_noop_update()
1005        } else {
1006            chunk.project(&self.output_indices)
1007        };
1008        broadcast_concurrent(
1009            self.outputs.values_mut(),
1010            DispatcherMessageBatch::Chunk(chunk),
1011        )
1012        .await
1013    }
1014
1015    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1016        // always broadcast barrier
1017        broadcast_concurrent(
1018            self.outputs.values_mut(),
1019            DispatcherMessageBatch::BarrierBatch(barriers),
1020        )
1021        .await
1022    }
1023
1024    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1025        if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
1026            // always broadcast watermark
1027            broadcast_concurrent(
1028                self.outputs.values_mut(),
1029                DispatcherMessageBatch::Watermark(watermark),
1030            )
1031            .await?;
1032        }
1033        Ok(())
1034    }
1035
1036    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
1037        self.outputs.extend(Self::into_pairs(outputs));
1038    }
1039
1040    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1041        self.outputs
1042            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1043            .count();
1044    }
1045
1046    fn dispatcher_id(&self) -> DispatcherId {
1047        self.dispatcher_id
1048    }
1049
1050    fn dispatcher_id_str(&self) -> &str {
1051        &self.dispatcher_id_str
1052    }
1053
1054    fn is_empty(&self) -> bool {
1055        self.outputs.is_empty()
1056    }
1057}
1058
1059/// `SimpleDispatcher` dispatches message to a single output.
1060#[derive(Debug)]
1061pub struct SimpleDispatcher {
1062    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1063    /// the field needs to be temporarily set to 0 or 2 outputs.
1064    ///
1065    /// - When dropping a materialized view, the output will be removed and this field becomes
1066    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1067    ///   finishing processing the current mutation.
1068    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1069    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1070    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1071    ///   will be removed and this field becomes single again.
1072    ///
1073    /// Therefore, when dispatching data, we assert that there's exactly one output by
1074    /// `Self::output`.
1075    output: SmallVec<[BoxedOutput; 2]>,
1076    output_indices: Vec<usize>,
1077    dispatcher_id: DispatcherId,
1078    dispatcher_id_str: String,
1079}
1080
1081impl SimpleDispatcher {
1082    pub fn new(
1083        output: BoxedOutput,
1084        output_indices: Vec<usize>,
1085        dispatcher_id: DispatcherId,
1086    ) -> Self {
1087        Self {
1088            output: smallvec![output],
1089            output_indices,
1090            dispatcher_id,
1091            dispatcher_id_str: dispatcher_id.to_string(),
1092        }
1093    }
1094}
1095
1096impl Dispatcher for SimpleDispatcher {
1097    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
1098        self.output.extend(outputs);
1099        assert!(self.output.len() <= 2);
1100    }
1101
1102    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1103        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1104        for output in &mut self.output {
1105            output
1106                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1107                .await?;
1108        }
1109        Ok(())
1110    }
1111
1112    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1113        let output = self
1114            .output
1115            .iter_mut()
1116            .exactly_one()
1117            .expect("expect exactly one output");
1118
1119        let chunk = if self.output_indices.len() < chunk.columns().len() {
1120            chunk
1121                .project(&self.output_indices)
1122                .eliminate_adjacent_noop_update()
1123        } else {
1124            chunk.project(&self.output_indices)
1125        };
1126        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1127    }
1128
1129    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1130        let output = self
1131            .output
1132            .iter_mut()
1133            .exactly_one()
1134            .expect("expect exactly one output");
1135
1136        if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
1137            output
1138                .send(DispatcherMessageBatch::Watermark(watermark))
1139                .await?;
1140        }
1141        Ok(())
1142    }
1143
1144    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1145        self.output
1146            .retain(|output| !actor_ids.contains(&output.actor_id()));
1147    }
1148
1149    fn dispatcher_id(&self) -> DispatcherId {
1150        self.dispatcher_id
1151    }
1152
1153    fn dispatcher_id_str(&self) -> &str {
1154        &self.dispatcher_id_str
1155    }
1156
1157    fn is_empty(&self) -> bool {
1158        self.output.is_empty()
1159    }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164    use std::hash::{BuildHasher, Hasher};
1165    use std::sync::Mutex;
1166
1167    use async_trait::async_trait;
1168    use futures::pin_mut;
1169    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1170    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1171    use risingwave_common::config;
1172    use risingwave_common::util::epoch::test_epoch;
1173    use risingwave_common::util::hash_util::Crc32FastBuilder;
1174    use risingwave_pb::stream_plan::DispatcherType;
1175
1176    use super::*;
1177    use crate::executor::exchange::output::Output;
1178    use crate::executor::exchange::permit::channel_for_test;
1179    use crate::executor::receiver::ReceiverExecutor;
1180    use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1181    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1182    use crate::task::test_utils::helper_make_local_actor;
1183
1184    #[derive(Debug)]
1185    pub struct MockOutput {
1186        actor_id: ActorId,
1187        data: Arc<Mutex<Vec<DispatcherMessageBatch>>>,
1188    }
1189
1190    impl MockOutput {
1191        pub fn new(actor_id: ActorId, data: Arc<Mutex<Vec<DispatcherMessageBatch>>>) -> Self {
1192            Self { actor_id, data }
1193        }
1194    }
1195
1196    #[async_trait]
1197    impl Output for MockOutput {
1198        async fn send(&mut self, message: DispatcherMessageBatch) -> StreamResult<()> {
1199            self.data.lock().unwrap().push(message);
1200            Ok(())
1201        }
1202
1203        fn actor_id(&self) -> ActorId {
1204            self.actor_id
1205        }
1206    }
1207
1208    // TODO: this test contains update being shuffled to different partitions, which is not
1209    // supported for now.
1210    #[tokio::test]
1211    async fn test_hash_dispatcher_complex() {
1212        test_hash_dispatcher_complex_inner().await
1213    }
1214
1215    async fn test_hash_dispatcher_complex_inner() {
1216        // This test only works when vnode count is 256.
1217        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1218
1219        let num_outputs = 2; // actor id ranges from 1 to 2
1220        let key_indices = &[0, 2];
1221        let output_data_vecs = (0..num_outputs)
1222            .map(|_| Arc::new(Mutex::new(Vec::new())))
1223            .collect::<Vec<_>>();
1224        let outputs = output_data_vecs
1225            .iter()
1226            .enumerate()
1227            .map(|(actor_id, data)| {
1228                Box::new(MockOutput::new(1 + actor_id as u32, data.clone())) as BoxedOutput
1229            })
1230            .collect::<Vec<_>>();
1231        let mut hash_mapping = (1..num_outputs + 1)
1232            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1233            .collect_vec();
1234        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1235        let mut hash_dispatcher = HashDataDispatcher::new(
1236            outputs,
1237            key_indices.to_vec(),
1238            vec![0, 1, 2],
1239            hash_mapping,
1240            0,
1241        );
1242
1243        let chunk = StreamChunk::from_pretty(
1244            "  I I I
1245            +  4 6 8
1246            +  5 7 9
1247            +  0 0 0
1248            -  1 1 1 D
1249            U- 2 0 2
1250            U+ 2 0 2
1251            U- 3 3 2
1252            U+ 3 3 4",
1253        );
1254        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1255
1256        assert_eq!(
1257            *output_data_vecs[0].lock().unwrap()[0].as_chunk().unwrap(),
1258            StreamChunk::from_pretty(
1259                "  I I I
1260                +  4 6 8
1261                +  5 7 9
1262                +  0 0 0
1263                -  1 1 1 D
1264                U- 2 0 2
1265                U+ 2 0 2
1266                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1267                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1268            )
1269        );
1270        assert_eq!(
1271            *output_data_vecs[1].lock().unwrap()[0].as_chunk().unwrap(),
1272            StreamChunk::from_pretty(
1273                "  I I I
1274                +  4 6 8 D
1275                +  5 7 9 D
1276                +  0 0 0 D
1277                -  1 1 1 D  // Should keep original invisible mark
1278                U- 2 0 2 D  // Should keep UpdateDelete
1279                U+ 2 0 2 D  // Should keep UpdateInsert
1280                -  3 3 2    // Should rewrite UpdateDelete to Delete
1281                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1282            )
1283        );
1284    }
1285
1286    #[tokio::test]
1287    async fn test_configuration_change() {
1288        let _schema = Schema { fields: vec![] };
1289        let (tx, rx) = channel_for_test();
1290        let actor_id = 233;
1291        let fragment_id = 666;
1292        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1293        let ctx = Arc::new(SharedContext::for_test());
1294        let metrics = Arc::new(StreamingMetrics::unused());
1295
1296        let (untouched, old, new) = (234, 235, 238); // broadcast downstream actors
1297        let (old_simple, new_simple) = (114, 514); // simple downstream actors
1298
1299        // 1. Register info in context.
1300        ctx.add_actors(
1301            [actor_id, untouched, old, new, old_simple, new_simple]
1302                .into_iter()
1303                .map(helper_make_local_actor),
1304        );
1305        // actor_id -> untouched, old, new, old_simple, new_simple
1306
1307        let broadcast_dispatcher_id = 666;
1308        let broadcast_dispatcher = DispatcherImpl::new(
1309            &ctx,
1310            actor_id,
1311            &PbDispatcher {
1312                r#type: DispatcherType::Broadcast as _,
1313                dispatcher_id: broadcast_dispatcher_id,
1314                downstream_actor_id: vec![untouched, old],
1315                ..Default::default()
1316            },
1317        )
1318        .unwrap();
1319
1320        let simple_dispatcher_id = 888;
1321        let simple_dispatcher = DispatcherImpl::new(
1322            &ctx,
1323            actor_id,
1324            &PbDispatcher {
1325                r#type: DispatcherType::Simple as _,
1326                dispatcher_id: simple_dispatcher_id,
1327                downstream_actor_id: vec![old_simple],
1328                ..Default::default()
1329            },
1330        )
1331        .unwrap();
1332
1333        let dispatcher_updates = maplit::hashmap! {
1334            actor_id => vec![PbDispatcherUpdate {
1335                actor_id,
1336                dispatcher_id: broadcast_dispatcher_id,
1337                added_downstream_actor_id: vec![new],
1338                removed_downstream_actor_id: vec![old],
1339                hash_mapping: Default::default(),
1340            }]
1341        };
1342        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1343            UpdateMutation {
1344                dispatchers: dispatcher_updates,
1345                merges: Default::default(),
1346                vnode_bitmaps: Default::default(),
1347                dropped_actors: Default::default(),
1348                actor_splits: Default::default(),
1349                actor_new_dispatchers: Default::default(),
1350            },
1351        ));
1352        barrier_test_env.inject_barrier(&b1, [actor_id]);
1353        barrier_test_env.flush_all_events().await;
1354
1355        let input = Executor::new(
1356            Default::default(),
1357            ReceiverExecutor::for_test(
1358                actor_id,
1359                rx,
1360                barrier_test_env.shared_context.clone(),
1361                barrier_test_env.local_barrier_manager.clone(),
1362            )
1363            .boxed(),
1364        );
1365        let executor = Box::new(DispatchExecutor::new(
1366            input,
1367            vec![broadcast_dispatcher, simple_dispatcher],
1368            actor_id,
1369            fragment_id,
1370            ctx.clone(),
1371            metrics,
1372            config::default::developer::stream_chunk_size(),
1373        ))
1374        .execute();
1375        pin_mut!(executor);
1376
1377        // 2. Take downstream receivers.
1378        let mut rxs = [untouched, old, new, old_simple, new_simple]
1379            .into_iter()
1380            .map(|id| (id, ctx.take_receiver((actor_id, id)).unwrap()))
1381            .collect::<HashMap<_, _>>();
1382        macro_rules! try_recv {
1383            ($down_id:expr) => {
1384                rxs.get_mut(&$down_id).unwrap().try_recv()
1385            };
1386        }
1387
1388        // 3. Send a chunk.
1389        tx.send(Message::Chunk(StreamChunk::default()).into())
1390            .await
1391            .unwrap();
1392
1393        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1394            .await
1395            .unwrap();
1396        executor.next().await.unwrap().unwrap();
1397
1398        // 5. Check downstream.
1399        try_recv!(untouched).unwrap().as_chunk().unwrap();
1400        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1401
1402        try_recv!(old).unwrap().as_chunk().unwrap();
1403        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1404
1405        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1406
1407        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1408        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1409
1410        // 6. Send another barrier.
1411        let b2 = Barrier::new_test_barrier(test_epoch(2));
1412        barrier_test_env.inject_barrier(&b2, [actor_id]);
1413        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1414            .await
1415            .unwrap();
1416        executor.next().await.unwrap().unwrap();
1417
1418        // 7. Check downstream.
1419        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1420        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1421        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1422
1423        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1424        try_recv!(new_simple).unwrap_err(); // Untouched.
1425
1426        // 8. Send another chunk.
1427        tx.send(Message::Chunk(StreamChunk::default()).into())
1428            .await
1429            .unwrap();
1430
1431        // 9. Send a configuration change barrier for simple dispatcher.
1432        let dispatcher_updates = maplit::hashmap! {
1433            actor_id => vec![PbDispatcherUpdate {
1434                actor_id,
1435                dispatcher_id: simple_dispatcher_id,
1436                added_downstream_actor_id: vec![new_simple],
1437                removed_downstream_actor_id: vec![old_simple],
1438                hash_mapping: Default::default(),
1439            }]
1440        };
1441        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1442            UpdateMutation {
1443                dispatchers: dispatcher_updates,
1444                merges: Default::default(),
1445                vnode_bitmaps: Default::default(),
1446                dropped_actors: Default::default(),
1447                actor_splits: Default::default(),
1448                actor_new_dispatchers: Default::default(),
1449            },
1450        ));
1451        barrier_test_env.inject_barrier(&b3, [actor_id]);
1452        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1453            .await
1454            .unwrap();
1455        executor.next().await.unwrap().unwrap();
1456
1457        // 10. Check downstream.
1458        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1459        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1460
1461        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1462
1463        // 11. Send another barrier.
1464        let b4 = Barrier::new_test_barrier(test_epoch(4));
1465        barrier_test_env.inject_barrier(&b4, [actor_id]);
1466        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1467            .await
1468            .unwrap();
1469        executor.next().await.unwrap().unwrap();
1470
1471        // 12. Check downstream.
1472        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1473        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1474    }
1475
1476    #[tokio::test]
1477    async fn test_hash_dispatcher() {
1478        // This test only works when vnode count is 256.
1479        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1480
1481        let num_outputs = 5; // actor id ranges from 1 to 5
1482        let cardinality = 10;
1483        let dimension = 4;
1484        let key_indices = &[0, 2];
1485        let output_data_vecs = (0..num_outputs)
1486            .map(|_| Arc::new(Mutex::new(Vec::new())))
1487            .collect::<Vec<_>>();
1488        let outputs = output_data_vecs
1489            .iter()
1490            .enumerate()
1491            .map(|(actor_id, data)| {
1492                Box::new(MockOutput::new(1 + actor_id as u32, data.clone())) as BoxedOutput
1493            })
1494            .collect::<Vec<_>>();
1495        let mut hash_mapping = (1..num_outputs + 1)
1496            .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1497            .collect_vec();
1498        hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1499        let mut hash_dispatcher = HashDataDispatcher::new(
1500            outputs,
1501            key_indices.to_vec(),
1502            (0..dimension).collect(),
1503            hash_mapping.clone(),
1504            0,
1505        );
1506
1507        let mut ops = Vec::new();
1508        for idx in 0..cardinality {
1509            if idx % 2 == 0 {
1510                ops.push(Op::Insert);
1511            } else {
1512                ops.push(Op::Delete);
1513            }
1514        }
1515
1516        let mut start = 19260817i32..;
1517        let mut builders = (0..dimension)
1518            .map(|_| I32ArrayBuilder::new(cardinality))
1519            .collect_vec();
1520        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1521        let mut output_ops = vec![vec![]; num_outputs];
1522        for op in &ops {
1523            let hash_builder = Crc32FastBuilder;
1524            let mut hasher = hash_builder.build_hasher();
1525            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1526            for key_idx in key_indices {
1527                let val = one_row[*key_idx];
1528                let bytes = val.to_le_bytes();
1529                hasher.update(&bytes);
1530            }
1531            let output_idx =
1532                hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1533            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1534                builder.append(Some(*val));
1535            }
1536            output_cols[output_idx]
1537                .iter_mut()
1538                .zip_eq_fast(one_row.iter())
1539                .for_each(|(each_column, val)| each_column.push(*val));
1540            output_ops[output_idx].push(op);
1541        }
1542
1543        let columns = builders
1544            .into_iter()
1545            .map(|builder| {
1546                let array = builder.finish();
1547                array.into_ref()
1548            })
1549            .collect();
1550
1551        let chunk = StreamChunk::new(ops, columns);
1552        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1553
1554        for (output_idx, output) in output_data_vecs.into_iter().enumerate() {
1555            let guard = output.lock().unwrap();
1556            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1557            assert!(guard.len() <= 1);
1558            if guard.is_empty() {
1559                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1560            } else {
1561                let message = guard.first().unwrap();
1562                let real_chunk = match message {
1563                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1564                    _ => panic!(),
1565                };
1566                real_chunk
1567                    .columns()
1568                    .iter()
1569                    .zip_eq_fast(output_cols[output_idx].iter())
1570                    .for_each(|(real_col, expect_col)| {
1571                        let real_vals = real_chunk
1572                            .visibility()
1573                            .iter_ones()
1574                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1575                            .collect::<Vec<_>>();
1576                        assert_eq!(real_vals.len(), expect_col.len());
1577                        assert_eq!(real_vals, *expect_col);
1578                    });
1579            }
1580        }
1581    }
1582}