risingwave_stream/executor/
dispatch.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::FutureExt;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
28use risingwave_common::metrics::LabelGuardedIntCounter;
29use risingwave_common::row::RowExt;
30use risingwave_common::util::iter_util::ZipEqFast;
31use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
32use risingwave_pb::stream_plan::{self, PbDispatcher};
33use smallvec::{SmallVec, smallvec};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::UnboundedReceiver;
36use tokio::task::consume_budget;
37use tokio::time::Instant;
38use tokio_stream::StreamExt;
39use tokio_stream::adapters::Peekable;
40use tracing::{Instrument, event};
41
42use super::exchange::output::Output;
43use super::{
44    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
45    UpdateMutation,
46};
47use crate::executor::prelude::*;
48use crate::executor::{StopMutation, StreamConsumer};
49use crate::task::{DispatcherId, NewOutputRequest};
50
51mod output_mapping;
52pub use output_mapping::DispatchOutputMapping;
53use risingwave_common::id::FragmentId;
54
55use crate::error::StreamError;
56
57/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
58/// data chunks will be dispatched with some specified policy, while control message
59/// such as barriers will be distributed to all receivers.
60pub struct DispatchExecutor {
61    input: Executor,
62    inner: DispatchExecutorInner,
63}
64
65struct DispatcherWithMetrics {
66    dispatcher: DispatcherImpl,
67    pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
68}
69
70impl Debug for DispatcherWithMetrics {
71    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72        self.dispatcher.fmt(f)
73    }
74}
75
76impl Deref for DispatcherWithMetrics {
77    type Target = DispatcherImpl;
78
79    fn deref(&self) -> &Self::Target {
80        &self.dispatcher
81    }
82}
83
84impl DerefMut for DispatcherWithMetrics {
85    fn deref_mut(&mut self) -> &mut Self::Target {
86        &mut self.dispatcher
87    }
88}
89
90struct DispatchExecutorMetrics {
91    actor_id_str: String,
92    fragment_id_str: String,
93    metrics: Arc<StreamingMetrics>,
94    actor_out_record_cnt: LabelGuardedIntCounter,
95}
96
97impl DispatchExecutorMetrics {
98    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
99        DispatcherWithMetrics {
100            actor_output_buffer_blocking_duration_ns: self
101                .metrics
102                .actor_output_buffer_blocking_duration_ns
103                .with_guarded_label_values(&[
104                    self.actor_id_str.as_str(),
105                    self.fragment_id_str.as_str(),
106                    dispatcher.dispatcher_id().to_string().as_str(),
107                ]),
108            dispatcher,
109        }
110    }
111}
112
113struct DispatchExecutorInner {
114    dispatchers: Vec<DispatcherWithMetrics>,
115    actor_id: ActorId,
116    actor_config: Arc<StreamingConfig>,
117    metrics: DispatchExecutorMetrics,
118    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
119    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
120}
121
122impl DispatchExecutorInner {
123    async fn collect_outputs(
124        &mut self,
125        downstream_actors: &[ActorId],
126    ) -> StreamResult<Vec<Output>> {
127        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
128            let tx = match request {
129                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
130            };
131            Output::new(downstream_actor, tx)
132        }
133        let mut outputs = Vec::with_capacity(downstream_actors.len());
134        for &downstream_actor in downstream_actors {
135            let output =
136                if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
137                    resolve_output(downstream_actor, request)
138                } else {
139                    loop {
140                        let (requested_actor, request) = self
141                            .new_output_request_rx
142                            .recv()
143                            .await
144                            .ok_or_else(|| anyhow!("end of new output request"))?;
145                        if requested_actor == downstream_actor {
146                            break resolve_output(requested_actor, request);
147                        } else {
148                            assert!(
149                                self.pending_new_output_requests
150                                    .insert(requested_actor, request)
151                                    .is_none(),
152                                "duplicated inflight new output requests from actor {}",
153                                requested_actor
154                            );
155                        }
156                    }
157                };
158            outputs.push(output);
159        }
160        Ok(outputs)
161    }
162
163    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
164        if self.dispatchers.is_empty() {
165            // Our dispatcher internal implementation calls tokio method, which already involves cooperative scheduling
166            // with tokio runtime.
167            //
168            // When there is no dispatcher, we should do the cooperative scheduling together.
169            consume_budget().await;
170        }
171        async fn await_with_metrics(
172            future: impl Future<Output = ()>,
173            metrics: &LabelGuardedIntCounter,
174        ) {
175            let mut start_time = Instant::now();
176            let interval_duration = Duration::from_secs(15);
177            let mut interval =
178                tokio::time::interval_at(start_time + interval_duration, interval_duration);
179
180            let mut fut = pin!(future);
181
182            loop {
183                tokio::select! {
184                    biased;
185                    _res = &mut fut => {
186                        let ns = start_time.elapsed().as_nanos() as u64;
187                        metrics.inc_by(ns);
188                        break;
189                    }
190                    _ = interval.tick() => {
191                        start_time = Instant::now();
192                        metrics.inc_by(interval_duration.as_nanos() as u64);
193                    }
194                };
195            }
196        }
197
198        use futures::StreamExt;
199
200        let limit = self.actor_config.developer.exchange_concurrent_dispatchers;
201        // Only barrier can be batched for now.
202        match msg {
203            MessageBatch::BarrierBatch(barrier_batch) => {
204                if barrier_batch.is_empty() {
205                    return Ok(());
206                }
207                // Only the first barrier in a batch can be mutation.
208                let mutation = barrier_batch[0].mutation.clone();
209                self.pre_mutate_dispatchers(&mutation).await?;
210                futures::stream::iter(self.dispatchers.iter_mut())
211                    .for_each_concurrent(limit, |dispatcher| async {
212                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
213                        let dispatcher_output = &mut dispatcher.dispatcher;
214                        let fut = dispatcher_output.dispatch_barriers(
215                            barrier_batch
216                                .iter()
217                                .cloned()
218                                .map(|b| b.into_dispatcher())
219                                .collect(),
220                        );
221                        await_with_metrics(fut, metrics).await;
222                    })
223                    .await;
224                self.post_mutate_dispatchers(&mutation)?;
225            }
226            MessageBatch::Watermark(watermark) => {
227                futures::stream::iter(self.dispatchers.iter_mut())
228                    .for_each_concurrent(limit, |dispatcher| async {
229                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
230                        let dispatcher_output = &mut dispatcher.dispatcher;
231                        let fut = dispatcher_output.dispatch_watermark(watermark.clone());
232                        await_with_metrics(fut, metrics).await;
233                    })
234                    .await;
235            }
236            MessageBatch::Chunk(chunk) => {
237                futures::stream::iter(self.dispatchers.iter_mut())
238                    .for_each_concurrent(limit, |dispatcher| async {
239                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
240                        let dispatcher_output = &mut dispatcher.dispatcher;
241                        let fut = dispatcher_output.dispatch_data(chunk.clone());
242                        await_with_metrics(fut, metrics).await;
243                    })
244                    .await;
245
246                self.metrics
247                    .actor_out_record_cnt
248                    .inc_by(chunk.cardinality() as _);
249            }
250        }
251        self.dispatchers.retain(|dispatcher| match &**dispatcher {
252            DispatcherImpl::Failed(dispatcher_id, e) => {
253                warn!(%dispatcher_id, err = %e.as_report(), actor_id = %self.actor_id, "dispatch fail");
254                false
255            }
256            _ => true,
257        });
258        Ok(())
259    }
260
261    /// Add new dispatchers to the executor. Will check whether their ids are unique.
262    async fn add_dispatchers<'a>(
263        &mut self,
264        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
265    ) -> StreamResult<()> {
266        for dispatcher in new_dispatchers {
267            let outputs = self
268                .collect_outputs(&dispatcher.downstream_actor_id)
269                .await?;
270            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
271            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
272            self.dispatchers.push(dispatcher);
273        }
274
275        assert!(
276            self.dispatchers
277                .iter()
278                .map(|d| d.dispatcher_id())
279                .all_unique(),
280            "dispatcher ids must be unique: {:?}",
281            self.dispatchers
282        );
283
284        Ok(())
285    }
286
287    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> Option<&mut DispatcherImpl> {
288        self.dispatchers
289            .iter_mut()
290            .find(|d| d.dispatcher_id() == dispatcher_id)
291            .map(DerefMut::deref_mut)
292    }
293
294    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
295    /// outputs.
296    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
297        let outputs = self
298            .collect_outputs(&update.added_downstream_actor_id)
299            .await?;
300
301        let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
302            warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
303            return Ok(());
304        };
305        dispatcher.add_outputs(outputs);
306
307        Ok(())
308    }
309
310    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
311    /// update the hash mapping.
312    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
313        let ids = update.removed_downstream_actor_id.iter().copied().collect();
314
315        let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
316            warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
317            return Ok(());
318        };
319        dispatcher.remove_outputs(&ids);
320
321        // The hash mapping is only used by the hash dispatcher.
322        //
323        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
324        // it's possible that there're multiple upstreams with different exchange types, for
325        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
326        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
327        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
328            dispatcher.hash_mapping =
329                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
330        }
331
332        Ok(())
333    }
334
335    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
336    async fn pre_mutate_dispatchers(
337        &mut self,
338        mutation: &Option<Arc<Mutation>>,
339    ) -> StreamResult<()> {
340        let Some(mutation) = mutation.as_deref() else {
341            return Ok(());
342        };
343
344        match mutation {
345            Mutation::Add(AddMutation { adds, .. }) => {
346                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
347                    self.add_dispatchers(new_dispatchers).await?;
348                }
349            }
350            Mutation::Update(UpdateMutation {
351                dispatchers,
352                actor_new_dispatchers: actor_dispatchers,
353                ..
354            }) => {
355                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
356                    self.add_dispatchers(new_dispatchers).await?;
357                }
358
359                if let Some(updates) = dispatchers.get(&self.actor_id) {
360                    for update in updates {
361                        self.pre_update_dispatcher(update).await?;
362                    }
363                }
364            }
365            _ => {}
366        }
367
368        Ok(())
369    }
370
371    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
372    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
373        let Some(mutation) = mutation.as_deref() else {
374            return Ok(());
375        };
376
377        match mutation {
378            Mutation::Stop(StopMutation { dropped_actors, .. }) => {
379                // Remove outputs only if this actor itself is not to be stopped.
380                if !dropped_actors.contains(&self.actor_id) {
381                    for dispatcher in &mut self.dispatchers {
382                        dispatcher.remove_outputs(dropped_actors);
383                    }
384                }
385            }
386            Mutation::Update(UpdateMutation {
387                dispatchers,
388                dropped_actors,
389                ..
390            }) => {
391                if let Some(updates) = dispatchers.get(&self.actor_id) {
392                    for update in updates {
393                        self.post_update_dispatcher(update)?;
394                    }
395                }
396
397                if !dropped_actors.contains(&self.actor_id) {
398                    for dispatcher in &mut self.dispatchers {
399                        dispatcher.remove_outputs(dropped_actors);
400                    }
401                }
402            }
403            _ => {}
404        };
405
406        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
407        // should clean up them.
408        self.dispatchers.retain(|d| !d.is_empty());
409
410        Ok(())
411    }
412}
413
414impl DispatchExecutor {
415    pub(crate) async fn new(
416        input: Executor,
417        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
418        dispatchers: Vec<stream_plan::Dispatcher>,
419        actor_context: &ActorContextRef,
420    ) -> StreamResult<Self> {
421        let mut executor = Self::new_inner(
422            input,
423            new_output_request_rx,
424            vec![],
425            actor_context.id,
426            actor_context.fragment_id,
427            actor_context.config.clone(),
428            actor_context.streaming_metrics.clone(),
429        );
430        let inner = &mut executor.inner;
431        for dispatcher in dispatchers {
432            let outputs = inner
433                .collect_outputs(&dispatcher.downstream_actor_id)
434                .await?;
435            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
436            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
437            inner.dispatchers.push(dispatcher);
438        }
439        Ok(executor)
440    }
441
442    #[cfg(test)]
443    pub(crate) fn for_test(
444        input: Executor,
445        dispatchers: Vec<DispatcherImpl>,
446        actor_id: ActorId,
447        fragment_id: FragmentId,
448        actor_config: Arc<StreamingConfig>,
449        metrics: Arc<StreamingMetrics>,
450    ) -> (
451        Self,
452        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
453    ) {
454        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
455
456        (
457            Self::new_inner(
458                input,
459                rx,
460                dispatchers,
461                actor_id,
462                fragment_id,
463                actor_config,
464                metrics,
465            ),
466            tx,
467        )
468    }
469
470    fn new_inner(
471        mut input: Executor,
472        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
473        dispatchers: Vec<DispatcherImpl>,
474        actor_id: ActorId,
475        fragment_id: FragmentId,
476        actor_config: Arc<StreamingConfig>,
477        metrics: Arc<StreamingMetrics>,
478    ) -> Self {
479        if crate::consistency::insane() {
480            // make some trouble before dispatching to avoid generating invalid dist key.
481            let mut info = input.info().clone();
482            info.identity = format!("{} (embedded trouble)", info.identity);
483            let troublemaker = TroublemakerExecutor::new(input, actor_config.developer.chunk_size);
484            input = (info, troublemaker).into();
485        }
486
487        let actor_id_str = actor_id.to_string();
488        let fragment_id_str = fragment_id.to_string();
489        let actor_out_record_cnt = metrics
490            .actor_out_record_cnt
491            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
492        let metrics = DispatchExecutorMetrics {
493            actor_id_str,
494            fragment_id_str,
495            metrics,
496            actor_out_record_cnt,
497        };
498        let dispatchers = dispatchers
499            .into_iter()
500            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
501            .collect();
502
503        Self {
504            input,
505            inner: DispatchExecutorInner {
506                dispatchers,
507                actor_id,
508                actor_config,
509                metrics,
510                new_output_request_rx,
511                pending_new_output_requests: Default::default(),
512            },
513        }
514    }
515}
516
517impl StreamConsumer for DispatchExecutor {
518    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
519
520    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
521        let max_barrier_count_per_batch = self.inner.actor_config.developer.max_barrier_batch_size;
522        #[try_stream]
523        async move {
524            let mut input = self.input.execute().peekable();
525            loop {
526                let Some(message) =
527                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
528                else {
529                    // end_of_stream
530                    break;
531                };
532                match message {
533                    chunk @ MessageBatch::Chunk(_) => {
534                        self.inner
535                            .dispatch(chunk)
536                            .instrument(tracing::info_span!("dispatch_chunk"))
537                            .instrument_await("dispatch_chunk")
538                            .await?;
539                    }
540                    MessageBatch::BarrierBatch(barrier_batch) => {
541                        assert!(!barrier_batch.is_empty());
542                        self.inner
543                            .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
544                            .instrument(tracing::info_span!("dispatch_barrier_batch"))
545                            .instrument_await("dispatch_barrier_batch")
546                            .await?;
547                        self.inner
548                            .metrics
549                            .metrics
550                            .barrier_batch_size
551                            .observe(barrier_batch.len() as f64);
552                        for barrier in barrier_batch {
553                            yield barrier;
554                        }
555                    }
556                    watermark @ MessageBatch::Watermark(_) => {
557                        self.inner
558                            .dispatch(watermark)
559                            .instrument(tracing::info_span!("dispatch_watermark"))
560                            .instrument_await("dispatch_watermark")
561                            .await?;
562                    }
563                }
564            }
565        }
566    }
567}
568
569/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
570///
571/// Returns the message batch.
572///
573/// Returns None if end of stream.
574async fn try_batch_barriers(
575    max_barrier_count_per_batch: u32,
576    input: &mut Peekable<BoxedMessageStream>,
577) -> StreamResult<Option<MessageBatch>> {
578    let Some(msg) = input.next().await else {
579        // end_of_stream
580        return Ok(None);
581    };
582    let mut barrier_batch = vec![];
583    let msg: Message = msg?;
584    let max_peek_attempts = match msg {
585        Message::Chunk(c) => {
586            return Ok(Some(MessageBatch::Chunk(c)));
587        }
588        Message::Watermark(w) => {
589            return Ok(Some(MessageBatch::Watermark(w)));
590        }
591        Message::Barrier(b) => {
592            let peek_more_barrier = b.mutation.is_none();
593            barrier_batch.push(b);
594            if peek_more_barrier {
595                max_barrier_count_per_batch.saturating_sub(1)
596            } else {
597                0
598            }
599        }
600    };
601    // Try to peek more consecutive non-mutation barriers.
602    for _ in 0..max_peek_attempts {
603        let peek = input.peek().now_or_never();
604        let Some(peek) = peek else {
605            break;
606        };
607        let Some(msg) = peek else {
608            // end_of_stream
609            break;
610        };
611        let Ok(Message::Barrier(barrier)) = msg else {
612            break;
613        };
614        if barrier.mutation.is_some() {
615            break;
616        }
617        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
618        let Message::Barrier(ref barrier) = msg else {
619            unreachable!("must be a barrier");
620        };
621        barrier_batch.push(barrier.clone());
622    }
623    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
624}
625
626#[derive(Debug)]
627pub(crate) enum DispatcherImpl {
628    Hash(HashDataDispatcher),
629    Broadcast(BroadcastDispatcher),
630    Simple(SimpleDispatcher),
631    #[cfg_attr(not(test), expect(dead_code))]
632    RoundRobin(RoundRobinDataDispatcher),
633    /// The dispatcher has failed to dispatch to downstream and gets pending
634    /// Should be clean up later.
635    Failed(DispatcherId, StreamError),
636}
637
638impl DispatcherImpl {
639    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
640        let output_mapping =
641            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
642
643        use risingwave_pb::stream_plan::DispatcherType::*;
644        let dispatcher_impl = match dispatcher.get_type()? {
645            Hash => {
646                assert!(!outputs.is_empty());
647                let dist_key_indices = dispatcher
648                    .dist_key_indices
649                    .iter()
650                    .map(|i| *i as usize)
651                    .collect();
652
653                let hash_mapping =
654                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
655
656                DispatcherImpl::Hash(HashDataDispatcher::new(
657                    outputs,
658                    dist_key_indices,
659                    output_mapping,
660                    hash_mapping,
661                    dispatcher.dispatcher_id,
662                ))
663            }
664            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
665                outputs,
666                output_mapping,
667                dispatcher.dispatcher_id,
668            )),
669            Simple | NoShuffle => {
670                let [output]: [_; 1] = outputs.try_into().unwrap();
671                DispatcherImpl::Simple(SimpleDispatcher::new(
672                    output,
673                    output_mapping,
674                    dispatcher.dispatcher_id,
675                ))
676            }
677            Unspecified => unreachable!(),
678        };
679
680        Ok(dispatcher_impl)
681    }
682}
683
684macro_rules! impl_dispatcher {
685    ($( { $variant_name:ident } ),*) => {
686        impl DispatcherImpl {
687            pub async fn dispatch_data(&mut self, chunk: StreamChunk) {
688                if let Err(e) = match self {
689                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
690                    Self::Failed(..) => unreachable!(),
691                } {
692                    *self = Self::Failed(self.dispatcher_id(), e);
693                }
694            }
695
696            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) {
697                if let Err(e) = match self {
698                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
699                    Self::Failed(..) => unreachable!(),
700                } {
701                    *self = Self::Failed(self.dispatcher_id(), e);
702                }
703            }
704
705            pub async fn dispatch_watermark(&mut self, watermark: Watermark) {
706                if let Err(e) = match self {
707                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
708                    Self::Failed(..) => unreachable!(),
709                } {
710                    *self = Self::Failed(self.dispatcher_id(), e);
711                }
712            }
713
714            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
715                match self {
716                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
717                    Self::Failed(..) => {},
718                }
719            }
720
721            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
722                match self {
723                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
724                    Self::Failed(..) => {},
725                }
726            }
727
728            pub fn dispatcher_id(&self) -> DispatcherId {
729                match self {
730                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
731                    Self::Failed(dispatcher_id, ..) => *dispatcher_id,
732                }
733            }
734
735            pub fn is_empty(&self) -> bool {
736                match self {
737                    $(Self::$variant_name(inner) => inner.is_empty(), )*
738                    Self::Failed(..) => true,
739                }
740            }
741        }
742    }
743}
744
745macro_rules! for_all_dispatcher_variants {
746    ($macro:ident) => {
747        $macro! {
748            { Hash },
749            { Broadcast },
750            { Simple },
751            { RoundRobin }
752        }
753    };
754}
755
756for_all_dispatcher_variants! { impl_dispatcher }
757
758pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
759
760trait Dispatcher: Debug + 'static {
761    /// Dispatch a data chunk to downstream actors.
762    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
763    /// Dispatch barriers to downstream actors, generally by broadcasting it.
764    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
765    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
766    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
767
768    /// Add new outputs to the dispatcher.
769    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
770    /// Remove outputs to `actor_ids` from the dispatcher.
771    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
772
773    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
774    /// different IDs.
775    ///
776    /// Note that the dispatcher id is always equal to the downstream fragment id.
777    /// See also `proto/stream_plan.proto`.
778    fn dispatcher_id(&self) -> DispatcherId;
779
780    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
781    /// [`DispatchExecutor`].
782    fn is_empty(&self) -> bool;
783}
784
785/// Concurrently broadcast a message to all outputs.
786///
787/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
788/// always unlimited.
789async fn broadcast_concurrent(
790    outputs: impl IntoIterator<Item = &'_ mut Output>,
791    message: DispatcherMessageBatch,
792) -> StreamResult<()> {
793    futures::future::try_join_all(
794        outputs
795            .into_iter()
796            .map(|output| output.send(message.clone())),
797    )
798    .await?;
799    Ok(())
800}
801
802#[derive(Debug)]
803pub struct RoundRobinDataDispatcher {
804    outputs: Vec<Output>,
805    output_mapping: DispatchOutputMapping,
806    cur: usize,
807    dispatcher_id: DispatcherId,
808}
809
810impl RoundRobinDataDispatcher {
811    #[cfg_attr(not(test), expect(dead_code))]
812    pub fn new(
813        outputs: Vec<Output>,
814        output_mapping: DispatchOutputMapping,
815        dispatcher_id: DispatcherId,
816    ) -> Self {
817        Self {
818            outputs,
819            output_mapping,
820            cur: 0,
821            dispatcher_id,
822        }
823    }
824}
825
826impl Dispatcher for RoundRobinDataDispatcher {
827    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
828        let chunk = self.output_mapping.apply(chunk);
829
830        self.outputs[self.cur]
831            .send(DispatcherMessageBatch::Chunk(chunk))
832            .await?;
833        self.cur += 1;
834        self.cur %= self.outputs.len();
835        Ok(())
836    }
837
838    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
839        // always broadcast barrier
840        broadcast_concurrent(
841            &mut self.outputs,
842            DispatcherMessageBatch::BarrierBatch(barriers),
843        )
844        .await
845    }
846
847    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
848        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
849            // always broadcast watermark
850            broadcast_concurrent(
851                &mut self.outputs,
852                DispatcherMessageBatch::Watermark(watermark),
853            )
854            .await?;
855        }
856        Ok(())
857    }
858
859    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
860        self.outputs.extend(outputs);
861    }
862
863    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
864        self.outputs
865            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
866            .count();
867        self.cur = self.cur.min(self.outputs.len() - 1);
868    }
869
870    fn dispatcher_id(&self) -> DispatcherId {
871        self.dispatcher_id
872    }
873
874    fn is_empty(&self) -> bool {
875        self.outputs.is_empty()
876    }
877}
878
879pub struct HashDataDispatcher {
880    outputs: Vec<Output>,
881    keys: Vec<usize>,
882    output_mapping: DispatchOutputMapping,
883    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
884    /// different downstream actors.
885    hash_mapping: ExpandedActorMapping,
886    dispatcher_id: DispatcherId,
887}
888
889impl Debug for HashDataDispatcher {
890    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
891        f.debug_struct("HashDataDispatcher")
892            .field("outputs", &self.outputs)
893            .field("keys", &self.keys)
894            .field("dispatcher_id", &self.dispatcher_id)
895            .finish_non_exhaustive()
896    }
897}
898
899impl HashDataDispatcher {
900    pub fn new(
901        outputs: Vec<Output>,
902        keys: Vec<usize>,
903        output_mapping: DispatchOutputMapping,
904        hash_mapping: ExpandedActorMapping,
905        dispatcher_id: DispatcherId,
906    ) -> Self {
907        Self {
908            outputs,
909            keys,
910            output_mapping,
911            hash_mapping,
912            dispatcher_id,
913        }
914    }
915}
916
917impl Dispatcher for HashDataDispatcher {
918    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
919        self.outputs.extend(outputs);
920    }
921
922    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
923        // always broadcast barrier
924        broadcast_concurrent(
925            &mut self.outputs,
926            DispatcherMessageBatch::BarrierBatch(barriers),
927        )
928        .await
929    }
930
931    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
932        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
933            // always broadcast watermark
934            broadcast_concurrent(
935                &mut self.outputs,
936                DispatcherMessageBatch::Watermark(watermark),
937            )
938            .await?;
939        }
940        Ok(())
941    }
942
943    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
944        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
945        // In these output chunks, the only difference are visibility map, which is calculated
946        // by the hash value of each line in the input chunk.
947        let num_outputs = self.outputs.len();
948
949        // get hash value of every line by its key
950        let vnode_count = self.hash_mapping.len();
951        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
952
953        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
954
955        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
956            .take(num_outputs)
957            .collect_vec();
958        let mut last_update_delete_row_idx = None;
959        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
960
961        for (row_idx, ((vnode, &op), visible)) in vnodes
962            .iter()
963            .copied()
964            .zip_eq_fast(chunk.ops())
965            .zip_eq_fast(chunk.visibility().iter())
966            .enumerate()
967        {
968            // Build visibility map for every output chunk.
969            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
970                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
971            }
972
973            if !visible {
974                new_ops.push(op);
975                continue;
976            }
977
978            // The `Update` message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
979            // need to be rewritten to common `Delete` and `Insert` if the distribution key
980            // columns are changed, since the distribution key will eventually be part of the
981            // stream key of the downstream executor, and there's an invariant that stream key
982            // must be the same for rows within an `Update` pair.
983            if op == Op::UpdateDelete {
984                last_update_delete_row_idx = Some(row_idx);
985            } else if op == Op::UpdateInsert {
986                let delete_row_idx = last_update_delete_row_idx
987                    .take()
988                    .expect("missing U- before U+");
989
990                // Check if any distribution key column value changed
991                let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
992                    != chunk.row_at(row_idx).1.project(&self.keys);
993
994                if dist_key_changed {
995                    new_ops.push(Op::Delete);
996                    new_ops.push(Op::Insert);
997                } else {
998                    new_ops.push(Op::UpdateDelete);
999                    new_ops.push(Op::UpdateInsert);
1000                }
1001            } else {
1002                new_ops.push(op);
1003            }
1004        }
1005        assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1006
1007        let ops = new_ops;
1008        // Apply output mapping after calculating the vnode and new visibility maps.
1009        // The output mapping may project columns and eliminate noop updates.
1010        let chunk = self.output_mapping.apply(chunk);
1011        // Get the visibility after noop update elimination to incorporate into the final visibility.
1012        let chunk_vis = chunk.visibility();
1013
1014        // individually output StreamChunk integrated with vis_map
1015        futures::future::try_join_all(
1016            vis_maps
1017                .into_iter()
1018                .zip_eq_fast(self.outputs.iter_mut())
1019                .map(|(vis_map, output)| async {
1020                    let vis_map = vis_map.finish();
1021                    // Combine hash routing visibility with noop update elimination visibility.
1022                    // A row is visible only if it passes BOTH:
1023                    // 1. Hash routing (the row should go to this output)
1024                    // 2. Noop update elimination (the row was not eliminated as a noop)
1025                    let combined_vis = &vis_map & chunk_vis;
1026                    let new_stream_chunk = StreamChunk::with_visibility(
1027                        ops.clone(),
1028                        chunk.columns().into(),
1029                        combined_vis,
1030                    );
1031                    if new_stream_chunk.cardinality() > 0 {
1032                        event!(
1033                            tracing::Level::TRACE,
1034                            msg = "chunk",
1035                            downstream = %output.actor_id(),
1036                            "send = \n{:#?}",
1037                            new_stream_chunk
1038                        );
1039                        output
1040                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1041                            .await?;
1042                    }
1043                    StreamResult::Ok(())
1044                }),
1045        )
1046        .await?;
1047
1048        Ok(())
1049    }
1050
1051    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1052        self.outputs
1053            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1054            .count();
1055    }
1056
1057    fn dispatcher_id(&self) -> DispatcherId {
1058        self.dispatcher_id
1059    }
1060
1061    fn is_empty(&self) -> bool {
1062        self.outputs.is_empty()
1063    }
1064}
1065
1066/// `BroadcastDispatcher` dispatches message to all outputs.
1067#[derive(Debug)]
1068pub struct BroadcastDispatcher {
1069    outputs: HashMap<ActorId, Output>,
1070    output_mapping: DispatchOutputMapping,
1071    dispatcher_id: DispatcherId,
1072}
1073
1074impl BroadcastDispatcher {
1075    pub fn new(
1076        outputs: impl IntoIterator<Item = Output>,
1077        output_mapping: DispatchOutputMapping,
1078        dispatcher_id: DispatcherId,
1079    ) -> Self {
1080        Self {
1081            outputs: Self::into_pairs(outputs).collect(),
1082            output_mapping,
1083            dispatcher_id,
1084        }
1085    }
1086
1087    fn into_pairs(
1088        outputs: impl IntoIterator<Item = Output>,
1089    ) -> impl Iterator<Item = (ActorId, Output)> {
1090        outputs
1091            .into_iter()
1092            .map(|output| (output.actor_id(), output))
1093    }
1094}
1095
1096impl Dispatcher for BroadcastDispatcher {
1097    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1098        let chunk = self.output_mapping.apply(chunk);
1099        broadcast_concurrent(
1100            self.outputs.values_mut(),
1101            DispatcherMessageBatch::Chunk(chunk),
1102        )
1103        .await
1104    }
1105
1106    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1107        // always broadcast barrier
1108        broadcast_concurrent(
1109            self.outputs.values_mut(),
1110            DispatcherMessageBatch::BarrierBatch(barriers),
1111        )
1112        .await
1113    }
1114
1115    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1116        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1117            // always broadcast watermark
1118            broadcast_concurrent(
1119                self.outputs.values_mut(),
1120                DispatcherMessageBatch::Watermark(watermark),
1121            )
1122            .await?;
1123        }
1124        Ok(())
1125    }
1126
1127    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1128        self.outputs.extend(Self::into_pairs(outputs));
1129    }
1130
1131    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1132        self.outputs
1133            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1134            .count();
1135    }
1136
1137    fn dispatcher_id(&self) -> DispatcherId {
1138        self.dispatcher_id
1139    }
1140
1141    fn is_empty(&self) -> bool {
1142        self.outputs.is_empty()
1143    }
1144}
1145
1146/// `SimpleDispatcher` dispatches message to a single output.
1147#[derive(Debug)]
1148pub struct SimpleDispatcher {
1149    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1150    /// the field needs to be temporarily set to 0 or 2 outputs.
1151    ///
1152    /// - When dropping a materialized view, the output will be removed and this field becomes
1153    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1154    ///   finishing processing the current mutation.
1155    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1156    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1157    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1158    ///   will be removed and this field becomes single again.
1159    ///
1160    /// Therefore, when dispatching data, we assert that there's exactly one output by
1161    /// `Self::output`.
1162    output: SmallVec<[Output; 2]>,
1163    output_mapping: DispatchOutputMapping,
1164    dispatcher_id: DispatcherId,
1165}
1166
1167impl SimpleDispatcher {
1168    pub fn new(
1169        output: Output,
1170        output_mapping: DispatchOutputMapping,
1171        dispatcher_id: DispatcherId,
1172    ) -> Self {
1173        Self {
1174            output: smallvec![output],
1175            output_mapping,
1176            dispatcher_id,
1177        }
1178    }
1179}
1180
1181impl Dispatcher for SimpleDispatcher {
1182    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1183        self.output.extend(outputs);
1184        assert!(self.output.len() <= 2);
1185    }
1186
1187    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1188        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1189        for output in &mut self.output {
1190            output
1191                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1192                .await?;
1193        }
1194        Ok(())
1195    }
1196
1197    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1198        let output = self
1199            .output
1200            .iter_mut()
1201            .exactly_one()
1202            .expect("expect exactly one output");
1203
1204        let chunk = self.output_mapping.apply(chunk);
1205        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1206    }
1207
1208    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1209        let output = self
1210            .output
1211            .iter_mut()
1212            .exactly_one()
1213            .expect("expect exactly one output");
1214
1215        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1216            output
1217                .send(DispatcherMessageBatch::Watermark(watermark))
1218                .await?;
1219        }
1220        Ok(())
1221    }
1222
1223    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1224        self.output
1225            .retain(|output| !actor_ids.contains(&output.actor_id()));
1226    }
1227
1228    fn dispatcher_id(&self) -> DispatcherId {
1229        self.dispatcher_id
1230    }
1231
1232    fn is_empty(&self) -> bool {
1233        self.output.is_empty()
1234    }
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239    use std::hash::{BuildHasher, Hasher};
1240
1241    use futures::pin_mut;
1242    use multimap::MultiMap;
1243    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1244    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1245    use risingwave_common::catalog::{Field, Schema};
1246    use risingwave_common::types::DataType;
1247    use risingwave_common::util::epoch::test_epoch;
1248    use risingwave_common::util::hash_util::Crc32FastBuilder;
1249    use risingwave_expr::expr::{InputRefExpression, NonStrictExpression};
1250    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1251    use tokio::sync::mpsc::unbounded_channel;
1252
1253    use super::*;
1254    use crate::executor::exchange::output::Output;
1255    use crate::executor::exchange::permit::channel_for_test;
1256    use crate::executor::project::ProjectExecutor;
1257    use crate::executor::receiver::ReceiverExecutor;
1258    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
1259    use crate::executor::{ActorContext, BarrierInner as Barrier, MessageInner as Message};
1260    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1261
1262    #[tokio::test]
1263    async fn test_hash_dispatcher_complex() {
1264        // This test only works when vnode count is 256.
1265        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1266
1267        let num_outputs = 2; // actor id ranges from 1 to 2
1268        let key_indices = &[0, 2];
1269        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1270            (0..num_outputs).map(|_| channel_for_test()).collect();
1271        let outputs = output_tx_vecs
1272            .into_iter()
1273            .enumerate()
1274            .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1275            .collect::<Vec<_>>();
1276        let mut hash_mapping = (1..num_outputs + 1)
1277            .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1278            .collect_vec();
1279        hash_mapping.resize(
1280            VirtualNode::COUNT_FOR_TEST,
1281            ActorId::new(num_outputs as u32),
1282        );
1283        let mut hash_dispatcher = HashDataDispatcher::new(
1284            outputs,
1285            key_indices.to_vec(),
1286            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1287            hash_mapping,
1288            0.into(),
1289        );
1290
1291        let chunk = StreamChunk::from_pretty(
1292            "  I I I
1293            +  4 6 8
1294            +  5 7 9
1295            +  0 0 0
1296            -  1 1 1 D
1297            U- 2 0 2
1298            U+ 2 0 2
1299            U- 3 3 2
1300            U+ 3 3 4",
1301        );
1302        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1303
1304        assert_eq!(
1305            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1306            StreamChunk::from_pretty(
1307                "  I I I
1308                +  4 6 8
1309                +  5 7 9
1310                +  0 0 0
1311                -  1 1 1 D
1312                U- 2 0 2
1313                U+ 2 0 2
1314                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1315                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1316            )
1317        );
1318        assert_eq!(
1319            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1320            StreamChunk::from_pretty(
1321                "  I I I
1322                +  4 6 8 D
1323                +  5 7 9 D
1324                +  0 0 0 D
1325                -  1 1 1 D  // Should keep original invisible mark
1326                U- 2 0 2 D  // Should keep UpdateDelete
1327                U+ 2 0 2 D  // Should keep UpdateInsert
1328                -  3 3 2    // Should rewrite UpdateDelete to Delete
1329                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1330            )
1331        );
1332    }
1333
1334    #[tokio::test]
1335    async fn test_configuration_change() {
1336        let _schema = Schema { fields: vec![] };
1337        let (tx, rx) = channel_for_test();
1338        let actor_id = 233.into();
1339        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1340
1341        let (untouched, old, new) = (234.into(), 235.into(), 238.into()); // broadcast downstream actors
1342        let (old_simple, new_simple) = (114.into(), 514.into()); // simple downstream actors
1343
1344        // actor_id -> untouched, old, new, old_simple, new_simple
1345
1346        let broadcast_dispatcher_id = 666.into();
1347        let broadcast_dispatcher = PbDispatcher {
1348            r#type: DispatcherType::Broadcast as _,
1349            dispatcher_id: broadcast_dispatcher_id,
1350            downstream_actor_id: vec![untouched, old],
1351            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1352            ..Default::default()
1353        };
1354
1355        let simple_dispatcher_id = 888.into();
1356        let simple_dispatcher = PbDispatcher {
1357            r#type: DispatcherType::Simple as _,
1358            dispatcher_id: simple_dispatcher_id,
1359            downstream_actor_id: vec![old_simple],
1360            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1361            ..Default::default()
1362        };
1363
1364        let dispatcher_updates = maplit::hashmap! {
1365            actor_id => vec![PbDispatcherUpdate {
1366                actor_id,
1367                dispatcher_id: broadcast_dispatcher_id,
1368                added_downstream_actor_id: vec![new],
1369                removed_downstream_actor_id: vec![old],
1370                hash_mapping: Default::default(),
1371            }]
1372        };
1373        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1374            UpdateMutation {
1375                dispatchers: dispatcher_updates,
1376                ..Default::default()
1377            },
1378        ));
1379        barrier_test_env.inject_barrier(&b1, [actor_id]);
1380        barrier_test_env.flush_all_events().await;
1381
1382        let input = Executor::new(
1383            Default::default(),
1384            ReceiverExecutor::for_test(
1385                actor_id,
1386                rx,
1387                barrier_test_env.local_barrier_manager.clone(),
1388            )
1389            .boxed(),
1390        );
1391
1392        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1393        let mut rxs = [untouched, old, new, old_simple, new_simple]
1394            .into_iter()
1395            .map(|id| {
1396                (id, {
1397                    let (tx, rx) = channel_for_test();
1398                    new_output_request_tx
1399                        .send((id, NewOutputRequest::Local(tx)))
1400                        .unwrap();
1401                    rx
1402                })
1403            })
1404            .collect::<HashMap<_, _>>();
1405        let executor = Box::new(
1406            DispatchExecutor::new(
1407                input,
1408                new_output_request_rx,
1409                vec![broadcast_dispatcher, simple_dispatcher],
1410                &ActorContext::for_test(actor_id),
1411            )
1412            .await
1413            .unwrap(),
1414        )
1415        .execute();
1416
1417        pin_mut!(executor);
1418
1419        macro_rules! try_recv {
1420            ($down_id:expr) => {
1421                rxs.get_mut(&$down_id).unwrap().try_recv()
1422            };
1423        }
1424
1425        // 3. Send a chunk.
1426        tx.send(Message::Chunk(StreamChunk::default()).into())
1427            .await
1428            .unwrap();
1429
1430        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1431            .await
1432            .unwrap();
1433        executor.next().await.unwrap().unwrap();
1434
1435        // 5. Check downstream.
1436        try_recv!(untouched).unwrap().as_chunk().unwrap();
1437        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1438
1439        try_recv!(old).unwrap().as_chunk().unwrap();
1440        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1441
1442        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1443
1444        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1445        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1446
1447        // 6. Send another barrier.
1448        let b2 = Barrier::new_test_barrier(test_epoch(2));
1449        barrier_test_env.inject_barrier(&b2, [actor_id]);
1450        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1451            .await
1452            .unwrap();
1453        executor.next().await.unwrap().unwrap();
1454
1455        // 7. Check downstream.
1456        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1457        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1458        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1459
1460        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1461        try_recv!(new_simple).unwrap_err(); // Untouched.
1462
1463        // 8. Send another chunk.
1464        tx.send(Message::Chunk(StreamChunk::default()).into())
1465            .await
1466            .unwrap();
1467
1468        // 9. Send a configuration change barrier for simple dispatcher.
1469        let dispatcher_updates = maplit::hashmap! {
1470            actor_id => vec![PbDispatcherUpdate {
1471                actor_id,
1472                dispatcher_id: simple_dispatcher_id,
1473                added_downstream_actor_id: vec![new_simple],
1474                removed_downstream_actor_id: vec![old_simple],
1475                hash_mapping: Default::default(),
1476            }]
1477        };
1478        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1479            UpdateMutation {
1480                dispatchers: dispatcher_updates,
1481                ..Default::default()
1482            },
1483        ));
1484        barrier_test_env.inject_barrier(&b3, [actor_id]);
1485        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1486            .await
1487            .unwrap();
1488        executor.next().await.unwrap().unwrap();
1489
1490        // 10. Check downstream.
1491        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1492        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1493
1494        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1495
1496        // 11. Send another barrier.
1497        let b4 = Barrier::new_test_barrier(test_epoch(4));
1498        barrier_test_env.inject_barrier(&b4, [actor_id]);
1499        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1500            .await
1501            .unwrap();
1502        executor.next().await.unwrap().unwrap();
1503
1504        // 12. Check downstream.
1505        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1506        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1507    }
1508
1509    #[tokio::test]
1510    async fn test_hash_dispatcher() {
1511        // This test only works when vnode count is 256.
1512        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1513
1514        let num_outputs = 5; // actor id ranges from 1 to 5
1515        let cardinality = 10;
1516        let dimension = 4;
1517        let key_indices = &[0, 2];
1518        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1519            (0..num_outputs).map(|_| channel_for_test()).collect();
1520        let outputs = output_tx_vecs
1521            .into_iter()
1522            .enumerate()
1523            .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1524            .collect::<Vec<_>>();
1525        let mut hash_mapping = (1..num_outputs + 1)
1526            .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1527            .collect_vec();
1528        hash_mapping.resize(
1529            VirtualNode::COUNT_FOR_TEST,
1530            ActorId::new(num_outputs as u32),
1531        );
1532        let mut hash_dispatcher = HashDataDispatcher::new(
1533            outputs,
1534            key_indices.to_vec(),
1535            DispatchOutputMapping::Simple((0..dimension).collect()),
1536            hash_mapping.clone(),
1537            0.into(),
1538        );
1539
1540        let mut ops = Vec::new();
1541        for idx in 0..cardinality {
1542            if idx % 2 == 0 {
1543                ops.push(Op::Insert);
1544            } else {
1545                ops.push(Op::Delete);
1546            }
1547        }
1548
1549        let mut start = 19260817i32..;
1550        let mut builders = (0..dimension)
1551            .map(|_| I32ArrayBuilder::new(cardinality))
1552            .collect_vec();
1553        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1554        let mut output_ops = vec![vec![]; num_outputs];
1555        for op in &ops {
1556            let hash_builder = Crc32FastBuilder;
1557            let mut hasher = hash_builder.build_hasher();
1558            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1559            for key_idx in key_indices {
1560                let val = one_row[*key_idx];
1561                let bytes = val.to_le_bytes();
1562                hasher.update(&bytes);
1563            }
1564            let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1565                .as_raw_id() as usize
1566                - 1;
1567            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1568                builder.append(Some(*val));
1569            }
1570            output_cols[output_idx]
1571                .iter_mut()
1572                .zip_eq_fast(one_row.iter())
1573                .for_each(|(each_column, val)| each_column.push(*val));
1574            output_ops[output_idx].push(op);
1575        }
1576
1577        let columns = builders
1578            .into_iter()
1579            .map(|builder| {
1580                let array = builder.finish();
1581                array.into_ref()
1582            })
1583            .collect();
1584
1585        let chunk = StreamChunk::new(ops, columns);
1586        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1587
1588        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1589            let mut output = vec![];
1590            while let Some(Some(msg)) = rx.recv().now_or_never() {
1591                output.push(msg);
1592            }
1593            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1594            assert!(output.len() <= 1);
1595            if output.is_empty() {
1596                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1597            } else {
1598                let message = output.first().unwrap();
1599                let real_chunk = match message {
1600                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1601                    _ => panic!(),
1602                };
1603                real_chunk
1604                    .columns()
1605                    .iter()
1606                    .zip_eq_fast(output_cols[output_idx].iter())
1607                    .for_each(|(real_col, expect_col)| {
1608                        let real_vals = real_chunk
1609                            .visibility()
1610                            .iter_ones()
1611                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1612                            .collect::<Vec<_>>();
1613                        assert_eq!(real_vals.len(), expect_col.len());
1614                        assert_eq!(real_vals, *expect_col);
1615                    });
1616            }
1617        }
1618    }
1619
1620    #[tokio::test]
1621    async fn test_hash_dispatcher_non_adjacent_update_after_project() {
1622        // This test only works when vnode count is 256.
1623        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1624
1625        let schema = Schema {
1626            fields: vec![
1627                Field::unnamed(DataType::Int64),
1628                Field::unnamed(DataType::Int64),
1629            ],
1630        };
1631        let (mut tx, source) = MockSource::channel();
1632        let source = source.into_executor(schema, vec![0]);
1633
1634        let proj = ProjectExecutor::new(
1635            ActorContext::for_test(123),
1636            source,
1637            vec![
1638                NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 0)),
1639                NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
1640            ],
1641            MultiMap::new(),
1642            vec![],
1643            true,
1644        );
1645        let mut proj = proj.boxed().execute();
1646
1647        tx.push_barrier(test_epoch(1), false);
1648        proj.expect_barrier().await;
1649
1650        tx.push_chunk(StreamChunk::from_pretty(
1651            "  I I
1652            U- 1 1
1653            U+ 1 2
1654            U- 1 2
1655            U+ 1 3",
1656        ));
1657
1658        let projected = proj.expect_chunk().await;
1659        assert_eq!(
1660            projected,
1661            StreamChunk::from_pretty(
1662                "  I I
1663                - 1 1
1664                U+ 1 2 D
1665                U- 1 2 D
1666                + 1 3",
1667            )
1668        );
1669
1670        let (output_tx, _output_rx) = channel_for_test();
1671        let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1672        let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1673        let mut hash_dispatcher = HashDataDispatcher::new(
1674            outputs,
1675            vec![0],
1676            DispatchOutputMapping::Simple(vec![0]),
1677            hash_mapping,
1678            0.into(),
1679        );
1680
1681        hash_dispatcher.dispatch_data(projected).await.unwrap();
1682    }
1683
1684    #[tokio::test]
1685    async fn test_hash_dispatcher_missing_update_delete_after_project() {
1686        let schema = Schema {
1687            fields: vec![
1688                Field::unnamed(DataType::Int64),
1689                Field::unnamed(DataType::Int64),
1690            ],
1691        };
1692        let (mut tx, source) = MockSource::channel();
1693        let source = source.into_executor(schema, vec![0]);
1694
1695        let proj = ProjectExecutor::new(
1696            ActorContext::for_test(123),
1697            source,
1698            vec![NonStrictExpression::for_test(InputRefExpression::new(
1699                DataType::Int64,
1700                0,
1701            ))],
1702            MultiMap::new(),
1703            vec![],
1704            true,
1705        );
1706        let mut proj = proj.boxed().execute();
1707
1708        tx.push_barrier(test_epoch(1), false);
1709        proj.expect_barrier().await;
1710
1711        tx.push_chunk(StreamChunk::from_pretty(
1712            "  I I
1713            + 1 10
1714            U- 1 10
1715            U+ 1 30",
1716        ));
1717
1718        let projected = proj.expect_chunk().await;
1719        assert_eq!(
1720            projected,
1721            StreamChunk::from_pretty(
1722                "  I
1723                + 1 D
1724                U- 1 D
1725                + 1",
1726            )
1727        );
1728
1729        let (output_tx, _output_rx) = channel_for_test();
1730        let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1731        let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1732        let mut hash_dispatcher = HashDataDispatcher::new(
1733            outputs,
1734            vec![0],
1735            DispatchOutputMapping::Simple(vec![0]),
1736            hash_mapping,
1737            0.into(),
1738        );
1739
1740        hash_dispatcher.dispatch_data(projected).await.unwrap();
1741    }
1742}