1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use futures::{FutureExt, TryStreamExt};
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::BitmapBuilder;
25use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
26use risingwave_common::metrics::LabelGuardedIntCounter;
27use risingwave_common::util::iter_util::ZipEqFast;
28use risingwave_pb::stream_plan::PbDispatcher;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use smallvec::{SmallVec, smallvec};
31use tokio::time::Instant;
32use tokio_stream::StreamExt;
33use tokio_stream::adapters::Peekable;
34use tracing::{Instrument, event};
35
36use super::exchange::output::{BoxedOutput, new_output};
37use super::{
38 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
39 UpdateMutation,
40};
41use crate::executor::StreamConsumer;
42use crate::executor::prelude::*;
43use crate::task::{DispatcherId, SharedContext};
44
45pub struct DispatchExecutor {
49 input: Executor,
50 inner: DispatchExecutorInner,
51}
52
53struct DispatcherWithMetrics {
54 dispatcher: DispatcherImpl,
55 actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter<3>,
56}
57
58impl DispatcherWithMetrics {
59 pub fn record_output_buffer_blocking_duration(&self, duration: Duration) {
60 let ns = duration.as_nanos() as u64;
61 self.actor_output_buffer_blocking_duration_ns.inc_by(ns);
62 }
63}
64
65impl Debug for DispatcherWithMetrics {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 self.dispatcher.fmt(f)
68 }
69}
70
71impl Deref for DispatcherWithMetrics {
72 type Target = DispatcherImpl;
73
74 fn deref(&self) -> &Self::Target {
75 &self.dispatcher
76 }
77}
78
79impl DerefMut for DispatcherWithMetrics {
80 fn deref_mut(&mut self) -> &mut Self::Target {
81 &mut self.dispatcher
82 }
83}
84
85struct DispatchExecutorMetrics {
86 actor_id_str: String,
87 fragment_id_str: String,
88 metrics: Arc<StreamingMetrics>,
89 actor_out_record_cnt: LabelGuardedIntCounter<2>,
90}
91
92impl DispatchExecutorMetrics {
93 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
94 DispatcherWithMetrics {
95 actor_output_buffer_blocking_duration_ns: self
96 .metrics
97 .actor_output_buffer_blocking_duration_ns
98 .with_guarded_label_values(&[
99 &self.actor_id_str,
100 &self.fragment_id_str,
101 dispatcher.dispatcher_id_str(),
102 ]),
103 dispatcher,
104 }
105 }
106}
107
108struct DispatchExecutorInner {
109 dispatchers: Vec<DispatcherWithMetrics>,
110 actor_id: u32,
111 context: Arc<SharedContext>,
112 metrics: DispatchExecutorMetrics,
113}
114
115impl DispatchExecutorInner {
116 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
117 let limit = (self.context.config.developer).exchange_concurrent_dispatchers;
118 match msg {
120 MessageBatch::BarrierBatch(barrier_batch) => {
121 if barrier_batch.is_empty() {
122 return Ok(());
123 }
124 let mutation = barrier_batch[0].mutation.clone();
126 self.pre_mutate_dispatchers(&mutation)?;
127 futures::stream::iter(self.dispatchers.iter_mut())
128 .map(Ok)
129 .try_for_each_concurrent(limit, |dispatcher| async {
130 let start_time = Instant::now();
131 dispatcher
132 .dispatch_barriers(
133 barrier_batch
134 .iter()
135 .cloned()
136 .map(|b| b.into_dispatcher())
137 .collect(),
138 )
139 .await?;
140 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
141 StreamResult::Ok(())
142 })
143 .await?;
144 self.post_mutate_dispatchers(&mutation)?;
145 }
146 MessageBatch::Watermark(watermark) => {
147 futures::stream::iter(self.dispatchers.iter_mut())
148 .map(Ok)
149 .try_for_each_concurrent(limit, |dispatcher| async {
150 let start_time = Instant::now();
151 dispatcher.dispatch_watermark(watermark.clone()).await?;
152 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
153 StreamResult::Ok(())
154 })
155 .await?;
156 }
157 MessageBatch::Chunk(chunk) => {
158 futures::stream::iter(self.dispatchers.iter_mut())
159 .map(Ok)
160 .try_for_each_concurrent(limit, |dispatcher| async {
161 let start_time = Instant::now();
162 dispatcher.dispatch_data(chunk.clone()).await?;
163 dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
164 StreamResult::Ok(())
165 })
166 .await?;
167
168 self.metrics
169 .actor_out_record_cnt
170 .inc_by(chunk.cardinality() as _);
171 }
172 }
173 Ok(())
174 }
175
176 fn add_dispatchers<'a>(
178 &mut self,
179 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
180 ) -> StreamResult<()> {
181 let new_dispatchers: Vec<_> = new_dispatchers
182 .into_iter()
183 .map(|d| {
184 DispatcherImpl::new(&self.context, self.actor_id, d)
185 .map(|dispatcher| self.metrics.monitor_dispatcher(dispatcher))
186 })
187 .try_collect()?;
188
189 self.dispatchers.extend(new_dispatchers);
190
191 assert!(
192 self.dispatchers
193 .iter()
194 .map(|d| d.dispatcher_id())
195 .all_unique(),
196 "dispatcher ids must be unique: {:?}",
197 self.dispatchers
198 );
199
200 Ok(())
201 }
202
203 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> &mut DispatcherImpl {
204 self.dispatchers
205 .iter_mut()
206 .find(|d| d.dispatcher_id() == dispatcher_id)
207 .unwrap_or_else(|| panic!("dispatcher {}:{} not found", self.actor_id, dispatcher_id))
208 }
209
210 fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
213 let outputs: Vec<_> = update
214 .added_downstream_actor_id
215 .iter()
216 .map(|&id| new_output(&self.context, self.actor_id, id))
217 .try_collect()?;
218
219 let dispatcher = self.find_dispatcher(update.dispatcher_id);
220 dispatcher.add_outputs(outputs);
221
222 Ok(())
223 }
224
225 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
228 let ids = update.removed_downstream_actor_id.iter().copied().collect();
229
230 let dispatcher = self.find_dispatcher(update.dispatcher_id);
231 dispatcher.remove_outputs(&ids);
232
233 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
240 dispatcher.hash_mapping =
241 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
242 }
243
244 Ok(())
245 }
246
247 fn pre_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
249 let Some(mutation) = mutation.as_deref() else {
250 return Ok(());
251 };
252
253 match mutation {
254 Mutation::Add(AddMutation { adds, .. }) => {
255 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
256 self.add_dispatchers(new_dispatchers)?;
257 }
258 }
259 Mutation::Update(UpdateMutation {
260 dispatchers,
261 actor_new_dispatchers: actor_dispatchers,
262 ..
263 }) => {
264 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
265 self.add_dispatchers(new_dispatchers)?;
266 }
267
268 if let Some(updates) = dispatchers.get(&self.actor_id) {
269 for update in updates {
270 self.pre_update_dispatcher(update)?;
271 }
272 }
273 }
274 Mutation::AddAndUpdate(
275 AddMutation { adds, .. },
276 UpdateMutation {
277 dispatchers,
278 actor_new_dispatchers: actor_dispatchers,
279 ..
280 },
281 ) => {
282 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
283 self.add_dispatchers(new_dispatchers)?;
284 }
285
286 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
287 self.add_dispatchers(new_dispatchers)?;
288 }
289
290 if let Some(updates) = dispatchers.get(&self.actor_id) {
291 for update in updates {
292 self.pre_update_dispatcher(update)?;
293 }
294 }
295 }
296 _ => {}
297 }
298
299 Ok(())
300 }
301
302 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
304 let Some(mutation) = mutation.as_deref() else {
305 return Ok(());
306 };
307
308 match mutation {
309 Mutation::Stop(stops) => {
310 if !stops.contains(&self.actor_id) {
312 for dispatcher in &mut self.dispatchers {
313 dispatcher.remove_outputs(stops);
314 }
315 }
316 }
317 Mutation::Update(UpdateMutation {
318 dispatchers,
319 dropped_actors,
320 ..
321 })
322 | Mutation::AddAndUpdate(
323 _,
324 UpdateMutation {
325 dispatchers,
326 dropped_actors,
327 ..
328 },
329 ) => {
330 if let Some(updates) = dispatchers.get(&self.actor_id) {
331 for update in updates {
332 self.post_update_dispatcher(update)?;
333 }
334 }
335
336 if !dropped_actors.contains(&self.actor_id) {
337 for dispatcher in &mut self.dispatchers {
338 dispatcher.remove_outputs(dropped_actors);
339 }
340 }
341 }
342 _ => {}
343 };
344
345 self.dispatchers.retain(|d| !d.is_empty());
348
349 Ok(())
350 }
351}
352
353impl DispatchExecutor {
354 pub fn new(
355 mut input: Executor,
356 dispatchers: Vec<DispatcherImpl>,
357 actor_id: u32,
358 fragment_id: u32,
359 context: Arc<SharedContext>,
360 metrics: Arc<StreamingMetrics>,
361 chunk_size: usize,
362 ) -> Self {
363 if crate::consistency::insane() {
364 let mut info = input.info().clone();
366 info.identity = format!("{} (embedded trouble)", info.identity);
367 let troublemaker = TroublemakerExecutor::new(input, chunk_size);
368 input = (info, troublemaker).into();
369 }
370
371 let actor_id_str = actor_id.to_string();
372 let fragment_id_str = fragment_id.to_string();
373 let actor_out_record_cnt = metrics
374 .actor_out_record_cnt
375 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
376 let metrics = DispatchExecutorMetrics {
377 actor_id_str,
378 fragment_id_str,
379 metrics,
380 actor_out_record_cnt,
381 };
382 let dispatchers = dispatchers
383 .into_iter()
384 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
385 .collect();
386 Self {
387 input,
388 inner: DispatchExecutorInner {
389 dispatchers,
390 actor_id,
391 context,
392 metrics,
393 },
394 }
395 }
396}
397
398impl StreamConsumer for DispatchExecutor {
399 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
400
401 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
402 let max_barrier_count_per_batch =
403 self.inner.context.config.developer.max_barrier_batch_size;
404 #[try_stream]
405 async move {
406 let mut input = self.input.execute().peekable();
407 loop {
408 let Some(message) =
409 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
410 else {
411 break;
413 };
414 match message {
415 chunk @ MessageBatch::Chunk(_) => {
416 self.inner
417 .dispatch(chunk)
418 .instrument(tracing::info_span!("dispatch_chunk"))
419 .instrument_await("dispatch_chunk")
420 .await?;
421 }
422 MessageBatch::BarrierBatch(barrier_batch) => {
423 assert!(!barrier_batch.is_empty());
424 self.inner
425 .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
426 .instrument(tracing::info_span!("dispatch_barrier_batch"))
427 .instrument_await("dispatch_barrier_batch")
428 .await?;
429 self.inner
430 .metrics
431 .metrics
432 .barrier_batch_size
433 .observe(barrier_batch.len() as f64);
434 for barrier in barrier_batch {
435 yield barrier;
436 }
437 }
438 watermark @ MessageBatch::Watermark(_) => {
439 self.inner
440 .dispatch(watermark)
441 .instrument(tracing::info_span!("dispatch_watermark"))
442 .instrument_await("dispatch_watermark")
443 .await?;
444 }
445 }
446 }
447 }
448 }
449}
450
451async fn try_batch_barriers(
457 max_barrier_count_per_batch: u32,
458 input: &mut Peekable<BoxedMessageStream>,
459) -> StreamResult<Option<MessageBatch>> {
460 let Some(msg) = input.next().await else {
461 return Ok(None);
463 };
464 let mut barrier_batch = vec![];
465 let msg: Message = msg?;
466 let max_peek_attempts = match msg {
467 Message::Chunk(c) => {
468 return Ok(Some(MessageBatch::Chunk(c)));
469 }
470 Message::Watermark(w) => {
471 return Ok(Some(MessageBatch::Watermark(w)));
472 }
473 Message::Barrier(b) => {
474 let peek_more_barrier = b.mutation.is_none();
475 barrier_batch.push(b);
476 if peek_more_barrier {
477 max_barrier_count_per_batch.saturating_sub(1)
478 } else {
479 0
480 }
481 }
482 };
483 for _ in 0..max_peek_attempts {
485 let peek = input.peek().now_or_never();
486 let Some(peek) = peek else {
487 break;
488 };
489 let Some(msg) = peek else {
490 break;
492 };
493 let Ok(Message::Barrier(barrier)) = msg else {
494 break;
495 };
496 if barrier.mutation.is_some() {
497 break;
498 }
499 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
500 let Message::Barrier(ref barrier) = msg else {
501 unreachable!("must be a barrier");
502 };
503 barrier_batch.push(barrier.clone());
504 }
505 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
506}
507
508#[derive(Debug)]
509pub enum DispatcherImpl {
510 Hash(HashDataDispatcher),
511 Broadcast(BroadcastDispatcher),
512 Simple(SimpleDispatcher),
513 RoundRobin(RoundRobinDataDispatcher),
514}
515
516impl DispatcherImpl {
517 pub fn new(
518 context: &SharedContext,
519 actor_id: ActorId,
520 dispatcher: &PbDispatcher,
521 ) -> StreamResult<Self> {
522 let outputs = dispatcher
523 .downstream_actor_id
524 .iter()
525 .map(|&down_id| new_output(context, actor_id, down_id))
526 .collect::<StreamResult<Vec<_>>>()?;
527
528 let output_indices = dispatcher
529 .output_indices
530 .iter()
531 .map(|&i| i as usize)
532 .collect_vec();
533
534 use risingwave_pb::stream_plan::DispatcherType::*;
535 let dispatcher_impl = match dispatcher.get_type()? {
536 Hash => {
537 assert!(!outputs.is_empty());
538 let dist_key_indices = dispatcher
539 .dist_key_indices
540 .iter()
541 .map(|i| *i as usize)
542 .collect();
543
544 let hash_mapping =
545 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
546
547 DispatcherImpl::Hash(HashDataDispatcher::new(
548 outputs,
549 dist_key_indices,
550 output_indices,
551 hash_mapping,
552 dispatcher.dispatcher_id,
553 ))
554 }
555 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
556 outputs,
557 output_indices,
558 dispatcher.dispatcher_id,
559 )),
560 Simple | NoShuffle => {
561 let [output]: [_; 1] = outputs.try_into().unwrap();
562 DispatcherImpl::Simple(SimpleDispatcher::new(
563 output,
564 output_indices,
565 dispatcher.dispatcher_id,
566 ))
567 }
568 Unspecified => unreachable!(),
569 };
570
571 Ok(dispatcher_impl)
572 }
573}
574
575macro_rules! impl_dispatcher {
576 ($( { $variant_name:ident } ),*) => {
577 impl DispatcherImpl {
578 pub async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
579 match self {
580 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
581 }
582 }
583
584 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
585 match self {
586 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
587 }
588 }
589
590 pub async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
591 match self {
592 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
593 }
594 }
595
596 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
597 match self {
598 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
599 }
600 }
601
602 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
603 match self {
604 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
605 }
606 }
607
608 pub fn dispatcher_id(&self) -> DispatcherId {
609 match self {
610 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
611 }
612 }
613
614 pub fn dispatcher_id_str(&self) -> &str {
615 match self {
616 $(Self::$variant_name(inner) => inner.dispatcher_id_str(), )*
617 }
618 }
619
620 pub fn is_empty(&self) -> bool {
621 match self {
622 $(Self::$variant_name(inner) => inner.is_empty(), )*
623 }
624 }
625 }
626 }
627}
628
629macro_rules! for_all_dispatcher_variants {
630 ($macro:ident) => {
631 $macro! {
632 { Hash },
633 { Broadcast },
634 { Simple },
635 { RoundRobin }
636 }
637 };
638}
639
640for_all_dispatcher_variants! { impl_dispatcher }
641
642pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
643
644pub trait Dispatcher: Debug + 'static {
645 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
647 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
649 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
651
652 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>);
654 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
656
657 fn dispatcher_id(&self) -> DispatcherId;
663
664 fn dispatcher_id_str(&self) -> &str;
666
667 fn is_empty(&self) -> bool;
670}
671
672async fn broadcast_concurrent(
677 outputs: impl IntoIterator<Item = &'_ mut BoxedOutput>,
678 message: DispatcherMessageBatch,
679) -> StreamResult<()> {
680 futures::future::try_join_all(
681 outputs
682 .into_iter()
683 .map(|output| output.send(message.clone())),
684 )
685 .await?;
686 Ok(())
687}
688
689#[derive(Debug)]
690pub struct RoundRobinDataDispatcher {
691 outputs: Vec<BoxedOutput>,
692 output_indices: Vec<usize>,
693 cur: usize,
694 dispatcher_id: DispatcherId,
695 dispatcher_id_str: String,
696}
697
698impl RoundRobinDataDispatcher {
699 pub fn new(
700 outputs: Vec<BoxedOutput>,
701 output_indices: Vec<usize>,
702 dispatcher_id: DispatcherId,
703 ) -> Self {
704 Self {
705 outputs,
706 output_indices,
707 cur: 0,
708 dispatcher_id,
709 dispatcher_id_str: dispatcher_id.to_string(),
710 }
711 }
712}
713
714impl Dispatcher for RoundRobinDataDispatcher {
715 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
716 let chunk = if self.output_indices.len() < chunk.columns().len() {
717 chunk
718 .project(&self.output_indices)
719 .eliminate_adjacent_noop_update()
720 } else {
721 chunk.project(&self.output_indices)
722 };
723
724 self.outputs[self.cur]
725 .send(DispatcherMessageBatch::Chunk(chunk))
726 .await?;
727 self.cur += 1;
728 self.cur %= self.outputs.len();
729 Ok(())
730 }
731
732 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
733 broadcast_concurrent(
735 &mut self.outputs,
736 DispatcherMessageBatch::BarrierBatch(barriers),
737 )
738 .await
739 }
740
741 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
742 if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
743 broadcast_concurrent(
745 &mut self.outputs,
746 DispatcherMessageBatch::Watermark(watermark),
747 )
748 .await?;
749 }
750 Ok(())
751 }
752
753 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
754 self.outputs.extend(outputs);
755 }
756
757 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
758 self.outputs
759 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
760 .count();
761 self.cur = self.cur.min(self.outputs.len() - 1);
762 }
763
764 fn dispatcher_id(&self) -> DispatcherId {
765 self.dispatcher_id
766 }
767
768 fn dispatcher_id_str(&self) -> &str {
769 &self.dispatcher_id_str
770 }
771
772 fn is_empty(&self) -> bool {
773 self.outputs.is_empty()
774 }
775}
776
777pub struct HashDataDispatcher {
778 outputs: Vec<BoxedOutput>,
779 keys: Vec<usize>,
780 output_indices: Vec<usize>,
781 hash_mapping: ExpandedActorMapping,
784 dispatcher_id: DispatcherId,
785 dispatcher_id_str: String,
786}
787
788impl Debug for HashDataDispatcher {
789 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
790 f.debug_struct("HashDataDispatcher")
791 .field("outputs", &self.outputs)
792 .field("keys", &self.keys)
793 .field("dispatcher_id", &self.dispatcher_id)
794 .finish_non_exhaustive()
795 }
796}
797
798impl HashDataDispatcher {
799 pub fn new(
800 outputs: Vec<BoxedOutput>,
801 keys: Vec<usize>,
802 output_indices: Vec<usize>,
803 hash_mapping: ExpandedActorMapping,
804 dispatcher_id: DispatcherId,
805 ) -> Self {
806 Self {
807 outputs,
808 keys,
809 output_indices,
810 hash_mapping,
811 dispatcher_id,
812 dispatcher_id_str: dispatcher_id.to_string(),
813 }
814 }
815}
816
817impl Dispatcher for HashDataDispatcher {
818 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
819 self.outputs.extend(outputs);
820 }
821
822 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
823 broadcast_concurrent(
825 &mut self.outputs,
826 DispatcherMessageBatch::BarrierBatch(barriers),
827 )
828 .await
829 }
830
831 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
832 if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
833 broadcast_concurrent(
835 &mut self.outputs,
836 DispatcherMessageBatch::Watermark(watermark),
837 )
838 .await?;
839 }
840 Ok(())
841 }
842
843 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
844 let num_outputs = self.outputs.len();
848
849 let vnode_count = self.hash_mapping.len();
851 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
852
853 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
854
855 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
856 .take(num_outputs)
857 .collect_vec();
858 let mut last_vnode_when_update_delete = None;
859 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
860
861 let chunk = if self.output_indices.len() < chunk.columns().len() {
863 chunk
864 .project(&self.output_indices)
865 .eliminate_adjacent_noop_update()
866 } else {
867 chunk.project(&self.output_indices)
868 };
869
870 for ((vnode, &op), visible) in vnodes
871 .iter()
872 .copied()
873 .zip_eq_fast(chunk.ops())
874 .zip_eq_fast(chunk.visibility().iter())
875 {
876 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
878 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
879 }
880
881 if !visible {
882 assert!(
883 last_vnode_when_update_delete.is_none(),
884 "invisible row between U- and U+, op = {op:?}",
885 );
886 new_ops.push(op);
887 continue;
888 }
889
890 if op == Op::UpdateDelete {
894 last_vnode_when_update_delete = Some(vnode);
895 } else if op == Op::UpdateInsert {
896 if vnode
897 != last_vnode_when_update_delete
898 .take()
899 .expect("missing U- before U+")
900 {
901 new_ops.push(Op::Delete);
902 new_ops.push(Op::Insert);
903 } else {
904 new_ops.push(Op::UpdateDelete);
905 new_ops.push(Op::UpdateInsert);
906 }
907 } else {
908 new_ops.push(op);
909 }
910 }
911 assert!(
912 last_vnode_when_update_delete.is_none(),
913 "missing U+ after U-"
914 );
915
916 let ops = new_ops;
917
918 futures::future::try_join_all(
920 vis_maps
921 .into_iter()
922 .zip_eq_fast(self.outputs.iter_mut())
923 .map(|(vis_map, output)| async {
924 let vis_map = vis_map.finish();
925 let new_stream_chunk =
927 StreamChunk::with_visibility(ops.clone(), chunk.columns().into(), vis_map);
928 if new_stream_chunk.cardinality() > 0 {
929 event!(
930 tracing::Level::TRACE,
931 msg = "chunk",
932 downstream = output.actor_id(),
933 "send = \n{:#?}",
934 new_stream_chunk
935 );
936 output
937 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
938 .await?;
939 }
940 StreamResult::Ok(())
941 }),
942 )
943 .await?;
944
945 Ok(())
946 }
947
948 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
949 self.outputs
950 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
951 .count();
952 }
953
954 fn dispatcher_id(&self) -> DispatcherId {
955 self.dispatcher_id
956 }
957
958 fn dispatcher_id_str(&self) -> &str {
959 &self.dispatcher_id_str
960 }
961
962 fn is_empty(&self) -> bool {
963 self.outputs.is_empty()
964 }
965}
966
967#[derive(Debug)]
969pub struct BroadcastDispatcher {
970 outputs: HashMap<ActorId, BoxedOutput>,
971 output_indices: Vec<usize>,
972 dispatcher_id: DispatcherId,
973 dispatcher_id_str: String,
974}
975
976impl BroadcastDispatcher {
977 pub fn new(
978 outputs: impl IntoIterator<Item = BoxedOutput>,
979 output_indices: Vec<usize>,
980 dispatcher_id: DispatcherId,
981 ) -> Self {
982 Self {
983 outputs: Self::into_pairs(outputs).collect(),
984 output_indices,
985 dispatcher_id,
986 dispatcher_id_str: dispatcher_id.to_string(),
987 }
988 }
989
990 fn into_pairs(
991 outputs: impl IntoIterator<Item = BoxedOutput>,
992 ) -> impl Iterator<Item = (ActorId, BoxedOutput)> {
993 outputs
994 .into_iter()
995 .map(|output| (output.actor_id(), output))
996 }
997}
998
999impl Dispatcher for BroadcastDispatcher {
1000 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1001 let chunk = if self.output_indices.len() < chunk.columns().len() {
1002 chunk
1003 .project(&self.output_indices)
1004 .eliminate_adjacent_noop_update()
1005 } else {
1006 chunk.project(&self.output_indices)
1007 };
1008 broadcast_concurrent(
1009 self.outputs.values_mut(),
1010 DispatcherMessageBatch::Chunk(chunk),
1011 )
1012 .await
1013 }
1014
1015 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1016 broadcast_concurrent(
1018 self.outputs.values_mut(),
1019 DispatcherMessageBatch::BarrierBatch(barriers),
1020 )
1021 .await
1022 }
1023
1024 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1025 if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
1026 broadcast_concurrent(
1028 self.outputs.values_mut(),
1029 DispatcherMessageBatch::Watermark(watermark),
1030 )
1031 .await?;
1032 }
1033 Ok(())
1034 }
1035
1036 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
1037 self.outputs.extend(Self::into_pairs(outputs));
1038 }
1039
1040 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1041 self.outputs
1042 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1043 .count();
1044 }
1045
1046 fn dispatcher_id(&self) -> DispatcherId {
1047 self.dispatcher_id
1048 }
1049
1050 fn dispatcher_id_str(&self) -> &str {
1051 &self.dispatcher_id_str
1052 }
1053
1054 fn is_empty(&self) -> bool {
1055 self.outputs.is_empty()
1056 }
1057}
1058
1059#[derive(Debug)]
1061pub struct SimpleDispatcher {
1062 output: SmallVec<[BoxedOutput; 2]>,
1076 output_indices: Vec<usize>,
1077 dispatcher_id: DispatcherId,
1078 dispatcher_id_str: String,
1079}
1080
1081impl SimpleDispatcher {
1082 pub fn new(
1083 output: BoxedOutput,
1084 output_indices: Vec<usize>,
1085 dispatcher_id: DispatcherId,
1086 ) -> Self {
1087 Self {
1088 output: smallvec![output],
1089 output_indices,
1090 dispatcher_id,
1091 dispatcher_id_str: dispatcher_id.to_string(),
1092 }
1093 }
1094}
1095
1096impl Dispatcher for SimpleDispatcher {
1097 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = BoxedOutput>) {
1098 self.output.extend(outputs);
1099 assert!(self.output.len() <= 2);
1100 }
1101
1102 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1103 for output in &mut self.output {
1105 output
1106 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1107 .await?;
1108 }
1109 Ok(())
1110 }
1111
1112 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1113 let output = self
1114 .output
1115 .iter_mut()
1116 .exactly_one()
1117 .expect("expect exactly one output");
1118
1119 let chunk = if self.output_indices.len() < chunk.columns().len() {
1120 chunk
1121 .project(&self.output_indices)
1122 .eliminate_adjacent_noop_update()
1123 } else {
1124 chunk.project(&self.output_indices)
1125 };
1126 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1127 }
1128
1129 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1130 let output = self
1131 .output
1132 .iter_mut()
1133 .exactly_one()
1134 .expect("expect exactly one output");
1135
1136 if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
1137 output
1138 .send(DispatcherMessageBatch::Watermark(watermark))
1139 .await?;
1140 }
1141 Ok(())
1142 }
1143
1144 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1145 self.output
1146 .retain(|output| !actor_ids.contains(&output.actor_id()));
1147 }
1148
1149 fn dispatcher_id(&self) -> DispatcherId {
1150 self.dispatcher_id
1151 }
1152
1153 fn dispatcher_id_str(&self) -> &str {
1154 &self.dispatcher_id_str
1155 }
1156
1157 fn is_empty(&self) -> bool {
1158 self.output.is_empty()
1159 }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164 use std::hash::{BuildHasher, Hasher};
1165 use std::sync::Mutex;
1166
1167 use async_trait::async_trait;
1168 use futures::pin_mut;
1169 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1170 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1171 use risingwave_common::config;
1172 use risingwave_common::util::epoch::test_epoch;
1173 use risingwave_common::util::hash_util::Crc32FastBuilder;
1174 use risingwave_pb::stream_plan::DispatcherType;
1175
1176 use super::*;
1177 use crate::executor::exchange::output::Output;
1178 use crate::executor::exchange::permit::channel_for_test;
1179 use crate::executor::receiver::ReceiverExecutor;
1180 use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
1181 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1182 use crate::task::test_utils::helper_make_local_actor;
1183
1184 #[derive(Debug)]
1185 pub struct MockOutput {
1186 actor_id: ActorId,
1187 data: Arc<Mutex<Vec<DispatcherMessageBatch>>>,
1188 }
1189
1190 impl MockOutput {
1191 pub fn new(actor_id: ActorId, data: Arc<Mutex<Vec<DispatcherMessageBatch>>>) -> Self {
1192 Self { actor_id, data }
1193 }
1194 }
1195
1196 #[async_trait]
1197 impl Output for MockOutput {
1198 async fn send(&mut self, message: DispatcherMessageBatch) -> StreamResult<()> {
1199 self.data.lock().unwrap().push(message);
1200 Ok(())
1201 }
1202
1203 fn actor_id(&self) -> ActorId {
1204 self.actor_id
1205 }
1206 }
1207
1208 #[tokio::test]
1211 async fn test_hash_dispatcher_complex() {
1212 test_hash_dispatcher_complex_inner().await
1213 }
1214
1215 async fn test_hash_dispatcher_complex_inner() {
1216 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1218
1219 let num_outputs = 2; let key_indices = &[0, 2];
1221 let output_data_vecs = (0..num_outputs)
1222 .map(|_| Arc::new(Mutex::new(Vec::new())))
1223 .collect::<Vec<_>>();
1224 let outputs = output_data_vecs
1225 .iter()
1226 .enumerate()
1227 .map(|(actor_id, data)| {
1228 Box::new(MockOutput::new(1 + actor_id as u32, data.clone())) as BoxedOutput
1229 })
1230 .collect::<Vec<_>>();
1231 let mut hash_mapping = (1..num_outputs + 1)
1232 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1233 .collect_vec();
1234 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1235 let mut hash_dispatcher = HashDataDispatcher::new(
1236 outputs,
1237 key_indices.to_vec(),
1238 vec![0, 1, 2],
1239 hash_mapping,
1240 0,
1241 );
1242
1243 let chunk = StreamChunk::from_pretty(
1244 " I I I
1245 + 4 6 8
1246 + 5 7 9
1247 + 0 0 0
1248 - 1 1 1 D
1249 U- 2 0 2
1250 U+ 2 0 2
1251 U- 3 3 2
1252 U+ 3 3 4",
1253 );
1254 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1255
1256 assert_eq!(
1257 *output_data_vecs[0].lock().unwrap()[0].as_chunk().unwrap(),
1258 StreamChunk::from_pretty(
1259 " I I I
1260 + 4 6 8
1261 + 5 7 9
1262 + 0 0 0
1263 - 1 1 1 D
1264 U- 2 0 2
1265 U+ 2 0 2
1266 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1267 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1268 )
1269 );
1270 assert_eq!(
1271 *output_data_vecs[1].lock().unwrap()[0].as_chunk().unwrap(),
1272 StreamChunk::from_pretty(
1273 " I I I
1274 + 4 6 8 D
1275 + 5 7 9 D
1276 + 0 0 0 D
1277 - 1 1 1 D // Should keep original invisible mark
1278 U- 2 0 2 D // Should keep UpdateDelete
1279 U+ 2 0 2 D // Should keep UpdateInsert
1280 - 3 3 2 // Should rewrite UpdateDelete to Delete
1281 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1282 )
1283 );
1284 }
1285
1286 #[tokio::test]
1287 async fn test_configuration_change() {
1288 let _schema = Schema { fields: vec![] };
1289 let (tx, rx) = channel_for_test();
1290 let actor_id = 233;
1291 let fragment_id = 666;
1292 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1293 let ctx = Arc::new(SharedContext::for_test());
1294 let metrics = Arc::new(StreamingMetrics::unused());
1295
1296 let (untouched, old, new) = (234, 235, 238); let (old_simple, new_simple) = (114, 514); ctx.add_actors(
1301 [actor_id, untouched, old, new, old_simple, new_simple]
1302 .into_iter()
1303 .map(helper_make_local_actor),
1304 );
1305 let broadcast_dispatcher_id = 666;
1308 let broadcast_dispatcher = DispatcherImpl::new(
1309 &ctx,
1310 actor_id,
1311 &PbDispatcher {
1312 r#type: DispatcherType::Broadcast as _,
1313 dispatcher_id: broadcast_dispatcher_id,
1314 downstream_actor_id: vec![untouched, old],
1315 ..Default::default()
1316 },
1317 )
1318 .unwrap();
1319
1320 let simple_dispatcher_id = 888;
1321 let simple_dispatcher = DispatcherImpl::new(
1322 &ctx,
1323 actor_id,
1324 &PbDispatcher {
1325 r#type: DispatcherType::Simple as _,
1326 dispatcher_id: simple_dispatcher_id,
1327 downstream_actor_id: vec![old_simple],
1328 ..Default::default()
1329 },
1330 )
1331 .unwrap();
1332
1333 let dispatcher_updates = maplit::hashmap! {
1334 actor_id => vec![PbDispatcherUpdate {
1335 actor_id,
1336 dispatcher_id: broadcast_dispatcher_id,
1337 added_downstream_actor_id: vec![new],
1338 removed_downstream_actor_id: vec![old],
1339 hash_mapping: Default::default(),
1340 }]
1341 };
1342 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1343 UpdateMutation {
1344 dispatchers: dispatcher_updates,
1345 merges: Default::default(),
1346 vnode_bitmaps: Default::default(),
1347 dropped_actors: Default::default(),
1348 actor_splits: Default::default(),
1349 actor_new_dispatchers: Default::default(),
1350 },
1351 ));
1352 barrier_test_env.inject_barrier(&b1, [actor_id]);
1353 barrier_test_env.flush_all_events().await;
1354
1355 let input = Executor::new(
1356 Default::default(),
1357 ReceiverExecutor::for_test(
1358 actor_id,
1359 rx,
1360 barrier_test_env.shared_context.clone(),
1361 barrier_test_env.local_barrier_manager.clone(),
1362 )
1363 .boxed(),
1364 );
1365 let executor = Box::new(DispatchExecutor::new(
1366 input,
1367 vec![broadcast_dispatcher, simple_dispatcher],
1368 actor_id,
1369 fragment_id,
1370 ctx.clone(),
1371 metrics,
1372 config::default::developer::stream_chunk_size(),
1373 ))
1374 .execute();
1375 pin_mut!(executor);
1376
1377 let mut rxs = [untouched, old, new, old_simple, new_simple]
1379 .into_iter()
1380 .map(|id| (id, ctx.take_receiver((actor_id, id)).unwrap()))
1381 .collect::<HashMap<_, _>>();
1382 macro_rules! try_recv {
1383 ($down_id:expr) => {
1384 rxs.get_mut(&$down_id).unwrap().try_recv()
1385 };
1386 }
1387
1388 tx.send(Message::Chunk(StreamChunk::default()).into())
1390 .await
1391 .unwrap();
1392
1393 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1394 .await
1395 .unwrap();
1396 executor.next().await.unwrap().unwrap();
1397
1398 try_recv!(untouched).unwrap().as_chunk().unwrap();
1400 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1401
1402 try_recv!(old).unwrap().as_chunk().unwrap();
1403 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1408 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1412 barrier_test_env.inject_barrier(&b2, [actor_id]);
1413 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1414 .await
1415 .unwrap();
1416 executor.next().await.unwrap().unwrap();
1417
1418 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1420 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1422
1423 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1428 .await
1429 .unwrap();
1430
1431 let dispatcher_updates = maplit::hashmap! {
1433 actor_id => vec![PbDispatcherUpdate {
1434 actor_id,
1435 dispatcher_id: simple_dispatcher_id,
1436 added_downstream_actor_id: vec![new_simple],
1437 removed_downstream_actor_id: vec![old_simple],
1438 hash_mapping: Default::default(),
1439 }]
1440 };
1441 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1442 UpdateMutation {
1443 dispatchers: dispatcher_updates,
1444 merges: Default::default(),
1445 vnode_bitmaps: Default::default(),
1446 dropped_actors: Default::default(),
1447 actor_splits: Default::default(),
1448 actor_new_dispatchers: Default::default(),
1449 },
1450 ));
1451 barrier_test_env.inject_barrier(&b3, [actor_id]);
1452 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1453 .await
1454 .unwrap();
1455 executor.next().await.unwrap().unwrap();
1456
1457 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1459 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1465 barrier_test_env.inject_barrier(&b4, [actor_id]);
1466 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1467 .await
1468 .unwrap();
1469 executor.next().await.unwrap().unwrap();
1470
1471 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1474 }
1475
1476 #[tokio::test]
1477 async fn test_hash_dispatcher() {
1478 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1480
1481 let num_outputs = 5; let cardinality = 10;
1483 let dimension = 4;
1484 let key_indices = &[0, 2];
1485 let output_data_vecs = (0..num_outputs)
1486 .map(|_| Arc::new(Mutex::new(Vec::new())))
1487 .collect::<Vec<_>>();
1488 let outputs = output_data_vecs
1489 .iter()
1490 .enumerate()
1491 .map(|(actor_id, data)| {
1492 Box::new(MockOutput::new(1 + actor_id as u32, data.clone())) as BoxedOutput
1493 })
1494 .collect::<Vec<_>>();
1495 let mut hash_mapping = (1..num_outputs + 1)
1496 .flat_map(|id| vec![id as ActorId; VirtualNode::COUNT_FOR_TEST / num_outputs])
1497 .collect_vec();
1498 hash_mapping.resize(VirtualNode::COUNT_FOR_TEST, num_outputs as u32);
1499 let mut hash_dispatcher = HashDataDispatcher::new(
1500 outputs,
1501 key_indices.to_vec(),
1502 (0..dimension).collect(),
1503 hash_mapping.clone(),
1504 0,
1505 );
1506
1507 let mut ops = Vec::new();
1508 for idx in 0..cardinality {
1509 if idx % 2 == 0 {
1510 ops.push(Op::Insert);
1511 } else {
1512 ops.push(Op::Delete);
1513 }
1514 }
1515
1516 let mut start = 19260817i32..;
1517 let mut builders = (0..dimension)
1518 .map(|_| I32ArrayBuilder::new(cardinality))
1519 .collect_vec();
1520 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1521 let mut output_ops = vec![vec![]; num_outputs];
1522 for op in &ops {
1523 let hash_builder = Crc32FastBuilder;
1524 let mut hasher = hash_builder.build_hasher();
1525 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1526 for key_idx in key_indices {
1527 let val = one_row[*key_idx];
1528 let bytes = val.to_le_bytes();
1529 hasher.update(&bytes);
1530 }
1531 let output_idx =
1532 hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST] as usize - 1;
1533 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1534 builder.append(Some(*val));
1535 }
1536 output_cols[output_idx]
1537 .iter_mut()
1538 .zip_eq_fast(one_row.iter())
1539 .for_each(|(each_column, val)| each_column.push(*val));
1540 output_ops[output_idx].push(op);
1541 }
1542
1543 let columns = builders
1544 .into_iter()
1545 .map(|builder| {
1546 let array = builder.finish();
1547 array.into_ref()
1548 })
1549 .collect();
1550
1551 let chunk = StreamChunk::new(ops, columns);
1552 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1553
1554 for (output_idx, output) in output_data_vecs.into_iter().enumerate() {
1555 let guard = output.lock().unwrap();
1556 assert!(guard.len() <= 1);
1558 if guard.is_empty() {
1559 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1560 } else {
1561 let message = guard.first().unwrap();
1562 let real_chunk = match message {
1563 DispatcherMessageBatch::Chunk(chunk) => chunk,
1564 _ => panic!(),
1565 };
1566 real_chunk
1567 .columns()
1568 .iter()
1569 .zip_eq_fast(output_cols[output_idx].iter())
1570 .for_each(|(real_col, expect_col)| {
1571 let real_vals = real_chunk
1572 .visibility()
1573 .iter_ones()
1574 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1575 .collect::<Vec<_>>();
1576 assert_eq!(real_vals.len(), expect_col.len());
1577 assert_eq!(real_vals, *expect_col);
1578 });
1579 }
1580 }
1581 }
1582}