1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::{FutureExt, TryStreamExt};
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
28use risingwave_common::metrics::LabelGuardedIntCounter;
29use risingwave_common::row::RowExt;
30use risingwave_common::util::iter_util::ZipEqFast;
31use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
32use risingwave_pb::stream_plan::{self, PbDispatcher};
33use smallvec::{SmallVec, smallvec};
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::time::Instant;
36use tokio_stream::StreamExt;
37use tokio_stream::adapters::Peekable;
38use tracing::{Instrument, event};
39
40use super::exchange::output::Output;
41use super::{
42 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
43 UpdateMutation,
44};
45use crate::executor::prelude::*;
46use crate::executor::{StopMutation, StreamConsumer};
47use crate::task::{DispatcherId, NewOutputRequest};
48
49mod output_mapping;
50pub use output_mapping::DispatchOutputMapping;
51use risingwave_common::id::FragmentId;
52
53pub struct DispatchExecutor {
57 input: Executor,
58 inner: DispatchExecutorInner,
59}
60
61struct DispatcherWithMetrics {
62 dispatcher: DispatcherImpl,
63 pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
64}
65
66impl Debug for DispatcherWithMetrics {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 self.dispatcher.fmt(f)
69 }
70}
71
72impl Deref for DispatcherWithMetrics {
73 type Target = DispatcherImpl;
74
75 fn deref(&self) -> &Self::Target {
76 &self.dispatcher
77 }
78}
79
80impl DerefMut for DispatcherWithMetrics {
81 fn deref_mut(&mut self) -> &mut Self::Target {
82 &mut self.dispatcher
83 }
84}
85
86struct DispatchExecutorMetrics {
87 actor_id_str: String,
88 fragment_id_str: String,
89 metrics: Arc<StreamingMetrics>,
90 actor_out_record_cnt: LabelGuardedIntCounter,
91}
92
93impl DispatchExecutorMetrics {
94 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
95 DispatcherWithMetrics {
96 actor_output_buffer_blocking_duration_ns: self
97 .metrics
98 .actor_output_buffer_blocking_duration_ns
99 .with_guarded_label_values(&[
100 self.actor_id_str.as_str(),
101 self.fragment_id_str.as_str(),
102 dispatcher.dispatcher_id_str(),
103 ]),
104 dispatcher,
105 }
106 }
107}
108
109struct DispatchExecutorInner {
110 dispatchers: Vec<DispatcherWithMetrics>,
111 actor_id: ActorId,
112 actor_config: Arc<StreamingConfig>,
113 metrics: DispatchExecutorMetrics,
114 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
115 pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
116}
117
118impl DispatchExecutorInner {
119 async fn collect_outputs(
120 &mut self,
121 downstream_actors: &[ActorId],
122 ) -> StreamResult<Vec<Output>> {
123 fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
124 let tx = match request {
125 NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
126 };
127 Output::new(downstream_actor, tx)
128 }
129 let mut outputs = Vec::with_capacity(downstream_actors.len());
130 for &downstream_actor in downstream_actors {
131 let output =
132 if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
133 resolve_output(downstream_actor, request)
134 } else {
135 loop {
136 let (requested_actor, request) = self
137 .new_output_request_rx
138 .recv()
139 .await
140 .ok_or_else(|| anyhow!("end of new output request"))?;
141 if requested_actor == downstream_actor {
142 break resolve_output(requested_actor, request);
143 } else {
144 assert!(
145 self.pending_new_output_requests
146 .insert(requested_actor, request)
147 .is_none(),
148 "duplicated inflight new output requests from actor {}",
149 requested_actor
150 );
151 }
152 }
153 };
154 outputs.push(output);
155 }
156 Ok(outputs)
157 }
158
159 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
160 macro_rules! await_with_metrics {
161 ($fut:expr, $metrics:expr) => {{
162 let mut start_time = Instant::now();
163 let interval_duration = Duration::from_secs(15);
164 let mut interval =
165 tokio::time::interval_at(start_time + interval_duration, interval_duration);
166
167 let mut fut = std::pin::pin!($fut);
168
169 loop {
170 tokio::select! {
171 biased;
172 res = &mut fut => {
173 res?;
174 let ns = start_time.elapsed().as_nanos() as u64;
175 $metrics.inc_by(ns);
176 break;
177 }
178 _ = interval.tick() => {
179 start_time = Instant::now();
180 $metrics.inc_by(interval_duration.as_nanos() as u64);
181 }
182 };
183 }
184 StreamResult::Ok(())
185 }};
186 }
187
188 let limit = self.actor_config.developer.exchange_concurrent_dispatchers;
189 match msg {
191 MessageBatch::BarrierBatch(barrier_batch) => {
192 if barrier_batch.is_empty() {
193 return Ok(());
194 }
195 let mutation = barrier_batch[0].mutation.clone();
197 self.pre_mutate_dispatchers(&mutation).await?;
198 futures::stream::iter(self.dispatchers.iter_mut())
199 .map(Ok)
200 .try_for_each_concurrent(limit, |dispatcher| async {
201 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
202 let dispatcher_output = &mut dispatcher.dispatcher;
203 let fut = dispatcher_output.dispatch_barriers(
204 barrier_batch
205 .iter()
206 .cloned()
207 .map(|b| b.into_dispatcher())
208 .collect(),
209 );
210 await_with_metrics!(std::pin::pin!(fut), metrics)
211 })
212 .await?;
213 self.post_mutate_dispatchers(&mutation)?;
214 }
215 MessageBatch::Watermark(watermark) => {
216 futures::stream::iter(self.dispatchers.iter_mut())
217 .map(Ok)
218 .try_for_each_concurrent(limit, |dispatcher| async {
219 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
220 let dispatcher_output = &mut dispatcher.dispatcher;
221 let fut = dispatcher_output.dispatch_watermark(watermark.clone());
222 await_with_metrics!(std::pin::pin!(fut), metrics)
223 })
224 .await?;
225 }
226 MessageBatch::Chunk(chunk) => {
227 futures::stream::iter(self.dispatchers.iter_mut())
228 .map(Ok)
229 .try_for_each_concurrent(limit, |dispatcher| async {
230 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
231 let dispatcher_output = &mut dispatcher.dispatcher;
232 let fut = dispatcher_output.dispatch_data(chunk.clone());
233 await_with_metrics!(std::pin::pin!(fut), metrics)
234 })
235 .await?;
236
237 self.metrics
238 .actor_out_record_cnt
239 .inc_by(chunk.cardinality() as _);
240 }
241 }
242 Ok(())
243 }
244
245 async fn add_dispatchers<'a>(
247 &mut self,
248 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
249 ) -> StreamResult<()> {
250 for dispatcher in new_dispatchers {
251 let outputs = self
252 .collect_outputs(&dispatcher.downstream_actor_id)
253 .await?;
254 let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
255 let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
256 self.dispatchers.push(dispatcher);
257 }
258
259 assert!(
260 self.dispatchers
261 .iter()
262 .map(|d| d.dispatcher_id())
263 .all_unique(),
264 "dispatcher ids must be unique: {:?}",
265 self.dispatchers
266 );
267
268 Ok(())
269 }
270
271 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
272 self.dispatchers
273 .iter_mut()
274 .find(|d| d.dispatcher_id() == dispatcher_id)
275 .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
276 }
277
278 async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
281 let outputs = self
282 .collect_outputs(&update.added_downstream_actor_id)
283 .await?;
284
285 let dispatcher = self.find_dispatcher(update.dispatcher_id);
286 dispatcher.add_outputs(outputs);
287
288 Ok(())
289 }
290
291 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
294 let ids = update.removed_downstream_actor_id.iter().copied().collect();
295
296 let dispatcher = self.find_dispatcher(update.dispatcher_id);
297 dispatcher.remove_outputs(&ids);
298
299 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
306 dispatcher.hash_mapping =
307 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
308 }
309
310 Ok(())
311 }
312
313 async fn pre_mutate_dispatchers(
315 &mut self,
316 mutation: &Option<Arc<Mutation>>,
317 ) -> StreamResult<()> {
318 let Some(mutation) = mutation.as_deref() else {
319 return Ok(());
320 };
321
322 match mutation {
323 Mutation::Add(AddMutation { adds, .. }) => {
324 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
325 self.add_dispatchers(new_dispatchers).await?;
326 }
327 }
328 Mutation::Update(UpdateMutation {
329 dispatchers,
330 actor_new_dispatchers: actor_dispatchers,
331 ..
332 }) => {
333 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
334 self.add_dispatchers(new_dispatchers).await?;
335 }
336
337 if let Some(updates) = dispatchers.get(&self.actor_id) {
338 for update in updates {
339 self.pre_update_dispatcher(update).await?;
340 }
341 }
342 }
343 _ => {}
344 }
345
346 Ok(())
347 }
348
349 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
351 let Some(mutation) = mutation.as_deref() else {
352 return Ok(());
353 };
354
355 match mutation {
356 Mutation::Stop(StopMutation { dropped_actors, .. }) => {
357 if !dropped_actors.contains(&self.actor_id) {
359 for dispatcher in &mut self.dispatchers {
360 dispatcher.remove_outputs(dropped_actors);
361 }
362 }
363 }
364 Mutation::Update(UpdateMutation {
365 dispatchers,
366 dropped_actors,
367 ..
368 }) => {
369 if let Some(updates) = dispatchers.get(&self.actor_id) {
370 for update in updates {
371 self.post_update_dispatcher(update)?;
372 }
373 }
374
375 if !dropped_actors.contains(&self.actor_id) {
376 for dispatcher in &mut self.dispatchers {
377 dispatcher.remove_outputs(dropped_actors);
378 }
379 }
380 }
381 _ => {}
382 };
383
384 self.dispatchers.retain(|d| !d.is_empty());
387
388 Ok(())
389 }
390}
391
392impl DispatchExecutor {
393 pub(crate) async fn new(
394 input: Executor,
395 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
396 dispatchers: Vec<stream_plan::Dispatcher>,
397 actor_context: &ActorContextRef,
398 ) -> StreamResult<Self> {
399 let mut executor = Self::new_inner(
400 input,
401 new_output_request_rx,
402 vec![],
403 actor_context.id,
404 actor_context.fragment_id,
405 actor_context.config.clone(),
406 actor_context.streaming_metrics.clone(),
407 );
408 let inner = &mut executor.inner;
409 for dispatcher in dispatchers {
410 let outputs = inner
411 .collect_outputs(&dispatcher.downstream_actor_id)
412 .await?;
413 let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
414 let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
415 inner.dispatchers.push(dispatcher);
416 }
417 Ok(executor)
418 }
419
420 #[cfg(test)]
421 pub(crate) fn for_test(
422 input: Executor,
423 dispatchers: Vec<DispatcherImpl>,
424 actor_id: ActorId,
425 fragment_id: FragmentId,
426 actor_config: Arc<StreamingConfig>,
427 metrics: Arc<StreamingMetrics>,
428 ) -> (
429 Self,
430 tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
431 ) {
432 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
433
434 (
435 Self::new_inner(
436 input,
437 rx,
438 dispatchers,
439 actor_id,
440 fragment_id,
441 actor_config,
442 metrics,
443 ),
444 tx,
445 )
446 }
447
448 fn new_inner(
449 mut input: Executor,
450 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
451 dispatchers: Vec<DispatcherImpl>,
452 actor_id: ActorId,
453 fragment_id: FragmentId,
454 actor_config: Arc<StreamingConfig>,
455 metrics: Arc<StreamingMetrics>,
456 ) -> Self {
457 if crate::consistency::insane() {
458 let mut info = input.info().clone();
460 info.identity = format!("{} (embedded trouble)", info.identity);
461 let troublemaker = TroublemakerExecutor::new(input, actor_config.developer.chunk_size);
462 input = (info, troublemaker).into();
463 }
464
465 let actor_id_str = actor_id.to_string();
466 let fragment_id_str = fragment_id.to_string();
467 let actor_out_record_cnt = metrics
468 .actor_out_record_cnt
469 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
470 let metrics = DispatchExecutorMetrics {
471 actor_id_str,
472 fragment_id_str,
473 metrics,
474 actor_out_record_cnt,
475 };
476 let dispatchers = dispatchers
477 .into_iter()
478 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
479 .collect();
480
481 Self {
482 input,
483 inner: DispatchExecutorInner {
484 dispatchers,
485 actor_id,
486 actor_config,
487 metrics,
488 new_output_request_rx,
489 pending_new_output_requests: Default::default(),
490 },
491 }
492 }
493}
494
495impl StreamConsumer for DispatchExecutor {
496 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
497
498 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
499 let max_barrier_count_per_batch = self.inner.actor_config.developer.max_barrier_batch_size;
500 #[try_stream]
501 async move {
502 let mut input = self.input.execute().peekable();
503 loop {
504 let Some(message) =
505 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
506 else {
507 break;
509 };
510 match message {
511 chunk @ MessageBatch::Chunk(_) => {
512 self.inner
513 .dispatch(chunk)
514 .instrument(tracing::info_span!("dispatch_chunk"))
515 .instrument_await("dispatch_chunk")
516 .await?;
517 }
518 MessageBatch::BarrierBatch(barrier_batch) => {
519 assert!(!barrier_batch.is_empty());
520 self.inner
521 .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
522 .instrument(tracing::info_span!("dispatch_barrier_batch"))
523 .instrument_await("dispatch_barrier_batch")
524 .await?;
525 self.inner
526 .metrics
527 .metrics
528 .barrier_batch_size
529 .observe(barrier_batch.len() as f64);
530 for barrier in barrier_batch {
531 yield barrier;
532 }
533 }
534 watermark @ MessageBatch::Watermark(_) => {
535 self.inner
536 .dispatch(watermark)
537 .instrument(tracing::info_span!("dispatch_watermark"))
538 .instrument_await("dispatch_watermark")
539 .await?;
540 }
541 }
542 }
543 }
544 }
545}
546
547async fn try_batch_barriers(
553 max_barrier_count_per_batch: u32,
554 input: &mut Peekable<BoxedMessageStream>,
555) -> StreamResult<Option<MessageBatch>> {
556 let Some(msg) = input.next().await else {
557 return Ok(None);
559 };
560 let mut barrier_batch = vec![];
561 let msg: Message = msg?;
562 let max_peek_attempts = match msg {
563 Message::Chunk(c) => {
564 return Ok(Some(MessageBatch::Chunk(c)));
565 }
566 Message::Watermark(w) => {
567 return Ok(Some(MessageBatch::Watermark(w)));
568 }
569 Message::Barrier(b) => {
570 let peek_more_barrier = b.mutation.is_none();
571 barrier_batch.push(b);
572 if peek_more_barrier {
573 max_barrier_count_per_batch.saturating_sub(1)
574 } else {
575 0
576 }
577 }
578 };
579 for _ in 0..max_peek_attempts {
581 let peek = input.peek().now_or_never();
582 let Some(peek) = peek else {
583 break;
584 };
585 let Some(msg) = peek else {
586 break;
588 };
589 let Ok(Message::Barrier(barrier)) = msg else {
590 break;
591 };
592 if barrier.mutation.is_some() {
593 break;
594 }
595 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
596 let Message::Barrier(ref barrier) = msg else {
597 unreachable!("must be a barrier");
598 };
599 barrier_batch.push(barrier.clone());
600 }
601 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
602}
603
604#[derive(Debug)]
605pub enum DispatcherImpl {
606 Hash(HashDataDispatcher),
607 Broadcast(BroadcastDispatcher),
608 Simple(SimpleDispatcher),
609 RoundRobin(RoundRobinDataDispatcher),
610}
611
612impl DispatcherImpl {
613 pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
614 let output_mapping =
615 DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
616
617 use risingwave_pb::stream_plan::DispatcherType::*;
618 let dispatcher_impl = match dispatcher.get_type()? {
619 Hash => {
620 assert!(!outputs.is_empty());
621 let dist_key_indices = dispatcher
622 .dist_key_indices
623 .iter()
624 .map(|i| *i as usize)
625 .collect();
626
627 let hash_mapping =
628 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
629
630 DispatcherImpl::Hash(HashDataDispatcher::new(
631 outputs,
632 dist_key_indices,
633 output_mapping,
634 hash_mapping,
635 dispatcher.dispatcher_id,
636 ))
637 }
638 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
639 outputs,
640 output_mapping,
641 dispatcher.dispatcher_id,
642 )),
643 Simple | NoShuffle => {
644 let [output]: [_; 1] = outputs.try_into().unwrap();
645 DispatcherImpl::Simple(SimpleDispatcher::new(
646 output,
647 output_mapping,
648 dispatcher.dispatcher_id,
649 ))
650 }
651 Unspecified => unreachable!(),
652 };
653
654 Ok(dispatcher_impl)
655 }
656}
657
658macro_rules! impl_dispatcher {
659 ($( { $variant_name:ident } ),*) => {
660 impl DispatcherImpl {
661 pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
662 match self {
663 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
664 }
665 }
666
667 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
668 match self {
669 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
670 }
671 }
672
673 pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
674 match self {
675 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
676 }
677 }
678
679 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
680 match self {
681 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
682 }
683 }
684
685 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
686 match self {
687 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
688 }
689 }
690
691 pub fn dispatcher_id(&self) -> DispatcherId {
692 match self {
693 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
694 }
695 }
696
697 pub fn dispatcher_id_str(&self) -> &str {
698 match self {
699 $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
700 }
701 }
702
703 pub fn is_empty(&self) -> bool {
704 match self {
705 $(Self::$variant_name(inner) => inner.is_empty(), )*
706 }
707 }
708 }
709 }
710}
711
712macro_rules! for_all_dispatcher_variants {
713 ($macro:ident) => {
714 $macro! {
715 { Hash },
716 { Broadcast },
717 { Simple },
718 { RoundRobin }
719 }
720 };
721}
722
723for_all_dispatcher_variants! { impl_dispatcher }
724
725pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
726
727pub trait Dispatcher: Debug + 'static {
728 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
730 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
732 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
734
735 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
737 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
739
740 fn dispatcher_id(&self) -> DispatcherId;
746
747 fn dispatcher_id_str(&self) -> &str;
749
750 fn is_empty(&self) -> bool;
753}
754
755async fn broadcast_concurrent(
760 outputs: impl IntoIterator<Item = &'_ mut Output>,
761 message: DispatcherMessageBatch,
762) -> StreamResult<()> {
763 futures::future::try_join_all(
764 outputs
765 .into_iter()
766 .map(|output| output.send(message.clone())),
767 )
768 .await?;
769 Ok(())
770}
771
772#[derive(Debug)]
773pub struct RoundRobinDataDispatcher {
774 outputs: Vec<Output>,
775 output_mapping: DispatchOutputMapping,
776 cur: usize,
777 dispatcher_id: DispatcherId,
778 dispatcher_id_str: String,
779}
780
781impl RoundRobinDataDispatcher {
782 pub fn new(
783 outputs: Vec<Output>,
784 output_mapping: DispatchOutputMapping,
785 dispatcher_id: DispatcherId,
786 ) -> Self {
787 Self {
788 outputs,
789 output_mapping,
790 cur: 0,
791 dispatcher_id,
792 dispatcher_id_str: dispatcher_id.to_string(),
793 }
794 }
795}
796
797impl Dispatcher for RoundRobinDataDispatcher {
798 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
799 let chunk = self.output_mapping.apply(chunk);
800
801 self.outputs[self.cur]
802 .send(DispatcherMessageBatch::Chunk(chunk))
803 .await?;
804 self.cur += 1;
805 self.cur %= self.outputs.len();
806 Ok(())
807 }
808
809 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
810 broadcast_concurrent(
812 &mut self.outputs,
813 DispatcherMessageBatch::BarrierBatch(barriers),
814 )
815 .await
816 }
817
818 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
819 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
820 broadcast_concurrent(
822 &mut self.outputs,
823 DispatcherMessageBatch::Watermark(watermark),
824 )
825 .await?;
826 }
827 Ok(())
828 }
829
830 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
831 self.outputs.extend(outputs);
832 }
833
834 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
835 self.outputs
836 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
837 .count();
838 self.cur = self.cur.min(self.outputs.len() - 1);
839 }
840
841 fn dispatcher_id(&self) -> DispatcherId {
842 self.dispatcher_id
843 }
844
845 fn dispatcher_id_str(&self) -> &str {
846 &self.dispatcher_id_str
847 }
848
849 fn is_empty(&self) -> bool {
850 self.outputs.is_empty()
851 }
852}
853
854pub struct HashDataDispatcher {
855 outputs: Vec<Output>,
856 keys: Vec<usize>,
857 output_mapping: DispatchOutputMapping,
858 hash_mapping: ExpandedActorMapping,
861 dispatcher_id: DispatcherId,
862 dispatcher_id_str: String,
863}
864
865impl Debug for HashDataDispatcher {
866 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
867 f.debug_struct("HashDataDispatcher")
868 .field("outputs", &self.outputs)
869 .field("keys", &self.keys)
870 .field("dispatcher_id", &self.dispatcher_id)
871 .finish_non_exhaustive()
872 }
873}
874
875impl HashDataDispatcher {
876 pub fn new(
877 outputs: Vec<Output>,
878 keys: Vec<usize>,
879 output_mapping: DispatchOutputMapping,
880 hash_mapping: ExpandedActorMapping,
881 dispatcher_id: DispatcherId,
882 ) -> Self {
883 Self {
884 outputs,
885 keys,
886 output_mapping,
887 hash_mapping,
888 dispatcher_id,
889 dispatcher_id_str: dispatcher_id.to_string(),
890 }
891 }
892}
893
894impl Dispatcher for HashDataDispatcher {
895 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
896 self.outputs.extend(outputs);
897 }
898
899 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
900 broadcast_concurrent(
902 &mut self.outputs,
903 DispatcherMessageBatch::BarrierBatch(barriers),
904 )
905 .await
906 }
907
908 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
909 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
910 broadcast_concurrent(
912 &mut self.outputs,
913 DispatcherMessageBatch::Watermark(watermark),
914 )
915 .await?;
916 }
917 Ok(())
918 }
919
920 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
921 let num_outputs = self.outputs.len();
925
926 let vnode_count = self.hash_mapping.len();
928 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
929
930 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
931
932 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
933 .take(num_outputs)
934 .collect_vec();
935 let mut last_update_delete_row_idx = None;
936 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
937
938 for (row_idx, ((vnode, &op), visible)) in vnodes
939 .iter()
940 .copied()
941 .zip_eq_fast(chunk.ops())
942 .zip_eq_fast(chunk.visibility().iter())
943 .enumerate()
944 {
945 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
947 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
948 }
949
950 if !visible {
951 new_ops.push(op);
952 continue;
953 }
954
955 if op == Op::UpdateDelete {
961 last_update_delete_row_idx = Some(row_idx);
962 } else if op == Op::UpdateInsert {
963 let delete_row_idx = last_update_delete_row_idx
964 .take()
965 .expect("missing U- before U+");
966 assert!(delete_row_idx + 1 == row_idx, "U- and U+ are not adjacent");
967
968 let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
970 != chunk.row_at(row_idx).1.project(&self.keys);
971
972 if dist_key_changed {
973 new_ops.push(Op::Delete);
974 new_ops.push(Op::Insert);
975 } else {
976 new_ops.push(Op::UpdateDelete);
977 new_ops.push(Op::UpdateInsert);
978 }
979 } else {
980 new_ops.push(op);
981 }
982 }
983 assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
984
985 let ops = new_ops;
986 let chunk = self.output_mapping.apply(chunk);
988
989 futures::future::try_join_all(
991 vis_maps
992 .into_iter()
993 .zip_eq_fast(self.outputs.iter_mut())
994 .map(|(vis_map, output)| async {
995 let vis_map = vis_map.finish();
996 let new_stream_chunk =
998 StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
999 if new_stream_chunk.cardinality() > 0 {
1000 event!(
1001 tracing::Level::TRACE,
1002 msg = "chunk",
1003 downstream = %output.actor_id(),
1004 "send = \n{:#?}",
1005 new_stream_chunk
1006 );
1007 output
1008 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1009 .await?;
1010 }
1011 StreamResult::Ok(())
1012 }),
1013 )
1014 .await?;
1015
1016 Ok(())
1017 }
1018
1019 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1020 self.outputs
1021 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1022 .count();
1023 }
1024
1025 fn dispatcher_id(&self) -> DispatcherId {
1026 self.dispatcher_id
1027 }
1028
1029 fn dispatcher_id_str(&self) -> &str {
1030 &self.dispatcher_id_str
1031 }
1032
1033 fn is_empty(&self) -> bool {
1034 self.outputs.is_empty()
1035 }
1036}
1037
1038#[derive(Debug)]
1040pub struct BroadcastDispatcher {
1041 outputs: HashMap<ActorId, Output>,
1042 output_mapping: DispatchOutputMapping,
1043 dispatcher_id: DispatcherId,
1044 dispatcher_id_str: String,
1045}
1046
1047impl BroadcastDispatcher {
1048 pub fn new(
1049 outputs: impl IntoIterator<Item = Output>,
1050 output_mapping: DispatchOutputMapping,
1051 dispatcher_id: DispatcherId,
1052 ) -> Self {
1053 Self {
1054 outputs: Self::into_pairs(outputs).collect(),
1055 output_mapping,
1056 dispatcher_id,
1057 dispatcher_id_str: dispatcher_id.to_string(),
1058 }
1059 }
1060
1061 fn into_pairs(
1062 outputs: impl IntoIterator<Item = Output>,
1063 ) -> impl Iterator<Item = (ActorId, Output)> {
1064 outputs
1065 .into_iter()
1066 .map(|output| (output.actor_id(), output))
1067 }
1068}
1069
1070impl Dispatcher for BroadcastDispatcher {
1071 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1072 let chunk = self.output_mapping.apply(chunk);
1073 broadcast_concurrent(
1074 self.outputs.values_mut(),
1075 DispatcherMessageBatch::Chunk(chunk),
1076 )
1077 .await
1078 }
1079
1080 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1081 broadcast_concurrent(
1083 self.outputs.values_mut(),
1084 DispatcherMessageBatch::BarrierBatch(barriers),
1085 )
1086 .await
1087 }
1088
1089 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1090 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1091 broadcast_concurrent(
1093 self.outputs.values_mut(),
1094 DispatcherMessageBatch::Watermark(watermark),
1095 )
1096 .await?;
1097 }
1098 Ok(())
1099 }
1100
1101 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1102 self.outputs.extend(Self::into_pairs(outputs));
1103 }
1104
1105 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1106 self.outputs
1107 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1108 .count();
1109 }
1110
1111 fn dispatcher_id(&self) -> DispatcherId {
1112 self.dispatcher_id
1113 }
1114
1115 fn dispatcher_id_str(&self) -> &str {
1116 &self.dispatcher_id_str
1117 }
1118
1119 fn is_empty(&self) -> bool {
1120 self.outputs.is_empty()
1121 }
1122}
1123
1124#[derive(Debug)]
1126pub struct SimpleDispatcher {
1127 output: SmallVec<[Output; 2]>,
1141 output_mapping: DispatchOutputMapping,
1142 dispatcher_id: DispatcherId,
1143 dispatcher_id_str: String,
1144}
1145
1146impl SimpleDispatcher {
1147 pub fn new(
1148 output: Output,
1149 output_mapping: DispatchOutputMapping,
1150 dispatcher_id: DispatcherId,
1151 ) -> Self {
1152 Self {
1153 output: smallvec![output],
1154 output_mapping,
1155 dispatcher_id,
1156 dispatcher_id_str: dispatcher_id.to_string(),
1157 }
1158 }
1159}
1160
1161impl Dispatcher for SimpleDispatcher {
1162 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1163 self.output.extend(outputs);
1164 assert!(self.output.len() <= 2);
1165 }
1166
1167 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1168 for output in &mut self.output {
1170 output
1171 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1172 .await?;
1173 }
1174 Ok(())
1175 }
1176
1177 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1178 let output = self
1179 .output
1180 .iter_mut()
1181 .exactly_one()
1182 .expect("expect exactly one output");
1183
1184 let chunk = self.output_mapping.apply(chunk);
1185 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1186 }
1187
1188 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1189 let output = self
1190 .output
1191 .iter_mut()
1192 .exactly_one()
1193 .expect("expect exactly one output");
1194
1195 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1196 output
1197 .send(DispatcherMessageBatch::Watermark(watermark))
1198 .await?;
1199 }
1200 Ok(())
1201 }
1202
1203 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1204 self.output
1205 .retain(|output| !actor_ids.contains(&output.actor_id()));
1206 }
1207
1208 fn dispatcher_id(&self) -> DispatcherId {
1209 self.dispatcher_id
1210 }
1211
1212 fn dispatcher_id_str(&self) -> &str {
1213 &self.dispatcher_id_str
1214 }
1215
1216 fn is_empty(&self) -> bool {
1217 self.output.is_empty()
1218 }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223 use std::hash::{BuildHasher, Hasher};
1224
1225 use futures::pin_mut;
1226 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1227 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1228 use risingwave_common::util::epoch::test_epoch;
1229 use risingwave_common::util::hash_util::Crc32FastBuilder;
1230 use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1231 use tokio::sync::mpsc::unbounded_channel;
1232
1233 use super::*;
1234 use crate::executor::exchange::output::Output;
1235 use crate::executor::exchange::permit::channel_for_test;
1236 use crate::executor::receiver::ReceiverExecutor;
1237 use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1238 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1239
1240 #[tokio::test]
1241 async fn test_hash_dispatcher_complex() {
1242 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1244
1245 let num_outputs = 2; let key_indices = &[0, 2];
1247 let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1248 (0..num_outputs).map(|_| channel_for_test()).collect();
1249 let outputs = output_tx_vecs
1250 .into_iter()
1251 .enumerate()
1252 .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1253 .collect::<Vec<_>>();
1254 let mut hash_mapping = (1..num_outputs + 1)
1255 .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1256 .collect_vec();
1257 hash_mapping.resize(
1258 VirtualNode::COUNT_FOR_TEST,
1259 ActorId::new(num_outputs as u32),
1260 );
1261 let mut hash_dispatcher = HashDataDispatcher::new(
1262 outputs,
1263 key_indices.to_vec(),
1264 DispatchOutputMapping::Simple(vec![0, 1, 2]),
1265 hash_mapping,
1266 0,
1267 );
1268
1269 let chunk = StreamChunk::from_pretty(
1270 " I I I
1271 + 4 6 8
1272 + 5 7 9
1273 + 0 0 0
1274 - 1 1 1 D
1275 U- 2 0 2
1276 U+ 2 0 2
1277 U- 3 3 2
1278 U+ 3 3 4",
1279 );
1280 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1281
1282 assert_eq!(
1283 *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1284 StreamChunk::from_pretty(
1285 " I I I
1286 + 4 6 8
1287 + 5 7 9
1288 + 0 0 0
1289 - 1 1 1 D
1290 U- 2 0 2
1291 U+ 2 0 2
1292 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1293 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1294 )
1295 );
1296 assert_eq!(
1297 *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1298 StreamChunk::from_pretty(
1299 " I I I
1300 + 4 6 8 D
1301 + 5 7 9 D
1302 + 0 0 0 D
1303 - 1 1 1 D // Should keep original invisible mark
1304 U- 2 0 2 D // Should keep UpdateDelete
1305 U+ 2 0 2 D // Should keep UpdateInsert
1306 - 3 3 2 // Should rewrite UpdateDelete to Delete
1307 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1308 )
1309 );
1310 }
1311
1312 #[tokio::test]
1313 async fn test_configuration_change() {
1314 let _schema = Schema { fields: vec![] };
1315 let (tx, rx) = channel_for_test();
1316 let actor_id = 233.into();
1317 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1318
1319 let (untouched, old, new) = (234.into(), 235.into(), 238.into()); let (old_simple, new_simple) = (114.into(), 514.into()); let broadcast_dispatcher_id = 666;
1325 let broadcast_dispatcher = PbDispatcher {
1326 r#type: DispatcherType::Broadcast as _,
1327 dispatcher_id: broadcast_dispatcher_id,
1328 downstream_actor_id: vec![untouched, old],
1329 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1331 };
1332
1333 let simple_dispatcher_id = 888;
1334 let simple_dispatcher = PbDispatcher {
1335 r#type: DispatcherType::Simple as _,
1336 dispatcher_id: simple_dispatcher_id,
1337 downstream_actor_id: vec![old_simple],
1338 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1340 };
1341
1342 let dispatcher_updates = maplit::hashmap! {
1343 actor_id => vec![PbDispatcherUpdate {
1344 actor_id,
1345 dispatcher_id: broadcast_dispatcher_id,
1346 added_downstream_actor_id: vec![new],
1347 removed_downstream_actor_id: vec![old],
1348 hash_mapping: Default::default(),
1349 }]
1350 };
1351 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1352 UpdateMutation {
1353 dispatchers: dispatcher_updates,
1354 ..Default::default()
1355 },
1356 ));
1357 barrier_test_env.inject_barrier(&b1, [actor_id]);
1358 barrier_test_env.flush_all_events().await;
1359
1360 let input = Executor::new(
1361 Default::default(),
1362 ReceiverExecutor::for_test(
1363 actor_id,
1364 rx,
1365 barrier_test_env.local_barrier_manager.clone(),
1366 )
1367 .boxed(),
1368 );
1369
1370 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1371 let mut rxs = [untouched, old, new, old_simple, new_simple]
1372 .into_iter()
1373 .map(|id| {
1374 (id, {
1375 let (tx, rx) = channel_for_test();
1376 new_output_request_tx
1377 .send((id, NewOutputRequest::Local(tx)))
1378 .unwrap();
1379 rx
1380 })
1381 })
1382 .collect::<HashMap<_, _>>();
1383 let executor = Box::new(
1384 DispatchExecutor::new(
1385 input,
1386 new_output_request_rx,
1387 vec![broadcast_dispatcher, simple_dispatcher],
1388 &ActorContext::for_test(actor_id),
1389 )
1390 .await
1391 .unwrap(),
1392 )
1393 .execute();
1394
1395 pin_mut!(executor);
1396
1397 macro_rules! try_recv {
1398 ($down_id:expr) => {
1399 rxs.get_mut(&$down_id).unwrap().try_recv()
1400 };
1401 }
1402
1403 tx.send(Message::Chunk(StreamChunk::default()).into())
1405 .await
1406 .unwrap();
1407
1408 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1409 .await
1410 .unwrap();
1411 executor.next().await.unwrap().unwrap();
1412
1413 try_recv!(untouched).unwrap().as_chunk().unwrap();
1415 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1416
1417 try_recv!(old).unwrap().as_chunk().unwrap();
1418 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1423 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1427 barrier_test_env.inject_barrier(&b2, [actor_id]);
1428 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1429 .await
1430 .unwrap();
1431 executor.next().await.unwrap().unwrap();
1432
1433 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1435 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1437
1438 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1443 .await
1444 .unwrap();
1445
1446 let dispatcher_updates = maplit::hashmap! {
1448 actor_id => vec![PbDispatcherUpdate {
1449 actor_id,
1450 dispatcher_id: simple_dispatcher_id,
1451 added_downstream_actor_id: vec![new_simple],
1452 removed_downstream_actor_id: vec![old_simple],
1453 hash_mapping: Default::default(),
1454 }]
1455 };
1456 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1457 UpdateMutation {
1458 dispatchers: dispatcher_updates,
1459 ..Default::default()
1460 },
1461 ));
1462 barrier_test_env.inject_barrier(&b3, [actor_id]);
1463 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1464 .await
1465 .unwrap();
1466 executor.next().await.unwrap().unwrap();
1467
1468 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1470 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1476 barrier_test_env.inject_barrier(&b4, [actor_id]);
1477 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1478 .await
1479 .unwrap();
1480 executor.next().await.unwrap().unwrap();
1481
1482 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1485 }
1486
1487 #[tokio::test]
1488 async fn test_hash_dispatcher() {
1489 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1491
1492 let num_outputs = 5; let cardinality = 10;
1494 let dimension = 4;
1495 let key_indices = &[0, 2];
1496 let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1497 (0..num_outputs).map(|_| channel_for_test()).collect();
1498 let outputs = output_tx_vecs
1499 .into_iter()
1500 .enumerate()
1501 .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1502 .collect::<Vec<_>>();
1503 let mut hash_mapping = (1..num_outputs + 1)
1504 .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1505 .collect_vec();
1506 hash_mapping.resize(
1507 VirtualNode::COUNT_FOR_TEST,
1508 ActorId::new(num_outputs as u32),
1509 );
1510 let mut hash_dispatcher = HashDataDispatcher::new(
1511 outputs,
1512 key_indices.to_vec(),
1513 DispatchOutputMapping::Simple((0..dimension).collect()),
1514 hash_mapping.clone(),
1515 0,
1516 );
1517
1518 let mut ops = Vec::new();
1519 for idx in 0..cardinality {
1520 if idx % 2 == 0 {
1521 ops.push(Op::Insert);
1522 } else {
1523 ops.push(Op::Delete);
1524 }
1525 }
1526
1527 let mut start = 19260817i32..;
1528 let mut builders = (0..dimension)
1529 .map(|_| I32ArrayBuilder::new(cardinality))
1530 .collect_vec();
1531 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1532 let mut output_ops = vec![vec![]; num_outputs];
1533 for op in &ops {
1534 let hash_builder = Crc32FastBuilder;
1535 let mut hasher = hash_builder.build_hasher();
1536 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1537 for key_idx in key_indices {
1538 let val = one_row[*key_idx];
1539 let bytes = val.to_le_bytes();
1540 hasher.update(&bytes);
1541 }
1542 let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1543 .as_raw_id() as usize
1544 - 1;
1545 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1546 builder.append(Some(*val));
1547 }
1548 output_cols[output_idx]
1549 .iter_mut()
1550 .zip_eq_fast(one_row.iter())
1551 .for_each(|(each_column, val)| each_column.push(*val));
1552 output_ops[output_idx].push(op);
1553 }
1554
1555 let columns = builders
1556 .into_iter()
1557 .map(|builder| {
1558 let array = builder.finish();
1559 array.into_ref()
1560 })
1561 .collect();
1562
1563 let chunk = StreamChunk::new(ops, columns);
1564 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1565
1566 for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1567 let mut output = vec![];
1568 while let Some(Some(msg)) = rx.recv().now_or_never() {
1569 output.push(msg);
1570 }
1571 assert!(output.len() <= 1);
1573 if output.is_empty() {
1574 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1575 } else {
1576 let message = output.first().unwrap();
1577 let real_chunk = match message {
1578 DispatcherMessageBatch::Chunk(chunk) => chunk,
1579 _ => panic!(),
1580 };
1581 real_chunk
1582 .columns()
1583 .iter()
1584 .zip_eq_fast(output_cols[output_idx].iter())
1585 .for_each(|(real_col, expect_col)| {
1586 let real_vals = real_chunk
1587 .visibility()
1588 .iter_ones()
1589 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1590 .collect::<Vec<_>>();
1591 assert_eq!(real_vals.len(), expect_col.len());
1592 assert_eq!(real_vals, *expect_col);
1593 });
1594 }
1595 }
1596 }
1597}