1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use risingwave_pb::stream_plan::{self, PbDispatcher};
31use smallvec::{SmallVec, smallvec};
32use tokio::sync::mpsc::UnboundedReceiver;
33use tokio::time::Instant;
34use tokio_stream::StreamExt;
35use tokio_stream::adapters::Peekable;
36use tracing::{Instrument, event};
37
38use super::exchange::output::Output;
39use super::{
40 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
41 UpdateMutation,
42};
43use crate::executor::prelude::*;
44use crate::executor::{StopMutation, StreamConsumer};
45use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
46
47mod output_mapping;
48pub use output_mapping::DispatchOutputMapping;
49
50pub struct DispatchExecutor {
54 input: Executor,
55 inner: DispatchExecutorInner,
56}
57
58struct DispatcherWithMetrics {
59 dispatcher: DispatcherImpl,
60 pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
61}
62
63impl Debug for DispatcherWithMetrics {
64 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
65 self.dispatcher.fmt(f)
66 }
67}
68
69impl Deref for DispatcherWithMetrics {
70 type Target = DispatcherImpl;
71
72 fn deref(&self) -> &Self::Target {
73 &self.dispatcher
74 }
75}
76
77impl DerefMut for DispatcherWithMetrics {
78 fn deref_mut(&mut self) -> &mut Self::Target {
79 &mut self.dispatcher
80 }
81}
82
83struct DispatchExecutorMetrics {
84 actor_id_str: String,
85 fragment_id_str: String,
86 metrics: Arc<StreamingMetrics>,
87 actor_out_record_cnt: LabelGuardedIntCounter,
88}
89
90impl DispatchExecutorMetrics {
91 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
92 DispatcherWithMetrics {
93 actor_output_buffer_blocking_duration_ns: self
94 .metrics
95 .actor_output_buffer_blocking_duration_ns
96 .with_guarded_label_values(&[
97 self.actor_id_str.as_str(),
98 self.fragment_id_str.as_str(),
99 dispatcher.dispatcher_id_str(),
100 ]),
101 dispatcher,
102 }
103 }
104}
105
106struct DispatchExecutorInner {
107 dispatchers: Vec<DispatcherWithMetrics>,
108 actor_id: u32,
109 local_barrier_manager: LocalBarrierManager,
110 metrics: DispatchExecutorMetrics,
111 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
112 pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
113}
114
115impl DispatchExecutorInner {
116 async fn collect_outputs(
117 &mut self,
118 downstream_actors: &[ActorId],
119 ) -> StreamResult<Vec<Output>> {
120 fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
121 let tx = match request {
122 NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
123 };
124 Output::new(downstream_actor, tx)
125 }
126 let mut outputs = Vec::with_capacity(downstream_actors.len());
127 for downstream_actor in downstream_actors {
128 let output =
129 if let Some(request) = self.pending_new_output_requests.remove(downstream_actor) {
130 resolve_output(*downstream_actor, request)
131 } else {
132 loop {
133 let (requested_actor, request) = self
134 .new_output_request_rx
135 .recv()
136 .await
137 .ok_or_else(|| anyhow!("end of new output request"))?;
138 if requested_actor == *downstream_actor {
139 break resolve_output(requested_actor, request);
140 } else {
141 assert!(
142 self.pending_new_output_requests
143 .insert(requested_actor, request)
144 .is_none(),
145 "duplicated inflight new output requests from actor {}",
146 requested_actor
147 );
148 }
149 }
150 };
151 outputs.push(output);
152 }
153 Ok(outputs)
154 }
155
156 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
157 macro_rules! await_with_metrics {
158 ($fut:expr, $metrics:expr) => {{
159 let mut start_time = Instant::now();
160 let interval_duration = Duration::from_secs(15);
161 let mut interval =
162 tokio::time::interval_at(start_time + interval_duration, interval_duration);
163
164 let mut fut = std::pin::pin!($fut);
165
166 loop {
167 tokio::select! {
168 biased;
169 res = &mut fut => {
170 res?;
171 let ns = start_time.elapsed().as_nanos() as u64;
172 $metrics.inc_by(ns);
173 break;
174 }
175 _ = interval.tick() => {
176 start_time = Instant::now();
177 $metrics.inc_by(interval_duration.as_nanos() as u64);
178 }
179 };
180 }
181 StreamResult::Ok(())
182 }};
183 }
184
185 let limit = self
186 .local_barrier_manager
187 .env
188 .config()
189 .developer
190 .exchange_concurrent_dispatchers;
191 match msg {
193 MessageBatch::BarrierBatch(barrier_batch) => {
194 if barrier_batch.is_empty() {
195 return Ok(());
196 }
197 let mutation = barrier_batch[0].mutation.clone();
199 self.pre_mutate_dispatchers(&mutation).await?;
200 futures::stream::iter(self.dispatchers.iter_mut())
201 .map(Ok)
202 .try_for_each_concurrent(limit, |dispatcher| async {
203 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
204 let dispatcher_output = &mut dispatcher.dispatcher;
205 let fut = dispatcher_output.dispatch_barriers(
206 barrier_batch
207 .iter()
208 .cloned()
209 .map(|b| b.into_dispatcher())
210 .collect(),
211 );
212 await_with_metrics!(std::pin::pin!(fut), metrics)
213 })
214 .await?;
215 self.post_mutate_dispatchers(&mutation)?;
216 }
217 MessageBatch::Watermark(watermark) => {
218 futures::stream::iter(self.dispatchers.iter_mut())
219 .map(Ok)
220 .try_for_each_concurrent(limit, |dispatcher| async {
221 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
222 let dispatcher_output = &mut dispatcher.dispatcher;
223 let fut = dispatcher_output.dispatch_watermark(watermark.clone());
224 await_with_metrics!(std::pin::pin!(fut), metrics)
225 })
226 .await?;
227 }
228 MessageBatch::Chunk(chunk) => {
229 futures::stream::iter(self.dispatchers.iter_mut())
230 .map(Ok)
231 .try_for_each_concurrent(limit, |dispatcher| async {
232 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
233 let dispatcher_output = &mut dispatcher.dispatcher;
234 let fut = dispatcher_output.dispatch_data(chunk.clone());
235 await_with_metrics!(std::pin::pin!(fut), metrics)
236 })
237 .await?;
238
239 self.metrics
240 .actor_out_record_cnt
241 .inc_by(chunk.cardinality() as _);
242 }
243 }
244 Ok(())
245 }
246
247 async fn add_dispatchers<'a>(
249 &mut self,
250 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
251 ) -> StreamResult<()> {
252 for dispatcher in new_dispatchers {
253 let outputs = self
254 .collect_outputs(&dispatcher.downstream_actor_id)
255 .await?;
256 let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
257 let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
258 self.dispatchers.push(dispatcher);
259 }
260
261 assert!(
262 self.dispatchers
263 .iter()
264 .map(|d| d.dispatcher_id())
265 .all_unique(),
266 "dispatcher ids must be unique: {:?}",
267 self.dispatchers
268 );
269
270 Ok(())
271 }
272
273 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
274 self.dispatchers
275 .iter_mut()
276 .find(|d| d.dispatcher_id() == dispatcher_id)
277 .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
278 }
279
280 async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
283 let outputs = self
284 .collect_outputs(&update.added_downstream_actor_id)
285 .await?;
286
287 let dispatcher = self.find_dispatcher(update.dispatcher_id);
288 dispatcher.add_outputs(outputs);
289
290 Ok(())
291 }
292
293 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
296 let ids = update.removed_downstream_actor_id.iter().copied().collect();
297
298 let dispatcher = self.find_dispatcher(update.dispatcher_id);
299 dispatcher.remove_outputs(&ids);
300
301 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
308 dispatcher.hash_mapping =
309 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
310 }
311
312 Ok(())
313 }
314
315 async fn pre_mutate_dispatchers(
317 &mut self,
318 mutation: &Option<Arc<Mutation>>,
319 ) -> StreamResult<()> {
320 let Some(mutation) = mutation.as_deref() else {
321 return Ok(());
322 };
323
324 match mutation {
325 Mutation::Add(AddMutation { adds, .. }) => {
326 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
327 self.add_dispatchers(new_dispatchers).await?;
328 }
329 }
330 Mutation::Update(UpdateMutation {
331 dispatchers,
332 actor_new_dispatchers: actor_dispatchers,
333 ..
334 }) => {
335 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
336 self.add_dispatchers(new_dispatchers).await?;
337 }
338
339 if let Some(updates) = dispatchers.get(&self.actor_id) {
340 for update in updates {
341 self.pre_update_dispatcher(update).await?;
342 }
343 }
344 }
345 Mutation::AddAndUpdate(
346 AddMutation { adds, .. },
347 UpdateMutation {
348 dispatchers,
349 actor_new_dispatchers: actor_dispatchers,
350 ..
351 },
352 ) => {
353 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
354 self.add_dispatchers(new_dispatchers).await?;
355 }
356
357 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
358 self.add_dispatchers(new_dispatchers).await?;
359 }
360
361 if let Some(updates) = dispatchers.get(&self.actor_id) {
362 for update in updates {
363 self.pre_update_dispatcher(update).await?;
364 }
365 }
366 }
367 _ => {}
368 }
369
370 Ok(())
371 }
372
373 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
375 let Some(mutation) = mutation.as_deref() else {
376 return Ok(());
377 };
378
379 match mutation {
380 Mutation::Stop(StopMutation { dropped_actors, .. }) => {
381 if !dropped_actors.contains(&self.actor_id) {
383 for dispatcher in &mut self.dispatchers {
384 dispatcher.remove_outputs(dropped_actors);
385 }
386 }
387 }
388 Mutation::Update(UpdateMutation {
389 dispatchers,
390 dropped_actors,
391 ..
392 })
393 | Mutation::AddAndUpdate(
394 _,
395 UpdateMutation {
396 dispatchers,
397 dropped_actors,
398 ..
399 },
400 ) => {
401 if let Some(updates) = dispatchers.get(&self.actor_id) {
402 for update in updates {
403 self.post_update_dispatcher(update)?;
404 }
405 }
406
407 if !dropped_actors.contains(&self.actor_id) {
408 for dispatcher in &mut self.dispatchers {
409 dispatcher.remove_outputs(dropped_actors);
410 }
411 }
412 }
413 _ => {}
414 };
415
416 self.dispatchers.retain(|d| !d.is_empty());
419
420 Ok(())
421 }
422}
423
424impl DispatchExecutor {
425 pub(crate) async fn new(
426 input: Executor,
427 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
428 dispatchers: Vec<stream_plan::Dispatcher>,
429 actor_id: u32,
430 fragment_id: u32,
431 local_barrier_manager: LocalBarrierManager,
432 metrics: Arc<StreamingMetrics>,
433 ) -> StreamResult<Self> {
434 let mut executor = Self::new_inner(
435 input,
436 new_output_request_rx,
437 vec![],
438 actor_id,
439 fragment_id,
440 local_barrier_manager,
441 metrics,
442 );
443 let inner = &mut executor.inner;
444 for dispatcher in dispatchers {
445 let outputs = inner
446 .collect_outputs(&dispatcher.downstream_actor_id)
447 .await?;
448 let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
449 let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
450 inner.dispatchers.push(dispatcher);
451 }
452 Ok(executor)
453 }
454
455 #[cfg(test)]
456 pub(crate) fn for_test(
457 input: Executor,
458 dispatchers: Vec<DispatcherImpl>,
459 actor_id: u32,
460 fragment_id: u32,
461 local_barrier_manager: LocalBarrierManager,
462 metrics: Arc<StreamingMetrics>,
463 ) -> (
464 Self,
465 tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
466 ) {
467 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
468
469 (
470 Self::new_inner(
471 input,
472 rx,
473 dispatchers,
474 actor_id,
475 fragment_id,
476 local_barrier_manager,
477 metrics,
478 ),
479 tx,
480 )
481 }
482
483 fn new_inner(
484 mut input: Executor,
485 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
486 dispatchers: Vec<DispatcherImpl>,
487 actor_id: u32,
488 fragment_id: u32,
489 local_barrier_manager: LocalBarrierManager,
490 metrics: Arc<StreamingMetrics>,
491 ) -> Self {
492 let chunk_size = local_barrier_manager.env.config().developer.chunk_size;
493 if crate::consistency::insane() {
494 let mut info = input.info().clone();
496 info.identity = format!("{} (embedded trouble)", info.identity);
497 let troublemaker = TroublemakerExecutor::new(input, chunk_size);
498 input = (info, troublemaker).into();
499 }
500
501 let actor_id_str = actor_id.to_string();
502 let fragment_id_str = fragment_id.to_string();
503 let actor_out_record_cnt = metrics
504 .actor_out_record_cnt
505 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
506 let metrics = DispatchExecutorMetrics {
507 actor_id_str,
508 fragment_id_str,
509 metrics,
510 actor_out_record_cnt,
511 };
512 let dispatchers = dispatchers
513 .into_iter()
514 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
515 .collect();
516 Self {
517 input,
518 inner: DispatchExecutorInner {
519 dispatchers,
520 actor_id,
521 local_barrier_manager,
522 metrics,
523 new_output_request_rx,
524 pending_new_output_requests: Default::default(),
525 },
526 }
527 }
528}
529
530impl StreamConsumer for DispatchExecutor {
531 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
532
533 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
534 let max_barrier_count_per_batch = self
535 .inner
536 .local_barrier_manager
537 .env
538 .config()
539 .developer
540 .max_barrier_batch_size;
541 #[try_stream]
542 async move {
543 let mut input = self.input.execute().peekable();
544 loop {
545 let Some(message) =
546 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
547 else {
548 break;
550 };
551 match message {
552 chunk @ MessageBatch::Chunk(_) => {
553 self.inner
554 .dispatch(chunk)
555 .instrument(tracing::info_span!("dispatch_chunk"))
556 .instrument_await("dispatch_chunk")
557 .await?;
558 }
559 MessageBatch::BarrierBatch(barrier_batch) => {
560 assert!(!barrier_batch.is_empty());
561 self.inner
562 .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
563 .instrument(tracing::info_span!("dispatch_barrier_batch"))
564 .instrument_await("dispatch_barrier_batch")
565 .await?;
566 self.inner
567 .metrics
568 .metrics
569 .barrier_batch_size
570 .observe(barrier_batch.len() as f64);
571 for barrier in barrier_batch {
572 yield barrier;
573 }
574 }
575 watermark @ MessageBatch::Watermark(_) => {
576 self.inner
577 .dispatch(watermark)
578 .instrument(tracing::info_span!("dispatch_watermark"))
579 .instrument_await("dispatch_watermark")
580 .await?;
581 }
582 }
583 }
584 }
585 }
586}
587
588async fn try_batch_barriers(
594 max_barrier_count_per_batch: u32,
595 input: &mut Peekable<BoxedMessageStream>,
596) -> StreamResult<Option<MessageBatch>> {
597 let Some(msg) = input.next().await else {
598 return Ok(None);
600 };
601 let mut barrier_batch = vec![];
602 let msg: Message = msg?;
603 let max_peek_attempts = match msg {
604 Message::Chunk(c) => {
605 return Ok(Some(MessageBatch::Chunk(c)));
606 }
607 Message::Watermark(w) => {
608 return Ok(Some(MessageBatch::Watermark(w)));
609 }
610 Message::Barrier(b) => {
611 let peek_more_barrier = b.mutation.is_none();
612 barrier_batch.push(b);
613 if peek_more_barrier {
614 max_barrier_count_per_batch.saturating_sub(1)
615 } else {
616 0
617 }
618 }
619 };
620 for _ in 0..max_peek_attempts {
622 let peek = input.peek().now_or_never();
623 let Some(peek) = peek else {
624 break;
625 };
626 let Some(msg) = peek else {
627 break;
629 };
630 let Ok(Message::Barrier(barrier)) = msg else {
631 break;
632 };
633 if barrier.mutation.is_some() {
634 break;
635 }
636 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
637 let Message::Barrier(ref barrier) = msg else {
638 unreachable!("must be a barrier");
639 };
640 barrier_batch.push(barrier.clone());
641 }
642 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
643}
644
645#[derive(Debug)]
646pub enum DispatcherImpl {
647 Hash(HashDataDispatcher),
648 Broadcast(BroadcastDispatcher),
649 Simple(SimpleDispatcher),
650 RoundRobin(RoundRobinDataDispatcher),
651}
652
653impl DispatcherImpl {
654 pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
655 let output_mapping =
656 DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
657
658 use risingwave_pb::stream_plan::DispatcherType::*;
659 let dispatcher_impl = match dispatcher.get_type()? {
660 Hash => {
661 assert!(!outputs.is_empty());
662 let dist_key_indices = dispatcher
663 .dist_key_indices
664 .iter()
665 .map(|i| *i as usize)
666 .collect();
667
668 let hash_mapping =
669 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
670
671 DispatcherImpl::Hash(HashDataDispatcher::new(
672 outputs,
673 dist_key_indices,
674 output_mapping,
675 hash_mapping,
676 dispatcher.dispatcher_id,
677 ))
678 }
679 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
680 outputs,
681 output_mapping,
682 dispatcher.dispatcher_id,
683 )),
684 Simple | NoShuffle => {
685 let [output]: [_; 1] = outputs.try_into().unwrap();
686 DispatcherImpl::Simple(SimpleDispatcher::new(
687 output,
688 output_mapping,
689 dispatcher.dispatcher_id,
690 ))
691 }
692 Unspecified => unreachable!(),
693 };
694
695 Ok(dispatcher_impl)
696 }
697}
698
699macro_rules! impl_dispatcher {
700 ($( { $variant_name:ident } ),*) => {
701 impl DispatcherImpl {
702 pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
703 match self {
704 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
705 }
706 }
707
708 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
709 match self {
710 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
711 }
712 }
713
714 pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
715 match self {
716 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
717 }
718 }
719
720 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
721 match self {
722 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
723 }
724 }
725
726 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
727 match self {
728 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
729 }
730 }
731
732 pub fn dispatcher_id(&self) -> DispatcherId {
733 match self {
734 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
735 }
736 }
737
738 pub fn dispatcher_id_str(&self) -> &str {
739 match self {
740 $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
741 }
742 }
743
744 pub fn is_empty(&self) -> bool {
745 match self {
746 $(Self::$variant_name(inner) => inner.is_empty(), )*
747 }
748 }
749 }
750 }
751}
752
753macro_rules! for_all_dispatcher_variants {
754 ($macro:ident) => {
755 $macro! {
756 { Hash },
757 { Broadcast },
758 { Simple },
759 { RoundRobin }
760 }
761 };
762}
763
764for_all_dispatcher_variants! { impl_dispatcher }
765
766pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
767
768pub trait Dispatcher: Debug + 'static {
769 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
771 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
773 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
775
776 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
778 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
780
781 fn dispatcher_id(&self) -> DispatcherId;
787
788 fn dispatcher_id_str(&self) -> &str;
790
791 fn is_empty(&self) -> bool;
794}
795
796async fn broadcast_concurrent(
801 outputs: impl IntoIterator<Item = &'_ mut Output>,
802 message: DispatcherMessageBatch,
803) -> StreamResult<()> {
804 futures::future::try_join_all(
805 outputs
806 .into_iter()
807 .map(|output| output.send(message.clone())),
808 )
809 .await?;
810 Ok(())
811}
812
813#[derive(Debug)]
814pub struct RoundRobinDataDispatcher {
815 outputs: Vec<Output>,
816 output_mapping: DispatchOutputMapping,
817 cur: usize,
818 dispatcher_id: DispatcherId,
819 dispatcher_id_str: String,
820}
821
822impl RoundRobinDataDispatcher {
823 pub fn new(
824 outputs: Vec<Output>,
825 output_mapping: DispatchOutputMapping,
826 dispatcher_id: DispatcherId,
827 ) -> Self {
828 Self {
829 outputs,
830 output_mapping,
831 cur: 0,
832 dispatcher_id,
833 dispatcher_id_str: dispatcher_id.to_string(),
834 }
835 }
836}
837
838impl Dispatcher for RoundRobinDataDispatcher {
839 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
840 let chunk = self.output_mapping.apply(chunk);
841
842 self.outputs[self.cur]
843 .send(DispatcherMessageBatch::Chunk(chunk))
844 .await?;
845 self.cur += 1;
846 self.cur %= self.outputs.len();
847 Ok(())
848 }
849
850 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
851 broadcast_concurrent(
853 &mut self.outputs,
854 DispatcherMessageBatch::BarrierBatch(barriers),
855 )
856 .await
857 }
858
859 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
860 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
861 broadcast_concurrent(
863 &mut self.outputs,
864 DispatcherMessageBatch::Watermark(watermark),
865 )
866 .await?;
867 }
868 Ok(())
869 }
870
871 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
872 self.outputs.extend(outputs);
873 }
874
875 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
876 self.outputs
877 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
878 .count();
879 self.cur = self.cur.min(self.outputs.len() - 1);
880 }
881
882 fn dispatcher_id(&self) -> DispatcherId {
883 self.dispatcher_id
884 }
885
886 fn dispatcher_id_str(&self) -> &str {
887 &self.dispatcher_id_str
888 }
889
890 fn is_empty(&self) -> bool {
891 self.outputs.is_empty()
892 }
893}
894
895pub struct HashDataDispatcher {
896 outputs: Vec<Output>,
897 keys: Vec<usize>,
898 output_mapping: DispatchOutputMapping,
899 hash_mapping: ExpandedActorMapping,
902 dispatcher_id: DispatcherId,
903 dispatcher_id_str: String,
904}
905
906impl Debug for HashDataDispatcher {
907 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
908 f.debug_struct("HashDataDispatcher")
909 .field("outputs", &self.outputs)
910 .field("keys", &self.keys)
911 .field("dispatcher_id", &self.dispatcher_id)
912 .finish_non_exhaustive()
913 }
914}
915
916impl HashDataDispatcher {
917 pub fn new(
918 outputs: Vec<Output>,
919 keys: Vec<usize>,
920 output_mapping: DispatchOutputMapping,
921 hash_mapping: ExpandedActorMapping,
922 dispatcher_id: DispatcherId,
923 ) -> Self {
924 Self {
925 outputs,
926 keys,
927 output_mapping,
928 hash_mapping,
929 dispatcher_id,
930 dispatcher_id_str: dispatcher_id.to_string(),
931 }
932 }
933}
934
935impl Dispatcher for HashDataDispatcher {
936 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
937 self.outputs.extend(outputs);
938 }
939
940 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
941 broadcast_concurrent(
943 &mut self.outputs,
944 DispatcherMessageBatch::BarrierBatch(barriers),
945 )
946 .await
947 }
948
949 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
950 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
951 broadcast_concurrent(
953 &mut self.outputs,
954 DispatcherMessageBatch::Watermark(watermark),
955 )
956 .await?;
957 }
958 Ok(())
959 }
960
961 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
962 let num_outputs = self.outputs.len();
966
967 let vnode_count = self.hash_mapping.len();
969 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
970
971 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
972
973 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
974 .take(num_outputs)
975 .collect_vec();
976 let mut last_vnode_when_update_delete = None;
977 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
978
979 let chunk = self.output_mapping.apply(chunk);
981
982 for ((vnode, &op), visible) in vnodes
983 .iter()
984 .copied()
985 .zip_eq_fast(chunk.ops())
986 .zip_eq_fast(chunk.visibility().iter())
987 {
988 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
990 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
991 }
992
993 if !visible {
994 assert!(
995 last_vnode_when_update_delete.is_none(),
996 "invisible row between U- and U+, op = {op:?}",
997 );
998 new_ops.push(op);
999 continue;
1000 }
1001
1002 if op == Op::UpdateDelete {
1006 last_vnode_when_update_delete = Some(vnode);
1007 } else if op == Op::UpdateInsert {
1008 if vnode
1009 != last_vnode_when_update_delete
1010 .take()
1011 .expect("missing U- before U+")
1012 {
1013 new_ops.push(Op::Delete);
1014 new_ops.push(Op::Insert);
1015 } else {
1016 new_ops.push(Op::UpdateDelete);
1017 new_ops.push(Op::UpdateInsert);
1018 }
1019 } else {
1020 new_ops.push(op);
1021 }
1022 }
1023 assert!(
1024 last_vnode_when_update_delete.is_none(),
1025 "missing U+ after U-"
1026 );
1027
1028 let ops = new_ops;
1029
1030 futures::future::try_join_all(
1032 vis_maps
1033 .into_iter()
1034 .zip_eq_fast(self.outputs.iter_mut())
1035 .map(|(vis_map, output)| async {
1036 let vis_map = vis_map.finish();
1037 let new_stream_chunk =
1039 StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1040 if new_stream_chunk.cardinality() > 0 {
1041 event!(
1042 tracing::Level::TRACE,
1043 msg = "chunk",
1044 downstream = output.actor_id(),
1045 "send = \n{:#?}",
1046 new_stream_chunk
1047 );
1048 output
1049 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1050 .await?;
1051 }
1052 StreamResult::Ok(())
1053 }),
1054 )
1055 .await?;
1056
1057 Ok(())
1058 }
1059
1060 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1061 self.outputs
1062 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1063 .count();
1064 }
1065
1066 fn dispatcher_id(&self) -> DispatcherId {
1067 self.dispatcher_id
1068 }
1069
1070 fn dispatcher_id_str(&self) -> &str {
1071 &self.dispatcher_id_str
1072 }
1073
1074 fn is_empty(&self) -> bool {
1075 self.outputs.is_empty()
1076 }
1077}
1078
1079#[derive(Debug)]
1081pub struct BroadcastDispatcher {
1082 outputs: HashMap<ActorId, Output>,
1083 output_mapping: DispatchOutputMapping,
1084 dispatcher_id: DispatcherId,
1085 dispatcher_id_str: String,
1086}
1087
1088impl BroadcastDispatcher {
1089 pub fn new(
1090 outputs: impl IntoIterator<Item = Output>,
1091 output_mapping: DispatchOutputMapping,
1092 dispatcher_id: DispatcherId,
1093 ) -> Self {
1094 Self {
1095 outputs: Self::into_pairs(outputs).collect(),
1096 output_mapping,
1097 dispatcher_id,
1098 dispatcher_id_str: dispatcher_id.to_string(),
1099 }
1100 }
1101
1102 fn into_pairs(
1103 outputs: impl IntoIterator<Item = Output>,
1104 ) -> impl Iterator<Item = (ActorId, Output)> {
1105 outputs
1106 .into_iter()
1107 .map(|output| (output.actor_id(), output))
1108 }
1109}
1110
1111impl Dispatcher for BroadcastDispatcher {
1112 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1113 let chunk = self.output_mapping.apply(chunk);
1114 broadcast_concurrent(
1115 self.outputs.values_mut(),
1116 DispatcherMessageBatch::Chunk(chunk),
1117 )
1118 .await
1119 }
1120
1121 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1122 broadcast_concurrent(
1124 self.outputs.values_mut(),
1125 DispatcherMessageBatch::BarrierBatch(barriers),
1126 )
1127 .await
1128 }
1129
1130 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1131 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1132 broadcast_concurrent(
1134 self.outputs.values_mut(),
1135 DispatcherMessageBatch::Watermark(watermark),
1136 )
1137 .await?;
1138 }
1139 Ok(())
1140 }
1141
1142 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1143 self.outputs.extend(Self::into_pairs(outputs));
1144 }
1145
1146 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1147 self.outputs
1148 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1149 .count();
1150 }
1151
1152 fn dispatcher_id(&self) -> DispatcherId {
1153 self.dispatcher_id
1154 }
1155
1156 fn dispatcher_id_str(&self) -> &str {
1157 &self.dispatcher_id_str
1158 }
1159
1160 fn is_empty(&self) -> bool {
1161 self.outputs.is_empty()
1162 }
1163}
1164
1165#[derive(Debug)]
1167pub struct SimpleDispatcher {
1168 output: SmallVec<[Output; 2]>,
1182 output_mapping: DispatchOutputMapping,
1183 dispatcher_id: DispatcherId,
1184 dispatcher_id_str: String,
1185}
1186
1187impl SimpleDispatcher {
1188 pub fn new(
1189 output: Output,
1190 output_mapping: DispatchOutputMapping,
1191 dispatcher_id: DispatcherId,
1192 ) -> Self {
1193 Self {
1194 output: smallvec![output],
1195 output_mapping,
1196 dispatcher_id,
1197 dispatcher_id_str: dispatcher_id.to_string(),
1198 }
1199 }
1200}
1201
1202impl Dispatcher for SimpleDispatcher {
1203 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1204 self.output.extend(outputs);
1205 assert!(self.output.len() <= 2);
1206 }
1207
1208 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1209 for output in &mut self.output {
1211 output
1212 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1213 .await?;
1214 }
1215 Ok(())
1216 }
1217
1218 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1219 let output = self
1220 .output
1221 .iter_mut()
1222 .exactly_one()
1223 .expect("expect exactly one output");
1224
1225 let chunk = self.output_mapping.apply(chunk);
1226 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1227 }
1228
1229 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1230 let output = self
1231 .output
1232 .iter_mut()
1233 .exactly_one()
1234 .expect("expect exactly one output");
1235
1236 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1237 output
1238 .send(DispatcherMessageBatch::Watermark(watermark))
1239 .await?;
1240 }
1241 Ok(())
1242 }
1243
1244 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1245 self.output
1246 .retain(|output| !actor_ids.contains(&output.actor_id()));
1247 }
1248
1249 fn dispatcher_id(&self) -> DispatcherId {
1250 self.dispatcher_id
1251 }
1252
1253 fn dispatcher_id_str(&self) -> &str {
1254 &self.dispatcher_id_str
1255 }
1256
1257 fn is_empty(&self) -> bool {
1258 self.output.is_empty()
1259 }
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264 use std::hash::{BuildHasher, Hasher};
1265
1266 use futures::pin_mut;
1267 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1268 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1269 use risingwave_common::util::epoch::test_epoch;
1270 use risingwave_common::util::hash_util::Crc32FastBuilder;
1271 use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1272 use tokio::sync::mpsc::unbounded_channel;
1273
1274 use super::*;
1275 use crate::executor::exchange::output::Output;
1276 use crate::executor::exchange::permit::channel_for_test;
1277 use crate::executor::receiver::ReceiverExecutor;
1278 use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1279 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1280
1281 #[tokio::test]
1284 async fn test_hash_dispatcher_complex() {
1285 test_hash_dispatcher_complex_inner().await
1286 }
1287
1288 async fn test_hash_dispatcher_complex_inner() {
1289 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1291
1292 let num_outputs = 2; let key_indices = &[0, 2];
1294 let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1295 (0..num_outputs).map(|_| channel_for_test()).collect();
1296 let outputs = output_tx_vecs
1297 .into_iter()
1298 .enumerate()
1299 .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1300 .collect::<Vec<_>>();
1301 let mut hash_mapping = (1..num_outputs + 1)
1302 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1303 .collect_vec();
1304 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1305 let mut hash_dispatcher = HashDataDispatcher::new(
1306 outputs,
1307 key_indices.to_vec(),
1308 DispatchOutputMapping::Simple(vec![0, 1, 2]),
1309 hash_mapping,
1310 0,
1311 );
1312
1313 let chunk = StreamChunk::from_pretty(
1314 " I I I
1315 + 4 6 8
1316 + 5 7 9
1317 + 0 0 0
1318 - 1 1 1 D
1319 U- 2 0 2
1320 U+ 2 0 2
1321 U- 3 3 2
1322 U+ 3 3 4",
1323 );
1324 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1325
1326 assert_eq!(
1327 *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1328 StreamChunk::from_pretty(
1329 " I I I
1330 + 4 6 8
1331 + 5 7 9
1332 + 0 0 0
1333 - 1 1 1 D
1334 U- 2 0 2
1335 U+ 2 0 2
1336 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1337 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1338 )
1339 );
1340 assert_eq!(
1341 *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1342 StreamChunk::from_pretty(
1343 " I I I
1344 + 4 6 8 D
1345 + 5 7 9 D
1346 + 0 0 0 D
1347 - 1 1 1 D // Should keep original invisible mark
1348 U- 2 0 2 D // Should keep UpdateDelete
1349 U+ 2 0 2 D // Should keep UpdateInsert
1350 - 3 3 2 // Should rewrite UpdateDelete to Delete
1351 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1352 )
1353 );
1354 }
1355
1356 #[tokio::test]
1357 async fn test_configuration_change() {
1358 let _schema = Schema { fields: vec![] };
1359 let (tx, rx) = channel_for_test();
1360 let actor_id = 233;
1361 let fragment_id = 666;
1362 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1363 let metrics = Arc::new(StreamingMetrics::unused());
1364
1365 let (untouched, old, new) = (234, 235, 238); let (old_simple, new_simple) = (114, 514); let broadcast_dispatcher_id = 666;
1371 let broadcast_dispatcher = PbDispatcher {
1372 r#type: DispatcherType::Broadcast as _,
1373 dispatcher_id: broadcast_dispatcher_id,
1374 downstream_actor_id: vec![untouched, old],
1375 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1377 };
1378
1379 let simple_dispatcher_id = 888;
1380 let simple_dispatcher = PbDispatcher {
1381 r#type: DispatcherType::Simple as _,
1382 dispatcher_id: simple_dispatcher_id,
1383 downstream_actor_id: vec![old_simple],
1384 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1386 };
1387
1388 let dispatcher_updates = maplit::hashmap! {
1389 actor_id => vec![PbDispatcherUpdate {
1390 actor_id,
1391 dispatcher_id: broadcast_dispatcher_id,
1392 added_downstream_actor_id: vec![new],
1393 removed_downstream_actor_id: vec![old],
1394 hash_mapping: Default::default(),
1395 }]
1396 };
1397 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1398 UpdateMutation {
1399 dispatchers: dispatcher_updates,
1400 ..Default::default()
1401 },
1402 ));
1403 barrier_test_env.inject_barrier(&b1, [actor_id]);
1404 barrier_test_env.flush_all_events().await;
1405
1406 let input = Executor::new(
1407 Default::default(),
1408 ReceiverExecutor::for_test(
1409 actor_id,
1410 rx,
1411 barrier_test_env.local_barrier_manager.clone(),
1412 )
1413 .boxed(),
1414 );
1415
1416 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1417 let mut rxs = [untouched, old, new, old_simple, new_simple]
1418 .into_iter()
1419 .map(|id| {
1420 (id, {
1421 let (tx, rx) = channel_for_test();
1422 new_output_request_tx
1423 .send((id, NewOutputRequest::Local(tx)))
1424 .unwrap();
1425 rx
1426 })
1427 })
1428 .collect::<HashMap<_, _>>();
1429 let executor = Box::new(
1430 DispatchExecutor::new(
1431 input,
1432 new_output_request_rx,
1433 vec![broadcast_dispatcher, simple_dispatcher],
1434 actor_id,
1435 fragment_id,
1436 barrier_test_env.local_barrier_manager.clone(),
1437 metrics,
1438 )
1439 .await
1440 .unwrap(),
1441 )
1442 .execute();
1443
1444 pin_mut!(executor);
1445
1446 macro_rules! try_recv {
1447 ($down_id:expr) => {
1448 rxs.get_mut(&$down_id).unwrap().try_recv()
1449 };
1450 }
1451
1452 tx.send(Message::Chunk(StreamChunk::default()).into())
1454 .await
1455 .unwrap();
1456
1457 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1458 .await
1459 .unwrap();
1460 executor.next().await.unwrap().unwrap();
1461
1462 try_recv!(untouched).unwrap().as_chunk().unwrap();
1464 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1465
1466 try_recv!(old).unwrap().as_chunk().unwrap();
1467 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1472 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1476 barrier_test_env.inject_barrier(&b2, [actor_id]);
1477 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1478 .await
1479 .unwrap();
1480 executor.next().await.unwrap().unwrap();
1481
1482 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1484 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1486
1487 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1492 .await
1493 .unwrap();
1494
1495 let dispatcher_updates = maplit::hashmap! {
1497 actor_id => vec![PbDispatcherUpdate {
1498 actor_id,
1499 dispatcher_id: simple_dispatcher_id,
1500 added_downstream_actor_id: vec![new_simple],
1501 removed_downstream_actor_id: vec![old_simple],
1502 hash_mapping: Default::default(),
1503 }]
1504 };
1505 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1506 UpdateMutation {
1507 dispatchers: dispatcher_updates,
1508 ..Default::default()
1509 },
1510 ));
1511 barrier_test_env.inject_barrier(&b3, [actor_id]);
1512 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1513 .await
1514 .unwrap();
1515 executor.next().await.unwrap().unwrap();
1516
1517 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1519 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1525 barrier_test_env.inject_barrier(&b4, [actor_id]);
1526 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1527 .await
1528 .unwrap();
1529 executor.next().await.unwrap().unwrap();
1530
1531 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1534 }
1535
1536 #[tokio::test]
1537 async fn test_hash_dispatcher() {
1538 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1540
1541 let num_outputs = 5; let cardinality = 10;
1543 let dimension = 4;
1544 let key_indices = &[0, 2];
1545 let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1546 (0..num_outputs).map(|_| channel_for_test()).collect();
1547 let outputs = output_tx_vecs
1548 .into_iter()
1549 .enumerate()
1550 .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1551 .collect::<Vec<_>>();
1552 let mut hash_mapping = (1..num_outputs + 1)
1553 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1554 .collect_vec();
1555 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1556 let mut hash_dispatcher = HashDataDispatcher::new(
1557 outputs,
1558 key_indices.to_vec(),
1559 DispatchOutputMapping::Simple((0..dimension).collect()),
1560 hash_mapping.clone(),
1561 0,
1562 );
1563
1564 let mut ops = Vec::new();
1565 for idx in 0..cardinality {
1566 if idx % 2 == 0 {
1567 ops.push(Op::Insert);
1568 } else {
1569 ops.push(Op::Delete);
1570 }
1571 }
1572
1573 let mut start = 19260817i32..;
1574 let mut builders = (0..dimension)
1575 .map(|_| I32ArrayBuilder::new(cardinality))
1576 .collect_vec();
1577 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1578 let mut output_ops = vec![vec![]; num_outputs];
1579 for op in &ops {
1580 let hash_builder = Crc32FastBuilder;
1581 let mut hasher = hash_builder.build_hasher();
1582 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1583 for key_idx in key_indices {
1584 let val = one_row[*key_idx];
1585 let bytes = val.to_le_bytes();
1586 hasher.update(&bytes);
1587 }
1588 let output_idx =
1589 hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1590 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1591 builder.append(Some(*val));
1592 }
1593 output_cols[output_idx]
1594 .iter_mut()
1595 .zip_eq_fast(one_row.iter())
1596 .for_each(|(each_column, val)| each_column.push(*val));
1597 output_ops[output_idx].push(op);
1598 }
1599
1600 let columns = builders
1601 .into_iter()
1602 .map(|builder| {
1603 let array = builder.finish();
1604 array.into_ref()
1605 })
1606 .collect();
1607
1608 let chunk = StreamChunk::new(ops, columns);
1609 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1610
1611 for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1612 let mut output = vec![];
1613 while let Some(Some(msg)) = rx.recv().now_or_never() {
1614 output.push(msg);
1615 }
1616 assert!(output.len() <= 1);
1618 if output.is_empty() {
1619 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1620 } else {
1621 let message = output.first().unwrap();
1622 let real_chunk = match message {
1623 DispatcherMessageBatch::Chunk(chunk) => chunk,
1624 _ => panic!(),
1625 };
1626 real_chunk
1627 .columns()
1628 .iter()
1629 .zip_eq_fast(output_cols[output_idx].iter())
1630 .for_each(|(real_col, expect_col)| {
1631 let real_vals = real_chunk
1632 .visibility()
1633 .iter_ones()
1634 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1635 .collect::<Vec<_>>();
1636 assert_eq!(real_vals.len(), expect_col.len());
1637 assert_eq!(real_vals, *expect_col);
1638 });
1639 }
1640 }
1641 }
1642}