risingwave_stream/executor/
dispatch.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::iter::repeat_with;
17use std::ops::{Deref, DerefMut};
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::FutureExt;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::BitmapBuilder;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
31use risingwave_pb::stream_plan::{self, PbDispatcher};
32use smallvec::{SmallVec, smallvec};
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::task::consume_budget;
36use tokio::time::Instant;
37use tokio_stream::StreamExt;
38use tokio_stream::adapters::Peekable;
39use tracing::{Instrument, event};
40
41use super::exchange::output::Output;
42use super::{
43    AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
44    UpdateMutation,
45};
46use crate::executor::prelude::*;
47use crate::executor::{StopMutation, StreamConsumer};
48use crate::task::{DispatcherId, NewOutputRequest};
49
50mod dispatch_sync_log_store;
51mod output_mapping;
52
53pub use dispatch_sync_log_store::SyncLogStoreDispatchExecutor;
54pub use output_mapping::DispatchOutputMapping;
55use risingwave_common::id::FragmentId;
56
57use crate::error::StreamError;
58
59/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
60/// data chunks will be dispatched with some specified policy, while control message
61/// such as barriers will be distributed to all receivers.
62pub struct DispatchExecutor {
63    input: Executor,
64    inner: DispatchExecutorInner,
65}
66
67struct DispatcherWithMetrics {
68    dispatcher: DispatcherImpl,
69    pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
70}
71
72impl Debug for DispatcherWithMetrics {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        self.dispatcher.fmt(f)
75    }
76}
77
78impl Deref for DispatcherWithMetrics {
79    type Target = DispatcherImpl;
80
81    fn deref(&self) -> &Self::Target {
82        &self.dispatcher
83    }
84}
85
86impl DerefMut for DispatcherWithMetrics {
87    fn deref_mut(&mut self) -> &mut Self::Target {
88        &mut self.dispatcher
89    }
90}
91
92struct DispatchExecutorMetrics {
93    actor_id_str: String,
94    fragment_id_str: String,
95    metrics: Arc<StreamingMetrics>,
96    actor_out_record_cnt: LabelGuardedIntCounter,
97}
98
99impl DispatchExecutorMetrics {
100    fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
101        DispatcherWithMetrics {
102            actor_output_buffer_blocking_duration_ns: self
103                .metrics
104                .actor_output_buffer_blocking_duration_ns
105                .with_guarded_label_values(&[
106                    self.actor_id_str.as_str(),
107                    self.fragment_id_str.as_str(),
108                    dispatcher.dispatcher_id().to_string().as_str(),
109                ]),
110            dispatcher,
111        }
112    }
113}
114
115pub struct DispatchExecutorInner {
116    dispatchers: Vec<DispatcherWithMetrics>,
117    actor_id: ActorId,
118    actor_config: Arc<StreamingConfig>,
119    metrics: DispatchExecutorMetrics,
120    new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
121    pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
122}
123
124impl DispatchExecutorInner {
125    async fn collect_outputs(
126        &mut self,
127        downstream_actors: &[ActorId],
128    ) -> StreamResult<Vec<Output>> {
129        fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
130            let tx = match request {
131                NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
132            };
133            Output::new(downstream_actor, tx)
134        }
135        let mut outputs = Vec::with_capacity(downstream_actors.len());
136        for &downstream_actor in downstream_actors {
137            let output =
138                if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
139                    resolve_output(downstream_actor, request)
140                } else {
141                    loop {
142                        let (requested_actor, request) = self
143                            .new_output_request_rx
144                            .recv()
145                            .await
146                            .ok_or_else(|| anyhow!("end of new output request"))?;
147                        if requested_actor == downstream_actor {
148                            break resolve_output(requested_actor, request);
149                        } else {
150                            assert!(
151                                self.pending_new_output_requests
152                                    .insert(requested_actor, request)
153                                    .is_none(),
154                                "duplicated inflight new output requests from actor {}",
155                                requested_actor
156                            );
157                        }
158                    }
159                };
160            outputs.push(output);
161        }
162        Ok(outputs)
163    }
164
165    async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
166        if self.dispatchers.is_empty() {
167            // Our dispatcher internal implementation calls tokio method, which already involves cooperative scheduling
168            // with tokio runtime.
169            //
170            // When there is no dispatcher, we should do the cooperative scheduling together.
171            consume_budget().await;
172        }
173        async fn await_with_metrics(
174            future: impl Future<Output = ()>,
175            metrics: &LabelGuardedIntCounter,
176        ) {
177            let mut start_time = Instant::now();
178            let interval_duration = Duration::from_secs(15);
179            let mut interval =
180                tokio::time::interval_at(start_time + interval_duration, interval_duration);
181
182            let mut fut = pin!(future);
183
184            loop {
185                tokio::select! {
186                    biased;
187                    _res = &mut fut => {
188                        let ns = start_time.elapsed().as_nanos() as u64;
189                        metrics.inc_by(ns);
190                        break;
191                    }
192                    _ = interval.tick() => {
193                        start_time = Instant::now();
194                        metrics.inc_by(interval_duration.as_nanos() as u64);
195                    }
196                };
197            }
198        }
199
200        use futures::StreamExt;
201
202        let limit = self.actor_config.developer.exchange_concurrent_dispatchers;
203        // Only barrier can be batched for now.
204        match msg {
205            MessageBatch::BarrierBatch(barrier_batch) => {
206                if barrier_batch.is_empty() {
207                    return Ok(());
208                }
209                // Only the first barrier in a batch can be mutation.
210                let mutation = barrier_batch[0].mutation.clone();
211                self.pre_mutate_dispatchers(&mutation).await?;
212                futures::stream::iter(self.dispatchers.iter_mut())
213                    .for_each_concurrent(limit, |dispatcher| async {
214                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
215                        let dispatcher_output = &mut dispatcher.dispatcher;
216                        let fut = dispatcher_output.dispatch_barriers(
217                            barrier_batch
218                                .iter()
219                                .cloned()
220                                .map(|b| b.into_dispatcher())
221                                .collect(),
222                        );
223                        await_with_metrics(fut, metrics).await;
224                    })
225                    .await;
226                self.post_mutate_dispatchers(&mutation)?;
227            }
228            MessageBatch::Watermark(watermark) => {
229                futures::stream::iter(self.dispatchers.iter_mut())
230                    .for_each_concurrent(limit, |dispatcher| async {
231                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
232                        let dispatcher_output = &mut dispatcher.dispatcher;
233                        let fut = dispatcher_output.dispatch_watermark(watermark.clone());
234                        await_with_metrics(fut, metrics).await;
235                    })
236                    .await;
237            }
238            MessageBatch::Chunk(chunk) => {
239                futures::stream::iter(self.dispatchers.iter_mut())
240                    .for_each_concurrent(limit, |dispatcher| async {
241                        let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
242                        let dispatcher_output = &mut dispatcher.dispatcher;
243                        let fut = dispatcher_output.dispatch_data(chunk.clone());
244                        await_with_metrics(fut, metrics).await;
245                    })
246                    .await;
247
248                self.metrics
249                    .actor_out_record_cnt
250                    .inc_by(chunk.cardinality() as _);
251            }
252        }
253        self.dispatchers.retain(|dispatcher| match &**dispatcher {
254            DispatcherImpl::Failed(dispatcher_id, e) => {
255                warn!(%dispatcher_id, err = %e.as_report(), actor_id = %self.actor_id, "dispatch fail");
256                false
257            }
258            _ => true,
259        });
260        Ok(())
261    }
262
263    /// Add new dispatchers to the executor. Will check whether their ids are unique.
264    async fn add_dispatchers<'a>(
265        &mut self,
266        new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
267    ) -> StreamResult<()> {
268        for dispatcher in new_dispatchers {
269            let outputs = self
270                .collect_outputs(&dispatcher.downstream_actor_id)
271                .await?;
272            let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
273            let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
274            self.dispatchers.push(dispatcher);
275        }
276
277        assert!(
278            self.dispatchers
279                .iter()
280                .map(|d| d.dispatcher_id())
281                .all_unique(),
282            "dispatcher ids must be unique: {:?}",
283            self.dispatchers
284        );
285
286        Ok(())
287    }
288
289    fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> Option<&mut DispatcherImpl> {
290        self.dispatchers
291            .iter_mut()
292            .find(|d| d.dispatcher_id() == dispatcher_id)
293            .map(DerefMut::deref_mut)
294    }
295
296    /// Update the dispatcher BEFORE we actually dispatch this barrier. We'll only add the new
297    /// outputs.
298    async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
299        let outputs = self
300            .collect_outputs(&update.added_downstream_actor_id)
301            .await?;
302
303        let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
304            warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
305            return Ok(());
306        };
307        dispatcher.add_outputs(outputs);
308
309        Ok(())
310    }
311
312    /// Update the dispatcher AFTER we dispatch this barrier. We'll remove some outputs and finally
313    /// update the hash mapping.
314    fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
315        let ids = update.removed_downstream_actor_id.iter().copied().collect();
316
317        let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
318            warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
319            return Ok(());
320        };
321        dispatcher.remove_outputs(&ids);
322
323        // The hash mapping is only used by the hash dispatcher.
324        //
325        // We specify a single upstream hash mapping for scaling the downstream fragment. However,
326        // it's possible that there're multiple upstreams with different exchange types, for
327        // example, the `Broadcast` inner side of the dynamic filter. There're too many combinations
328        // to handle here, so we just ignore the `hash_mapping` field for any other exchange types.
329        if let DispatcherImpl::Hash(dispatcher) = dispatcher {
330            dispatcher.hash_mapping =
331                ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
332        }
333
334        Ok(())
335    }
336
337    /// For `Add` and `Update`, update the dispatchers before we dispatch the barrier.
338    async fn pre_mutate_dispatchers(
339        &mut self,
340        mutation: &Option<Arc<Mutation>>,
341    ) -> StreamResult<()> {
342        let Some(mutation) = mutation.as_deref() else {
343            return Ok(());
344        };
345
346        match mutation {
347            Mutation::Add(AddMutation { adds, .. }) => {
348                if let Some(new_dispatchers) = adds.get(&self.actor_id) {
349                    self.add_dispatchers(new_dispatchers).await?;
350                }
351            }
352            Mutation::Update(UpdateMutation {
353                dispatchers,
354                actor_new_dispatchers: actor_dispatchers,
355                ..
356            }) => {
357                if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
358                    self.add_dispatchers(new_dispatchers).await?;
359                }
360
361                if let Some(updates) = dispatchers.get(&self.actor_id) {
362                    for update in updates {
363                        self.pre_update_dispatcher(update).await?;
364                    }
365                }
366            }
367            _ => {}
368        }
369
370        Ok(())
371    }
372
373    /// For `Stop` and `Update`, update the dispatchers after we dispatch the barrier.
374    fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
375        let Some(mutation) = mutation.as_deref() else {
376            return Ok(());
377        };
378
379        match mutation {
380            Mutation::Stop(StopMutation { dropped_actors, .. }) => {
381                // Remove outputs only if this actor itself is not to be stopped.
382                if !dropped_actors.contains(&self.actor_id) {
383                    for dispatcher in &mut self.dispatchers {
384                        dispatcher.remove_outputs(dropped_actors);
385                    }
386                }
387            }
388            Mutation::Update(UpdateMutation {
389                dispatchers,
390                dropped_actors,
391                ..
392            }) => {
393                if let Some(updates) = dispatchers.get(&self.actor_id) {
394                    for update in updates {
395                        self.post_update_dispatcher(update)?;
396                    }
397                }
398
399                if !dropped_actors.contains(&self.actor_id) {
400                    for dispatcher in &mut self.dispatchers {
401                        dispatcher.remove_outputs(dropped_actors);
402                    }
403                }
404            }
405            _ => {}
406        };
407
408        // After stopping the downstream mview, the outputs of some dispatcher might be empty and we
409        // should clean up them.
410        self.dispatchers.retain(|d| !d.is_empty());
411
412        Ok(())
413    }
414}
415
416impl DispatchExecutor {
417    pub(crate) async fn new(
418        input: Executor,
419        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
420        dispatchers: Vec<stream_plan::Dispatcher>,
421        actor_context: &ActorContextRef,
422    ) -> StreamResult<Self> {
423        let mut executor = Self::new_inner(
424            input,
425            new_output_request_rx,
426            vec![],
427            actor_context.id,
428            actor_context.fragment_id,
429            actor_context.config.clone(),
430            actor_context.streaming_metrics.clone(),
431        );
432        let inner = &mut executor.inner;
433        for dispatcher in dispatchers {
434            let outputs = inner
435                .collect_outputs(&dispatcher.downstream_actor_id)
436                .await?;
437            let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
438            let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
439            inner.dispatchers.push(dispatcher);
440        }
441        Ok(executor)
442    }
443
444    #[cfg(test)]
445    pub(crate) fn for_test(
446        input: Executor,
447        dispatchers: Vec<DispatcherImpl>,
448        actor_id: ActorId,
449        fragment_id: FragmentId,
450        actor_config: Arc<StreamingConfig>,
451        metrics: Arc<StreamingMetrics>,
452    ) -> (
453        Self,
454        tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
455    ) {
456        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
457
458        (
459            Self::new_inner(
460                input,
461                rx,
462                dispatchers,
463                actor_id,
464                fragment_id,
465                actor_config,
466                metrics,
467            ),
468            tx,
469        )
470    }
471
472    fn new_inner(
473        mut input: Executor,
474        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
475        dispatchers: Vec<DispatcherImpl>,
476        actor_id: ActorId,
477        fragment_id: FragmentId,
478        actor_config: Arc<StreamingConfig>,
479        metrics: Arc<StreamingMetrics>,
480    ) -> Self {
481        if crate::consistency::insane() {
482            // make some trouble before dispatching to avoid generating invalid dist key.
483            let mut info = input.info().clone();
484            info.identity = format!("{} (embedded trouble)", info.identity);
485            let troublemaker = TroublemakerExecutor::new(input, actor_config.developer.chunk_size);
486            input = (info, troublemaker).into();
487        }
488
489        let actor_id_str = actor_id.to_string();
490        let fragment_id_str = fragment_id.to_string();
491        let actor_out_record_cnt = metrics
492            .actor_out_record_cnt
493            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
494        let metrics = DispatchExecutorMetrics {
495            actor_id_str,
496            fragment_id_str,
497            metrics,
498            actor_out_record_cnt,
499        };
500        let dispatchers = dispatchers
501            .into_iter()
502            .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
503            .collect();
504
505        Self {
506            input,
507            inner: DispatchExecutorInner {
508                dispatchers,
509                actor_id,
510                actor_config,
511                metrics,
512                new_output_request_rx,
513                pending_new_output_requests: Default::default(),
514            },
515        }
516    }
517}
518
519/// Dispatches a message batch downstream and returns the barriers that should be yielded by the
520/// barrier stream.
521async fn dispatch_message_batch(
522    inner: &mut DispatchExecutorInner,
523    batch: MessageBatch,
524) -> StreamResult<Option<Vec<Barrier>>> {
525    match batch {
526        MessageBatch::Chunk(chunk) => {
527            inner
528                .dispatch(MessageBatch::Chunk(chunk))
529                .instrument(tracing::info_span!("dispatch_chunk"))
530                .instrument_await("dispatch_chunk")
531                .await?;
532            Ok(None)
533        }
534        MessageBatch::BarrierBatch(barrier_batch) => {
535            assert!(!barrier_batch.is_empty());
536            let yielded_barriers = barrier_batch.clone();
537            inner
538                .dispatch(MessageBatch::BarrierBatch(barrier_batch))
539                .instrument(tracing::info_span!("dispatch_barrier_batch"))
540                .instrument_await("dispatch_barrier_batch")
541                .await?;
542            inner
543                .metrics
544                .metrics
545                .barrier_batch_size
546                .observe(yielded_barriers.len() as f64);
547            Ok(Some(yielded_barriers))
548        }
549        MessageBatch::Watermark(watermark) => {
550            inner
551                .dispatch(MessageBatch::Watermark(watermark))
552                .instrument(tracing::info_span!("dispatch_watermark"))
553                .instrument_await("dispatch_watermark")
554                .await?;
555            Ok(None)
556        }
557    }
558}
559
560impl StreamConsumer for DispatchExecutor {
561    type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
562
563    fn execute(mut self: Box<Self>) -> Self::BarrierStream {
564        let max_barrier_count_per_batch = self.inner.actor_config.developer.max_barrier_batch_size;
565        #[try_stream]
566        async move {
567            let mut input = self.input.execute().peekable();
568            loop {
569                let Some(message) =
570                    try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
571                else {
572                    // end_of_stream
573                    break;
574                };
575                if let Some(barrier_batch) =
576                    dispatch_message_batch(&mut self.inner, message).await?
577                {
578                    for barrier in barrier_batch {
579                        yield barrier;
580                    }
581                }
582            }
583        }
584    }
585}
586
587/// Tries to batch up to `max_barrier_count_per_batch` consecutive barriers within a single message batch.
588///
589/// Returns the message batch.
590///
591/// Returns None if end of stream.
592async fn try_batch_barriers(
593    max_barrier_count_per_batch: u32,
594    input: &mut Peekable<BoxedMessageStream>,
595) -> StreamResult<Option<MessageBatch>> {
596    let Some(msg) = input.next().await else {
597        // end_of_stream
598        return Ok(None);
599    };
600    let mut barrier_batch = vec![];
601    let msg: Message = msg?;
602    let max_peek_attempts = match msg {
603        Message::Chunk(c) => {
604            return Ok(Some(MessageBatch::Chunk(c)));
605        }
606        Message::Watermark(w) => {
607            return Ok(Some(MessageBatch::Watermark(w)));
608        }
609        Message::Barrier(b) => {
610            let peek_more_barrier = b.mutation.is_none();
611            barrier_batch.push(b);
612            if peek_more_barrier {
613                max_barrier_count_per_batch.saturating_sub(1)
614            } else {
615                0
616            }
617        }
618    };
619    // Try to peek more consecutive non-mutation barriers.
620    for _ in 0..max_peek_attempts {
621        let peek = input.peek().now_or_never();
622        let Some(peek) = peek else {
623            break;
624        };
625        let Some(msg) = peek else {
626            // end_of_stream
627            break;
628        };
629        let Ok(Message::Barrier(barrier)) = msg else {
630            break;
631        };
632        if barrier.mutation.is_some() {
633            break;
634        }
635        let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
636        let Message::Barrier(ref barrier) = msg else {
637            unreachable!("must be a barrier");
638        };
639        barrier_batch.push(barrier.clone());
640    }
641    Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
642}
643
644#[derive(Debug)]
645pub(crate) enum DispatcherImpl {
646    Hash(HashDataDispatcher),
647    Broadcast(BroadcastDispatcher),
648    Simple(SimpleDispatcher),
649    #[cfg_attr(not(test), expect(dead_code))]
650    RoundRobin(RoundRobinDataDispatcher),
651    /// The dispatcher has failed to dispatch to downstream and gets pending
652    /// Should be clean up later.
653    Failed(DispatcherId, StreamError),
654}
655
656impl DispatcherImpl {
657    pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
658        let output_mapping =
659            DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
660
661        use risingwave_pb::stream_plan::DispatcherType::*;
662        let dispatcher_impl = match dispatcher.get_type()? {
663            Hash => {
664                assert!(!outputs.is_empty());
665                let dist_key_indices = dispatcher
666                    .dist_key_indices
667                    .iter()
668                    .map(|i| *i as usize)
669                    .collect();
670
671                let hash_mapping =
672                    ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
673
674                DispatcherImpl::Hash(HashDataDispatcher::new(
675                    outputs,
676                    dist_key_indices,
677                    output_mapping,
678                    hash_mapping,
679                    dispatcher.dispatcher_id,
680                ))
681            }
682            Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
683                outputs,
684                output_mapping,
685                dispatcher.dispatcher_id,
686            )),
687            Simple | NoShuffle => {
688                let [output]: [_; 1] = outputs.try_into().unwrap();
689                DispatcherImpl::Simple(SimpleDispatcher::new(
690                    output,
691                    output_mapping,
692                    dispatcher.dispatcher_id,
693                ))
694            }
695            Unspecified => unreachable!(),
696        };
697
698        Ok(dispatcher_impl)
699    }
700}
701
702macro_rules! impl_dispatcher {
703    ($( { $variant_name:ident } ),*) => {
704        impl DispatcherImpl {
705            pub async fn dispatch_data(&mut self, chunk: StreamChunk) {
706                if let Err(e) = match self {
707                    $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
708                    Self::Failed(..) => unreachable!(),
709                } {
710                    *self = Self::Failed(self.dispatcher_id(), e);
711                }
712            }
713
714            pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) {
715                if let Err(e) = match self {
716                    $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
717                    Self::Failed(..) => unreachable!(),
718                } {
719                    *self = Self::Failed(self.dispatcher_id(), e);
720                }
721            }
722
723            pub async fn dispatch_watermark(&mut self, watermark: Watermark) {
724                if let Err(e) = match self {
725                    $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
726                    Self::Failed(..) => unreachable!(),
727                } {
728                    *self = Self::Failed(self.dispatcher_id(), e);
729                }
730            }
731
732            pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
733                match self {
734                    $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
735                    Self::Failed(..) => {},
736                }
737            }
738
739            pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
740                match self {
741                    $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
742                    Self::Failed(..) => {},
743                }
744            }
745
746            pub fn dispatcher_id(&self) -> DispatcherId {
747                match self {
748                    $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
749                    Self::Failed(dispatcher_id, ..) => *dispatcher_id,
750                }
751            }
752
753            pub fn is_empty(&self) -> bool {
754                match self {
755                    $(Self::$variant_name(inner) => inner.is_empty(), )*
756                    Self::Failed(..) => true,
757                }
758            }
759        }
760    }
761}
762
763macro_rules! for_all_dispatcher_variants {
764    ($macro:ident) => {
765        $macro! {
766            { Hash },
767            { Broadcast },
768            { Simple },
769            { RoundRobin }
770        }
771    };
772}
773
774for_all_dispatcher_variants! { impl_dispatcher }
775
776pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
777
778trait Dispatcher: Debug + 'static {
779    /// Dispatch a data chunk to downstream actors.
780    fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
781    /// Dispatch barriers to downstream actors, generally by broadcasting it.
782    fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
783    /// Dispatch a watermark to downstream actors, generally by broadcasting it.
784    fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
785
786    /// Add new outputs to the dispatcher.
787    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
788    /// Remove outputs to `actor_ids` from the dispatcher.
789    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
790
791    /// The ID of the dispatcher. A [`DispatchExecutor`] may have multiple dispatchers with
792    /// different IDs.
793    ///
794    /// Note that the dispatcher id is always equal to the downstream fragment id.
795    /// See also `proto/stream_plan.proto`.
796    fn dispatcher_id(&self) -> DispatcherId;
797
798    /// Whether the dispatcher has no outputs. If so, it'll be cleaned up from the
799    /// [`DispatchExecutor`].
800    fn is_empty(&self) -> bool;
801}
802
803/// Concurrently broadcast a message to all outputs.
804///
805/// Note that this does not follow `concurrent_dispatchers` in the config and the concurrency is
806/// always unlimited.
807async fn broadcast_concurrent(
808    outputs: impl IntoIterator<Item = &'_ mut Output>,
809    message: DispatcherMessageBatch,
810) -> StreamResult<()> {
811    futures::future::try_join_all(
812        outputs
813            .into_iter()
814            .map(|output| output.send(message.clone())),
815    )
816    .await?;
817    Ok(())
818}
819
820#[derive(Debug)]
821pub struct RoundRobinDataDispatcher {
822    outputs: Vec<Output>,
823    output_mapping: DispatchOutputMapping,
824    cur: usize,
825    dispatcher_id: DispatcherId,
826}
827
828impl RoundRobinDataDispatcher {
829    #[cfg_attr(not(test), expect(dead_code))]
830    pub fn new(
831        outputs: Vec<Output>,
832        output_mapping: DispatchOutputMapping,
833        dispatcher_id: DispatcherId,
834    ) -> Self {
835        Self {
836            outputs,
837            output_mapping,
838            cur: 0,
839            dispatcher_id,
840        }
841    }
842}
843
844impl Dispatcher for RoundRobinDataDispatcher {
845    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
846        let chunk = self.output_mapping.apply(chunk);
847
848        self.outputs[self.cur]
849            .send(DispatcherMessageBatch::Chunk(chunk))
850            .await?;
851        self.cur += 1;
852        self.cur %= self.outputs.len();
853        Ok(())
854    }
855
856    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
857        // always broadcast barrier
858        broadcast_concurrent(
859            &mut self.outputs,
860            DispatcherMessageBatch::BarrierBatch(barriers),
861        )
862        .await
863    }
864
865    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
866        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
867            // always broadcast watermark
868            broadcast_concurrent(
869                &mut self.outputs,
870                DispatcherMessageBatch::Watermark(watermark),
871            )
872            .await?;
873        }
874        Ok(())
875    }
876
877    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
878        self.outputs.extend(outputs);
879    }
880
881    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
882        self.outputs
883            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
884            .count();
885        self.cur = self.cur.min(self.outputs.len() - 1);
886    }
887
888    fn dispatcher_id(&self) -> DispatcherId {
889        self.dispatcher_id
890    }
891
892    fn is_empty(&self) -> bool {
893        self.outputs.is_empty()
894    }
895}
896
897pub struct HashDataDispatcher {
898    outputs: Vec<Output>,
899    keys: Vec<usize>,
900    output_mapping: DispatchOutputMapping,
901    /// Mapping from virtual node to actor id, used for hash data dispatcher to dispatch tasks to
902    /// different downstream actors.
903    hash_mapping: ExpandedActorMapping,
904    dispatcher_id: DispatcherId,
905}
906
907impl Debug for HashDataDispatcher {
908    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
909        f.debug_struct("HashDataDispatcher")
910            .field("outputs", &self.outputs)
911            .field("keys", &self.keys)
912            .field("dispatcher_id", &self.dispatcher_id)
913            .finish_non_exhaustive()
914    }
915}
916
917impl HashDataDispatcher {
918    pub fn new(
919        outputs: Vec<Output>,
920        keys: Vec<usize>,
921        output_mapping: DispatchOutputMapping,
922        hash_mapping: ExpandedActorMapping,
923        dispatcher_id: DispatcherId,
924    ) -> Self {
925        Self {
926            outputs,
927            keys,
928            output_mapping,
929            hash_mapping,
930            dispatcher_id,
931        }
932    }
933}
934
935impl Dispatcher for HashDataDispatcher {
936    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
937        self.outputs.extend(outputs);
938    }
939
940    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
941        // always broadcast barrier
942        broadcast_concurrent(
943            &mut self.outputs,
944            DispatcherMessageBatch::BarrierBatch(barriers),
945        )
946        .await
947    }
948
949    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
950        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
951            // always broadcast watermark
952            broadcast_concurrent(
953                &mut self.outputs,
954                DispatcherMessageBatch::Watermark(watermark),
955            )
956            .await?;
957        }
958        Ok(())
959    }
960
961    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
962        // A chunk can be shuffled into multiple output chunks that to be sent to downstreams.
963        // In these output chunks, the only difference are visibility map, which is calculated
964        // by the hash value of each line in the input chunk.
965        let num_outputs = self.outputs.len();
966
967        // get hash value of every line by its key
968        let vnode_count = self.hash_mapping.len();
969        let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
970
971        tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
972
973        let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
974            .take(num_outputs)
975            .collect_vec();
976        let mut last_update_delete_row_idx = None;
977        let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
978
979        for (row_idx, ((vnode, &op), visible)) in vnodes
980            .iter()
981            .copied()
982            .zip_eq_fast(chunk.ops())
983            .zip_eq_fast(chunk.visibility().iter())
984            .enumerate()
985        {
986            // Build visibility map for every output chunk.
987            for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
988                vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
989            }
990
991            if !visible {
992                new_ops.push(op);
993                continue;
994            }
995
996            // The `Update` message, noted by an `UpdateDelete` and a successive `UpdateInsert`,
997            // need to be rewritten to common `Delete` and `Insert` if the distribution key
998            // columns are changed, since the distribution key will eventually be part of the
999            // stream key of the downstream executor, and there's an invariant that stream key
1000            // must be the same for rows within an `Update` pair.
1001            if op == Op::UpdateDelete {
1002                last_update_delete_row_idx = Some(row_idx);
1003            } else if op == Op::UpdateInsert {
1004                let delete_row_idx = last_update_delete_row_idx
1005                    .take()
1006                    .expect("missing U- before U+");
1007
1008                // Check if any distribution key column value changed
1009                let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
1010                    != chunk.row_at(row_idx).1.project(&self.keys);
1011
1012                if dist_key_changed {
1013                    new_ops.push(Op::Delete);
1014                    new_ops.push(Op::Insert);
1015                } else {
1016                    new_ops.push(Op::UpdateDelete);
1017                    new_ops.push(Op::UpdateInsert);
1018                }
1019            } else {
1020                new_ops.push(op);
1021            }
1022        }
1023        assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1024
1025        // Apply output mapping after calculating the vnode and new visibility maps.
1026        // The output mapping may project columns and eliminate noop updates.
1027        let chunk = self.output_mapping.apply(chunk);
1028        // Get the visibility after noop update elimination to incorporate into the final visibility.
1029        let chunk_vis = chunk.visibility();
1030
1031        // The noop elimination in `output_mapping.apply()` may normalize partially-visible
1032        // update pairs (e.g., U- → Delete, U+ → Insert). We must adopt these normalized ops,
1033        // otherwise the downstream will see an orphan U+ and panic.
1034        // Meanwhile, `new_ops` contains dist-key-changed rewrites (U-/U+ → Delete/Insert)
1035        // that are not reflected in `chunk.ops()`. Merge both: prefer `new_ops` when it already
1036        // did a rewrite, otherwise take the (possibly normalized) op from the chunk.
1037        let chunk_ops = chunk.ops();
1038        let ops: Vec<Op> = new_ops
1039            .iter()
1040            .zip_eq_fast(chunk_ops.iter())
1041            .map(|(&new_op, &elim_op)| {
1042                // If new_ops already rewrote U-/U+ to Delete/Insert (dist_key_changed), keep it.
1043                // Otherwise, use the post-elimination op which may have been normalized.
1044                match (new_op, elim_op) {
1045                    // dist_key_changed rewrite: new_ops turned U- into Delete
1046                    (Op::Delete, Op::UpdateDelete) => Op::Delete,
1047                    // dist_key_changed rewrite: new_ops turned U+ into Insert
1048                    (Op::Insert, Op::UpdateInsert) => Op::Insert,
1049                    // For all other cases, use the post-elimination op (which includes normalize).
1050                    _ => elim_op,
1051                }
1052            })
1053            .collect();
1054
1055        // individually output StreamChunk integrated with vis_map
1056        futures::future::try_join_all(
1057            vis_maps
1058                .into_iter()
1059                .zip_eq_fast(self.outputs.iter_mut())
1060                .map(|(vis_map, output)| async {
1061                    let vis_map = vis_map.finish();
1062                    // Combine hash routing visibility with noop update elimination visibility.
1063                    // A row is visible only if it passes BOTH:
1064                    // 1. Hash routing (the row should go to this output)
1065                    // 2. Noop update elimination (the row was not eliminated as a noop)
1066                    let combined_vis = &vis_map & chunk_vis;
1067                    let new_stream_chunk = StreamChunk::with_visibility(
1068                        ops.clone(),
1069                        chunk.columns().into(),
1070                        combined_vis,
1071                    );
1072                    if new_stream_chunk.cardinality() > 0 {
1073                        event!(
1074                            tracing::Level::TRACE,
1075                            msg = "chunk",
1076                            downstream = %output.actor_id(),
1077                            "send = \n{:#?}",
1078                            new_stream_chunk
1079                        );
1080                        output
1081                            .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1082                            .await?;
1083                    }
1084                    StreamResult::Ok(())
1085                }),
1086        )
1087        .await?;
1088
1089        Ok(())
1090    }
1091
1092    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1093        self.outputs
1094            .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1095            .count();
1096    }
1097
1098    fn dispatcher_id(&self) -> DispatcherId {
1099        self.dispatcher_id
1100    }
1101
1102    fn is_empty(&self) -> bool {
1103        self.outputs.is_empty()
1104    }
1105}
1106
1107/// `BroadcastDispatcher` dispatches message to all outputs.
1108#[derive(Debug)]
1109pub struct BroadcastDispatcher {
1110    outputs: HashMap<ActorId, Output>,
1111    output_mapping: DispatchOutputMapping,
1112    dispatcher_id: DispatcherId,
1113}
1114
1115impl BroadcastDispatcher {
1116    pub fn new(
1117        outputs: impl IntoIterator<Item = Output>,
1118        output_mapping: DispatchOutputMapping,
1119        dispatcher_id: DispatcherId,
1120    ) -> Self {
1121        Self {
1122            outputs: Self::into_pairs(outputs).collect(),
1123            output_mapping,
1124            dispatcher_id,
1125        }
1126    }
1127
1128    fn into_pairs(
1129        outputs: impl IntoIterator<Item = Output>,
1130    ) -> impl Iterator<Item = (ActorId, Output)> {
1131        outputs
1132            .into_iter()
1133            .map(|output| (output.actor_id(), output))
1134    }
1135}
1136
1137impl Dispatcher for BroadcastDispatcher {
1138    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1139        let chunk = self.output_mapping.apply(chunk);
1140        broadcast_concurrent(
1141            self.outputs.values_mut(),
1142            DispatcherMessageBatch::Chunk(chunk),
1143        )
1144        .await
1145    }
1146
1147    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1148        // always broadcast barrier
1149        broadcast_concurrent(
1150            self.outputs.values_mut(),
1151            DispatcherMessageBatch::BarrierBatch(barriers),
1152        )
1153        .await
1154    }
1155
1156    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1157        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1158            // always broadcast watermark
1159            broadcast_concurrent(
1160                self.outputs.values_mut(),
1161                DispatcherMessageBatch::Watermark(watermark),
1162            )
1163            .await?;
1164        }
1165        Ok(())
1166    }
1167
1168    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1169        self.outputs.extend(Self::into_pairs(outputs));
1170    }
1171
1172    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1173        self.outputs
1174            .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1175            .count();
1176    }
1177
1178    fn dispatcher_id(&self) -> DispatcherId {
1179        self.dispatcher_id
1180    }
1181
1182    fn is_empty(&self) -> bool {
1183        self.outputs.is_empty()
1184    }
1185}
1186
1187/// `SimpleDispatcher` dispatches message to a single output.
1188#[derive(Debug)]
1189pub struct SimpleDispatcher {
1190    /// In most cases, there is exactly one output. However, in some cases of configuration change,
1191    /// the field needs to be temporarily set to 0 or 2 outputs.
1192    ///
1193    /// - When dropping a materialized view, the output will be removed and this field becomes
1194    ///   empty. The [`DispatchExecutor`] will immediately clean-up this empty dispatcher before
1195    ///   finishing processing the current mutation.
1196    /// - When migrating a singleton fragment, the new output will be temporarily added in `pre`
1197    ///   stage and this field becomes multiple, which is for broadcasting this configuration
1198    ///   change barrier to both old and new downstream actors. In `post` stage, the old output
1199    ///   will be removed and this field becomes single again.
1200    ///
1201    /// Therefore, when dispatching data, we assert that there's exactly one output by
1202    /// `Self::output`.
1203    output: SmallVec<[Output; 2]>,
1204    output_mapping: DispatchOutputMapping,
1205    dispatcher_id: DispatcherId,
1206}
1207
1208impl SimpleDispatcher {
1209    pub fn new(
1210        output: Output,
1211        output_mapping: DispatchOutputMapping,
1212        dispatcher_id: DispatcherId,
1213    ) -> Self {
1214        Self {
1215            output: smallvec![output],
1216            output_mapping,
1217            dispatcher_id,
1218        }
1219    }
1220}
1221
1222impl Dispatcher for SimpleDispatcher {
1223    fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1224        self.output.extend(outputs);
1225        assert!(self.output.len() <= 2);
1226    }
1227
1228    async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1229        // Only barrier is allowed to be dispatched to multiple outputs during migration.
1230        for output in &mut self.output {
1231            output
1232                .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1233                .await?;
1234        }
1235        Ok(())
1236    }
1237
1238    async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1239        let output = self
1240            .output
1241            .iter_mut()
1242            .exactly_one()
1243            .expect("expect exactly one output");
1244
1245        let chunk = self.output_mapping.apply(chunk);
1246        output.send(DispatcherMessageBatch::Chunk(chunk)).await
1247    }
1248
1249    async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1250        let output = self
1251            .output
1252            .iter_mut()
1253            .exactly_one()
1254            .expect("expect exactly one output");
1255
1256        if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1257            output
1258                .send(DispatcherMessageBatch::Watermark(watermark))
1259                .await?;
1260        }
1261        Ok(())
1262    }
1263
1264    fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1265        self.output
1266            .retain(|output| !actor_ids.contains(&output.actor_id()));
1267    }
1268
1269    fn dispatcher_id(&self) -> DispatcherId {
1270        self.dispatcher_id
1271    }
1272
1273    fn is_empty(&self) -> bool {
1274        self.output.is_empty()
1275    }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use std::hash::{BuildHasher, Hasher};
1281
1282    use futures::pin_mut;
1283    use multimap::MultiMap;
1284    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1285    use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1286    use risingwave_common::catalog::{Field, Schema};
1287    use risingwave_common::types::DataType;
1288    use risingwave_common::util::epoch::test_epoch;
1289    use risingwave_common::util::hash_util::Crc32FastBuilder;
1290    use risingwave_expr::expr::{InputRefExpression, NonStrictExpression};
1291    use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1292    use tokio::sync::mpsc::unbounded_channel;
1293
1294    use super::*;
1295    use crate::executor::exchange::output::Output;
1296    use crate::executor::exchange::permit::channel_for_test;
1297    use crate::executor::project::ProjectExecutor;
1298    use crate::executor::receiver::ReceiverExecutor;
1299    use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
1300    use crate::executor::{ActorContext, BarrierInner as Barrier, MessageInner as Message};
1301    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1302
1303    #[tokio::test]
1304    async fn test_hash_dispatcher_complex() {
1305        // This test only works when vnode count is 256.
1306        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1307
1308        let num_outputs = 2; // actor id ranges from 1 to 2
1309        let key_indices = &[0, 2];
1310        let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1311            (0..num_outputs).map(|_| channel_for_test()).collect();
1312        let outputs = output_tx_vecs
1313            .into_iter()
1314            .enumerate()
1315            .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1316            .collect::<Vec<_>>();
1317        let mut hash_mapping = (1..num_outputs + 1)
1318            .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1319            .collect_vec();
1320        hash_mapping.resize(
1321            VirtualNode::COUNT_FOR_TEST,
1322            ActorId::new(num_outputs as u32),
1323        );
1324        let mut hash_dispatcher = HashDataDispatcher::new(
1325            outputs,
1326            key_indices.to_vec(),
1327            DispatchOutputMapping::Simple(vec![0, 1, 2]),
1328            hash_mapping,
1329            0.into(),
1330        );
1331
1332        let chunk = StreamChunk::from_pretty(
1333            "  I I I
1334            +  4 6 8
1335            +  5 7 9
1336            +  0 0 0
1337            -  1 1 1 D
1338            U- 2 0 2
1339            U+ 2 0 2
1340            U- 3 3 2
1341            U+ 3 3 4",
1342        );
1343        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1344
1345        assert_eq!(
1346            *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1347            StreamChunk::from_pretty(
1348                "  I I I
1349                +  4 6 8
1350                +  5 7 9
1351                +  0 0 0
1352                -  1 1 1 D
1353                U- 2 0 2
1354                U+ 2 0 2
1355                -  3 3 2 D  // Should rewrite UpdateDelete to Delete
1356                +  3 3 4    // Should rewrite UpdateInsert to Insert",
1357            )
1358        );
1359        assert_eq!(
1360            *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1361            StreamChunk::from_pretty(
1362                "  I I I
1363                +  4 6 8 D
1364                +  5 7 9 D
1365                +  0 0 0 D
1366                -  1 1 1 D  // Should keep original invisible mark
1367                U- 2 0 2 D  // Should keep UpdateDelete
1368                U+ 2 0 2 D  // Should keep UpdateInsert
1369                -  3 3 2    // Should rewrite UpdateDelete to Delete
1370                +  3 3 4 D  // Should rewrite UpdateInsert to Insert",
1371            )
1372        );
1373    }
1374
1375    #[tokio::test]
1376    async fn test_configuration_change() {
1377        let _schema = Schema { fields: vec![] };
1378        let (tx, rx) = channel_for_test();
1379        let actor_id = 233.into();
1380        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1381
1382        let (untouched, old, new) = (234.into(), 235.into(), 238.into()); // broadcast downstream actors
1383        let (old_simple, new_simple) = (114.into(), 514.into()); // simple downstream actors
1384
1385        // actor_id -> untouched, old, new, old_simple, new_simple
1386
1387        let broadcast_dispatcher_id = 666.into();
1388        let broadcast_dispatcher = PbDispatcher {
1389            r#type: DispatcherType::Broadcast as _,
1390            dispatcher_id: broadcast_dispatcher_id,
1391            downstream_actor_id: vec![untouched, old],
1392            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1393            ..Default::default()
1394        };
1395
1396        let simple_dispatcher_id = 888.into();
1397        let simple_dispatcher = PbDispatcher {
1398            r#type: DispatcherType::Simple as _,
1399            dispatcher_id: simple_dispatcher_id,
1400            downstream_actor_id: vec![old_simple],
1401            output_mapping: PbDispatchOutputMapping::identical(0).into(), /* dummy length as it's not used */
1402            ..Default::default()
1403        };
1404
1405        let dispatcher_updates = maplit::hashmap! {
1406            actor_id => vec![PbDispatcherUpdate {
1407                actor_id,
1408                dispatcher_id: broadcast_dispatcher_id,
1409                added_downstream_actor_id: vec![new],
1410                removed_downstream_actor_id: vec![old],
1411                hash_mapping: Default::default(),
1412            }]
1413        };
1414        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1415            UpdateMutation {
1416                dispatchers: dispatcher_updates,
1417                ..Default::default()
1418            },
1419        ));
1420        barrier_test_env.inject_barrier(&b1, [actor_id]);
1421        barrier_test_env.flush_all_events().await;
1422
1423        let input = Executor::new(
1424            Default::default(),
1425            ReceiverExecutor::for_test(
1426                actor_id,
1427                rx,
1428                barrier_test_env.local_barrier_manager.clone(),
1429            )
1430            .boxed(),
1431        );
1432
1433        let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1434        let mut rxs = [untouched, old, new, old_simple, new_simple]
1435            .into_iter()
1436            .map(|id| {
1437                (id, {
1438                    let (tx, rx) = channel_for_test();
1439                    new_output_request_tx
1440                        .send((id, NewOutputRequest::Local(tx)))
1441                        .unwrap();
1442                    rx
1443                })
1444            })
1445            .collect::<HashMap<_, _>>();
1446        let executor = Box::new(
1447            DispatchExecutor::new(
1448                input,
1449                new_output_request_rx,
1450                vec![broadcast_dispatcher, simple_dispatcher],
1451                &ActorContext::for_test(actor_id),
1452            )
1453            .await
1454            .unwrap(),
1455        )
1456        .execute();
1457
1458        pin_mut!(executor);
1459
1460        macro_rules! try_recv {
1461            ($down_id:expr) => {
1462                rxs.get_mut(&$down_id).unwrap().try_recv()
1463            };
1464        }
1465
1466        // 3. Send a chunk.
1467        tx.send(Message::Chunk(StreamChunk::default()).into())
1468            .await
1469            .unwrap();
1470
1471        tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1472            .await
1473            .unwrap();
1474        executor.next().await.unwrap().unwrap();
1475
1476        // 5. Check downstream.
1477        try_recv!(untouched).unwrap().as_chunk().unwrap();
1478        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1479
1480        try_recv!(old).unwrap().as_chunk().unwrap();
1481        try_recv!(old).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1482
1483        try_recv!(new).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1484
1485        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1486        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1487
1488        // 6. Send another barrier.
1489        let b2 = Barrier::new_test_barrier(test_epoch(2));
1490        barrier_test_env.inject_barrier(&b2, [actor_id]);
1491        tx.send(Message::Barrier(b2.into_dispatcher()).into())
1492            .await
1493            .unwrap();
1494        executor.next().await.unwrap().unwrap();
1495
1496        // 7. Check downstream.
1497        try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1498        try_recv!(old).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1499        try_recv!(new).unwrap().as_barrier_batch().unwrap();
1500
1501        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // Untouched.
1502        try_recv!(new_simple).unwrap_err(); // Untouched.
1503
1504        // 8. Send another chunk.
1505        tx.send(Message::Chunk(StreamChunk::default()).into())
1506            .await
1507            .unwrap();
1508
1509        // 9. Send a configuration change barrier for simple dispatcher.
1510        let dispatcher_updates = maplit::hashmap! {
1511            actor_id => vec![PbDispatcherUpdate {
1512                actor_id,
1513                dispatcher_id: simple_dispatcher_id,
1514                added_downstream_actor_id: vec![new_simple],
1515                removed_downstream_actor_id: vec![old_simple],
1516                hash_mapping: Default::default(),
1517            }]
1518        };
1519        let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1520            UpdateMutation {
1521                dispatchers: dispatcher_updates,
1522                ..Default::default()
1523            },
1524        ));
1525        barrier_test_env.inject_barrier(&b3, [actor_id]);
1526        tx.send(Message::Barrier(b3.into_dispatcher()).into())
1527            .await
1528            .unwrap();
1529        executor.next().await.unwrap().unwrap();
1530
1531        // 10. Check downstream.
1532        try_recv!(old_simple).unwrap().as_chunk().unwrap();
1533        try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); // It should still receive the barrier even if it's to be removed.
1534
1535        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); // Since it's just added, it won't receive the chunk.
1536
1537        // 11. Send another barrier.
1538        let b4 = Barrier::new_test_barrier(test_epoch(4));
1539        barrier_test_env.inject_barrier(&b4, [actor_id]);
1540        tx.send(Message::Barrier(b4.into_dispatcher()).into())
1541            .await
1542            .unwrap();
1543        executor.next().await.unwrap().unwrap();
1544
1545        // 12. Check downstream.
1546        try_recv!(old_simple).unwrap_err(); // Since it's stopped, we can't receive the new messages.
1547        try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1548    }
1549
1550    #[tokio::test]
1551    async fn test_hash_dispatcher() {
1552        // This test only works when vnode count is 256.
1553        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1554
1555        let num_outputs = 5; // actor id ranges from 1 to 5
1556        let cardinality = 10;
1557        let dimension = 4;
1558        let key_indices = &[0, 2];
1559        let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1560            (0..num_outputs).map(|_| channel_for_test()).collect();
1561        let outputs = output_tx_vecs
1562            .into_iter()
1563            .enumerate()
1564            .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1565            .collect::<Vec<_>>();
1566        let mut hash_mapping = (1..num_outputs + 1)
1567            .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1568            .collect_vec();
1569        hash_mapping.resize(
1570            VirtualNode::COUNT_FOR_TEST,
1571            ActorId::new(num_outputs as u32),
1572        );
1573        let mut hash_dispatcher = HashDataDispatcher::new(
1574            outputs,
1575            key_indices.to_vec(),
1576            DispatchOutputMapping::Simple((0..dimension).collect()),
1577            hash_mapping.clone(),
1578            0.into(),
1579        );
1580
1581        let mut ops = Vec::new();
1582        for idx in 0..cardinality {
1583            if idx % 2 == 0 {
1584                ops.push(Op::Insert);
1585            } else {
1586                ops.push(Op::Delete);
1587            }
1588        }
1589
1590        let mut start = 19260817i32..;
1591        let mut builders = (0..dimension)
1592            .map(|_| I32ArrayBuilder::new(cardinality))
1593            .collect_vec();
1594        let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1595        let mut output_ops = vec![vec![]; num_outputs];
1596        for op in &ops {
1597            let hash_builder = Crc32FastBuilder;
1598            let mut hasher = hash_builder.build_hasher();
1599            let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1600            for key_idx in key_indices {
1601                let val = one_row[*key_idx];
1602                let bytes = val.to_le_bytes();
1603                hasher.update(&bytes);
1604            }
1605            let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1606                .as_raw_id() as usize
1607                - 1;
1608            for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1609                builder.append(Some(*val));
1610            }
1611            output_cols[output_idx]
1612                .iter_mut()
1613                .zip_eq_fast(one_row.iter())
1614                .for_each(|(each_column, val)| each_column.push(*val));
1615            output_ops[output_idx].push(op);
1616        }
1617
1618        let columns = builders
1619            .into_iter()
1620            .map(|builder| {
1621                let array = builder.finish();
1622                array.into_ref()
1623            })
1624            .collect();
1625
1626        let chunk = StreamChunk::new(ops, columns);
1627        hash_dispatcher.dispatch_data(chunk).await.unwrap();
1628
1629        for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1630            let mut output = vec![];
1631            while let Some(Some(msg)) = rx.recv().now_or_never() {
1632                output.push(msg);
1633            }
1634            // It is possible that there is no chunks, as a key doesn't belong to any hash bucket.
1635            assert!(output.len() <= 1);
1636            if output.is_empty() {
1637                assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1638            } else {
1639                let message = output.first().unwrap();
1640                let real_chunk = match message {
1641                    DispatcherMessageBatch::Chunk(chunk) => chunk,
1642                    _ => panic!(),
1643                };
1644                real_chunk
1645                    .columns()
1646                    .iter()
1647                    .zip_eq_fast(output_cols[output_idx].iter())
1648                    .for_each(|(real_col, expect_col)| {
1649                        let real_vals = real_chunk
1650                            .visibility()
1651                            .iter_ones()
1652                            .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1653                            .collect::<Vec<_>>();
1654                        assert_eq!(real_vals.len(), expect_col.len());
1655                        assert_eq!(real_vals, *expect_col);
1656                    });
1657            }
1658        }
1659    }
1660
1661    #[tokio::test]
1662    async fn test_hash_dispatcher_non_adjacent_update_after_project() {
1663        // This test only works when vnode count is 256.
1664        assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1665
1666        let schema = Schema {
1667            fields: vec![
1668                Field::unnamed(DataType::Int64),
1669                Field::unnamed(DataType::Int64),
1670            ],
1671        };
1672        let (mut tx, source) = MockSource::channel();
1673        let source = source.into_executor(schema, vec![0]);
1674
1675        let proj = ProjectExecutor::new(
1676            ActorContext::for_test(123),
1677            source,
1678            vec![
1679                NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 0)),
1680                NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
1681            ],
1682            MultiMap::new(),
1683            vec![],
1684            true,
1685        );
1686        let mut proj = proj.boxed().execute();
1687
1688        tx.push_barrier(test_epoch(1), false);
1689        proj.expect_barrier().await;
1690
1691        tx.push_chunk(StreamChunk::from_pretty(
1692            "  I I
1693            U- 1 1
1694            U+ 1 2
1695            U- 1 2
1696            U+ 1 3",
1697        ));
1698
1699        let projected = proj.expect_chunk().await;
1700        assert_eq!(
1701            projected,
1702            StreamChunk::from_pretty(
1703                "  I I
1704                - 1 1
1705                U+ 1 2 D
1706                U- 1 2 D
1707                + 1 3",
1708            )
1709        );
1710
1711        let (output_tx, _output_rx) = channel_for_test();
1712        let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1713        let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1714        let mut hash_dispatcher = HashDataDispatcher::new(
1715            outputs,
1716            vec![0],
1717            DispatchOutputMapping::Simple(vec![0]),
1718            hash_mapping,
1719            0.into(),
1720        );
1721
1722        hash_dispatcher.dispatch_data(projected).await.unwrap();
1723    }
1724
1725    #[tokio::test]
1726    async fn test_hash_dispatcher_missing_update_delete_after_project() {
1727        let schema = Schema {
1728            fields: vec![
1729                Field::unnamed(DataType::Int64),
1730                Field::unnamed(DataType::Int64),
1731            ],
1732        };
1733        let (mut tx, source) = MockSource::channel();
1734        let source = source.into_executor(schema, vec![0]);
1735
1736        let proj = ProjectExecutor::new(
1737            ActorContext::for_test(123),
1738            source,
1739            vec![NonStrictExpression::for_test(InputRefExpression::new(
1740                DataType::Int64,
1741                0,
1742            ))],
1743            MultiMap::new(),
1744            vec![],
1745            true,
1746        );
1747        let mut proj = proj.boxed().execute();
1748
1749        tx.push_barrier(test_epoch(1), false);
1750        proj.expect_barrier().await;
1751
1752        tx.push_chunk(StreamChunk::from_pretty(
1753            "  I I
1754            + 1 10
1755            U- 1 10
1756            U+ 1 30",
1757        ));
1758
1759        let projected = proj.expect_chunk().await;
1760        assert_eq!(
1761            projected,
1762            StreamChunk::from_pretty(
1763                "  I
1764                + 1 D
1765                U- 1 D
1766                + 1",
1767            )
1768        );
1769
1770        let (output_tx, _output_rx) = channel_for_test();
1771        let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1772        let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1773        let mut hash_dispatcher = HashDataDispatcher::new(
1774            outputs,
1775            vec![0],
1776            DispatchOutputMapping::Simple(vec![0]),
1777            hash_mapping,
1778            0.into(),
1779        );
1780
1781        hash_dispatcher.dispatch_data(projected).await.unwrap();
1782    }
1783
1784    #[tokio::test]
1785    async fn test_hash_dispatcher_internal_projection_normalize_single_u_plus() {
1786        let (output_tx, mut output_rx) = channel_for_test();
1787        let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1788        let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1789        let mut hash_dispatcher = HashDataDispatcher::new(
1790            outputs,
1791            vec![0],
1792            DispatchOutputMapping::Simple(vec![0]),
1793            hash_mapping,
1794            0.into(),
1795        );
1796
1797        let input = StreamChunk::from_pretty(
1798            "  I I
1799            + 1 10
1800            U- 1 10
1801            U+ 1 30",
1802        );
1803
1804        hash_dispatcher.dispatch_data(input).await.unwrap();
1805
1806        let output = output_rx.recv().await.unwrap();
1807        assert_eq!(
1808            *output.as_chunk().unwrap(),
1809            StreamChunk::from_pretty(
1810                "  I
1811                + 1 D
1812                U- 1 D
1813                + 1",
1814            )
1815        );
1816    }
1817}