1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
31use risingwave_pb::stream_plan::{self, PbDispatcher};
32use smallvec::{SmallVec, smallvec};
33use tokio::sync::mpsc::UnboundedReceiver;
34use tokio::time::Instant;
35use tokio_stream::StreamExt;
36use tokio_stream::adapters::Peekable;
37use tracing::{Instrument, event};
38
39use super::exchange::output::Output;
40use super::{
41 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
42 UpdateMutation,
43};
44use crate::executor::prelude::*;
45use crate::executor::{StopMutation, StreamConsumer};
46use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
47
48mod output_mapping;
49pub use output_mapping::DispatchOutputMapping;
50use risingwave_common::id::FragmentId;
51
52pub struct DispatchExecutor {
56 input: Executor,
57 inner: DispatchExecutorInner,
58}
59
60struct DispatcherWithMetrics {
61 dispatcher: DispatcherImpl,
62 pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
63}
64
65impl Debug for DispatcherWithMetrics {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 self.dispatcher.fmt(f)
68 }
69}
70
71impl Deref for DispatcherWithMetrics {
72 type Target = DispatcherImpl;
73
74 fn deref(&self) -> &Self::Target {
75 &self.dispatcher
76 }
77}
78
79impl DerefMut for DispatcherWithMetrics {
80 fn deref_mut(&mut self) -> &mut Self::Target {
81 &mut self.dispatcher
82 }
83}
84
85struct DispatchExecutorMetrics {
86 actor_id_str: String,
87 fragment_id_str: String,
88 metrics: Arc<StreamingMetrics>,
89 actor_out_record_cnt: LabelGuardedIntCounter,
90}
91
92impl DispatchExecutorMetrics {
93 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
94 DispatcherWithMetrics {
95 actor_output_buffer_blocking_duration_ns: self
96 .metrics
97 .actor_output_buffer_blocking_duration_ns
98 .with_guarded_label_values(&[
99 self.actor_id_str.as_str(),
100 self.fragment_id_str.as_str(),
101 dispatcher.dispatcher_id_str(),
102 ]),
103 dispatcher,
104 }
105 }
106}
107
108struct DispatchExecutorInner {
109 dispatchers: Vec<DispatcherWithMetrics>,
110 actor_id: ActorId,
111 local_barrier_manager: LocalBarrierManager,
112 metrics: DispatchExecutorMetrics,
113 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
114 pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
115}
116
117impl DispatchExecutorInner {
118 async fn collect_outputs(
119 &mut self,
120 downstream_actors: &[ActorId],
121 ) -> StreamResult<Vec<Output>> {
122 fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
123 let tx = match request {
124 NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
125 };
126 Output::new(downstream_actor, tx)
127 }
128 let mut outputs = Vec::with_capacity(downstream_actors.len());
129 for &downstream_actor in downstream_actors {
130 let output =
131 if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
132 resolve_output(downstream_actor, request)
133 } else {
134 loop {
135 let (requested_actor, request) = self
136 .new_output_request_rx
137 .recv()
138 .await
139 .ok_or_else(|| anyhow!("end of new output request"))?;
140 if requested_actor == downstream_actor {
141 break resolve_output(requested_actor, request);
142 } else {
143 assert!(
144 self.pending_new_output_requests
145 .insert(requested_actor, request)
146 .is_none(),
147 "duplicated inflight new output requests from actor {}",
148 requested_actor
149 );
150 }
151 }
152 };
153 outputs.push(output);
154 }
155 Ok(outputs)
156 }
157
158 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
159 macro_rules! await_with_metrics {
160 ($fut:expr, $metrics:expr) => {{
161 let mut start_time = Instant::now();
162 let interval_duration = Duration::from_secs(15);
163 let mut interval =
164 tokio::time::interval_at(start_time + interval_duration, interval_duration);
165
166 let mut fut = std::pin::pin!($fut);
167
168 loop {
169 tokio::select! {
170 biased;
171 res = &mut fut => {
172 res?;
173 let ns = start_time.elapsed().as_nanos() as u64;
174 $metrics.inc_by(ns);
175 break;
176 }
177 _ = interval.tick() => {
178 start_time = Instant::now();
179 $metrics.inc_by(interval_duration.as_nanos() as u64);
180 }
181 };
182 }
183 StreamResult::Ok(())
184 }};
185 }
186
187 let limit = self
189 .local_barrier_manager
190 .env
191 .global_config()
192 .developer
193 .exchange_concurrent_dispatchers;
194 match msg {
196 MessageBatch::BarrierBatch(barrier_batch) => {
197 if barrier_batch.is_empty() {
198 return Ok(());
199 }
200 let mutation = barrier_batch[0].mutation.clone();
202 self.pre_mutate_dispatchers(&mutation).await?;
203 futures::stream::iter(self.dispatchers.iter_mut())
204 .map(Ok)
205 .try_for_each_concurrent(limit, |dispatcher| async {
206 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
207 let dispatcher_output = &mut dispatcher.dispatcher;
208 let fut = dispatcher_output.dispatch_barriers(
209 barrier_batch
210 .iter()
211 .cloned()
212 .map(|b| b.into_dispatcher())
213 .collect(),
214 );
215 await_with_metrics!(std::pin::pin!(fut), metrics)
216 })
217 .await?;
218 self.post_mutate_dispatchers(&mutation)?;
219 }
220 MessageBatch::Watermark(watermark) => {
221 futures::stream::iter(self.dispatchers.iter_mut())
222 .map(Ok)
223 .try_for_each_concurrent(limit, |dispatcher| async {
224 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
225 let dispatcher_output = &mut dispatcher.dispatcher;
226 let fut = dispatcher_output.dispatch_watermark(watermark.clone());
227 await_with_metrics!(std::pin::pin!(fut), metrics)
228 })
229 .await?;
230 }
231 MessageBatch::Chunk(chunk) => {
232 futures::stream::iter(self.dispatchers.iter_mut())
233 .map(Ok)
234 .try_for_each_concurrent(limit, |dispatcher| async {
235 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
236 let dispatcher_output = &mut dispatcher.dispatcher;
237 let fut = dispatcher_output.dispatch_data(chunk.clone());
238 await_with_metrics!(std::pin::pin!(fut), metrics)
239 })
240 .await?;
241
242 self.metrics
243 .actor_out_record_cnt
244 .inc_by(chunk.cardinality() as _);
245 }
246 }
247 Ok(())
248 }
249
250 async fn add_dispatchers<'a>(
252 &mut self,
253 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
254 ) -> StreamResult<()> {
255 for dispatcher in new_dispatchers {
256 let outputs = self
257 .collect_outputs(&dispatcher.downstream_actor_id)
258 .await?;
259 let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
260 let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
261 self.dispatchers.push(dispatcher);
262 }
263
264 assert!(
265 self.dispatchers
266 .iter()
267 .map(|d| d.dispatcher_id())
268 .all_unique(),
269 "dispatcher ids must be unique: {:?}",
270 self.dispatchers
271 );
272
273 Ok(())
274 }
275
276 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
277 self.dispatchers
278 .iter_mut()
279 .find(|d| d.dispatcher_id() == dispatcher_id)
280 .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
281 }
282
283 async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
286 let outputs = self
287 .collect_outputs(&update.added_downstream_actor_id)
288 .await?;
289
290 let dispatcher = self.find_dispatcher(update.dispatcher_id);
291 dispatcher.add_outputs(outputs);
292
293 Ok(())
294 }
295
296 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
299 let ids = update.removed_downstream_actor_id.iter().copied().collect();
300
301 let dispatcher = self.find_dispatcher(update.dispatcher_id);
302 dispatcher.remove_outputs(&ids);
303
304 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
311 dispatcher.hash_mapping =
312 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
313 }
314
315 Ok(())
316 }
317
318 async fn pre_mutate_dispatchers(
320 &mut self,
321 mutation: &Option<Arc<Mutation>>,
322 ) -> StreamResult<()> {
323 let Some(mutation) = mutation.as_deref() else {
324 return Ok(());
325 };
326
327 match mutation {
328 Mutation::Add(AddMutation { adds, .. }) => {
329 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
330 self.add_dispatchers(new_dispatchers).await?;
331 }
332 }
333 Mutation::Update(UpdateMutation {
334 dispatchers,
335 actor_new_dispatchers: actor_dispatchers,
336 ..
337 }) => {
338 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
339 self.add_dispatchers(new_dispatchers).await?;
340 }
341
342 if let Some(updates) = dispatchers.get(&self.actor_id) {
343 for update in updates {
344 self.pre_update_dispatcher(update).await?;
345 }
346 }
347 }
348 _ => {}
349 }
350
351 Ok(())
352 }
353
354 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
356 let Some(mutation) = mutation.as_deref() else {
357 return Ok(());
358 };
359
360 match mutation {
361 Mutation::Stop(StopMutation { dropped_actors, .. }) => {
362 if !dropped_actors.contains(&self.actor_id) {
364 for dispatcher in &mut self.dispatchers {
365 dispatcher.remove_outputs(dropped_actors);
366 }
367 }
368 }
369 Mutation::Update(UpdateMutation {
370 dispatchers,
371 dropped_actors,
372 ..
373 }) => {
374 if let Some(updates) = dispatchers.get(&self.actor_id) {
375 for update in updates {
376 self.post_update_dispatcher(update)?;
377 }
378 }
379
380 if !dropped_actors.contains(&self.actor_id) {
381 for dispatcher in &mut self.dispatchers {
382 dispatcher.remove_outputs(dropped_actors);
383 }
384 }
385 }
386 _ => {}
387 };
388
389 self.dispatchers.retain(|d| !d.is_empty());
392
393 Ok(())
394 }
395}
396
397impl DispatchExecutor {
398 pub(crate) async fn new(
399 input: Executor,
400 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
401 dispatchers: Vec<stream_plan::Dispatcher>,
402 actor_id: ActorId,
403 fragment_id: FragmentId,
404 local_barrier_manager: LocalBarrierManager,
405 metrics: Arc<StreamingMetrics>,
406 ) -> StreamResult<Self> {
407 let mut executor = Self::new_inner(
408 input,
409 new_output_request_rx,
410 vec![],
411 actor_id,
412 fragment_id,
413 local_barrier_manager,
414 metrics,
415 );
416 let inner = &mut executor.inner;
417 for dispatcher in dispatchers {
418 let outputs = inner
419 .collect_outputs(&dispatcher.downstream_actor_id)
420 .await?;
421 let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
422 let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
423 inner.dispatchers.push(dispatcher);
424 }
425 Ok(executor)
426 }
427
428 #[cfg(test)]
429 pub(crate) fn for_test(
430 input: Executor,
431 dispatchers: Vec<DispatcherImpl>,
432 actor_id: ActorId,
433 fragment_id: FragmentId,
434 local_barrier_manager: LocalBarrierManager,
435 metrics: Arc<StreamingMetrics>,
436 ) -> (
437 Self,
438 tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
439 ) {
440 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
441
442 (
443 Self::new_inner(
444 input,
445 rx,
446 dispatchers,
447 actor_id,
448 fragment_id,
449 local_barrier_manager,
450 metrics,
451 ),
452 tx,
453 )
454 }
455
456 fn new_inner(
457 mut input: Executor,
458 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
459 dispatchers: Vec<DispatcherImpl>,
460 actor_id: ActorId,
461 fragment_id: FragmentId,
462 local_barrier_manager: LocalBarrierManager,
463 metrics: Arc<StreamingMetrics>,
464 ) -> Self {
465 let chunk_size = local_barrier_manager
466 .env
467 .global_config()
468 .developer
469 .chunk_size;
470 if crate::consistency::insane() {
471 let mut info = input.info().clone();
473 info.identity = format!("{} (embedded trouble)", info.identity);
474 let troublemaker = TroublemakerExecutor::new(input, chunk_size);
475 input = (info, troublemaker).into();
476 }
477
478 let actor_id_str = actor_id.to_string();
479 let fragment_id_str = fragment_id.to_string();
480 let actor_out_record_cnt = metrics
481 .actor_out_record_cnt
482 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
483 let metrics = DispatchExecutorMetrics {
484 actor_id_str,
485 fragment_id_str,
486 metrics,
487 actor_out_record_cnt,
488 };
489 let dispatchers = dispatchers
490 .into_iter()
491 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
492 .collect();
493 Self {
494 input,
495 inner: DispatchExecutorInner {
496 dispatchers,
497 actor_id,
498 local_barrier_manager,
499 metrics,
500 new_output_request_rx,
501 pending_new_output_requests: Default::default(),
502 },
503 }
504 }
505}
506
507impl StreamConsumer for DispatchExecutor {
508 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
509
510 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
511 let max_barrier_count_per_batch = self
513 .inner
514 .local_barrier_manager
515 .env
516 .global_config()
517 .developer
518 .max_barrier_batch_size;
519 #[try_stream]
520 async move {
521 let mut input = self.input.execute().peekable();
522 loop {
523 let Some(message) =
524 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
525 else {
526 break;
528 };
529 match message {
530 chunk @ MessageBatch::Chunk(_) => {
531 self.inner
532 .dispatch(chunk)
533 .instrument(tracing::info_span!("dispatch_chunk"))
534 .instrument_await("dispatch_chunk")
535 .await?;
536 }
537 MessageBatch::BarrierBatch(barrier_batch) => {
538 assert!(!barrier_batch.is_empty());
539 self.inner
540 .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
541 .instrument(tracing::info_span!("dispatch_barrier_batch"))
542 .instrument_await("dispatch_barrier_batch")
543 .await?;
544 self.inner
545 .metrics
546 .metrics
547 .barrier_batch_size
548 .observe(barrier_batch.len() as f64);
549 for barrier in barrier_batch {
550 yield barrier;
551 }
552 }
553 watermark @ MessageBatch::Watermark(_) => {
554 self.inner
555 .dispatch(watermark)
556 .instrument(tracing::info_span!("dispatch_watermark"))
557 .instrument_await("dispatch_watermark")
558 .await?;
559 }
560 }
561 }
562 }
563 }
564}
565
566async fn try_batch_barriers(
572 max_barrier_count_per_batch: u32,
573 input: &mut Peekable<BoxedMessageStream>,
574) -> StreamResult<Option<MessageBatch>> {
575 let Some(msg) = input.next().await else {
576 return Ok(None);
578 };
579 let mut barrier_batch = vec![];
580 let msg: Message = msg?;
581 let max_peek_attempts = match msg {
582 Message::Chunk(c) => {
583 return Ok(Some(MessageBatch::Chunk(c)));
584 }
585 Message::Watermark(w) => {
586 return Ok(Some(MessageBatch::Watermark(w)));
587 }
588 Message::Barrier(b) => {
589 let peek_more_barrier = b.mutation.is_none();
590 barrier_batch.push(b);
591 if peek_more_barrier {
592 max_barrier_count_per_batch.saturating_sub(1)
593 } else {
594 0
595 }
596 }
597 };
598 for _ in 0..max_peek_attempts {
600 let peek = input.peek().now_or_never();
601 let Some(peek) = peek else {
602 break;
603 };
604 let Some(msg) = peek else {
605 break;
607 };
608 let Ok(Message::Barrier(barrier)) = msg else {
609 break;
610 };
611 if barrier.mutation.is_some() {
612 break;
613 }
614 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
615 let Message::Barrier(ref barrier) = msg else {
616 unreachable!("must be a barrier");
617 };
618 barrier_batch.push(barrier.clone());
619 }
620 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
621}
622
623#[derive(Debug)]
624pub enum DispatcherImpl {
625 Hash(HashDataDispatcher),
626 Broadcast(BroadcastDispatcher),
627 Simple(SimpleDispatcher),
628 RoundRobin(RoundRobinDataDispatcher),
629}
630
631impl DispatcherImpl {
632 pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
633 let output_mapping =
634 DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
635
636 use risingwave_pb::stream_plan::DispatcherType::*;
637 let dispatcher_impl = match dispatcher.get_type()? {
638 Hash => {
639 assert!(!outputs.is_empty());
640 let dist_key_indices = dispatcher
641 .dist_key_indices
642 .iter()
643 .map(|i| *i as usize)
644 .collect();
645
646 let hash_mapping =
647 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
648
649 DispatcherImpl::Hash(HashDataDispatcher::new(
650 outputs,
651 dist_key_indices,
652 output_mapping,
653 hash_mapping,
654 dispatcher.dispatcher_id,
655 ))
656 }
657 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
658 outputs,
659 output_mapping,
660 dispatcher.dispatcher_id,
661 )),
662 Simple | NoShuffle => {
663 let [output]: [_; 1] = outputs.try_into().unwrap();
664 DispatcherImpl::Simple(SimpleDispatcher::new(
665 output,
666 output_mapping,
667 dispatcher.dispatcher_id,
668 ))
669 }
670 Unspecified => unreachable!(),
671 };
672
673 Ok(dispatcher_impl)
674 }
675}
676
677macro_rules! impl_dispatcher {
678 ($( { $variant_name:ident } ),*) => {
679 impl DispatcherImpl {
680 pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
681 match self {
682 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
683 }
684 }
685
686 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
687 match self {
688 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
689 }
690 }
691
692 pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
693 match self {
694 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
695 }
696 }
697
698 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
699 match self {
700 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
701 }
702 }
703
704 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
705 match self {
706 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
707 }
708 }
709
710 pub fn dispatcher_id(&self) -> DispatcherId {
711 match self {
712 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
713 }
714 }
715
716 pub fn dispatcher_id_str(&self) -> &str {
717 match self {
718 $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
719 }
720 }
721
722 pub fn is_empty(&self) -> bool {
723 match self {
724 $(Self::$variant_name(inner) => inner.is_empty(), )*
725 }
726 }
727 }
728 }
729}
730
731macro_rules! for_all_dispatcher_variants {
732 ($macro:ident) => {
733 $macro! {
734 { Hash },
735 { Broadcast },
736 { Simple },
737 { RoundRobin }
738 }
739 };
740}
741
742for_all_dispatcher_variants! { impl_dispatcher }
743
744pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
745
746pub trait Dispatcher: Debug + 'static {
747 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
749 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
751 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
753
754 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
756 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
758
759 fn dispatcher_id(&self) -> DispatcherId;
765
766 fn dispatcher_id_str(&self) -> &str;
768
769 fn is_empty(&self) -> bool;
772}
773
774async fn broadcast_concurrent(
779 outputs: impl IntoIterator<Item = &'_ mut Output>,
780 message: DispatcherMessageBatch,
781) -> StreamResult<()> {
782 futures::future::try_join_all(
783 outputs
784 .into_iter()
785 .map(|output| output.send(message.clone())),
786 )
787 .await?;
788 Ok(())
789}
790
791#[derive(Debug)]
792pub struct RoundRobinDataDispatcher {
793 outputs: Vec<Output>,
794 output_mapping: DispatchOutputMapping,
795 cur: usize,
796 dispatcher_id: DispatcherId,
797 dispatcher_id_str: String,
798}
799
800impl RoundRobinDataDispatcher {
801 pub fn new(
802 outputs: Vec<Output>,
803 output_mapping: DispatchOutputMapping,
804 dispatcher_id: DispatcherId,
805 ) -> Self {
806 Self {
807 outputs,
808 output_mapping,
809 cur: 0,
810 dispatcher_id,
811 dispatcher_id_str: dispatcher_id.to_string(),
812 }
813 }
814}
815
816impl Dispatcher for RoundRobinDataDispatcher {
817 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
818 let chunk = self.output_mapping.apply(chunk);
819
820 self.outputs[self.cur]
821 .send(DispatcherMessageBatch::Chunk(chunk))
822 .await?;
823 self.cur += 1;
824 self.cur %= self.outputs.len();
825 Ok(())
826 }
827
828 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
829 broadcast_concurrent(
831 &mut self.outputs,
832 DispatcherMessageBatch::BarrierBatch(barriers),
833 )
834 .await
835 }
836
837 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
838 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
839 broadcast_concurrent(
841 &mut self.outputs,
842 DispatcherMessageBatch::Watermark(watermark),
843 )
844 .await?;
845 }
846 Ok(())
847 }
848
849 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
850 self.outputs.extend(outputs);
851 }
852
853 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
854 self.outputs
855 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
856 .count();
857 self.cur = self.cur.min(self.outputs.len() - 1);
858 }
859
860 fn dispatcher_id(&self) -> DispatcherId {
861 self.dispatcher_id
862 }
863
864 fn dispatcher_id_str(&self) -> &str {
865 &self.dispatcher_id_str
866 }
867
868 fn is_empty(&self) -> bool {
869 self.outputs.is_empty()
870 }
871}
872
873pub struct HashDataDispatcher {
874 outputs: Vec<Output>,
875 keys: Vec<usize>,
876 output_mapping: DispatchOutputMapping,
877 hash_mapping: ExpandedActorMapping,
880 dispatcher_id: DispatcherId,
881 dispatcher_id_str: String,
882}
883
884impl Debug for HashDataDispatcher {
885 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
886 f.debug_struct("HashDataDispatcher")
887 .field("outputs", &self.outputs)
888 .field("keys", &self.keys)
889 .field("dispatcher_id", &self.dispatcher_id)
890 .finish_non_exhaustive()
891 }
892}
893
894impl HashDataDispatcher {
895 pub fn new(
896 outputs: Vec<Output>,
897 keys: Vec<usize>,
898 output_mapping: DispatchOutputMapping,
899 hash_mapping: ExpandedActorMapping,
900 dispatcher_id: DispatcherId,
901 ) -> Self {
902 Self {
903 outputs,
904 keys,
905 output_mapping,
906 hash_mapping,
907 dispatcher_id,
908 dispatcher_id_str: dispatcher_id.to_string(),
909 }
910 }
911}
912
913impl Dispatcher for HashDataDispatcher {
914 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
915 self.outputs.extend(outputs);
916 }
917
918 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
919 broadcast_concurrent(
921 &mut self.outputs,
922 DispatcherMessageBatch::BarrierBatch(barriers),
923 )
924 .await
925 }
926
927 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
928 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
929 broadcast_concurrent(
931 &mut self.outputs,
932 DispatcherMessageBatch::Watermark(watermark),
933 )
934 .await?;
935 }
936 Ok(())
937 }
938
939 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
940 let num_outputs = self.outputs.len();
944
945 let vnode_count = self.hash_mapping.len();
947 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
948
949 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
950
951 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
952 .take(num_outputs)
953 .collect_vec();
954 let mut last_update_delete_row_idx = None;
955 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
956
957 for (row_idx, ((vnode, &op), visible)) in vnodes
958 .iter()
959 .copied()
960 .zip_eq_fast(chunk.ops())
961 .zip_eq_fast(chunk.visibility().iter())
962 .enumerate()
963 {
964 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
966 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
967 }
968
969 if !visible {
970 new_ops.push(op);
971 continue;
972 }
973
974 if op == Op::UpdateDelete {
980 last_update_delete_row_idx = Some(row_idx);
981 } else if op == Op::UpdateInsert {
982 let delete_row_idx = last_update_delete_row_idx
983 .take()
984 .expect("missing U- before U+");
985 assert!(delete_row_idx + 1 == row_idx, "U- and U+ are not adjacent");
986
987 let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
989 != chunk.row_at(row_idx).1.project(&self.keys);
990
991 if dist_key_changed {
992 new_ops.push(Op::Delete);
993 new_ops.push(Op::Insert);
994 } else {
995 new_ops.push(Op::UpdateDelete);
996 new_ops.push(Op::UpdateInsert);
997 }
998 } else {
999 new_ops.push(op);
1000 }
1001 }
1002 assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1003
1004 let ops = new_ops;
1005 let chunk = self.output_mapping.apply(chunk);
1007
1008 futures::future::try_join_all(
1010 vis_maps
1011 .into_iter()
1012 .zip_eq_fast(self.outputs.iter_mut())
1013 .map(|(vis_map, output)| async {
1014 let vis_map = vis_map.finish();
1015 let new_stream_chunk =
1017 StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1018 if new_stream_chunk.cardinality() > 0 {
1019 event!(
1020 tracing::Level::TRACE,
1021 msg = "chunk",
1022 downstream = %output.actor_id(),
1023 "send = \n{:#?}",
1024 new_stream_chunk
1025 );
1026 output
1027 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1028 .await?;
1029 }
1030 StreamResult::Ok(())
1031 }),
1032 )
1033 .await?;
1034
1035 Ok(())
1036 }
1037
1038 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1039 self.outputs
1040 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1041 .count();
1042 }
1043
1044 fn dispatcher_id(&self) -> DispatcherId {
1045 self.dispatcher_id
1046 }
1047
1048 fn dispatcher_id_str(&self) -> &str {
1049 &self.dispatcher_id_str
1050 }
1051
1052 fn is_empty(&self) -> bool {
1053 self.outputs.is_empty()
1054 }
1055}
1056
1057#[derive(Debug)]
1059pub struct BroadcastDispatcher {
1060 outputs: HashMap<ActorId, Output>,
1061 output_mapping: DispatchOutputMapping,
1062 dispatcher_id: DispatcherId,
1063 dispatcher_id_str: String,
1064}
1065
1066impl BroadcastDispatcher {
1067 pub fn new(
1068 outputs: impl IntoIterator<Item = Output>,
1069 output_mapping: DispatchOutputMapping,
1070 dispatcher_id: DispatcherId,
1071 ) -> Self {
1072 Self {
1073 outputs: Self::into_pairs(outputs).collect(),
1074 output_mapping,
1075 dispatcher_id,
1076 dispatcher_id_str: dispatcher_id.to_string(),
1077 }
1078 }
1079
1080 fn into_pairs(
1081 outputs: impl IntoIterator<Item = Output>,
1082 ) -> impl Iterator<Item = (ActorId, Output)> {
1083 outputs
1084 .into_iter()
1085 .map(|output| (output.actor_id(), output))
1086 }
1087}
1088
1089impl Dispatcher for BroadcastDispatcher {
1090 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1091 let chunk = self.output_mapping.apply(chunk);
1092 broadcast_concurrent(
1093 self.outputs.values_mut(),
1094 DispatcherMessageBatch::Chunk(chunk),
1095 )
1096 .await
1097 }
1098
1099 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1100 broadcast_concurrent(
1102 self.outputs.values_mut(),
1103 DispatcherMessageBatch::BarrierBatch(barriers),
1104 )
1105 .await
1106 }
1107
1108 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1109 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1110 broadcast_concurrent(
1112 self.outputs.values_mut(),
1113 DispatcherMessageBatch::Watermark(watermark),
1114 )
1115 .await?;
1116 }
1117 Ok(())
1118 }
1119
1120 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1121 self.outputs.extend(Self::into_pairs(outputs));
1122 }
1123
1124 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1125 self.outputs
1126 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1127 .count();
1128 }
1129
1130 fn dispatcher_id(&self) -> DispatcherId {
1131 self.dispatcher_id
1132 }
1133
1134 fn dispatcher_id_str(&self) -> &str {
1135 &self.dispatcher_id_str
1136 }
1137
1138 fn is_empty(&self) -> bool {
1139 self.outputs.is_empty()
1140 }
1141}
1142
1143#[derive(Debug)]
1145pub struct SimpleDispatcher {
1146 output: SmallVec<[Output; 2]>,
1160 output_mapping: DispatchOutputMapping,
1161 dispatcher_id: DispatcherId,
1162 dispatcher_id_str: String,
1163}
1164
1165impl SimpleDispatcher {
1166 pub fn new(
1167 output: Output,
1168 output_mapping: DispatchOutputMapping,
1169 dispatcher_id: DispatcherId,
1170 ) -> Self {
1171 Self {
1172 output: smallvec![output],
1173 output_mapping,
1174 dispatcher_id,
1175 dispatcher_id_str: dispatcher_id.to_string(),
1176 }
1177 }
1178}
1179
1180impl Dispatcher for SimpleDispatcher {
1181 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1182 self.output.extend(outputs);
1183 assert!(self.output.len() <= 2);
1184 }
1185
1186 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1187 for output in &mut self.output {
1189 output
1190 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1191 .await?;
1192 }
1193 Ok(())
1194 }
1195
1196 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1197 let output = self
1198 .output
1199 .iter_mut()
1200 .exactly_one()
1201 .expect("expect exactly one output");
1202
1203 let chunk = self.output_mapping.apply(chunk);
1204 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1205 }
1206
1207 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1208 let output = self
1209 .output
1210 .iter_mut()
1211 .exactly_one()
1212 .expect("expect exactly one output");
1213
1214 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1215 output
1216 .send(DispatcherMessageBatch::Watermark(watermark))
1217 .await?;
1218 }
1219 Ok(())
1220 }
1221
1222 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1223 self.output
1224 .retain(|output| !actor_ids.contains(&output.actor_id()));
1225 }
1226
1227 fn dispatcher_id(&self) -> DispatcherId {
1228 self.dispatcher_id
1229 }
1230
1231 fn dispatcher_id_str(&self) -> &str {
1232 &self.dispatcher_id_str
1233 }
1234
1235 fn is_empty(&self) -> bool {
1236 self.output.is_empty()
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use std::hash::{BuildHasher, Hasher};
1243
1244 use futures::pin_mut;
1245 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1246 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1247 use risingwave_common::util::epoch::test_epoch;
1248 use risingwave_common::util::hash_util::Crc32FastBuilder;
1249 use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1250 use tokio::sync::mpsc::unbounded_channel;
1251
1252 use super::*;
1253 use crate::executor::exchange::output::Output;
1254 use crate::executor::exchange::permit::channel_for_test;
1255 use crate::executor::receiver::ReceiverExecutor;
1256 use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1257 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1258
1259 #[tokio::test]
1260 async fn test_hash_dispatcher_complex() {
1261 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1263
1264 let num_outputs = 2; let key_indices = &[0, 2];
1266 let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1267 (0..num_outputs).map(|_| channel_for_test()).collect();
1268 let outputs = output_tx_vecs
1269 .into_iter()
1270 .enumerate()
1271 .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1272 .collect::<Vec<_>>();
1273 let mut hash_mapping = (1..num_outputs + 1)
1274 .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1275 .collect_vec();
1276 hash_mapping.resize(
1277 VirtualNode::COUNT_FOR_TEST,
1278 ActorId::new(num_outputs as u32),
1279 );
1280 let mut hash_dispatcher = HashDataDispatcher::new(
1281 outputs,
1282 key_indices.to_vec(),
1283 DispatchOutputMapping::Simple(vec![0, 1, 2]),
1284 hash_mapping,
1285 0,
1286 );
1287
1288 let chunk = StreamChunk::from_pretty(
1289 " I I I
1290 + 4 6 8
1291 + 5 7 9
1292 + 0 0 0
1293 - 1 1 1 D
1294 U- 2 0 2
1295 U+ 2 0 2
1296 U- 3 3 2
1297 U+ 3 3 4",
1298 );
1299 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1300
1301 assert_eq!(
1302 *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1303 StreamChunk::from_pretty(
1304 " I I I
1305 + 4 6 8
1306 + 5 7 9
1307 + 0 0 0
1308 - 1 1 1 D
1309 U- 2 0 2
1310 U+ 2 0 2
1311 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1312 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1313 )
1314 );
1315 assert_eq!(
1316 *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1317 StreamChunk::from_pretty(
1318 " I I I
1319 + 4 6 8 D
1320 + 5 7 9 D
1321 + 0 0 0 D
1322 - 1 1 1 D // Should keep original invisible mark
1323 U- 2 0 2 D // Should keep UpdateDelete
1324 U+ 2 0 2 D // Should keep UpdateInsert
1325 - 3 3 2 // Should rewrite UpdateDelete to Delete
1326 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1327 )
1328 );
1329 }
1330
1331 #[tokio::test]
1332 async fn test_configuration_change() {
1333 let _schema = Schema { fields: vec![] };
1334 let (tx, rx) = channel_for_test();
1335 let actor_id = 233.into();
1336 let fragment_id = 666.into();
1337 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1338 let metrics = Arc::new(StreamingMetrics::unused());
1339
1340 let (untouched, old, new) = (234.into(), 235.into(), 238.into()); let (old_simple, new_simple) = (114.into(), 514.into()); let broadcast_dispatcher_id = 666;
1346 let broadcast_dispatcher = PbDispatcher {
1347 r#type: DispatcherType::Broadcast as _,
1348 dispatcher_id: broadcast_dispatcher_id,
1349 downstream_actor_id: vec![untouched, old],
1350 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1352 };
1353
1354 let simple_dispatcher_id = 888;
1355 let simple_dispatcher = PbDispatcher {
1356 r#type: DispatcherType::Simple as _,
1357 dispatcher_id: simple_dispatcher_id,
1358 downstream_actor_id: vec![old_simple],
1359 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1361 };
1362
1363 let dispatcher_updates = maplit::hashmap! {
1364 actor_id => vec![PbDispatcherUpdate {
1365 actor_id,
1366 dispatcher_id: broadcast_dispatcher_id,
1367 added_downstream_actor_id: vec![new],
1368 removed_downstream_actor_id: vec![old],
1369 hash_mapping: Default::default(),
1370 }]
1371 };
1372 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1373 UpdateMutation {
1374 dispatchers: dispatcher_updates,
1375 ..Default::default()
1376 },
1377 ));
1378 barrier_test_env.inject_barrier(&b1, [actor_id]);
1379 barrier_test_env.flush_all_events().await;
1380
1381 let input = Executor::new(
1382 Default::default(),
1383 ReceiverExecutor::for_test(
1384 actor_id,
1385 rx,
1386 barrier_test_env.local_barrier_manager.clone(),
1387 )
1388 .boxed(),
1389 );
1390
1391 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1392 let mut rxs = [untouched, old, new, old_simple, new_simple]
1393 .into_iter()
1394 .map(|id| {
1395 (id, {
1396 let (tx, rx) = channel_for_test();
1397 new_output_request_tx
1398 .send((id, NewOutputRequest::Local(tx)))
1399 .unwrap();
1400 rx
1401 })
1402 })
1403 .collect::<HashMap<_, _>>();
1404 let executor = Box::new(
1405 DispatchExecutor::new(
1406 input,
1407 new_output_request_rx,
1408 vec![broadcast_dispatcher, simple_dispatcher],
1409 actor_id,
1410 fragment_id,
1411 barrier_test_env.local_barrier_manager.clone(),
1412 metrics,
1413 )
1414 .await
1415 .unwrap(),
1416 )
1417 .execute();
1418
1419 pin_mut!(executor);
1420
1421 macro_rules! try_recv {
1422 ($down_id:expr) => {
1423 rxs.get_mut(&$down_id).unwrap().try_recv()
1424 };
1425 }
1426
1427 tx.send(Message::Chunk(StreamChunk::default()).into())
1429 .await
1430 .unwrap();
1431
1432 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1433 .await
1434 .unwrap();
1435 executor.next().await.unwrap().unwrap();
1436
1437 try_recv!(untouched).unwrap().as_chunk().unwrap();
1439 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1440
1441 try_recv!(old).unwrap().as_chunk().unwrap();
1442 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1447 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1451 barrier_test_env.inject_barrier(&b2, [actor_id]);
1452 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1453 .await
1454 .unwrap();
1455 executor.next().await.unwrap().unwrap();
1456
1457 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1459 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1461
1462 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1467 .await
1468 .unwrap();
1469
1470 let dispatcher_updates = maplit::hashmap! {
1472 actor_id => vec![PbDispatcherUpdate {
1473 actor_id,
1474 dispatcher_id: simple_dispatcher_id,
1475 added_downstream_actor_id: vec![new_simple],
1476 removed_downstream_actor_id: vec![old_simple],
1477 hash_mapping: Default::default(),
1478 }]
1479 };
1480 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1481 UpdateMutation {
1482 dispatchers: dispatcher_updates,
1483 ..Default::default()
1484 },
1485 ));
1486 barrier_test_env.inject_barrier(&b3, [actor_id]);
1487 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1488 .await
1489 .unwrap();
1490 executor.next().await.unwrap().unwrap();
1491
1492 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1494 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1500 barrier_test_env.inject_barrier(&b4, [actor_id]);
1501 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1502 .await
1503 .unwrap();
1504 executor.next().await.unwrap().unwrap();
1505
1506 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1509 }
1510
1511 #[tokio::test]
1512 async fn test_hash_dispatcher() {
1513 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1515
1516 let num_outputs = 5; let cardinality = 10;
1518 let dimension = 4;
1519 let key_indices = &[0, 2];
1520 let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1521 (0..num_outputs).map(|_| channel_for_test()).collect();
1522 let outputs = output_tx_vecs
1523 .into_iter()
1524 .enumerate()
1525 .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1526 .collect::<Vec<_>>();
1527 let mut hash_mapping = (1..num_outputs + 1)
1528 .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1529 .collect_vec();
1530 hash_mapping.resize(
1531 VirtualNode::COUNT_FOR_TEST,
1532 ActorId::new(num_outputs as u32),
1533 );
1534 let mut hash_dispatcher = HashDataDispatcher::new(
1535 outputs,
1536 key_indices.to_vec(),
1537 DispatchOutputMapping::Simple((0..dimension).collect()),
1538 hash_mapping.clone(),
1539 0,
1540 );
1541
1542 let mut ops = Vec::new();
1543 for idx in 0..cardinality {
1544 if idx % 2 == 0 {
1545 ops.push(Op::Insert);
1546 } else {
1547 ops.push(Op::Delete);
1548 }
1549 }
1550
1551 let mut start = 19260817i32..;
1552 let mut builders = (0..dimension)
1553 .map(|_| I32ArrayBuilder::new(cardinality))
1554 .collect_vec();
1555 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1556 let mut output_ops = vec![vec![]; num_outputs];
1557 for op in &ops {
1558 let hash_builder = Crc32FastBuilder;
1559 let mut hasher = hash_builder.build_hasher();
1560 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1561 for key_idx in key_indices {
1562 let val = one_row[*key_idx];
1563 let bytes = val.to_le_bytes();
1564 hasher.update(&bytes);
1565 }
1566 let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1567 .as_raw_id() as usize
1568 - 1;
1569 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1570 builder.append(Some(*val));
1571 }
1572 output_cols[output_idx]
1573 .iter_mut()
1574 .zip_eq_fast(one_row.iter())
1575 .for_each(|(each_column, val)| each_column.push(*val));
1576 output_ops[output_idx].push(op);
1577 }
1578
1579 let columns = builders
1580 .into_iter()
1581 .map(|builder| {
1582 let array = builder.finish();
1583 array.into_ref()
1584 })
1585 .collect();
1586
1587 let chunk = StreamChunk::new(ops, columns);
1588 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1589
1590 for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1591 let mut output = vec![];
1592 while let Some(Some(msg)) = rx.recv().now_or_never() {
1593 output.push(msg);
1594 }
1595 assert!(output.len() <= 1);
1597 if output.is_empty() {
1598 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1599 } else {
1600 let message = output.first().unwrap();
1601 let real_chunk = match message {
1602 DispatcherMessageBatch::Chunk(chunk) => chunk,
1603 _ => panic!(),
1604 };
1605 real_chunk
1606 .columns()
1607 .iter()
1608 .zip_eq_fast(output_cols[output_idx].iter())
1609 .for_each(|(real_col, expect_col)| {
1610 let real_vals = real_chunk
1611 .visibility()
1612 .iter_ones()
1613 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1614 .collect::<Vec<_>>();
1615 assert_eq!(real_vals.len(), expect_col.len());
1616 assert_eq!(real_vals, *expect_col);
1617 });
1618 }
1619 }
1620 }
1621}