1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_pb::stream_plan;
30use risingwave_pb::stream_plan::PbDispatcher;
31use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
32use smallvec::{SmallVec, smallvec};
33use tokio::sync::mpsc::UnboundedReceiver;
34use tokio::time::Instant;
35use tokio_stream::StreamExt;
36use tokio_stream::adapters::Peekable;
37use tracing::{Instrument, event};
38
39use super::exchange::output::Output;
40use super::{
41 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
42 UpdateMutation,
43};
44use crate::executor::StreamConsumer;
45use crate::executor::prelude::*;
46use crate::task::{DispatcherId, LocalBarrierManager, NewOutputRequest};
47
48pub struct DispatchExecutor {
52 input: Executor,
53 inner: DispatchExecutorInner,
54}
55
56struct DispatcherWithMetrics {
57 dispatcher: DispatcherImpl,
58 actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
59}
60
61impl DispatcherWithMetrics {
62 pub fn record_output_buffer_blocking_duration(&self, duration: Duration) {
63 let ns = duration.as_nanos() as u64;
64 self.actor_output_buffer_blocking_duration_ns.inc_by(ns);
65 }
66}
67
68impl Debug for DispatcherWithMetrics {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 self.dispatcher.fmt(f)
71 }
72}
73
74impl Deref for DispatcherWithMetrics {
75 type Target = DispatcherImpl;
76
77 fn deref(&self) -> &Self::Target {
78 &self.dispatcher
79 }
80}
81
82impl DerefMut for DispatcherWithMetrics {
83 fn deref_mut(&mut self) -> &mut Self::Target {
84 &mut self.dispatcher
85 }
86}
87
88struct DispatchExecutorMetrics {
89 actor_id_str: String,
90 fragment_id_str: String,
91 metrics: Arc<StreamingMetrics>,
92 actor_out_record_cnt: LabelGuardedIntCounter,
93}
94
95impl DispatchExecutorMetrics {
96 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
97 DispatcherWithMetrics {
98 actor_output_buffer_blocking_duration_ns: self
99 .metrics
100 .actor_output_buffer_blocking_duration_ns
101 .with_guarded_label_values(&[
102 self.actor_id_str.as_str(),
103 self.fragment_id_str.as_str(),
104 dispatcher.dispatcher_id_str(),
105 ]),
106 dispatcher,
107 }
108 }
109}
110
111struct DispatchExecutorInner {
112 dispatchers: Vec<DispatcherWithMetrics>,
113 actor_id: u32,
114 local_barrier_manager: LocalBarrierManager,
115 metrics: DispatchExecutorMetrics,
116 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
117 pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
118}
119
120impl DispatchExecutorInner {
121 async fn collect_outputs(
122 &mut self,
123 downstream_actors: &[ActorId],
124 ) -> StreamResult<Vec<Output>> {
125 fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
126 let tx = match request {
127 NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
128 };
129 Output::new(downstream_actor, tx)
130 }
131 let mut outputs = Vec::with_capacity(downstream_actors.len());
132 for downstream_actor in downstream_actors {
133 let output =
134 if let Some(request) = self.pending_new_output_requests.remove(downstream_actor) {
135 resolve_output(*downstream_actor, request)
136 } else {
137 loop {
138 let (requested_actor, request) = self
139 .new_output_request_rx
140 .recv()
141 .await
142 .ok_or_else(|| anyhow!("end of new output request"))?;
143 if requested_actor == *downstream_actor {
144 break resolve_output(requested_actor, request);
145 } else {
146 assert!(
147 self.pending_new_output_requests
148 .insert(requested_actor, request)
149 .is_none(),
150 "duplicated inflight new output requests from actor {}",
151 requested_actor
152 );
153 }
154 }
155 };
156 outputs.push(output);
157 }
158 Ok(outputs)
159 }
160
161 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
162 let limit = self
163 .local_barrier_manager
164 .env
165 .config()
166 .developer
167 .exchange_concurrent_dispatchers;
168 match msg {
170 MessageBatch::BarrierBatch(barrier_batch) => {
171 if barrier_batch.is_empty() {
172 return Ok(());
173 }
174 let mutation = barrier_batch[0].mutation.clone();
176 self.pre_mutate_dispatchers(&mutation).await?;
177 futures::stream::iter(self.dispatchers.iter_mut())
178 .map(Ok)
179 .try_for_each_concurrent(limit, |dispatcher| async {
180 let start_time = Instant::now();
181 dispatcher
182 .dispatch_barriers(
183 barrier_batch
184 .iter()
185 .cloned()
186 .map(|b| b.into_dispatcher())
187 .collect(),
188 )
189 .await?;
190 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
191 StreamResult::Ok(())
192 })
193 .await?;
194 self.post_mutate_dispatchers(&mutation)?;
195 }
196 MessageBatch::Watermark(watermark) => {
197 futures::stream::iter(self.dispatchers.iter_mut())
198 .map(Ok)
199 .try_for_each_concurrent(limit, |dispatcher| async {
200 let start_time = Instant::now();
201 dispatcher.dispatch_watermark(watermark.clone()).await?;
202 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
203 StreamResult::Ok(())
204 })
205 .await?;
206 }
207 MessageBatch::Chunk(chunk) => {
208 futures::stream::iter(self.dispatchers.iter_mut())
209 .map(Ok)
210 .try_for_each_concurrent(limit, |dispatcher| async {
211 let start_time = Instant::now();
212 dispatcher.dispatch_data(chunk.clone()).await?;
213 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
214 StreamResult::Ok(())
215 })
216 .await?;
217
218 self.metrics
219 .actor_out_record_cnt
220 .inc_by(chunk.cardinality() as _);
221 }
222 }
223 Ok(())
224 }
225
226 async fn add_dispatchers<'a>(
228 &mut self,
229 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
230 ) -> StreamResult<()> {
231 for dispatcher in new_dispatchers {
232 let outputs = self
233 .collect_outputs(&dispatcher.downstream_actor_id)
234 .await?;
235 let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
236 let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
237 self.dispatchers.push(dispatcher);
238 }
239
240 assert!(
241 self.dispatchers
242 .iter()
243 .map(|d| d.dispatcher_id())
244 .all_unique(),
245 "dispatcher ids must be unique: {:?}",
246 self.dispatchers
247 );
248
249 Ok(())
250 }
251
252 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
253 self.dispatchers
254 .iter_mut()
255 .find(|d| d.dispatcher_id() == dispatcher_id)
256 .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
257 }
258
259 async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
262 let outputs = self
263 .collect_outputs(&update.added_downstream_actor_id)
264 .await?;
265
266 let dispatcher = self.find_dispatcher(update.dispatcher_id);
267 dispatcher.add_outputs(outputs);
268
269 Ok(())
270 }
271
272 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
275 let ids = update.removed_downstream_actor_id.iter().copied().collect();
276
277 let dispatcher = self.find_dispatcher(update.dispatcher_id);
278 dispatcher.remove_outputs(&ids);
279
280 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
287 dispatcher.hash_mapping =
288 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
289 }
290
291 Ok(())
292 }
293
294 async fn pre_mutate_dispatchers(
296 &mut self,
297 mutation: &Option<Arc<Mutation>>,
298 ) -> StreamResult<()> {
299 let Some(mutation) = mutation.as_deref() else {
300 return Ok(());
301 };
302
303 match mutation {
304 Mutation::Add(AddMutation { adds, .. }) => {
305 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
306 self.add_dispatchers(new_dispatchers).await?;
307 }
308 }
309 Mutation::Update(UpdateMutation {
310 dispatchers,
311 actor_new_dispatchers: actor_dispatchers,
312 ..
313 }) => {
314 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
315 self.add_dispatchers(new_dispatchers).await?;
316 }
317
318 if let Some(updates) = dispatchers.get(&self.actor_id) {
319 for update in updates {
320 self.pre_update_dispatcher(update).await?;
321 }
322 }
323 }
324 Mutation::AddAndUpdate(
325 AddMutation { adds, .. },
326 UpdateMutation {
327 dispatchers,
328 actor_new_dispatchers: actor_dispatchers,
329 ..
330 },
331 ) => {
332 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
333 self.add_dispatchers(new_dispatchers).await?;
334 }
335
336 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
337 self.add_dispatchers(new_dispatchers).await?;
338 }
339
340 if let Some(updates) = dispatchers.get(&self.actor_id) {
341 for update in updates {
342 self.pre_update_dispatcher(update).await?;
343 }
344 }
345 }
346 _ => {}
347 }
348
349 Ok(())
350 }
351
352 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
354 let Some(mutation) = mutation.as_deref() else {
355 return Ok(());
356 };
357
358 match mutation {
359 Mutation::Stop(stops) => {
360 if !stops.contains(&self.actor_id) {
362 for dispatcher in &mut self.dispatchers {
363 dispatcher.remove_outputs(stops);
364 }
365 }
366 }
367 Mutation::Update(UpdateMutation {
368 dispatchers,
369 dropped_actors,
370 ..
371 })
372 | Mutation::AddAndUpdate(
373 _,
374 UpdateMutation {
375 dispatchers,
376 dropped_actors,
377 ..
378 },
379 ) => {
380 if let Some(updates) = dispatchers.get(&self.actor_id) {
381 for update in updates {
382 self.post_update_dispatcher(update)?;
383 }
384 }
385
386 if !dropped_actors.contains(&self.actor_id) {
387 for dispatcher in &mut self.dispatchers {
388 dispatcher.remove_outputs(dropped_actors);
389 }
390 }
391 }
392 _ => {}
393 };
394
395 self.dispatchers.retain(|d| !d.is_empty());
398
399 Ok(())
400 }
401}
402
403impl DispatchExecutor {
404 pub(crate) async fn new(
405 input: Executor,
406 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
407 dispatchers: Vec<stream_plan::Dispatcher>,
408 actor_id: u32,
409 fragment_id: u32,
410 local_barrier_manager: LocalBarrierManager,
411 metrics: Arc<StreamingMetrics>,
412 ) -> StreamResult<Self> {
413 let mut executor = Self::new_inner(
414 input,
415 new_output_request_rx,
416 vec![],
417 actor_id,
418 fragment_id,
419 local_barrier_manager,
420 metrics,
421 );
422 let inner = &mut executor.inner;
423 for dispatcher in dispatchers {
424 let outputs = inner
425 .collect_outputs(&dispatcher.downstream_actor_id)
426 .await?;
427 let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
428 let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
429 inner.dispatchers.push(dispatcher);
430 }
431 Ok(executor)
432 }
433
434 #[cfg(test)]
435 pub(crate) fn for_test(
436 input: Executor,
437 dispatchers: Vec<DispatcherImpl>,
438 actor_id: u32,
439 fragment_id: u32,
440 local_barrier_manager: LocalBarrierManager,
441 metrics: Arc<StreamingMetrics>,
442 ) -> (
443 Self,
444 tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
445 ) {
446 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
447
448 (
449 Self::new_inner(
450 input,
451 rx,
452 dispatchers,
453 actor_id,
454 fragment_id,
455 local_barrier_manager,
456 metrics,
457 ),
458 tx,
459 )
460 }
461
462 fn new_inner(
463 mut input: Executor,
464 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
465 dispatchers: Vec<DispatcherImpl>,
466 actor_id: u32,
467 fragment_id: u32,
468 local_barrier_manager: LocalBarrierManager,
469 metrics: Arc<StreamingMetrics>,
470 ) -> Self {
471 let chunk_size = local_barrier_manager.env.config().developer.chunk_size;
472 if crate::consistency::insane() {
473 let mut info = input.info().clone();
475 info.identity = format!("{} (embedded trouble)", info.identity);
476 let troublemaker = TroublemakerExecutor::new(input, chunk_size);
477 input = (info, troublemaker).into();
478 }
479
480 let actor_id_str = actor_id.to_string();
481 let fragment_id_str = fragment_id.to_string();
482 let actor_out_record_cnt = metrics
483 .actor_out_record_cnt
484 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
485 let metrics = DispatchExecutorMetrics {
486 actor_id_str,
487 fragment_id_str,
488 metrics,
489 actor_out_record_cnt,
490 };
491 let dispatchers = dispatchers
492 .into_iter()
493 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
494 .collect();
495 Self {
496 input,
497 inner: DispatchExecutorInner {
498 dispatchers,
499 actor_id,
500 local_barrier_manager,
501 metrics,
502 new_output_request_rx,
503 pending_new_output_requests: Default::default(),
504 },
505 }
506 }
507}
508
509impl StreamConsumer for DispatchExecutor {
510 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
511
512 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
513 let max_barrier_count_per_batch = self
514 .inner
515 .local_barrier_manager
516 .env
517 .config()
518 .developer
519 .max_barrier_batch_size;
520 #[try_stream]
521 async move {
522 let mut input = self.input.execute().peekable();
523 loop {
524 let Some(message) =
525 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
526 else {
527 break;
529 };
530 match message {
531 chunk @ MessageBatch::Chunk(_) => {
532 self.inner
533 .dispatch(chunk)
534 .instrument(tracing::info_span!("dispatch_chunk"))
535 .instrument_await("dispatch_chunk")
536 .await?;
537 }
538 MessageBatch::BarrierBatch(barrier_batch) => {
539 assert!(!barrier_batch.is_empty());
540 self.inner
541 .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
542 .instrument(tracing::info_span!("dispatch_barrier_batch"))
543 .instrument_await("dispatch_barrier_batch")
544 .await?;
545 self.inner
546 .metrics
547 .metrics
548 .barrier_batch_size
549 .observe(barrier_batch.len() as f64);
550 for barrier in barrier_batch {
551 yield barrier;
552 }
553 }
554 watermark @ MessageBatch::Watermark(_) => {
555 self.inner
556 .dispatch(watermark)
557 .instrument(tracing::info_span!("dispatch_watermark"))
558 .instrument_await("dispatch_watermark")
559 .await?;
560 }
561 }
562 }
563 }
564 }
565}
566
567async fn try_batch_barriers(
573 max_barrier_count_per_batch: u32,
574 input: &mut Peekable<BoxedMessageStream>,
575) -> StreamResult<Option<MessageBatch>> {
576 let Some(msg) = input.next().await else {
577 return Ok(None);
579 };
580 let mut barrier_batch = vec![];
581 let msg: Message = msg?;
582 let max_peek_attempts = match msg {
583 Message::Chunk(c) => {
584 return Ok(Some(MessageBatch::Chunk(c)));
585 }
586 Message::Watermark(w) => {
587 return Ok(Some(MessageBatch::Watermark(w)));
588 }
589 Message::Barrier(b) => {
590 let peek_more_barrier = b.mutation.is_none();
591 barrier_batch.push(b);
592 if peek_more_barrier {
593 max_barrier_count_per_batch.saturating_sub(1)
594 } else {
595 0
596 }
597 }
598 };
599 for _ in 0..max_peek_attempts {
601 let peek = input.peek().now_or_never();
602 let Some(peek) = peek else {
603 break;
604 };
605 let Some(msg) = peek else {
606 break;
608 };
609 let Ok(Message::Barrier(barrier)) = msg else {
610 break;
611 };
612 if barrier.mutation.is_some() {
613 break;
614 }
615 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
616 let Message::Barrier(ref barrier) = msg else {
617 unreachable!("must be a barrier");
618 };
619 barrier_batch.push(barrier.clone());
620 }
621 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
622}
623
624#[derive(Debug)]
625pub enum DispatcherImpl {
626 Hash(HashDataDispatcher),
627 Broadcast(BroadcastDispatcher),
628 Simple(SimpleDispatcher),
629 RoundRobin(RoundRobinDataDispatcher),
630}
631
632impl DispatcherImpl {
633 pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
634 let output_indices = dispatcher
635 .output_indices
636 .iter()
637 .map(|&i| i as usize)
638 .collect_vec();
639
640 use risingwave_pb::stream_plan::DispatcherType::*;
641 let dispatcher_impl = match dispatcher.get_type()? {
642 Hash => {
643 assert!(!outputs.is_empty());
644 let dist_key_indices = dispatcher
645 .dist_key_indices
646 .iter()
647 .map(|i| *i as usize)
648 .collect();
649
650 let hash_mapping =
651 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
652
653 DispatcherImpl::Hash(HashDataDispatcher::new(
654 outputs,
655 dist_key_indices,
656 output_indices,
657 hash_mapping,
658 dispatcher.dispatcher_id,
659 ))
660 }
661 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
662 outputs,
663 output_indices,
664 dispatcher.dispatcher_id,
665 )),
666 Simple | NoShuffle => {
667 let [output]: [_; 1] = outputs.try_into().unwrap();
668 DispatcherImpl::Simple(SimpleDispatcher::new(
669 output,
670 output_indices,
671 dispatcher.dispatcher_id,
672 ))
673 }
674 Unspecified => unreachable!(),
675 };
676
677 Ok(dispatcher_impl)
678 }
679}
680
681macro_rules! impl_dispatcher {
682 ($( { $variant_name:ident } ),*) => {
683 impl DispatcherImpl {
684 pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
685 match self {
686 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
687 }
688 }
689
690 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
691 match self {
692 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
693 }
694 }
695
696 pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
697 match self {
698 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
699 }
700 }
701
702 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
703 match self {
704 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
705 }
706 }
707
708 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
709 match self {
710 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
711 }
712 }
713
714 pub fn dispatcher_id(&self) -> DispatcherId {
715 match self {
716 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
717 }
718 }
719
720 pub fn dispatcher_id_str(&self) -> &str {
721 match self {
722 $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
723 }
724 }
725
726 pub fn is_empty(&self) -> bool {
727 match self {
728 $(Self::$variant_name(inner) => inner.is_empty(), )*
729 }
730 }
731 }
732 }
733}
734
735macro_rules! for_all_dispatcher_variants {
736 ($macro:ident) => {
737 $macro! {
738 { Hash },
739 { Broadcast },
740 { Simple },
741 { RoundRobin }
742 }
743 };
744}
745
746for_all_dispatcher_variants! { impl_dispatcher }
747
748pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
749
750pub trait Dispatcher: Debug + 'static {
751 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
753 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
755 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
757
758 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
760 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
762
763 fn dispatcher_id(&self) -> DispatcherId;
769
770 fn dispatcher_id_str(&self) -> &str;
772
773 fn is_empty(&self) -> bool;
776}
777
778async fn broadcast_concurrent(
783 outputs: impl IntoIterator<Item = &'_ mut Output>,
784 message: DispatcherMessageBatch,
785) -> StreamResult<()> {
786 futures::future::try_join_all(
787 outputs
788 .into_iter()
789 .map(|output| output.send(message.clone())),
790 )
791 .await?;
792 Ok(())
793}
794
795#[derive(Debug)]
796pub struct RoundRobinDataDispatcher {
797 outputs: Vec<Output>,
798 output_indices: Vec<usize>,
799 cur: usize,
800 dispatcher_id: DispatcherId,
801 dispatcher_id_str: String,
802}
803
804impl RoundRobinDataDispatcher {
805 pub fn new(
806 outputs: Vec<Output>,
807 output_indices: Vec<usize>,
808 dispatcher_id: DispatcherId,
809 ) -> Self {
810 Self {
811 outputs,
812 output_indices,
813 cur: 0,
814 dispatcher_id,
815 dispatcher_id_str: dispatcher_id.to_string(),
816 }
817 }
818}
819
820impl Dispatcher for RoundRobinDataDispatcher {
821 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
822 let chunk = if self.output_indices.len() < chunk.columns().len() {
823 chunk
824 .project(&self.output_indices)
825 .eliminate_adjacent_noop_update()
826 } else {
827 chunk.project(&self.output_indices)
828 };
829
830 self.outputs[self.cur]
831 .send(DispatcherMessageBatch::Chunk(chunk))
832 .await?;
833 self.cur += 1;
834 self.cur %= self.outputs.len();
835 Ok(())
836 }
837
838 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
839 broadcast_concurrent(
841 &mut self.outputs,
842 DispatcherMessageBatch::BarrierBatch(barriers),
843 )
844 .await
845 }
846
847 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
848 if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
849 broadcast_concurrent(
851 &mut self.outputs,
852 DispatcherMessageBatch::Watermark(watermark),
853 )
854 .await?;
855 }
856 Ok(())
857 }
858
859 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
860 self.outputs.extend(outputs);
861 }
862
863 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
864 self.outputs
865 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
866 .count();
867 self.cur = self.cur.min(self.outputs.len() - 1);
868 }
869
870 fn dispatcher_id(&self) -> DispatcherId {
871 self.dispatcher_id
872 }
873
874 fn dispatcher_id_str(&self) -> &str {
875 &self.dispatcher_id_str
876 }
877
878 fn is_empty(&self) -> bool {
879 self.outputs.is_empty()
880 }
881}
882
883pub struct HashDataDispatcher {
884 outputs: Vec<Output>,
885 keys: Vec<usize>,
886 output_indices: Vec<usize>,
887 hash_mapping: ExpandedActorMapping,
890 dispatcher_id: DispatcherId,
891 dispatcher_id_str: String,
892}
893
894impl Debug for HashDataDispatcher {
895 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
896 f.debug_struct("HashDataDispatcher")
897 .field("outputs", &self.outputs)
898 .field("keys", &self.keys)
899 .field("dispatcher_id", &self.dispatcher_id)
900 .finish_non_exhaustive()
901 }
902}
903
904impl HashDataDispatcher {
905 pub fn new(
906 outputs: Vec<Output>,
907 keys: Vec<usize>,
908 output_indices: Vec<usize>,
909 hash_mapping: ExpandedActorMapping,
910 dispatcher_id: DispatcherId,
911 ) -> Self {
912 Self {
913 outputs,
914 keys,
915 output_indices,
916 hash_mapping,
917 dispatcher_id,
918 dispatcher_id_str: dispatcher_id.to_string(),
919 }
920 }
921}
922
923impl Dispatcher for HashDataDispatcher {
924 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
925 self.outputs.extend(outputs);
926 }
927
928 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
929 broadcast_concurrent(
931 &mut self.outputs,
932 DispatcherMessageBatch::BarrierBatch(barriers),
933 )
934 .await
935 }
936
937 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
938 if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
939 broadcast_concurrent(
941 &mut self.outputs,
942 DispatcherMessageBatch::Watermark(watermark),
943 )
944 .await?;
945 }
946 Ok(())
947 }
948
949 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
950 let num_outputs = self.outputs.len();
954
955 let vnode_count = self.hash_mapping.len();
957 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
958
959 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
960
961 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
962 .take(num_outputs)
963 .collect_vec();
964 let mut last_vnode_when_update_delete = None;
965 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
966
967 let chunk = if self.output_indices.len() < chunk.columns().len() {
969 chunk
970 .project(&self.output_indices)
971 .eliminate_adjacent_noop_update()
972 } else {
973 chunk.project(&self.output_indices)
974 };
975
976 for ((vnode, &op), visible) in vnodes
977 .iter()
978 .copied()
979 .zip_eq_fast(chunk.ops())
980 .zip_eq_fast(chunk.visibility().iter())
981 {
982 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
984 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
985 }
986
987 if !visible {
988 assert!(
989 last_vnode_when_update_delete.is_none(),
990 "invisible row between U- and U+, op = {op:?}",
991 );
992 new_ops.push(op);
993 continue;
994 }
995
996 if op == Op::UpdateDelete {
1000 last_vnode_when_update_delete = Some(vnode);
1001 } else if op == Op::UpdateInsert {
1002 if vnode
1003 != last_vnode_when_update_delete
1004 .take()
1005 .expect("missing U- before U+")
1006 {
1007 new_ops.push(Op::Delete);
1008 new_ops.push(Op::Insert);
1009 } else {
1010 new_ops.push(Op::UpdateDelete);
1011 new_ops.push(Op::UpdateInsert);
1012 }
1013 } else {
1014 new_ops.push(op);
1015 }
1016 }
1017 assert!(
1018 last_vnode_when_update_delete.is_none(),
1019 "missing U+ after U-"
1020 );
1021
1022 let ops = new_ops;
1023
1024 futures::future::try_join_all(
1026 vis_maps
1027 .into_iter()
1028 .zip_eq_fast(self.outputs.iter_mut())
1029 .map(|(vis_map, output)| async {
1030 let vis_map = vis_map.finish();
1031 let new_stream_chunk =
1033 StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
1034 if new_stream_chunk.cardinality() > 0 {
1035 event!(
1036 tracing::Level::TRACE,
1037 msg = "chunk",
1038 downstream = output.actor_id(),
1039 "send = \n{:#?}",
1040 new_stream_chunk
1041 );
1042 output
1043 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1044 .await?;
1045 }
1046 StreamResult::Ok(())
1047 }),
1048 )
1049 .await?;
1050
1051 Ok(())
1052 }
1053
1054 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1055 self.outputs
1056 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1057 .count();
1058 }
1059
1060 fn dispatcher_id(&self) -> DispatcherId {
1061 self.dispatcher_id
1062 }
1063
1064 fn dispatcher_id_str(&self) -> &str {
1065 &self.dispatcher_id_str
1066 }
1067
1068 fn is_empty(&self) -> bool {
1069 self.outputs.is_empty()
1070 }
1071}
1072
1073#[derive(Debug)]
1075pub struct BroadcastDispatcher {
1076 outputs: HashMap<ActorId, Output>,
1077 output_indices: Vec<usize>,
1078 dispatcher_id: DispatcherId,
1079 dispatcher_id_str: String,
1080}
1081
1082impl BroadcastDispatcher {
1083 pub fn new(
1084 outputs: impl IntoIterator<Item = Output>,
1085 output_indices: Vec<usize>,
1086 dispatcher_id: DispatcherId,
1087 ) -> Self {
1088 Self {
1089 outputs: Self::into_pairs(outputs).collect(),
1090 output_indices,
1091 dispatcher_id,
1092 dispatcher_id_str: dispatcher_id.to_string(),
1093 }
1094 }
1095
1096 fn into_pairs(
1097 outputs: impl IntoIterator<Item = Output>,
1098 ) -> impl Iterator<Item = (ActorId, Output)> {
1099 outputs
1100 .into_iter()
1101 .map(|output| (output.actor_id(), output))
1102 }
1103}
1104
1105impl Dispatcher for BroadcastDispatcher {
1106 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1107 let chunk = if self.output_indices.len() < chunk.columns().len() {
1108 chunk
1109 .project(&self.output_indices)
1110 .eliminate_adjacent_noop_update()
1111 } else {
1112 chunk.project(&self.output_indices)
1113 };
1114 broadcast_concurrent(
1115 self.outputs.values_mut(),
1116 DispatcherMessageBatch::Chunk(chunk),
1117 )
1118 .await
1119 }
1120
1121 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1122 broadcast_concurrent(
1124 self.outputs.values_mut(),
1125 DispatcherMessageBatch::BarrierBatch(barriers),
1126 )
1127 .await
1128 }
1129
1130 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1131 if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
1132 broadcast_concurrent(
1134 self.outputs.values_mut(),
1135 DispatcherMessageBatch::Watermark(watermark),
1136 )
1137 .await?;
1138 }
1139 Ok(())
1140 }
1141
1142 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1143 self.outputs.extend(Self::into_pairs(outputs));
1144 }
1145
1146 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1147 self.outputs
1148 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1149 .count();
1150 }
1151
1152 fn dispatcher_id(&self) -> DispatcherId {
1153 self.dispatcher_id
1154 }
1155
1156 fn dispatcher_id_str(&self) -> &str {
1157 &self.dispatcher_id_str
1158 }
1159
1160 fn is_empty(&self) -> bool {
1161 self.outputs.is_empty()
1162 }
1163}
1164
1165#[derive(Debug)]
1167pub struct SimpleDispatcher {
1168 output: SmallVec<[Output; 2]>,
1182 output_indices: Vec<usize>,
1183 dispatcher_id: DispatcherId,
1184 dispatcher_id_str: String,
1185}
1186
1187impl SimpleDispatcher {
1188 pub fn new(output: Output, output_indices: Vec<usize>, dispatcher_id: DispatcherId) -> Self {
1189 Self {
1190 output: smallvec![output],
1191 output_indices,
1192 dispatcher_id,
1193 dispatcher_id_str: dispatcher_id.to_string(),
1194 }
1195 }
1196}
1197
1198impl Dispatcher for SimpleDispatcher {
1199 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1200 self.output.extend(outputs);
1201 assert!(self.output.len() <= 2);
1202 }
1203
1204 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1205 for output in &mut self.output {
1207 output
1208 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1209 .await?;
1210 }
1211 Ok(())
1212 }
1213
1214 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1215 let output = self
1216 .output
1217 .iter_mut()
1218 .exactly_one()
1219 .expect("expect exactly one output");
1220
1221 let chunk = if self.output_indices.len() < chunk.columns().len() {
1222 chunk
1223 .project(&self.output_indices)
1224 .eliminate_adjacent_noop_update()
1225 } else {
1226 chunk.project(&self.output_indices)
1227 };
1228 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1229 }
1230
1231 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1232 let output = self
1233 .output
1234 .iter_mut()
1235 .exactly_one()
1236 .expect("expect exactly one output");
1237
1238 if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
1239 output
1240 .send(DispatcherMessageBatch::Watermark(watermark))
1241 .await?;
1242 }
1243 Ok(())
1244 }
1245
1246 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1247 self.output
1248 .retain(|output| !actor_ids.contains(&output.actor_id()));
1249 }
1250
1251 fn dispatcher_id(&self) -> DispatcherId {
1252 self.dispatcher_id
1253 }
1254
1255 fn dispatcher_id_str(&self) -> &str {
1256 &self.dispatcher_id_str
1257 }
1258
1259 fn is_empty(&self) -> bool {
1260 self.output.is_empty()
1261 }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266 use std::hash::{BuildHasher, Hasher};
1267
1268 use futures::pin_mut;
1269 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1270 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1271 use risingwave_common::util::epoch::test_epoch;
1272 use risingwave_common::util::hash_util::Crc32FastBuilder;
1273 use risingwave_pb::stream_plan::DispatcherType;
1274 use tokio::sync::mpsc::unbounded_channel;
1275
1276 use super::*;
1277 use crate::executor::exchange::output::Output;
1278 use crate::executor::exchange::permit::channel_for_test;
1279 use crate::executor::receiver::ReceiverExecutor;
1280 use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1281 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1282
1283 #[tokio::test]
1286 async fn test_hash_dispatcher_complex() {
1287 test_hash_dispatcher_complex_inner().await
1288 }
1289
1290 async fn test_hash_dispatcher_complex_inner() {
1291 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1293
1294 let num_outputs = 2; let key_indices = &[0, 2];
1296 let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1297 (0..num_outputs).map(|_| channel_for_test()).collect();
1298 let outputs = output_tx_vecs
1299 .into_iter()
1300 .enumerate()
1301 .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1302 .collect::<Vec<_>>();
1303 let mut hash_mapping = (1..num_outputs + 1)
1304 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1305 .collect_vec();
1306 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1307 let mut hash_dispatcher = HashDataDispatcher::new(
1308 outputs,
1309 key_indices.to_vec(),
1310 vec![0, 1, 2],
1311 hash_mapping,
1312 0,
1313 );
1314
1315 let chunk = StreamChunk::from_pretty(
1316 " I I I
1317 + 4 6 8
1318 + 5 7 9
1319 + 0 0 0
1320 - 1 1 1 D
1321 U- 2 0 2
1322 U+ 2 0 2
1323 U- 3 3 2
1324 U+ 3 3 4",
1325 );
1326 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1327
1328 assert_eq!(
1329 *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1330 StreamChunk::from_pretty(
1331 " I I I
1332 + 4 6 8
1333 + 5 7 9
1334 + 0 0 0
1335 - 1 1 1 D
1336 U- 2 0 2
1337 U+ 2 0 2
1338 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1339 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1340 )
1341 );
1342 assert_eq!(
1343 *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1344 StreamChunk::from_pretty(
1345 " I I I
1346 + 4 6 8 D
1347 + 5 7 9 D
1348 + 0 0 0 D
1349 - 1 1 1 D // Should keep original invisible mark
1350 U- 2 0 2 D // Should keep UpdateDelete
1351 U+ 2 0 2 D // Should keep UpdateInsert
1352 - 3 3 2 // Should rewrite UpdateDelete to Delete
1353 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1354 )
1355 );
1356 }
1357
1358 #[tokio::test]
1359 async fn test_configuration_change() {
1360 let _schema = Schema { fields: vec![] };
1361 let (tx, rx) = channel_for_test();
1362 let actor_id = 233;
1363 let fragment_id = 666;
1364 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1365 let metrics = Arc::new(StreamingMetrics::unused());
1366
1367 let (untouched, old, new) = (234, 235, 238); let (old_simple, new_simple) = (114, 514); let broadcast_dispatcher_id = 666;
1373 let broadcast_dispatcher = PbDispatcher {
1374 r#type: DispatcherType::Broadcast as _,
1375 dispatcher_id: broadcast_dispatcher_id,
1376 downstream_actor_id: vec![untouched, old],
1377 ..Default::default()
1378 };
1379
1380 let simple_dispatcher_id = 888;
1381 let simple_dispatcher = PbDispatcher {
1382 r#type: DispatcherType::Simple as _,
1383 dispatcher_id: simple_dispatcher_id,
1384 downstream_actor_id: vec![old_simple],
1385 ..Default::default()
1386 };
1387
1388 let dispatcher_updates = maplit::hashmap! {
1389 actor_id => vec![PbDispatcherUpdate {
1390 actor_id,
1391 dispatcher_id: broadcast_dispatcher_id,
1392 added_downstream_actor_id: vec![new],
1393 removed_downstream_actor_id: vec![old],
1394 hash_mapping: Default::default(),
1395 }]
1396 };
1397 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1398 UpdateMutation {
1399 dispatchers: dispatcher_updates,
1400 merges: Default::default(),
1401 vnode_bitmaps: Default::default(),
1402 dropped_actors: Default::default(),
1403 actor_splits: Default::default(),
1404 actor_new_dispatchers: Default::default(),
1405 },
1406 ));
1407 barrier_test_env.inject_barrier(&b1, [actor_id]);
1408 barrier_test_env.flush_all_events().await;
1409
1410 let input = Executor::new(
1411 Default::default(),
1412 ReceiverExecutor::for_test(
1413 actor_id,
1414 rx,
1415 barrier_test_env.local_barrier_manager.clone(),
1416 )
1417 .boxed(),
1418 );
1419
1420 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1421 let mut rxs = [untouched, old, new, old_simple, new_simple]
1422 .into_iter()
1423 .map(|id| {
1424 (id, {
1425 let (tx, rx) = channel_for_test();
1426 new_output_request_tx
1427 .send((id, NewOutputRequest::Local(tx)))
1428 .unwrap();
1429 rx
1430 })
1431 })
1432 .collect::<HashMap<_, _>>();
1433 let executor = Box::new(
1434 DispatchExecutor::new(
1435 input,
1436 new_output_request_rx,
1437 vec![broadcast_dispatcher, simple_dispatcher],
1438 actor_id,
1439 fragment_id,
1440 barrier_test_env.local_barrier_manager.clone(),
1441 metrics,
1442 )
1443 .await
1444 .unwrap(),
1445 )
1446 .execute();
1447
1448 pin_mut!(executor);
1449
1450 macro_rules! try_recv {
1451 ($down_id:expr) => {
1452 rxs.get_mut(&$down_id).unwrap().try_recv()
1453 };
1454 }
1455
1456 tx.send(Message::Chunk(StreamChunk::default()).into())
1458 .await
1459 .unwrap();
1460
1461 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1462 .await
1463 .unwrap();
1464 executor.next().await.unwrap().unwrap();
1465
1466 try_recv!(untouched).unwrap().as_chunk().unwrap();
1468 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1469
1470 try_recv!(old).unwrap().as_chunk().unwrap();
1471 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1476 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1480 barrier_test_env.inject_barrier(&b2, [actor_id]);
1481 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1482 .await
1483 .unwrap();
1484 executor.next().await.unwrap().unwrap();
1485
1486 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1488 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1490
1491 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1496 .await
1497 .unwrap();
1498
1499 let dispatcher_updates = maplit::hashmap! {
1501 actor_id => vec![PbDispatcherUpdate {
1502 actor_id,
1503 dispatcher_id: simple_dispatcher_id,
1504 added_downstream_actor_id: vec![new_simple],
1505 removed_downstream_actor_id: vec![old_simple],
1506 hash_mapping: Default::default(),
1507 }]
1508 };
1509 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1510 UpdateMutation {
1511 dispatchers: dispatcher_updates,
1512 merges: Default::default(),
1513 vnode_bitmaps: Default::default(),
1514 dropped_actors: Default::default(),
1515 actor_splits: Default::default(),
1516 actor_new_dispatchers: Default::default(),
1517 },
1518 ));
1519 barrier_test_env.inject_barrier(&b3, [actor_id]);
1520 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1521 .await
1522 .unwrap();
1523 executor.next().await.unwrap().unwrap();
1524
1525 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1527 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1533 barrier_test_env.inject_barrier(&b4, [actor_id]);
1534 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1535 .await
1536 .unwrap();
1537 executor.next().await.unwrap().unwrap();
1538
1539 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1542 }
1543
1544 #[tokio::test]
1545 async fn test_hash_dispatcher() {
1546 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1548
1549 let num_outputs = 5; let cardinality = 10;
1551 let dimension = 4;
1552 let key_indices = &[0, 2];
1553 let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1554 (0..num_outputs).map(|_| channel_for_test()).collect();
1555 let outputs = output_tx_vecs
1556 .into_iter()
1557 .enumerate()
1558 .map(|(actor_id, tx)| Output::new(1 + actor_id as u32, tx))
1559 .collect::<Vec<_>>();
1560 let mut hash_mapping = (1..num_outputs + 1)
1561 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1562 .collect_vec();
1563 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1564 let mut hash_dispatcher = HashDataDispatcher::new(
1565 outputs,
1566 key_indices.to_vec(),
1567 (0..dimension).collect(),
1568 hash_mapping.clone(),
1569 0,
1570 );
1571
1572 let mut ops = Vec::new();
1573 for idx in 0..cardinality {
1574 if idx % 2 == 0 {
1575 ops.push(Op::Insert);
1576 } else {
1577 ops.push(Op::Delete);
1578 }
1579 }
1580
1581 let mut start = 19260817i32..;
1582 let mut builders = (0..dimension)
1583 .map(|_| I32ArrayBuilder::new(cardinality))
1584 .collect_vec();
1585 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1586 let mut output_ops = vec![vec![]; num_outputs];
1587 for op in &ops {
1588 let hash_builder = Crc32FastBuilder;
1589 let mut hasher = hash_builder.build_hasher();
1590 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1591 for key_idx in key_indices {
1592 let val = one_row[*key_idx];
1593 let bytes = val.to_le_bytes();
1594 hasher.update(&bytes);
1595 }
1596 let output_idx =
1597 hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1598 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1599 builder.append(Some(*val));
1600 }
1601 output_cols[output_idx]
1602 .iter_mut()
1603 .zip_eq_fast(one_row.iter())
1604 .for_each(|(each_column, val)| each_column.push(*val));
1605 output_ops[output_idx].push(op);
1606 }
1607
1608 let columns = builders
1609 .into_iter()
1610 .map(|builder| {
1611 let array = builder.finish();
1612 array.into_ref()
1613 })
1614 .collect();
1615
1616 let chunk = StreamChunk::new(ops, columns);
1617 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1618
1619 for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1620 let mut output = vec![];
1621 while let Some(Some(msg)) = rx.recv().now_or_never() {
1622 output.push(msg);
1623 }
1624 assert!(output.len() <= 1);
1626 if output.is_empty() {
1627 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1628 } else {
1629 let message = output.first().unwrap();
1630 let real_chunk = match message {
1631 DispatcherMessageBatch::Chunk(chunk) => chunk,
1632 _ => panic!(),
1633 };
1634 real_chunk
1635 .columns()
1636 .iter()
1637 .zip_eq_fast(output_cols[output_idx].iter())
1638 .for_each(|(real_col, expect_col)| {
1639 let real_vals = real_chunk
1640 .visibility()
1641 .iter_ones()
1642 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1643 .collect::<Vec<_>>();
1644 assert_eq!(real_vals.len(), expect_col.len());
1645 assert_eq!(real_vals, *expect_col);
1646 });
1647 }
1648 }
1649 }
1650}