1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use risingwave_pb::stream_plan::{self, PbDispatcher};
31use smallvec::{SmallVec, smallvec};
32use tokio::sync::mpsc::UnboundedReceiver;
33use tokio::time::Instant;
34use tokio_stream::StreamExt;
35use tokio_stream::adapters::Peekable;
36use tracing::{Instrument, event};
37
38use super::exchange::output::Output;
39use super::{
40 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
41 UpdateMutation,
42};
43use crate::executor::StreamConsumer;
44use crate::executor::prelude::*;
45use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
46
47mod output_mapping;
48pub use output_mapping::DispatchOutputMapping;
49
50pub struct DispatchExecutor {
54 input: Executor,
55 inner: DispatchExecutorInner,
56}
57
58struct DispatcherWithMetrics {
59 dispatcher: DispatcherImpl,
60 actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
61}
62
63impl DispatcherWithMetrics {
64 pub fn record_output_buffer_blocking_duration(&self, duration: Duration) {
65 let ns = duration.as_nanos() as u64;
66 self.actor_output_buffer_blocking_duration_ns.inc_by(ns);
67 }
68}
69
70impl Debug for DispatcherWithMetrics {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 self.dispatcher.fmt(f)
73 }
74}
75
76impl Deref for DispatcherWithMetrics {
77 type Target = DispatcherImpl;
78
79 fn deref(&self) -> &Self::Target {
80 &self.dispatcher
81 }
82}
83
84impl DerefMut for DispatcherWithMetrics {
85 fn deref_mut(&mut self) -> &mut Self::Target {
86 &mut self.dispatcher
87 }
88}
89
90struct DispatchExecutorMetrics {
91 actor_id_str: String,
92 fragment_id_str: String,
93 metrics: Arc<StreamingMetrics>,
94 actor_out_record_cnt: LabelGuardedIntCounter,
95}
96
97impl DispatchExecutorMetrics {
98 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
99 DispatcherWithMetrics {
100 actor_output_buffer_blocking_duration_ns: self
101 .metrics
102 .actor_output_buffer_blocking_duration_ns
103 .with_guarded_label_values(&[
104 self.actor_id_str.as_str(),
105 self.fragment_id_str.as_str(),
106 dispatcher.dispatcher_id_str(),
107 ]),
108 dispatcher,
109 }
110 }
111}
112
113struct DispatchExecutorInner {
114 dispatchers: Vec<DispatcherWithMetrics>,
115 actor_id: u32,
116 local_barrier_manager: LocalBarrierManager,
117 metrics: DispatchExecutorMetrics,
118 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
119 pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
120}
121
122impl DispatchExecutorInner {
123 async fn collect_outputs(
124 &mut self,
125 downstream_actors: &[ActorId],
126 ) -> StreamResult<Vec<Output>> {
127 fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
128 let tx = match request {
129 NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
130 };
131 Output::new(downstream_actor, tx)
132 }
133 let mut outputs = Vec::with_capacity(downstream_actors.len());
134 for downstream_actor in downstream_actors {
135 let output =
136 if let Some(request) = self.pending_new_output_requests.remove(downstream_actor) {
137 resolve_output(*downstream_actor, request)
138 } else {
139 loop {
140 let (requested_actor, request) = self
141 .new_output_request_rx
142 .recv()
143 .await
144 .ok_or_else(|| anyhow!("end of new output request"))?;
145 if requested_actor == *downstream_actor {
146 break resolve_output(requested_actor, request);
147 } else {
148 assert!(
149 self.pending_new_output_requests
150 .insert(requested_actor, request)
151 .is_none(),
152 "duplicated inflight new output requests from actor {}",
153 requested_actor
154 );
155 }
156 }
157 };
158 outputs.push(output);
159 }
160 Ok(outputs)
161 }
162
163 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
164 let limit = self
165 .local_barrier_manager
166 .env
167 .config()
168 .developer
169 .exchange_concurrent_dispatchers;
170 match msg {
172 MessageBatch::BarrierBatch(barrier_batch) => {
173 if barrier_batch.is_empty() {
174 return Ok(());
175 }
176 let mutation = barrier_batch[0].mutation.clone();
178 self.pre_mutate_dispatchers(&mutation).await?;
179 futures::stream::iter(self.dispatchers.iter_mut())
180 .map(Ok)
181 .try_for_each_concurrent(limit, |dispatcher| async {
182 let start_time = Instant::now();
183 dispatcher
184 .dispatch_barriers(
185 barrier_batch
186 .iter()
187 .cloned()
188 .map(|b| b.into_dispatcher())
189 .collect(),
190 )
191 .await?;
192 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
193 StreamResult::Ok(())
194 })
195 .await?;
196 self.post_mutate_dispatchers(&mutation)?;
197 }
198 MessageBatch::Watermark(watermark) => {
199 futures::stream::iter(self.dispatchers.iter_mut())
200 .map(Ok)
201 .try_for_each_concurrent(limit, |dispatcher| async {
202 let start_time = Instant::now();
203 dispatcher.dispatch_watermark(watermark.clone()).await?;
204 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
205 StreamResult::Ok(())
206 })
207 .await?;
208 }
209 MessageBatch::Chunk(chunk) => {
210 futures::stream::iter(self.dispatchers.iter_mut())
211 .map(Ok)
212 .try_for_each_concurrent(limit, |dispatcher| async {
213 let start_time = Instant::now();
214 dispatcher.dispatch_data(chunk.clone()).await?;
215 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
216 StreamResult::Ok(())
217 })
218 .await?;
219
220 self.metrics
221 .actor_out_record_cnt
222 .inc_by(chunk.cardinality() as _);
223 }
224 }
225 Ok(())
226 }
227
228 async fn add_dispatchers<'a>(
230 &mut self,
231 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
232 ) -> StreamResult<()> {
233 for dispatcher in new_dispatchers {
234 let outputs = self
235 .collect_outputs(&dispatcher.downstream_actor_id)
236 .await?;
237 let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
238 let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
239 self.dispatchers.push(dispatcher);
240 }
241
242 assert!(
243 self.dispatchers
244 .iter()
245 .map(|d| d.dispatcher_id())
246 .all_unique(),
247 "dispatcher ids must be unique: {:?}",
248 self.dispatchers
249 );
250
251 Ok(())
252 }
253
254 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
255 self.dispatchers
256 .iter_mut()
257 .find(|d| d.dispatcher_id() == dispatcher_id)
258 .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
259 }
260
261 async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
264 let outputs = self
265 .collect_outputs(&update.added_downstream_actor_id)
266 .await?;
267
268 let dispatcher = self.find_dispatcher(update.dispatcher_id);
269 dispatcher.add_outputs(outputs);
270
271 Ok(())
272 }
273
274 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
277 let ids = update.removed_downstream_actor_id.iter().copied().collect();
278
279 let dispatcher = self.find_dispatcher(update.dispatcher_id);
280 dispatcher.remove_outputs(&ids);
281
282 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
289 dispatcher.hash_mapping =
290 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
291 }
292
293 Ok(())
294 }
295
296 async fn pre_mutate_dispatchers(
298 &mut self,
299 mutation: &Option<Arc<Mutation>>,
300 ) -> StreamResult<()> {
301 let Some(mutation) = mutation.as_deref() else {
302 return Ok(());
303 };
304
305 match mutation {
306 Mutation::Add(AddMutation { adds, .. }) => {
307 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
308 self.add_dispatchers(new_dispatchers).await?;
309 }
310 }
311 Mutation::Update(UpdateMutation {
312 dispatchers,
313 actor_new_dispatchers: actor_dispatchers,
314 ..
315 }) => {
316 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
317 self.add_dispatchers(new_dispatchers).await?;
318 }
319
320 if let Some(updates) = dispatchers.get(&self.actor_id) {
321 for update in updates {
322 self.pre_update_dispatcher(update).await?;
323 }
324 }
325 }
326 Mutation::AddAndUpdate(
327 AddMutation { adds, .. },
328 UpdateMutation {
329 dispatchers,
330 actor_new_dispatchers: actor_dispatchers,
331 ..
332 },
333 ) => {
334 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
335 self.add_dispatchers(new_dispatchers).await?;
336 }
337
338 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
339 self.add_dispatchers(new_dispatchers).await?;
340 }
341
342 if let Some(updates) = dispatchers.get(&self.actor_id) {
343 for update in updates {
344 self.pre_update_dispatcher(update).await?;
345 }
346 }
347 }
348 _ => {}
349 }
350
351 Ok(())
352 }
353
354 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
356 let Some(mutation) = mutation.as_deref() else {
357 return Ok(());
358 };
359
360 match mutation {
361 Mutation::Stop(stops) => {
362 if !stops.contains(&self.actor_id) {
364 for dispatcher in &mut self.dispatchers {
365 dispatcher.remove_outputs(stops);
366 }
367 }
368 }
369 Mutation::Update(UpdateMutation {
370 dispatchers,
371 dropped_actors,
372 ..
373 })
374 | Mutation::AddAndUpdate(
375 _,
376 UpdateMutation {
377 dispatchers,
378 dropped_actors,
379 ..
380 },
381 ) => {
382 if let Some(updates) = dispatchers.get(&self.actor_id) {
383 for update in updates {
384 self.post_update_dispatcher(update)?;
385 }
386 }
387
388 if !dropped_actors.contains(&self.actor_id) {
389 for dispatcher in &mut self.dispatchers {
390 dispatcher.remove_outputs(dropped_actors);
391 }
392 }
393 }
394 _ => {}
395 };
396
397 self.dispatchers.retain(|d| !d.is_empty());
400
401 Ok(())
402 }
403}
404
405impl DispatchExecutor {
406 pub(crate) async fn new(
407 input: Executor,
408 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
409 dispatchers: Vec<stream_plan::Dispatcher>,
410 actor_id: u32,
411 fragment_id: u32,
412 local_barrier_manager: LocalBarrierManager,
413 metrics: Arc<StreamingMetrics>,
414 ) -> StreamResult<Self> {
415 let mut executor = Self::new_inner(
416 input,
417 new_output_request_rx,
418 vec![],
419 actor_id,
420 fragment_id,
421 local_barrier_manager,
422 metrics,
423 );
424 let inner = &mut executor.inner;
425 for dispatcher in dispatchers {
426 let outputs = inner
427 .collect_outputs(&dispatcher.downstream_actor_id)
428 .await?;
429 let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
430 let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
431 inner.dispatchers.push(dispatcher);
432 }
433 Ok(executor)
434 }
435
436 #[cfg(test)]
437 pub(crate) fn for_test(
438 input: Executor,
439 dispatchers: Vec<DispatcherImpl>,
440 actor_id: u32,
441 fragment_id: u32,
442 local_barrier_manager: LocalBarrierManager,
443 metrics: Arc<StreamingMetrics>,
444 ) -> (
445 Self,
446 tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
447 ) {
448 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
449
450 (
451 Self::new_inner(
452 input,
453 rx,
454 dispatchers,
455 actor_id,
456 fragment_id,
457 local_barrier_manager,
458 metrics,
459 ),
460 tx,
461 )
462 }
463
464 fn new_inner(
465 mut input: Executor,
466 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
467 dispatchers: Vec<DispatcherImpl>,
468 actor_id: u32,
469 fragment_id: u32,
470 local_barrier_manager: LocalBarrierManager,
471 metrics: Arc<StreamingMetrics>,
472 ) -> Self {
473 let chunk_size = local_barrier_manager.env.config().developer.chunk_size;
474 if crate::consistency::insane() {
475 let mut info = input.info().clone();
477 info.identity = format!("{} (embedded trouble)", info.identity);
478 let troublemaker = TroublemakerExecutor::new(input, chunk_size);
479 input = (info, troublemaker).into();
480 }
481
482 let actor_id_str = actor_id.to_string();
483 let fragment_id_str = fragment_id.to_string();
484 let actor_out_record_cnt = metrics
485 .actor_out_record_cnt
486 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
487 let metrics = DispatchExecutorMetrics {
488 actor_id_str,
489 fragment_id_str,
490 metrics,
491 actor_out_record_cnt,
492 };
493 let dispatchers = dispatchers
494 .into_iter()
495 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
496 .collect();
497 Self {
498 input,
499 inner: DispatchExecutorInner {
500 dispatchers,
501 actor_id,
502 local_barrier_manager,
503 metrics,
504 new_output_request_rx,
505 pending_new_output_requests: Default::default(),
506 },
507 }
508 }
509}
510
511impl StreamConsumer for DispatchExecutor {
512 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
513
514 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
515 let max_barrier_count_per_batch = self
516 .inner
517 .local_barrier_manager
518 .env
519 .config()
520 .developer
521 .max_barrier_batch_size;
522 #[try_stream]
523 async move {
524 let mut input = self.input.execute().peekable();
525 loop {
526 let Some(message) =
527 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
528 else {
529 break;
531 };
532 match message {
533 chunk @ MessageBatch::Chunk(_) => {
534 self.inner
535 .dispatch(chunk)
536 .instrument(tracing::info_span!("dispatch_chunk"))
537 .instrument_await("dispatch_chunk")
538 .await?;
539 }
540 MessageBatch::BarrierBatch(barrier_batch) => {
541 assert!(!barrier_batch.is_empty());
542 self.inner
543 .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
544 .instrument(tracing::info_span!("dispatch_barrier_batch"))
545 .instrument_await("dispatch_barrier_batch")
546 .await?;
547 self.inner
548 .metrics
549 .metrics
550 .barrier_batch_size
551 .observe(barrier_batch.len() as f64);
552 for barrier in barrier_batch {
553 yield barrier;
554 }
555 }
556 watermark @ MessageBatch::Watermark(_) => {
557 self.inner
558 .dispatch(watermark)
559 .instrument(tracing::info_span!("dispatch_watermark"))
560 .instrument_await("dispatch_watermark")
561 .await?;
562 }
563 }
564 }
565 }
566 }
567}
568
569async fn try_batch_barriers(
575 max_barrier_count_per_batch: u32,
576 input: &mut Peekable<BoxedMessageStream>,
577) -> StreamResult<Option<MessageBatch>> {
578 let Some(msg) = input.next().await else {
579 return Ok(None);
581 };
582 let mut barrier_batch = vec![];
583 let msg: Message = msg?;
584 let max_peek_attempts = match msg {
585 Message::Chunk(c) => {
586 return Ok(Some(MessageBatch::Chunk(c)));
587 }
588 Message::Watermark(w) => {
589 return Ok(Some(MessageBatch::Watermark(w)));
590 }
591 Message::Barrier(b) => {
592 let peek_more_barrier = b.mutation.is_none();
593 barrier_batch.push(b);
594 if peek_more_barrier {
595 max_barrier_count_per_batch.saturating_sub(1)
596 } else {
597 0
598 }
599 }
600 };
601 for _ in 0..max_peek_attempts {
603 let peek = input.peek().now_or_never();
604 let Some(peek) = peek else {
605 break;
606 };
607 let Some(msg) = peek else {
608 break;
610 };
611 let Ok(Message::Barrier(barrier)) = msg else {
612 break;
613 };
614 if barrier.mutation.is_some() {
615 break;
616 }
617 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
618 let Message::Barrier(ref barrier) = msg else {
619 unreachable!("must be a barrier");
620 };
621 barrier_batch.push(barrier.clone());
622 }
623 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
624}
625
626#[derive(Debug)]
627pub enum DispatcherImpl {
628 Hash(HashDataDispatcher),
629 Broadcast(BroadcastDispatcher),
630 Simple(SimpleDispatcher),
631 RoundRobin(RoundRobinDataDispatcher),
632}
633
634impl DispatcherImpl {
635 pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
636 let output_mapping =
637 DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
638
639 use risingwave_pb::stream_plan::DispatcherType::*;
640 let dispatcher_impl = match dispatcher.get_type()? {
641 Hash => {
642 assert!(!outputs.is_empty());
643 let dist_key_indices = dispatcher
644 .dist_key_indices
645 .iter()
646 .map(|i| *i as usize)
647 .collect();
648
649 let hash_mapping =
650 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
651
652 DispatcherImpl::Hash(HashDataDispatcher::new(
653 outputs,
654 dist_key_indices,
655 output_mapping,
656 hash_mapping,
657 dispatcher.dispatcher_id,
658 ))
659 }
660 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
661 outputs,
662 output_mapping,
663 dispatcher.dispatcher_id,
664 )),
665 Simple | NoShuffle => {
666 let [output]: [_; 1] = outputs.try_into().unwrap();
667 DispatcherImpl::Simple(SimpleDispatcher::new(
668 output,
669 output_mapping,
670 dispatcher.dispatcher_id,
671 ))
672 }
673 Unspecified => unreachable!(),
674 };
675
676 Ok(dispatcher_impl)
677 }
678}
679
680macro_rules! impl_dispatcher {
681 ($( { $variant_name:ident } ),*) => {
682 impl DispatcherImpl {
683 pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
684 match self {
685 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
686 }
687 }
688
689 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
690 match self {
691 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
692 }
693 }
694
695 pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
696 match self {
697 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
698 }
699 }
700
701 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
702 match self {
703 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
704 }
705 }
706
707 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
708 match self {
709 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
710 }
711 }
712
713 pub fn dispatcher_id(&self) -> DispatcherId {
714 match self {
715 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
716 }
717 }
718
719 pub fn dispatcher_id_str(&self) -> &str {
720 match self {
721 $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
722 }
723 }
724
725 pub fn is_empty(&self) -> bool {
726 match self {
727 $(Self::$variant_name(inner) => inner.is_empty(), )*
728 }
729 }
730 }
731 }
732}
733
734macro_rules! for_all_dispatcher_variants {
735 ($macro:ident) => {
736 $macro! {
737 { Hash },
738 { Broadcast },
739 { Simple },
740 { RoundRobin }
741 }
742 };
743}
744
745for_all_dispatcher_variants! { impl_dispatcher }
746
747pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
748
749pub trait Dispatcher: Debug + 'static {
750 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
752 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
754 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
756
757 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
759 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
761
762 fn dispatcher_id(&self) -> DispatcherId;
768
769 fn dispatcher_id_str(&self) -> &str;
771
772 fn is_empty(&self) -> bool;
775}
776
777async fn broadcast_concurrent(
782 outputs: impl IntoIterator<Item = &'_ mut Output>,
783 message: DispatcherMessageBatch,
784) -> StreamResult<()> {
785 futures::future::try_join_all(
786 outputs
787 .into_iter()
788 .map(|output| output.send(message.clone())),
789 )
790 .await?;
791 Ok(())
792}
793
794#[derive(Debug)]
795pub struct RoundRobinDataDispatcher {
796 outputs: Vec<Output>,
797 output_mapping: DispatchOutputMapping,
798 cur: usize,
799 dispatcher_id: DispatcherId,
800 dispatcher_id_str: String,
801}
802
803impl RoundRobinDataDispatcher {
804 pub fn new(
805 outputs: Vec<Output>,
806 output_mapping: DispatchOutputMapping,
807 dispatcher_id: DispatcherId,
808 ) -> Self {
809 Self {
810 outputs,
811 output_mapping,
812 cur: 0,
813 dispatcher_id,
814 dispatcher_id_str: dispatcher_id.to_string(),
815 }
816 }
817}
818
819impl Dispatcher for RoundRobinDataDispatcher {
820 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
821 let chunk = self.output_mapping.apply(chunk);
822
823 self.outputs[self.cur]
824 .send(DispatcherMessageBatch::Chunk(chunk))
825 .await?;
826 self.cur += 1;
827 self.cur %= self.outputs.len();
828 Ok(())
829 }
830
831 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
832 broadcast_concurrent(
834 &mut self.outputs,
835 DispatcherMessageBatch::BarrierBatch(barriers),
836 )
837 .await
838 }
839
840 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
841 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
842 broadcast_concurrent(
844 &mut self.outputs,
845 DispatcherMessageBatch::Watermark(watermark),
846 )
847 .await?;
848 }
849 Ok(())
850 }
851
852 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
853 self.outputs.extend(outputs);
854 }
855
856 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
857 self.outputs
858 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
859 .count();
860 self.cur = self.cur.min(self.outputs.len() - 1);
861 }
862
863 fn dispatcher_id(&self) -> DispatcherId {
864 self.dispatcher_id
865 }
866
867 fn dispatcher_id_str(&self) -> &str {
868 &self.dispatcher_id_str
869 }
870
871 fn is_empty(&self) -> bool {
872 self.outputs.is_empty()
873 }
874}
875
876pub struct HashDataDispatcher {
877 outputs: Vec<Output>,
878 keys: Vec<usize>,
879 output_mapping: DispatchOutputMapping,
880 hash_mapping: ExpandedActorMapping,
883 dispatcher_id: DispatcherId,
884 dispatcher_id_str: String,
885}
886
887impl Debug for HashDataDispatcher {
888 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
889 f.debug_struct("HashDataDispatcher")
890 .field("outputs", &self.outputs)
891 .field("keys", &self.keys)
892 .field("dispatcher_id", &self.dispatcher_id)
893 .finish_non_exhaustive()
894 }
895}
896
897impl HashDataDispatcher {
898 pub fn new(
899 outputs: Vec<Output>,
900 keys: Vec<usize>,
901 output_mapping: DispatchOutputMapping,
902 hash_mapping: ExpandedActorMapping,
903 dispatcher_id: DispatcherId,
904 ) -> Self {
905 Self {
906 outputs,
907 keys,
908 output_mapping,
909 hash_mapping,
910 dispatcher_id,
911 dispatcher_id_str: dispatcher_id.to_string(),
912 }
913 }
914}
915
916impl Dispatcher for HashDataDispatcher {
917 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
918 self.outputs.extend(outputs);
919 }
920
921 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
922 broadcast_concurrent(
924 &mut self.outputs,
925 DispatcherMessageBatch::BarrierBatch(barriers),
926 )
927 .await
928 }
929
930 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
931 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
932 broadcast_concurrent(
934 &mut self.outputs,
935 DispatcherMessageBatch::Watermark(watermark),
936 )
937 .await?;
938 }
939 Ok(())
940 }
941
942 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
943 let num_outputs = self.outputs.len();
947
948 let vnode_count = self.hash_mapping.len();
950 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
951
952 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
953
954 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
955 .take(num_outputs)
956 .collect_vec();
957 let mut last_vnode_when_update_delete = None;
958 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
959
960 let chunk = self.output_mapping.apply(chunk);
962
963 for ((vnode, &op), visible) in vnodes
964 .iter()
965 .copied()
966 .zip_eq_fast(chunk.ops())
967 .zip_eq_fast(chunk.visibility().iter())
968 {
969 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
971 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
972 }
973
974 if !visible {
975 assert!(
976 last_vnode_when_update_delete.is_none(),
977 "invisible row between U- and U+, op = {op:?}",
978 );
979 new_ops.push(op);
980 continue;
981 }
982
983 if op == Op::UpdateDelete {
987 last_vnode_when_update_delete = Some(vnode);
988 } else if op == Op::UpdateInsert {
989 if vnode
990 != last_vnode_when_update_delete
991 .take()
992 .expect("missing U- before U+")
993 {
994 new_ops.push(Op::Delete);
995 new_ops.push(Op::Insert);
996 } else {
997 new_ops.push(Op::UpdateDelete);
998 new_ops.push(Op::UpdateInsert);
999 }
1000 } else {
1001 new_ops.push(op);
1002 }
1003 }
1004 assert!(
1005 last_vnode_when_update_delete.is_none(),
1006 "missing U+ after U-"
1007 );
1008
1009 let ops = new_ops;
1010
1011 futures::future::try_join_all(
1013 vis_maps
1014 .into_iter()
1015 .zip_eq_fast(self.outputs.iter_mut())
1016 .map(|(vis_map, output)| async {
1017 let vis_map = vis_map.finish();
1018 let new_stream_chunk =
1020 StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1021 if new_stream_chunk.cardinality() > 0 {
1022 event!(
1023 tracing::Level::TRACE,
1024 msg = "chunk",
1025 downstream = output.actor_id(),
1026 "send = \n{:#?}",
1027 new_stream_chunk
1028 );
1029 output
1030 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1031 .await?;
1032 }
1033 StreamResult::Ok(())
1034 }),
1035 )
1036 .await?;
1037
1038 Ok(())
1039 }
1040
1041 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1042 self.outputs
1043 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1044 .count();
1045 }
1046
1047 fn dispatcher_id(&self) -> DispatcherId {
1048 self.dispatcher_id
1049 }
1050
1051 fn dispatcher_id_str(&self) -> &str {
1052 &self.dispatcher_id_str
1053 }
1054
1055 fn is_empty(&self) -> bool {
1056 self.outputs.is_empty()
1057 }
1058}
1059
1060#[derive(Debug)]
1062pub struct BroadcastDispatcher {
1063 outputs: HashMap<ActorId, Output>,
1064 output_mapping: DispatchOutputMapping,
1065 dispatcher_id: DispatcherId,
1066 dispatcher_id_str: String,
1067}
1068
1069impl BroadcastDispatcher {
1070 pub fn new(
1071 outputs: impl IntoIterator<Item = Output>,
1072 output_mapping: DispatchOutputMapping,
1073 dispatcher_id: DispatcherId,
1074 ) -> Self {
1075 Self {
1076 outputs: Self::into_pairs(outputs).collect(),
1077 output_mapping,
1078 dispatcher_id,
1079 dispatcher_id_str: dispatcher_id.to_string(),
1080 }
1081 }
1082
1083 fn into_pairs(
1084 outputs: impl IntoIterator<Item = Output>,
1085 ) -> impl Iterator<Item = (ActorId, Output)> {
1086 outputs
1087 .into_iter()
1088 .map(|output| (output.actor_id(), output))
1089 }
1090}
1091
1092impl Dispatcher for BroadcastDispatcher {
1093 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1094 let chunk = self.output_mapping.apply(chunk);
1095 broadcast_concurrent(
1096 self.outputs.values_mut(),
1097 DispatcherMessageBatch::Chunk(chunk),
1098 )
1099 .await
1100 }
1101
1102 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1103 broadcast_concurrent(
1105 self.outputs.values_mut(),
1106 DispatcherMessageBatch::BarrierBatch(barriers),
1107 )
1108 .await
1109 }
1110
1111 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1112 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1113 broadcast_concurrent(
1115 self.outputs.values_mut(),
1116 DispatcherMessageBatch::Watermark(watermark),
1117 )
1118 .await?;
1119 }
1120 Ok(())
1121 }
1122
1123 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1124 self.outputs.extend(Self::into_pairs(outputs));
1125 }
1126
1127 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1128 self.outputs
1129 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1130 .count();
1131 }
1132
1133 fn dispatcher_id(&self) -> DispatcherId {
1134 self.dispatcher_id
1135 }
1136
1137 fn dispatcher_id_str(&self) -> &str {
1138 &self.dispatcher_id_str
1139 }
1140
1141 fn is_empty(&self) -> bool {
1142 self.outputs.is_empty()
1143 }
1144}
1145
1146#[derive(Debug)]
1148pub struct SimpleDispatcher {
1149 output: SmallVec<[Output; 2]>,
1163 output_mapping: DispatchOutputMapping,
1164 dispatcher_id: DispatcherId,
1165 dispatcher_id_str: String,
1166}
1167
1168impl SimpleDispatcher {
1169 pub fn new(
1170 output: Output,
1171 output_mapping: DispatchOutputMapping,
1172 dispatcher_id: DispatcherId,
1173 ) -> Self {
1174 Self {
1175 output: smallvec![output],
1176 output_mapping,
1177 dispatcher_id,
1178 dispatcher_id_str: dispatcher_id.to_string(),
1179 }
1180 }
1181}
1182
1183impl Dispatcher for SimpleDispatcher {
1184 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1185 self.output.extend(outputs);
1186 assert!(self.output.len() <= 2);
1187 }
1188
1189 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1190 for output in &mut self.output {
1192 output
1193 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1194 .await?;
1195 }
1196 Ok(())
1197 }
1198
1199 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1200 let output = self
1201 .output
1202 .iter_mut()
1203 .exactly_one()
1204 .expect("expect exactly one output");
1205
1206 let chunk = self.output_mapping.apply(chunk);
1207 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1208 }
1209
1210 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1211 let output = self
1212 .output
1213 .iter_mut()
1214 .exactly_one()
1215 .expect("expect exactly one output");
1216
1217 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1218 output
1219 .send(DispatcherMessageBatch::Watermark(watermark))
1220 .await?;
1221 }
1222 Ok(())
1223 }
1224
1225 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1226 self.output
1227 .retain(|output| !actor_ids.contains(&output.actor_id()));
1228 }
1229
1230 fn dispatcher_id(&self) -> DispatcherId {
1231 self.dispatcher_id
1232 }
1233
1234 fn dispatcher_id_str(&self) -> &str {
1235 &self.dispatcher_id_str
1236 }
1237
1238 fn is_empty(&self) -> bool {
1239 self.output.is_empty()
1240 }
1241}
1242
1243#[cfg(test)]
1244mod tests {
1245 use std::hash::{BuildHasher, Hasher};
1246
1247 use futures::pin_mut;
1248 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1249 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1250 use risingwave_common::util::epoch::test_epoch;
1251 use risingwave_common::util::hash_util::Crc32FastBuilder;
1252 use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1253 use tokio::sync::mpsc::unbounded_channel;
1254
1255 use super::*;
1256 use crate::executor::exchange::output::Output;
1257 use crate::executor::exchange::permit::channel_for_test;
1258 use crate::executor::receiver::ReceiverExecutor;
1259 use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1260 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1261
1262 #[tokio::test]
1265 async fn test_hash_dispatcher_complex() {
1266 test_hash_dispatcher_complex_inner().await
1267 }
1268
1269 async fn test_hash_dispatcher_complex_inner() {
1270 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1272
1273 let num_outputs = 2; let key_indices = &[0, 2];
1275 let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1276 (0..num_outputs).map(|_| channel_for_test()).collect();
1277 let outputs = output_tx_vecs
1278 .into_iter()
1279 .enumerate()
1280 .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1281 .collect::<Vec<_>>();
1282 let mut hash_mapping = (1..num_outputs + 1)
1283 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1284 .collect_vec();
1285 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1286 let mut hash_dispatcher = HashDataDispatcher::new(
1287 outputs,
1288 key_indices.to_vec(),
1289 DispatchOutputMapping::Simple(vec![0, 1, 2]),
1290 hash_mapping,
1291 0,
1292 );
1293
1294 let chunk = StreamChunk::from_pretty(
1295 " I I I
1296 + 4 6 8
1297 + 5 7 9
1298 + 0 0 0
1299 - 1 1 1 D
1300 U- 2 0 2
1301 U+ 2 0 2
1302 U- 3 3 2
1303 U+ 3 3 4",
1304 );
1305 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1306
1307 assert_eq!(
1308 *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1309 StreamChunk::from_pretty(
1310 " I I I
1311 + 4 6 8
1312 + 5 7 9
1313 + 0 0 0
1314 - 1 1 1 D
1315 U- 2 0 2
1316 U+ 2 0 2
1317 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1318 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1319 )
1320 );
1321 assert_eq!(
1322 *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1323 StreamChunk::from_pretty(
1324 " I I I
1325 + 4 6 8 D
1326 + 5 7 9 D
1327 + 0 0 0 D
1328 - 1 1 1 D // Should keep original invisible mark
1329 U- 2 0 2 D // Should keep UpdateDelete
1330 U+ 2 0 2 D // Should keep UpdateInsert
1331 - 3 3 2 // Should rewrite UpdateDelete to Delete
1332 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1333 )
1334 );
1335 }
1336
1337 #[tokio::test]
1338 async fn test_configuration_change() {
1339 let _schema = Schema { fields: vec![] };
1340 let (tx, rx) = channel_for_test();
1341 let actor_id = 233;
1342 let fragment_id = 666;
1343 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1344 let metrics = Arc::new(StreamingMetrics::unused());
1345
1346 let (untouched, old, new) = (234, 235, 238); let (old_simple, new_simple) = (114, 514); let broadcast_dispatcher_id = 666;
1352 let broadcast_dispatcher = PbDispatcher {
1353 r#type: DispatcherType::Broadcast as _,
1354 dispatcher_id: broadcast_dispatcher_id,
1355 downstream_actor_id: vec![untouched, old],
1356 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1358 };
1359
1360 let simple_dispatcher_id = 888;
1361 let simple_dispatcher = PbDispatcher {
1362 r#type: DispatcherType::Simple as _,
1363 dispatcher_id: simple_dispatcher_id,
1364 downstream_actor_id: vec![old_simple],
1365 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1367 };
1368
1369 let dispatcher_updates = maplit::hashmap! {
1370 actor_id => vec![PbDispatcherUpdate {
1371 actor_id,
1372 dispatcher_id: broadcast_dispatcher_id,
1373 added_downstream_actor_id: vec![new],
1374 removed_downstream_actor_id: vec![old],
1375 hash_mapping: Default::default(),
1376 }]
1377 };
1378 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1379 UpdateMutation {
1380 dispatchers: dispatcher_updates,
1381 merges: Default::default(),
1382 vnode_bitmaps: Default::default(),
1383 dropped_actors: Default::default(),
1384 actor_splits: Default::default(),
1385 actor_new_dispatchers: Default::default(),
1386 },
1387 ));
1388 barrier_test_env.inject_barrier(&b1, [actor_id]);
1389 barrier_test_env.flush_all_events().await;
1390
1391 let input = Executor::new(
1392 Default::default(),
1393 ReceiverExecutor::for_test(
1394 actor_id,
1395 rx,
1396 barrier_test_env.local_barrier_manager.clone(),
1397 )
1398 .boxed(),
1399 );
1400
1401 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1402 let mut rxs = [untouched, old, new, old_simple, new_simple]
1403 .into_iter()
1404 .map(|id| {
1405 (id, {
1406 let (tx, rx) = channel_for_test();
1407 new_output_request_tx
1408 .send((id, NewOutputRequest::Local(tx)))
1409 .unwrap();
1410 rx
1411 })
1412 })
1413 .collect::<HashMap<_, _>>();
1414 let executor = Box::new(
1415 DispatchExecutor::new(
1416 input,
1417 new_output_request_rx,
1418 vec![broadcast_dispatcher, simple_dispatcher],
1419 actor_id,
1420 fragment_id,
1421 barrier_test_env.local_barrier_manager.clone(),
1422 metrics,
1423 )
1424 .await
1425 .unwrap(),
1426 )
1427 .execute();
1428
1429 pin_mut!(executor);
1430
1431 macro_rules! try_recv {
1432 ($down_id:expr) => {
1433 rxs.get_mut(&$down_id).unwrap().try_recv()
1434 };
1435 }
1436
1437 tx.send(Message::Chunk(StreamChunk::default()).into())
1439 .await
1440 .unwrap();
1441
1442 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1443 .await
1444 .unwrap();
1445 executor.next().await.unwrap().unwrap();
1446
1447 try_recv!(untouched).unwrap().as_chunk().unwrap();
1449 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1450
1451 try_recv!(old).unwrap().as_chunk().unwrap();
1452 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1457 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1461 barrier_test_env.inject_barrier(&b2, [actor_id]);
1462 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1463 .await
1464 .unwrap();
1465 executor.next().await.unwrap().unwrap();
1466
1467 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1469 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1471
1472 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1477 .await
1478 .unwrap();
1479
1480 let dispatcher_updates = maplit::hashmap! {
1482 actor_id => vec![PbDispatcherUpdate {
1483 actor_id,
1484 dispatcher_id: simple_dispatcher_id,
1485 added_downstream_actor_id: vec![new_simple],
1486 removed_downstream_actor_id: vec![old_simple],
1487 hash_mapping: Default::default(),
1488 }]
1489 };
1490 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1491 UpdateMutation {
1492 dispatchers: dispatcher_updates,
1493 merges: Default::default(),
1494 vnode_bitmaps: Default::default(),
1495 dropped_actors: Default::default(),
1496 actor_splits: Default::default(),
1497 actor_new_dispatchers: Default::default(),
1498 },
1499 ));
1500 barrier_test_env.inject_barrier(&b3, [actor_id]);
1501 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1502 .await
1503 .unwrap();
1504 executor.next().await.unwrap().unwrap();
1505
1506 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1508 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1514 barrier_test_env.inject_barrier(&b4, [actor_id]);
1515 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1516 .await
1517 .unwrap();
1518 executor.next().await.unwrap().unwrap();
1519
1520 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1523 }
1524
1525 #[tokio::test]
1526 async fn test_hash_dispatcher() {
1527 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1529
1530 let num_outputs = 5; let cardinality = 10;
1532 let dimension = 4;
1533 let key_indices = &[0, 2];
1534 let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1535 (0..num_outputs).map(|_| channel_for_test()).collect();
1536 let outputs = output_tx_vecs
1537 .into_iter()
1538 .enumerate()
1539 .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1540 .collect::<Vec<_>>();
1541 let mut hash_mapping = (1..num_outputs + 1)
1542 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1543 .collect_vec();
1544 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1545 let mut hash_dispatcher = HashDataDispatcher::new(
1546 outputs,
1547 key_indices.to_vec(),
1548 DispatchOutputMapping::Simple((0..dimension).collect()),
1549 hash_mapping.clone(),
1550 0,
1551 );
1552
1553 let mut ops = Vec::new();
1554 for idx in 0..cardinality {
1555 if idx % 2 == 0 {
1556 ops.push(Op::Insert);
1557 } else {
1558 ops.push(Op::Delete);
1559 }
1560 }
1561
1562 let mut start = 19260817i32..;
1563 let mut builders = (0..dimension)
1564 .map(|_| I32ArrayBuilder::new(cardinality))
1565 .collect_vec();
1566 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1567 let mut output_ops = vec![vec![]; num_outputs];
1568 for op in &ops {
1569 let hash_builder = Crc32FastBuilder;
1570 let mut hasher = hash_builder.build_hasher();
1571 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1572 for key_idx in key_indices {
1573 let val = one_row[*key_idx];
1574 let bytes = val.to_le_bytes();
1575 hasher.update(&bytes);
1576 }
1577 let output_idx =
1578 hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1579 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1580 builder.append(Some(*val));
1581 }
1582 output_cols[output_idx]
1583 .iter_mut()
1584 .zip_eq_fast(one_row.iter())
1585 .for_each(|(each_column, val)| each_column.push(*val));
1586 output_ops[output_idx].push(op);
1587 }
1588
1589 let columns = builders
1590 .into_iter()
1591 .map(|builder| {
1592 let array = builder.finish();
1593 array.into_ref()
1594 })
1595 .collect();
1596
1597 let chunk = StreamChunk::new(ops, columns);
1598 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1599
1600 for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1601 let mut output = vec![];
1602 while let Some(Some(msg)) = rx.recv().now_or_never() {
1603 output.push(msg);
1604 }
1605 assert!(output.len() <= 1);
1607 if output.is_empty() {
1608 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1609 } else {
1610 let message = output.first().unwrap();
1611 let real_chunk = match message {
1612 DispatcherMessageBatch::Chunk(chunk) => chunk,
1613 _ => panic!(),
1614 };
1615 real_chunk
1616 .columns()
1617 .iter()
1618 .zip_eq_fast(output_cols[output_idx].iter())
1619 .for_each(|(real_col, expect_col)| {
1620 let real_vals = real_chunk
1621 .visibility()
1622 .iter_ones()
1623 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1624 .collect::<Vec<_>>();
1625 assert_eq!(real_vals.len(), expect_col.len());
1626 assert_eq!(real_vals, *expect_col);
1627 });
1628 }
1629 }
1630 }
1631}