1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
31use risingwave_pb::stream_plan::{self, PbDispatcher};
32use smallvec::{SmallVec, smallvec};
33use tokio::sync::mpsc::UnboundedReceiver;
34use tokio::time::Instant;
35use tokio_stream::StreamExt;
36use tokio_stream::adapters::Peekable;
37use tracing::{Instrument, event};
38
39use super::exchange::output::Output;
40use super::{
41 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
42 UpdateMutation,
43};
44use crate::executor::prelude::*;
45use crate::executor::{StopMutation, StreamConsumer};
46use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
47
48mod output_mapping;
49pub use output_mapping::DispatchOutputMapping;
50
51pub struct DispatchExecutor {
55 input: Executor,
56 inner: DispatchExecutorInner,
57}
58
59struct DispatcherWithMetrics {
60 dispatcher: DispatcherImpl,
61 pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
62}
63
64impl Debug for DispatcherWithMetrics {
65 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66 self.dispatcher.fmt(f)
67 }
68}
69
70impl Deref for DispatcherWithMetrics {
71 type Target = DispatcherImpl;
72
73 fn deref(&self) -> &Self::Target {
74 &self.dispatcher
75 }
76}
77
78impl DerefMut for DispatcherWithMetrics {
79 fn deref_mut(&mut self) -> &mut Self::Target {
80 &mut self.dispatcher
81 }
82}
83
84struct DispatchExecutorMetrics {
85 actor_id_str: String,
86 fragment_id_str: String,
87 metrics: Arc<StreamingMetrics>,
88 actor_out_record_cnt: LabelGuardedIntCounter,
89}
90
91impl DispatchExecutorMetrics {
92 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
93 DispatcherWithMetrics {
94 actor_output_buffer_blocking_duration_ns: self
95 .metrics
96 .actor_output_buffer_blocking_duration_ns
97 .with_guarded_label_values(&[
98 self.actor_id_str.as_str(),
99 self.fragment_id_str.as_str(),
100 dispatcher.dispatcher_id_str(),
101 ]),
102 dispatcher,
103 }
104 }
105}
106
107struct DispatchExecutorInner {
108 dispatchers: Vec<DispatcherWithMetrics>,
109 actor_id: u32,
110 local_barrier_manager: LocalBarrierManager,
111 metrics: DispatchExecutorMetrics,
112 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
113 pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
114}
115
116impl DispatchExecutorInner {
117 async fn collect_outputs(
118 &mut self,
119 downstream_actors: &[ActorId],
120 ) -> StreamResult<Vec<Output>> {
121 fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
122 let tx = match request {
123 NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
124 };
125 Output::new(downstream_actor, tx)
126 }
127 let mut outputs = Vec::with_capacity(downstream_actors.len());
128 for downstream_actor in downstream_actors {
129 let output =
130 if let Some(request) = self.pending_new_output_requests.remove(downstream_actor) {
131 resolve_output(*downstream_actor, request)
132 } else {
133 loop {
134 let (requested_actor, request) = self
135 .new_output_request_rx
136 .recv()
137 .await
138 .ok_or_else(|| anyhow!("end of new output request"))?;
139 if requested_actor == *downstream_actor {
140 break resolve_output(requested_actor, request);
141 } else {
142 assert!(
143 self.pending_new_output_requests
144 .insert(requested_actor, request)
145 .is_none(),
146 "duplicated inflight new output requests from actor {}",
147 requested_actor
148 );
149 }
150 }
151 };
152 outputs.push(output);
153 }
154 Ok(outputs)
155 }
156
157 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
158 macro_rules! await_with_metrics {
159 ($fut:expr, $metrics:expr) => {{
160 let mut start_time = Instant::now();
161 let interval_duration = Duration::from_secs(15);
162 let mut interval =
163 tokio::time::interval_at(start_time + interval_duration, interval_duration);
164
165 let mut fut = std::pin::pin!($fut);
166
167 loop {
168 tokio::select! {
169 biased;
170 res = &mut fut => {
171 res?;
172 let ns = start_time.elapsed().as_nanos() as u64;
173 $metrics.inc_by(ns);
174 break;
175 }
176 _ = interval.tick() => {
177 start_time = Instant::now();
178 $metrics.inc_by(interval_duration.as_nanos() as u64);
179 }
180 };
181 }
182 StreamResult::Ok(())
183 }};
184 }
185
186 let limit = self
187 .local_barrier_manager
188 .env
189 .config()
190 .developer
191 .exchange_concurrent_dispatchers;
192 match msg {
194 MessageBatch::BarrierBatch(barrier_batch) => {
195 if barrier_batch.is_empty() {
196 return Ok(());
197 }
198 let mutation = barrier_batch[0].mutation.clone();
200 self.pre_mutate_dispatchers(&mutation).await?;
201 futures::stream::iter(self.dispatchers.iter_mut())
202 .map(Ok)
203 .try_for_each_concurrent(limit, |dispatcher| async {
204 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
205 let dispatcher_output = &mut dispatcher.dispatcher;
206 let fut = dispatcher_output.dispatch_barriers(
207 barrier_batch
208 .iter()
209 .cloned()
210 .map(|b| b.into_dispatcher())
211 .collect(),
212 );
213 await_with_metrics!(std::pin::pin!(fut), metrics)
214 })
215 .await?;
216 self.post_mutate_dispatchers(&mutation)?;
217 }
218 MessageBatch::Watermark(watermark) => {
219 futures::stream::iter(self.dispatchers.iter_mut())
220 .map(Ok)
221 .try_for_each_concurrent(limit, |dispatcher| async {
222 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
223 let dispatcher_output = &mut dispatcher.dispatcher;
224 let fut = dispatcher_output.dispatch_watermark(watermark.clone());
225 await_with_metrics!(std::pin::pin!(fut), metrics)
226 })
227 .await?;
228 }
229 MessageBatch::Chunk(chunk) => {
230 futures::stream::iter(self.dispatchers.iter_mut())
231 .map(Ok)
232 .try_for_each_concurrent(limit, |dispatcher| async {
233 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
234 let dispatcher_output = &mut dispatcher.dispatcher;
235 let fut = dispatcher_output.dispatch_data(chunk.clone());
236 await_with_metrics!(std::pin::pin!(fut), metrics)
237 })
238 .await?;
239
240 self.metrics
241 .actor_out_record_cnt
242 .inc_by(chunk.cardinality() as _);
243 }
244 }
245 Ok(())
246 }
247
248 async fn add_dispatchers<'a>(
250 &mut self,
251 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
252 ) -> StreamResult<()> {
253 for dispatcher in new_dispatchers {
254 let outputs = self
255 .collect_outputs(&dispatcher.downstream_actor_id)
256 .await?;
257 let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
258 let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
259 self.dispatchers.push(dispatcher);
260 }
261
262 assert!(
263 self.dispatchers
264 .iter()
265 .map(|d| d.dispatcher_id())
266 .all_unique(),
267 "dispatcher ids must be unique: {:?}",
268 self.dispatchers
269 );
270
271 Ok(())
272 }
273
274 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
275 self.dispatchers
276 .iter_mut()
277 .find(|d| d.dispatcher_id() == dispatcher_id)
278 .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
279 }
280
281 async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
284 let outputs = self
285 .collect_outputs(&update.added_downstream_actor_id)
286 .await?;
287
288 let dispatcher = self.find_dispatcher(update.dispatcher_id);
289 dispatcher.add_outputs(outputs);
290
291 Ok(())
292 }
293
294 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
297 let ids = update.removed_downstream_actor_id.iter().copied().collect();
298
299 let dispatcher = self.find_dispatcher(update.dispatcher_id);
300 dispatcher.remove_outputs(&ids);
301
302 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
309 dispatcher.hash_mapping =
310 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
311 }
312
313 Ok(())
314 }
315
316 async fn pre_mutate_dispatchers(
318 &mut self,
319 mutation: &Option<Arc<Mutation>>,
320 ) -> StreamResult<()> {
321 let Some(mutation) = mutation.as_deref() else {
322 return Ok(());
323 };
324
325 match mutation {
326 Mutation::Add(AddMutation { adds, .. }) => {
327 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
328 self.add_dispatchers(new_dispatchers).await?;
329 }
330 }
331 Mutation::Update(UpdateMutation {
332 dispatchers,
333 actor_new_dispatchers: actor_dispatchers,
334 ..
335 }) => {
336 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
337 self.add_dispatchers(new_dispatchers).await?;
338 }
339
340 if let Some(updates) = dispatchers.get(&self.actor_id) {
341 for update in updates {
342 self.pre_update_dispatcher(update).await?;
343 }
344 }
345 }
346 Mutation::AddAndUpdate(
347 AddMutation { adds, .. },
348 UpdateMutation {
349 dispatchers,
350 actor_new_dispatchers: actor_dispatchers,
351 ..
352 },
353 ) => {
354 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
355 self.add_dispatchers(new_dispatchers).await?;
356 }
357
358 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
359 self.add_dispatchers(new_dispatchers).await?;
360 }
361
362 if let Some(updates) = dispatchers.get(&self.actor_id) {
363 for update in updates {
364 self.pre_update_dispatcher(update).await?;
365 }
366 }
367 }
368 _ => {}
369 }
370
371 Ok(())
372 }
373
374 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
376 let Some(mutation) = mutation.as_deref() else {
377 return Ok(());
378 };
379
380 match mutation {
381 Mutation::Stop(StopMutation { dropped_actors, .. }) => {
382 if !dropped_actors.contains(&self.actor_id) {
384 for dispatcher in &mut self.dispatchers {
385 dispatcher.remove_outputs(dropped_actors);
386 }
387 }
388 }
389 Mutation::Update(UpdateMutation {
390 dispatchers,
391 dropped_actors,
392 ..
393 })
394 | Mutation::AddAndUpdate(
395 _,
396 UpdateMutation {
397 dispatchers,
398 dropped_actors,
399 ..
400 },
401 ) => {
402 if let Some(updates) = dispatchers.get(&self.actor_id) {
403 for update in updates {
404 self.post_update_dispatcher(update)?;
405 }
406 }
407
408 if !dropped_actors.contains(&self.actor_id) {
409 for dispatcher in &mut self.dispatchers {
410 dispatcher.remove_outputs(dropped_actors);
411 }
412 }
413 }
414 _ => {}
415 };
416
417 self.dispatchers.retain(|d| !d.is_empty());
420
421 Ok(())
422 }
423}
424
425impl DispatchExecutor {
426 pub(crate) async fn new(
427 input: Executor,
428 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
429 dispatchers: Vec<stream_plan::Dispatcher>,
430 actor_id: u32,
431 fragment_id: u32,
432 local_barrier_manager: LocalBarrierManager,
433 metrics: Arc<StreamingMetrics>,
434 ) -> StreamResult<Self> {
435 let mut executor = Self::new_inner(
436 input,
437 new_output_request_rx,
438 vec![],
439 actor_id,
440 fragment_id,
441 local_barrier_manager,
442 metrics,
443 );
444 let inner = &mut executor.inner;
445 for dispatcher in dispatchers {
446 let outputs = inner
447 .collect_outputs(&dispatcher.downstream_actor_id)
448 .await?;
449 let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
450 let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
451 inner.dispatchers.push(dispatcher);
452 }
453 Ok(executor)
454 }
455
456 #[cfg(test)]
457 pub(crate) fn for_test(
458 input: Executor,
459 dispatchers: Vec<DispatcherImpl>,
460 actor_id: u32,
461 fragment_id: u32,
462 local_barrier_manager: LocalBarrierManager,
463 metrics: Arc<StreamingMetrics>,
464 ) -> (
465 Self,
466 tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
467 ) {
468 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
469
470 (
471 Self::new_inner(
472 input,
473 rx,
474 dispatchers,
475 actor_id,
476 fragment_id,
477 local_barrier_manager,
478 metrics,
479 ),
480 tx,
481 )
482 }
483
484 fn new_inner(
485 mut input: Executor,
486 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
487 dispatchers: Vec<DispatcherImpl>,
488 actor_id: u32,
489 fragment_id: u32,
490 local_barrier_manager: LocalBarrierManager,
491 metrics: Arc<StreamingMetrics>,
492 ) -> Self {
493 let chunk_size = local_barrier_manager.env.config().developer.chunk_size;
494 if crate::consistency::insane() {
495 let mut info = input.info().clone();
497 info.identity = format!("{} (embedded trouble)", info.identity);
498 let troublemaker = TroublemakerExecutor::new(input, chunk_size);
499 input = (info, troublemaker).into();
500 }
501
502 let actor_id_str = actor_id.to_string();
503 let fragment_id_str = fragment_id.to_string();
504 let actor_out_record_cnt = metrics
505 .actor_out_record_cnt
506 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
507 let metrics = DispatchExecutorMetrics {
508 actor_id_str,
509 fragment_id_str,
510 metrics,
511 actor_out_record_cnt,
512 };
513 let dispatchers = dispatchers
514 .into_iter()
515 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
516 .collect();
517 Self {
518 input,
519 inner: DispatchExecutorInner {
520 dispatchers,
521 actor_id,
522 local_barrier_manager,
523 metrics,
524 new_output_request_rx,
525 pending_new_output_requests: Default::default(),
526 },
527 }
528 }
529}
530
531impl StreamConsumer for DispatchExecutor {
532 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
533
534 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
535 let max_barrier_count_per_batch = self
536 .inner
537 .local_barrier_manager
538 .env
539 .config()
540 .developer
541 .max_barrier_batch_size;
542 #[try_stream]
543 async move {
544 let mut input = self.input.execute().peekable();
545 loop {
546 let Some(message) =
547 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
548 else {
549 break;
551 };
552 match message {
553 chunk @ MessageBatch::Chunk(_) => {
554 self.inner
555 .dispatch(chunk)
556 .instrument(tracing::info_span!("dispatch_chunk"))
557 .instrument_await("dispatch_chunk")
558 .await?;
559 }
560 MessageBatch::BarrierBatch(barrier_batch) => {
561 assert!(!barrier_batch.is_empty());
562 self.inner
563 .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
564 .instrument(tracing::info_span!("dispatch_barrier_batch"))
565 .instrument_await("dispatch_barrier_batch")
566 .await?;
567 self.inner
568 .metrics
569 .metrics
570 .barrier_batch_size
571 .observe(barrier_batch.len() as f64);
572 for barrier in barrier_batch {
573 yield barrier;
574 }
575 }
576 watermark @ MessageBatch::Watermark(_) => {
577 self.inner
578 .dispatch(watermark)
579 .instrument(tracing::info_span!("dispatch_watermark"))
580 .instrument_await("dispatch_watermark")
581 .await?;
582 }
583 }
584 }
585 }
586 }
587}
588
589async fn try_batch_barriers(
595 max_barrier_count_per_batch: u32,
596 input: &mut Peekable<BoxedMessageStream>,
597) -> StreamResult<Option<MessageBatch>> {
598 let Some(msg) = input.next().await else {
599 return Ok(None);
601 };
602 let mut barrier_batch = vec![];
603 let msg: Message = msg?;
604 let max_peek_attempts = match msg {
605 Message::Chunk(c) => {
606 return Ok(Some(MessageBatch::Chunk(c)));
607 }
608 Message::Watermark(w) => {
609 return Ok(Some(MessageBatch::Watermark(w)));
610 }
611 Message::Barrier(b) => {
612 let peek_more_barrier = b.mutation.is_none();
613 barrier_batch.push(b);
614 if peek_more_barrier {
615 max_barrier_count_per_batch.saturating_sub(1)
616 } else {
617 0
618 }
619 }
620 };
621 for _ in 0..max_peek_attempts {
623 let peek = input.peek().now_or_never();
624 let Some(peek) = peek else {
625 break;
626 };
627 let Some(msg) = peek else {
628 break;
630 };
631 let Ok(Message::Barrier(barrier)) = msg else {
632 break;
633 };
634 if barrier.mutation.is_some() {
635 break;
636 }
637 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
638 let Message::Barrier(ref barrier) = msg else {
639 unreachable!("must be a barrier");
640 };
641 barrier_batch.push(barrier.clone());
642 }
643 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
644}
645
646#[derive(Debug)]
647pub enum DispatcherImpl {
648 Hash(HashDataDispatcher),
649 Broadcast(BroadcastDispatcher),
650 Simple(SimpleDispatcher),
651 RoundRobin(RoundRobinDataDispatcher),
652}
653
654impl DispatcherImpl {
655 pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
656 let output_mapping =
657 DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
658
659 use risingwave_pb::stream_plan::DispatcherType::*;
660 let dispatcher_impl = match dispatcher.get_type()? {
661 Hash => {
662 assert!(!outputs.is_empty());
663 let dist_key_indices = dispatcher
664 .dist_key_indices
665 .iter()
666 .map(|i| *i as usize)
667 .collect();
668
669 let hash_mapping =
670 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
671
672 DispatcherImpl::Hash(HashDataDispatcher::new(
673 outputs,
674 dist_key_indices,
675 output_mapping,
676 hash_mapping,
677 dispatcher.dispatcher_id,
678 ))
679 }
680 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
681 outputs,
682 output_mapping,
683 dispatcher.dispatcher_id,
684 )),
685 Simple | NoShuffle => {
686 let [output]: [_; 1] = outputs.try_into().unwrap();
687 DispatcherImpl::Simple(SimpleDispatcher::new(
688 output,
689 output_mapping,
690 dispatcher.dispatcher_id,
691 ))
692 }
693 Unspecified => unreachable!(),
694 };
695
696 Ok(dispatcher_impl)
697 }
698}
699
700macro_rules! impl_dispatcher {
701 ($( { $variant_name:ident } ),*) => {
702 impl DispatcherImpl {
703 pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
704 match self {
705 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
706 }
707 }
708
709 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
710 match self {
711 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
712 }
713 }
714
715 pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
716 match self {
717 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
718 }
719 }
720
721 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
722 match self {
723 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
724 }
725 }
726
727 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
728 match self {
729 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
730 }
731 }
732
733 pub fn dispatcher_id(&self) -> DispatcherId {
734 match self {
735 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
736 }
737 }
738
739 pub fn dispatcher_id_str(&self) -> &str {
740 match self {
741 $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
742 }
743 }
744
745 pub fn is_empty(&self) -> bool {
746 match self {
747 $(Self::$variant_name(inner) => inner.is_empty(), )*
748 }
749 }
750 }
751 }
752}
753
754macro_rules! for_all_dispatcher_variants {
755 ($macro:ident) => {
756 $macro! {
757 { Hash },
758 { Broadcast },
759 { Simple },
760 { RoundRobin }
761 }
762 };
763}
764
765for_all_dispatcher_variants! { impl_dispatcher }
766
767pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
768
769pub trait Dispatcher: Debug + 'static {
770 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
772 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
774 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
776
777 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
779 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
781
782 fn dispatcher_id(&self) -> DispatcherId;
788
789 fn dispatcher_id_str(&self) -> &str;
791
792 fn is_empty(&self) -> bool;
795}
796
797async fn broadcast_concurrent(
802 outputs: impl IntoIterator<Item = &'_ mut Output>,
803 message: DispatcherMessageBatch,
804) -> StreamResult<()> {
805 futures::future::try_join_all(
806 outputs
807 .into_iter()
808 .map(|output| output.send(message.clone())),
809 )
810 .await?;
811 Ok(())
812}
813
814#[derive(Debug)]
815pub struct RoundRobinDataDispatcher {
816 outputs: Vec<Output>,
817 output_mapping: DispatchOutputMapping,
818 cur: usize,
819 dispatcher_id: DispatcherId,
820 dispatcher_id_str: String,
821}
822
823impl RoundRobinDataDispatcher {
824 pub fn new(
825 outputs: Vec<Output>,
826 output_mapping: DispatchOutputMapping,
827 dispatcher_id: DispatcherId,
828 ) -> Self {
829 Self {
830 outputs,
831 output_mapping,
832 cur: 0,
833 dispatcher_id,
834 dispatcher_id_str: dispatcher_id.to_string(),
835 }
836 }
837}
838
839impl Dispatcher for RoundRobinDataDispatcher {
840 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
841 let chunk = self.output_mapping.apply(chunk);
842
843 self.outputs[self.cur]
844 .send(DispatcherMessageBatch::Chunk(chunk))
845 .await?;
846 self.cur += 1;
847 self.cur %= self.outputs.len();
848 Ok(())
849 }
850
851 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
852 broadcast_concurrent(
854 &mut self.outputs,
855 DispatcherMessageBatch::BarrierBatch(barriers),
856 )
857 .await
858 }
859
860 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
861 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
862 broadcast_concurrent(
864 &mut self.outputs,
865 DispatcherMessageBatch::Watermark(watermark),
866 )
867 .await?;
868 }
869 Ok(())
870 }
871
872 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
873 self.outputs.extend(outputs);
874 }
875
876 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
877 self.outputs
878 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
879 .count();
880 self.cur = self.cur.min(self.outputs.len() - 1);
881 }
882
883 fn dispatcher_id(&self) -> DispatcherId {
884 self.dispatcher_id
885 }
886
887 fn dispatcher_id_str(&self) -> &str {
888 &self.dispatcher_id_str
889 }
890
891 fn is_empty(&self) -> bool {
892 self.outputs.is_empty()
893 }
894}
895
896pub struct HashDataDispatcher {
897 outputs: Vec<Output>,
898 keys: Vec<usize>,
899 output_mapping: DispatchOutputMapping,
900 hash_mapping: ExpandedActorMapping,
903 dispatcher_id: DispatcherId,
904 dispatcher_id_str: String,
905}
906
907impl Debug for HashDataDispatcher {
908 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
909 f.debug_struct("HashDataDispatcher")
910 .field("outputs", &self.outputs)
911 .field("keys", &self.keys)
912 .field("dispatcher_id", &self.dispatcher_id)
913 .finish_non_exhaustive()
914 }
915}
916
917impl HashDataDispatcher {
918 pub fn new(
919 outputs: Vec<Output>,
920 keys: Vec<usize>,
921 output_mapping: DispatchOutputMapping,
922 hash_mapping: ExpandedActorMapping,
923 dispatcher_id: DispatcherId,
924 ) -> Self {
925 Self {
926 outputs,
927 keys,
928 output_mapping,
929 hash_mapping,
930 dispatcher_id,
931 dispatcher_id_str: dispatcher_id.to_string(),
932 }
933 }
934}
935
936impl Dispatcher for HashDataDispatcher {
937 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
938 self.outputs.extend(outputs);
939 }
940
941 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
942 broadcast_concurrent(
944 &mut self.outputs,
945 DispatcherMessageBatch::BarrierBatch(barriers),
946 )
947 .await
948 }
949
950 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
951 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
952 broadcast_concurrent(
954 &mut self.outputs,
955 DispatcherMessageBatch::Watermark(watermark),
956 )
957 .await?;
958 }
959 Ok(())
960 }
961
962 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
963 let num_outputs = self.outputs.len();
967
968 let vnode_count = self.hash_mapping.len();
970 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
971
972 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
973
974 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
975 .take(num_outputs)
976 .collect_vec();
977 let mut last_update_delete_row_idx = None;
978 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
979
980 for (row_idx, ((vnode, &op), visible)) in vnodes
981 .iter()
982 .copied()
983 .zip_eq_fast(chunk.ops())
984 .zip_eq_fast(chunk.visibility().iter())
985 .enumerate()
986 {
987 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
989 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
990 }
991
992 if !visible {
993 new_ops.push(op);
994 continue;
995 }
996
997 if op == Op::UpdateDelete {
1003 last_update_delete_row_idx = Some(row_idx);
1004 } else if op == Op::UpdateInsert {
1005 let delete_row_idx = last_update_delete_row_idx
1006 .take()
1007 .expect("missing U- before U+");
1008 assert!(delete_row_idx + 1 == row_idx, "U- and U+ are not adjacent");
1009
1010 let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
1012 != chunk.row_at(row_idx).1.project(&self.keys);
1013
1014 if dist_key_changed {
1015 new_ops.push(Op::Delete);
1016 new_ops.push(Op::Insert);
1017 } else {
1018 new_ops.push(Op::UpdateDelete);
1019 new_ops.push(Op::UpdateInsert);
1020 }
1021 } else {
1022 new_ops.push(op);
1023 }
1024 }
1025 assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1026
1027 let ops = new_ops;
1028 let chunk = self.output_mapping.apply(chunk);
1030
1031 futures::future::try_join_all(
1033 vis_maps
1034 .into_iter()
1035 .zip_eq_fast(self.outputs.iter_mut())
1036 .map(|(vis_map, output)| async {
1037 let vis_map = vis_map.finish();
1038 let new_stream_chunk =
1040 StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1041 if new_stream_chunk.cardinality() > 0 {
1042 event!(
1043 tracing::Level::TRACE,
1044 msg = "chunk",
1045 downstream = output.actor_id(),
1046 "send = \n{:#?}",
1047 new_stream_chunk
1048 );
1049 output
1050 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1051 .await?;
1052 }
1053 StreamResult::Ok(())
1054 }),
1055 )
1056 .await?;
1057
1058 Ok(())
1059 }
1060
1061 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1062 self.outputs
1063 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1064 .count();
1065 }
1066
1067 fn dispatcher_id(&self) -> DispatcherId {
1068 self.dispatcher_id
1069 }
1070
1071 fn dispatcher_id_str(&self) -> &str {
1072 &self.dispatcher_id_str
1073 }
1074
1075 fn is_empty(&self) -> bool {
1076 self.outputs.is_empty()
1077 }
1078}
1079
1080#[derive(Debug)]
1082pub struct BroadcastDispatcher {
1083 outputs: HashMap<ActorId, Output>,
1084 output_mapping: DispatchOutputMapping,
1085 dispatcher_id: DispatcherId,
1086 dispatcher_id_str: String,
1087}
1088
1089impl BroadcastDispatcher {
1090 pub fn new(
1091 outputs: impl IntoIterator<Item = Output>,
1092 output_mapping: DispatchOutputMapping,
1093 dispatcher_id: DispatcherId,
1094 ) -> Self {
1095 Self {
1096 outputs: Self::into_pairs(outputs).collect(),
1097 output_mapping,
1098 dispatcher_id,
1099 dispatcher_id_str: dispatcher_id.to_string(),
1100 }
1101 }
1102
1103 fn into_pairs(
1104 outputs: impl IntoIterator<Item = Output>,
1105 ) -> impl Iterator<Item = (ActorId, Output)> {
1106 outputs
1107 .into_iter()
1108 .map(|output| (output.actor_id(), output))
1109 }
1110}
1111
1112impl Dispatcher for BroadcastDispatcher {
1113 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1114 let chunk = self.output_mapping.apply(chunk);
1115 broadcast_concurrent(
1116 self.outputs.values_mut(),
1117 DispatcherMessageBatch::Chunk(chunk),
1118 )
1119 .await
1120 }
1121
1122 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1123 broadcast_concurrent(
1125 self.outputs.values_mut(),
1126 DispatcherMessageBatch::BarrierBatch(barriers),
1127 )
1128 .await
1129 }
1130
1131 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1132 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1133 broadcast_concurrent(
1135 self.outputs.values_mut(),
1136 DispatcherMessageBatch::Watermark(watermark),
1137 )
1138 .await?;
1139 }
1140 Ok(())
1141 }
1142
1143 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1144 self.outputs.extend(Self::into_pairs(outputs));
1145 }
1146
1147 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1148 self.outputs
1149 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1150 .count();
1151 }
1152
1153 fn dispatcher_id(&self) -> DispatcherId {
1154 self.dispatcher_id
1155 }
1156
1157 fn dispatcher_id_str(&self) -> &str {
1158 &self.dispatcher_id_str
1159 }
1160
1161 fn is_empty(&self) -> bool {
1162 self.outputs.is_empty()
1163 }
1164}
1165
1166#[derive(Debug)]
1168pub struct SimpleDispatcher {
1169 output: SmallVec<[Output; 2]>,
1183 output_mapping: DispatchOutputMapping,
1184 dispatcher_id: DispatcherId,
1185 dispatcher_id_str: String,
1186}
1187
1188impl SimpleDispatcher {
1189 pub fn new(
1190 output: Output,
1191 output_mapping: DispatchOutputMapping,
1192 dispatcher_id: DispatcherId,
1193 ) -> Self {
1194 Self {
1195 output: smallvec![output],
1196 output_mapping,
1197 dispatcher_id,
1198 dispatcher_id_str: dispatcher_id.to_string(),
1199 }
1200 }
1201}
1202
1203impl Dispatcher for SimpleDispatcher {
1204 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1205 self.output.extend(outputs);
1206 assert!(self.output.len() <= 2);
1207 }
1208
1209 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1210 for output in &mut self.output {
1212 output
1213 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1214 .await?;
1215 }
1216 Ok(())
1217 }
1218
1219 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1220 let output = self
1221 .output
1222 .iter_mut()
1223 .exactly_one()
1224 .expect("expect exactly one output");
1225
1226 let chunk = self.output_mapping.apply(chunk);
1227 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1228 }
1229
1230 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1231 let output = self
1232 .output
1233 .iter_mut()
1234 .exactly_one()
1235 .expect("expect exactly one output");
1236
1237 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1238 output
1239 .send(DispatcherMessageBatch::Watermark(watermark))
1240 .await?;
1241 }
1242 Ok(())
1243 }
1244
1245 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1246 self.output
1247 .retain(|output| !actor_ids.contains(&output.actor_id()));
1248 }
1249
1250 fn dispatcher_id(&self) -> DispatcherId {
1251 self.dispatcher_id
1252 }
1253
1254 fn dispatcher_id_str(&self) -> &str {
1255 &self.dispatcher_id_str
1256 }
1257
1258 fn is_empty(&self) -> bool {
1259 self.output.is_empty()
1260 }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use std::hash::{BuildHasher, Hasher};
1266
1267 use futures::pin_mut;
1268 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1269 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1270 use risingwave_common::util::epoch::test_epoch;
1271 use risingwave_common::util::hash_util::Crc32FastBuilder;
1272 use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1273 use tokio::sync::mpsc::unbounded_channel;
1274
1275 use super::*;
1276 use crate::executor::exchange::output::Output;
1277 use crate::executor::exchange::permit::channel_for_test;
1278 use crate::executor::receiver::ReceiverExecutor;
1279 use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1280 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1281
1282 #[tokio::test]
1283 async fn test_hash_dispatcher_complex() {
1284 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1286
1287 let num_outputs = 2; let key_indices = &[0, 2];
1289 let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1290 (0..num_outputs).map(|_| channel_for_test()).collect();
1291 let outputs = output_tx_vecs
1292 .into_iter()
1293 .enumerate()
1294 .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1295 .collect::<Vec<_>>();
1296 let mut hash_mapping = (1..num_outputs + 1)
1297 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1298 .collect_vec();
1299 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1300 let mut hash_dispatcher = HashDataDispatcher::new(
1301 outputs,
1302 key_indices.to_vec(),
1303 DispatchOutputMapping::Simple(vec![0, 1, 2]),
1304 hash_mapping,
1305 0,
1306 );
1307
1308 let chunk = StreamChunk::from_pretty(
1309 " I I I
1310 + 4 6 8
1311 + 5 7 9
1312 + 0 0 0
1313 - 1 1 1 D
1314 U- 2 0 2
1315 U+ 2 0 2
1316 U- 3 3 2
1317 U+ 3 3 4",
1318 );
1319 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1320
1321 assert_eq!(
1322 *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1323 StreamChunk::from_pretty(
1324 " I I I
1325 + 4 6 8
1326 + 5 7 9
1327 + 0 0 0
1328 - 1 1 1 D
1329 U- 2 0 2
1330 U+ 2 0 2
1331 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1332 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1333 )
1334 );
1335 assert_eq!(
1336 *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1337 StreamChunk::from_pretty(
1338 " I I I
1339 + 4 6 8 D
1340 + 5 7 9 D
1341 + 0 0 0 D
1342 - 1 1 1 D // Should keep original invisible mark
1343 U- 2 0 2 D // Should keep UpdateDelete
1344 U+ 2 0 2 D // Should keep UpdateInsert
1345 - 3 3 2 // Should rewrite UpdateDelete to Delete
1346 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1347 )
1348 );
1349 }
1350
1351 #[tokio::test]
1352 async fn test_configuration_change() {
1353 let _schema = Schema { fields: vec![] };
1354 let (tx, rx) = channel_for_test();
1355 let actor_id = 233;
1356 let fragment_id = 666;
1357 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1358 let metrics = Arc::new(StreamingMetrics::unused());
1359
1360 let (untouched, old, new) = (234, 235, 238); let (old_simple, new_simple) = (114, 514); let broadcast_dispatcher_id = 666;
1366 let broadcast_dispatcher = PbDispatcher {
1367 r#type: DispatcherType::Broadcast as _,
1368 dispatcher_id: broadcast_dispatcher_id,
1369 downstream_actor_id: vec![untouched, old],
1370 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1372 };
1373
1374 let simple_dispatcher_id = 888;
1375 let simple_dispatcher = PbDispatcher {
1376 r#type: DispatcherType::Simple as _,
1377 dispatcher_id: simple_dispatcher_id,
1378 downstream_actor_id: vec![old_simple],
1379 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1381 };
1382
1383 let dispatcher_updates = maplit::hashmap! {
1384 actor_id => vec![PbDispatcherUpdate {
1385 actor_id,
1386 dispatcher_id: broadcast_dispatcher_id,
1387 added_downstream_actor_id: vec![new],
1388 removed_downstream_actor_id: vec![old],
1389 hash_mapping: Default::default(),
1390 }]
1391 };
1392 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1393 UpdateMutation {
1394 dispatchers: dispatcher_updates,
1395 ..Default::default()
1396 },
1397 ));
1398 barrier_test_env.inject_barrier(&b1, [actor_id]);
1399 barrier_test_env.flush_all_events().await;
1400
1401 let input = Executor::new(
1402 Default::default(),
1403 ReceiverExecutor::for_test(
1404 actor_id,
1405 rx,
1406 barrier_test_env.local_barrier_manager.clone(),
1407 )
1408 .boxed(),
1409 );
1410
1411 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1412 let mut rxs = [untouched, old, new, old_simple, new_simple]
1413 .into_iter()
1414 .map(|id| {
1415 (id, {
1416 let (tx, rx) = channel_for_test();
1417 new_output_request_tx
1418 .send((id, NewOutputRequest::Local(tx)))
1419 .unwrap();
1420 rx
1421 })
1422 })
1423 .collect::<HashMap<_, _>>();
1424 let executor = Box::new(
1425 DispatchExecutor::new(
1426 input,
1427 new_output_request_rx,
1428 vec![broadcast_dispatcher, simple_dispatcher],
1429 actor_id,
1430 fragment_id,
1431 barrier_test_env.local_barrier_manager.clone(),
1432 metrics,
1433 )
1434 .await
1435 .unwrap(),
1436 )
1437 .execute();
1438
1439 pin_mut!(executor);
1440
1441 macro_rules! try_recv {
1442 ($down_id:expr) => {
1443 rxs.get_mut(&$down_id).unwrap().try_recv()
1444 };
1445 }
1446
1447 tx.send(Message::Chunk(StreamChunk::default()).into())
1449 .await
1450 .unwrap();
1451
1452 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1453 .await
1454 .unwrap();
1455 executor.next().await.unwrap().unwrap();
1456
1457 try_recv!(untouched).unwrap().as_chunk().unwrap();
1459 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1460
1461 try_recv!(old).unwrap().as_chunk().unwrap();
1462 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1467 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1471 barrier_test_env.inject_barrier(&b2, [actor_id]);
1472 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1473 .await
1474 .unwrap();
1475 executor.next().await.unwrap().unwrap();
1476
1477 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1479 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1481
1482 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1487 .await
1488 .unwrap();
1489
1490 let dispatcher_updates = maplit::hashmap! {
1492 actor_id => vec![PbDispatcherUpdate {
1493 actor_id,
1494 dispatcher_id: simple_dispatcher_id,
1495 added_downstream_actor_id: vec![new_simple],
1496 removed_downstream_actor_id: vec![old_simple],
1497 hash_mapping: Default::default(),
1498 }]
1499 };
1500 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1501 UpdateMutation {
1502 dispatchers: dispatcher_updates,
1503 ..Default::default()
1504 },
1505 ));
1506 barrier_test_env.inject_barrier(&b3, [actor_id]);
1507 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1508 .await
1509 .unwrap();
1510 executor.next().await.unwrap().unwrap();
1511
1512 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1514 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1520 barrier_test_env.inject_barrier(&b4, [actor_id]);
1521 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1522 .await
1523 .unwrap();
1524 executor.next().await.unwrap().unwrap();
1525
1526 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1529 }
1530
1531 #[tokio::test]
1532 async fn test_hash_dispatcher() {
1533 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1535
1536 let num_outputs = 5; let cardinality = 10;
1538 let dimension = 4;
1539 let key_indices = &[0, 2];
1540 let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1541 (0..num_outputs).map(|_| channel_for_test()).collect();
1542 let outputs = output_tx_vecs
1543 .into_iter()
1544 .enumerate()
1545 .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1546 .collect::<Vec<_>>();
1547 let mut hash_mapping = (1..num_outputs + 1)
1548 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1549 .collect_vec();
1550 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1551 let mut hash_dispatcher = HashDataDispatcher::new(
1552 outputs,
1553 key_indices.to_vec(),
1554 DispatchOutputMapping::Simple((0..dimension).collect()),
1555 hash_mapping.clone(),
1556 0,
1557 );
1558
1559 let mut ops = Vec::new();
1560 for idx in 0..cardinality {
1561 if idx % 2 == 0 {
1562 ops.push(Op::Insert);
1563 } else {
1564 ops.push(Op::Delete);
1565 }
1566 }
1567
1568 let mut start = 19260817i32..;
1569 let mut builders = (0..dimension)
1570 .map(|_| I32ArrayBuilder::new(cardinality))
1571 .collect_vec();
1572 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1573 let mut output_ops = vec![vec![]; num_outputs];
1574 for op in &ops {
1575 let hash_builder = Crc32FastBuilder;
1576 let mut hasher = hash_builder.build_hasher();
1577 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1578 for key_idx in key_indices {
1579 let val = one_row[*key_idx];
1580 let bytes = val.to_le_bytes();
1581 hasher.update(&bytes);
1582 }
1583 let output_idx =
1584 hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1585 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1586 builder.append(Some(*val));
1587 }
1588 output_cols[output_idx]
1589 .iter_mut()
1590 .zip_eq_fast(one_row.iter())
1591 .for_each(|(each_column, val)| each_column.push(*val));
1592 output_ops[output_idx].push(op);
1593 }
1594
1595 let columns = builders
1596 .into_iter()
1597 .map(|builder| {
1598 let array = builder.finish();
1599 array.into_ref()
1600 })
1601 .collect();
1602
1603 let chunk = StreamChunk::new(ops, columns);
1604 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1605
1606 for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1607 let mut output = vec![];
1608 while let Some(Some(msg)) = rx.recv().now_or_never() {
1609 output.push(msg);
1610 }
1611 assert!(output.len() <= 1);
1613 if output.is_empty() {
1614 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1615 } else {
1616 let message = output.first().unwrap();
1617 let real_chunk = match message {
1618 DispatcherMessageBatch::Chunk(chunk) => chunk,
1619 _ => panic!(),
1620 };
1621 real_chunk
1622 .columns()
1623 .iter()
1624 .zip_eq_fast(output_cols[output_idx].iter())
1625 .for_each(|(real_col, expect_col)| {
1626 let real_vals = real_chunk
1627 .visibility()
1628 .iter_ones()
1629 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1630 .collect::<Vec<_>>();
1631 assert_eq!(real_vals.len(), expect_col.len());
1632 assert_eq!(real_vals, *expect_col);
1633 });
1634 }
1635 }
1636 }
1637}