1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::iter::repeat_with;
18use std::ops::{Deref, DerefMut};
19use std::time::Duration;
20
21use anyhow::anyhow;
22use futures::FutureExt;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::BitmapBuilder;
26use risingwave_common::config::StreamingConfig;
27use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
28use risingwave_common::metrics::LabelGuardedIntCounter;
29use risingwave_common::row::RowExt;
30use risingwave_common::util::iter_util::ZipEqFast;
31use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
32use risingwave_pb::stream_plan::{self, PbDispatcher};
33use smallvec::{SmallVec, smallvec};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::UnboundedReceiver;
36use tokio::task::consume_budget;
37use tokio::time::Instant;
38use tokio_stream::StreamExt;
39use tokio_stream::adapters::Peekable;
40use tracing::{Instrument, event};
41
42use super::exchange::output::Output;
43use super::{
44 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
45 UpdateMutation,
46};
47use crate::executor::prelude::*;
48use crate::executor::{StopMutation, StreamConsumer};
49use crate::task::{DispatcherId, NewOutputRequest};
50
51mod output_mapping;
52pub use output_mapping::DispatchOutputMapping;
53use risingwave_common::id::FragmentId;
54
55use crate::error::StreamError;
56
57pub struct DispatchExecutor {
61 input: Executor,
62 inner: DispatchExecutorInner,
63}
64
65struct DispatcherWithMetrics {
66 dispatcher: DispatcherImpl,
67 pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
68}
69
70impl Debug for DispatcherWithMetrics {
71 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
72 self.dispatcher.fmt(f)
73 }
74}
75
76impl Deref for DispatcherWithMetrics {
77 type Target = DispatcherImpl;
78
79 fn deref(&self) -> &Self::Target {
80 &self.dispatcher
81 }
82}
83
84impl DerefMut for DispatcherWithMetrics {
85 fn deref_mut(&mut self) -> &mut Self::Target {
86 &mut self.dispatcher
87 }
88}
89
90struct DispatchExecutorMetrics {
91 actor_id_str: String,
92 fragment_id_str: String,
93 metrics: Arc<StreamingMetrics>,
94 actor_out_record_cnt: LabelGuardedIntCounter,
95}
96
97impl DispatchExecutorMetrics {
98 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
99 DispatcherWithMetrics {
100 actor_output_buffer_blocking_duration_ns: self
101 .metrics
102 .actor_output_buffer_blocking_duration_ns
103 .with_guarded_label_values(&[
104 self.actor_id_str.as_str(),
105 self.fragment_id_str.as_str(),
106 dispatcher.dispatcher_id().to_string().as_str(),
107 ]),
108 dispatcher,
109 }
110 }
111}
112
113struct DispatchExecutorInner {
114 dispatchers: Vec<DispatcherWithMetrics>,
115 actor_id: ActorId,
116 actor_config: Arc<StreamingConfig>,
117 metrics: DispatchExecutorMetrics,
118 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
119 pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
120}
121
122impl DispatchExecutorInner {
123 async fn collect_outputs(
124 &mut self,
125 downstream_actors: &[ActorId],
126 ) -> StreamResult<Vec<Output>> {
127 fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
128 let tx = match request {
129 NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
130 };
131 Output::new(downstream_actor, tx)
132 }
133 let mut outputs = Vec::with_capacity(downstream_actors.len());
134 for &downstream_actor in downstream_actors {
135 let output =
136 if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
137 resolve_output(downstream_actor, request)
138 } else {
139 loop {
140 let (requested_actor, request) = self
141 .new_output_request_rx
142 .recv()
143 .await
144 .ok_or_else(|| anyhow!("end of new output request"))?;
145 if requested_actor == downstream_actor {
146 break resolve_output(requested_actor, request);
147 } else {
148 assert!(
149 self.pending_new_output_requests
150 .insert(requested_actor, request)
151 .is_none(),
152 "duplicated inflight new output requests from actor {}",
153 requested_actor
154 );
155 }
156 }
157 };
158 outputs.push(output);
159 }
160 Ok(outputs)
161 }
162
163 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
164 if self.dispatchers.is_empty() {
165 consume_budget().await;
170 }
171 async fn await_with_metrics(
172 future: impl Future<Output = ()>,
173 metrics: &LabelGuardedIntCounter,
174 ) {
175 let mut start_time = Instant::now();
176 let interval_duration = Duration::from_secs(15);
177 let mut interval =
178 tokio::time::interval_at(start_time + interval_duration, interval_duration);
179
180 let mut fut = pin!(future);
181
182 loop {
183 tokio::select! {
184 biased;
185 _res = &mut fut => {
186 let ns = start_time.elapsed().as_nanos() as u64;
187 metrics.inc_by(ns);
188 break;
189 }
190 _ = interval.tick() => {
191 start_time = Instant::now();
192 metrics.inc_by(interval_duration.as_nanos() as u64);
193 }
194 };
195 }
196 }
197
198 use futures::StreamExt;
199
200 let limit = self.actor_config.developer.exchange_concurrent_dispatchers;
201 match msg {
203 MessageBatch::BarrierBatch(barrier_batch) => {
204 if barrier_batch.is_empty() {
205 return Ok(());
206 }
207 let mutation = barrier_batch[0].mutation.clone();
209 self.pre_mutate_dispatchers(&mutation).await?;
210 futures::stream::iter(self.dispatchers.iter_mut())
211 .for_each_concurrent(limit, |dispatcher| async {
212 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
213 let dispatcher_output = &mut dispatcher.dispatcher;
214 let fut = dispatcher_output.dispatch_barriers(
215 barrier_batch
216 .iter()
217 .cloned()
218 .map(|b| b.into_dispatcher())
219 .collect(),
220 );
221 await_with_metrics(fut, metrics).await;
222 })
223 .await;
224 self.post_mutate_dispatchers(&mutation)?;
225 }
226 MessageBatch::Watermark(watermark) => {
227 futures::stream::iter(self.dispatchers.iter_mut())
228 .for_each_concurrent(limit, |dispatcher| async {
229 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
230 let dispatcher_output = &mut dispatcher.dispatcher;
231 let fut = dispatcher_output.dispatch_watermark(watermark.clone());
232 await_with_metrics(fut, metrics).await;
233 })
234 .await;
235 }
236 MessageBatch::Chunk(chunk) => {
237 futures::stream::iter(self.dispatchers.iter_mut())
238 .for_each_concurrent(limit, |dispatcher| async {
239 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
240 let dispatcher_output = &mut dispatcher.dispatcher;
241 let fut = dispatcher_output.dispatch_data(chunk.clone());
242 await_with_metrics(fut, metrics).await;
243 })
244 .await;
245
246 self.metrics
247 .actor_out_record_cnt
248 .inc_by(chunk.cardinality() as _);
249 }
250 }
251 self.dispatchers.retain(|dispatcher| match &**dispatcher {
252 DispatcherImpl::Failed(dispatcher_id, e) => {
253 warn!(%dispatcher_id, err = %e.as_report(), actor_id = %self.actor_id, "dispatch fail");
254 false
255 }
256 _ => true,
257 });
258 Ok(())
259 }
260
261 async fn add_dispatchers<'a>(
263 &mut self,
264 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
265 ) -> StreamResult<()> {
266 for dispatcher in new_dispatchers {
267 let outputs = self
268 .collect_outputs(&dispatcher.downstream_actor_id)
269 .await?;
270 let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
271 let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
272 self.dispatchers.push(dispatcher);
273 }
274
275 assert!(
276 self.dispatchers
277 .iter()
278 .map(|d| d.dispatcher_id())
279 .all_unique(),
280 "dispatcher ids must be unique: {:?}",
281 self.dispatchers
282 );
283
284 Ok(())
285 }
286
287 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> Option<&mut DispatcherImpl> {
288 self.dispatchers
289 .iter_mut()
290 .find(|d| d.dispatcher_id() == dispatcher_id)
291 .map(DerefMut::deref_mut)
292 }
293
294 async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
297 let outputs = self
298 .collect_outputs(&update.added_downstream_actor_id)
299 .await?;
300
301 let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
302 warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
303 return Ok(());
304 };
305 dispatcher.add_outputs(outputs);
306
307 Ok(())
308 }
309
310 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
313 let ids = update.removed_downstream_actor_id.iter().copied().collect();
314
315 let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
316 warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
317 return Ok(());
318 };
319 dispatcher.remove_outputs(&ids);
320
321 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
328 dispatcher.hash_mapping =
329 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
330 }
331
332 Ok(())
333 }
334
335 async fn pre_mutate_dispatchers(
337 &mut self,
338 mutation: &Option<Arc<Mutation>>,
339 ) -> StreamResult<()> {
340 let Some(mutation) = mutation.as_deref() else {
341 return Ok(());
342 };
343
344 match mutation {
345 Mutation::Add(AddMutation { adds, .. }) => {
346 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
347 self.add_dispatchers(new_dispatchers).await?;
348 }
349 }
350 Mutation::Update(UpdateMutation {
351 dispatchers,
352 actor_new_dispatchers: actor_dispatchers,
353 ..
354 }) => {
355 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
356 self.add_dispatchers(new_dispatchers).await?;
357 }
358
359 if let Some(updates) = dispatchers.get(&self.actor_id) {
360 for update in updates {
361 self.pre_update_dispatcher(update).await?;
362 }
363 }
364 }
365 _ => {}
366 }
367
368 Ok(())
369 }
370
371 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
373 let Some(mutation) = mutation.as_deref() else {
374 return Ok(());
375 };
376
377 match mutation {
378 Mutation::Stop(StopMutation { dropped_actors, .. }) => {
379 if !dropped_actors.contains(&self.actor_id) {
381 for dispatcher in &mut self.dispatchers {
382 dispatcher.remove_outputs(dropped_actors);
383 }
384 }
385 }
386 Mutation::Update(UpdateMutation {
387 dispatchers,
388 dropped_actors,
389 ..
390 }) => {
391 if let Some(updates) = dispatchers.get(&self.actor_id) {
392 for update in updates {
393 self.post_update_dispatcher(update)?;
394 }
395 }
396
397 if !dropped_actors.contains(&self.actor_id) {
398 for dispatcher in &mut self.dispatchers {
399 dispatcher.remove_outputs(dropped_actors);
400 }
401 }
402 }
403 _ => {}
404 };
405
406 self.dispatchers.retain(|d| !d.is_empty());
409
410 Ok(())
411 }
412}
413
414impl DispatchExecutor {
415 pub(crate) async fn new(
416 input: Executor,
417 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
418 dispatchers: Vec<stream_plan::Dispatcher>,
419 actor_context: &ActorContextRef,
420 ) -> StreamResult<Self> {
421 let mut executor = Self::new_inner(
422 input,
423 new_output_request_rx,
424 vec![],
425 actor_context.id,
426 actor_context.fragment_id,
427 actor_context.config.clone(),
428 actor_context.streaming_metrics.clone(),
429 );
430 let inner = &mut executor.inner;
431 for dispatcher in dispatchers {
432 let outputs = inner
433 .collect_outputs(&dispatcher.downstream_actor_id)
434 .await?;
435 let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
436 let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
437 inner.dispatchers.push(dispatcher);
438 }
439 Ok(executor)
440 }
441
442 #[cfg(test)]
443 pub(crate) fn for_test(
444 input: Executor,
445 dispatchers: Vec<DispatcherImpl>,
446 actor_id: ActorId,
447 fragment_id: FragmentId,
448 actor_config: Arc<StreamingConfig>,
449 metrics: Arc<StreamingMetrics>,
450 ) -> (
451 Self,
452 tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
453 ) {
454 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
455
456 (
457 Self::new_inner(
458 input,
459 rx,
460 dispatchers,
461 actor_id,
462 fragment_id,
463 actor_config,
464 metrics,
465 ),
466 tx,
467 )
468 }
469
470 fn new_inner(
471 mut input: Executor,
472 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
473 dispatchers: Vec<DispatcherImpl>,
474 actor_id: ActorId,
475 fragment_id: FragmentId,
476 actor_config: Arc<StreamingConfig>,
477 metrics: Arc<StreamingMetrics>,
478 ) -> Self {
479 if crate::consistency::insane() {
480 let mut info = input.info().clone();
482 info.identity = format!("{} (embedded trouble)", info.identity);
483 let troublemaker = TroublemakerExecutor::new(input, actor_config.developer.chunk_size);
484 input = (info, troublemaker).into();
485 }
486
487 let actor_id_str = actor_id.to_string();
488 let fragment_id_str = fragment_id.to_string();
489 let actor_out_record_cnt = metrics
490 .actor_out_record_cnt
491 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
492 let metrics = DispatchExecutorMetrics {
493 actor_id_str,
494 fragment_id_str,
495 metrics,
496 actor_out_record_cnt,
497 };
498 let dispatchers = dispatchers
499 .into_iter()
500 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
501 .collect();
502
503 Self {
504 input,
505 inner: DispatchExecutorInner {
506 dispatchers,
507 actor_id,
508 actor_config,
509 metrics,
510 new_output_request_rx,
511 pending_new_output_requests: Default::default(),
512 },
513 }
514 }
515}
516
517impl StreamConsumer for DispatchExecutor {
518 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
519
520 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
521 let max_barrier_count_per_batch = self.inner.actor_config.developer.max_barrier_batch_size;
522 #[try_stream]
523 async move {
524 let mut input = self.input.execute().peekable();
525 loop {
526 let Some(message) =
527 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
528 else {
529 break;
531 };
532 match message {
533 chunk @ MessageBatch::Chunk(_) => {
534 self.inner
535 .dispatch(chunk)
536 .instrument(tracing::info_span!("dispatch_chunk"))
537 .instrument_await("dispatch_chunk")
538 .await?;
539 }
540 MessageBatch::BarrierBatch(barrier_batch) => {
541 assert!(!barrier_batch.is_empty());
542 self.inner
543 .dispatch(MessageBatch::BarrierBatch(barrier_batch.clone()))
544 .instrument(tracing::info_span!("dispatch_barrier_batch"))
545 .instrument_await("dispatch_barrier_batch")
546 .await?;
547 self.inner
548 .metrics
549 .metrics
550 .barrier_batch_size
551 .observe(barrier_batch.len() as f64);
552 for barrier in barrier_batch {
553 yield barrier;
554 }
555 }
556 watermark @ MessageBatch::Watermark(_) => {
557 self.inner
558 .dispatch(watermark)
559 .instrument(tracing::info_span!("dispatch_watermark"))
560 .instrument_await("dispatch_watermark")
561 .await?;
562 }
563 }
564 }
565 }
566 }
567}
568
569async fn try_batch_barriers(
575 max_barrier_count_per_batch: u32,
576 input: &mut Peekable<BoxedMessageStream>,
577) -> StreamResult<Option<MessageBatch>> {
578 let Some(msg) = input.next().await else {
579 return Ok(None);
581 };
582 let mut barrier_batch = vec![];
583 let msg: Message = msg?;
584 let max_peek_attempts = match msg {
585 Message::Chunk(c) => {
586 return Ok(Some(MessageBatch::Chunk(c)));
587 }
588 Message::Watermark(w) => {
589 return Ok(Some(MessageBatch::Watermark(w)));
590 }
591 Message::Barrier(b) => {
592 let peek_more_barrier = b.mutation.is_none();
593 barrier_batch.push(b);
594 if peek_more_barrier {
595 max_barrier_count_per_batch.saturating_sub(1)
596 } else {
597 0
598 }
599 }
600 };
601 for _ in 0..max_peek_attempts {
603 let peek = input.peek().now_or_never();
604 let Some(peek) = peek else {
605 break;
606 };
607 let Some(msg) = peek else {
608 break;
610 };
611 let Ok(Message::Barrier(barrier)) = msg else {
612 break;
613 };
614 if barrier.mutation.is_some() {
615 break;
616 }
617 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
618 let Message::Barrier(ref barrier) = msg else {
619 unreachable!("must be a barrier");
620 };
621 barrier_batch.push(barrier.clone());
622 }
623 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
624}
625
626#[derive(Debug)]
627pub(crate) enum DispatcherImpl {
628 Hash(HashDataDispatcher),
629 Broadcast(BroadcastDispatcher),
630 Simple(SimpleDispatcher),
631 #[cfg_attr(not(test), expect(dead_code))]
632 RoundRobin(RoundRobinDataDispatcher),
633 Failed(DispatcherId, StreamError),
636}
637
638impl DispatcherImpl {
639 pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
640 let output_mapping =
641 DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
642
643 use risingwave_pb::stream_plan::DispatcherType::*;
644 let dispatcher_impl = match dispatcher.get_type()? {
645 Hash => {
646 assert!(!outputs.is_empty());
647 let dist_key_indices = dispatcher
648 .dist_key_indices
649 .iter()
650 .map(|i| *i as usize)
651 .collect();
652
653 let hash_mapping =
654 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
655
656 DispatcherImpl::Hash(HashDataDispatcher::new(
657 outputs,
658 dist_key_indices,
659 output_mapping,
660 hash_mapping,
661 dispatcher.dispatcher_id,
662 ))
663 }
664 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
665 outputs,
666 output_mapping,
667 dispatcher.dispatcher_id,
668 )),
669 Simple | NoShuffle => {
670 let [output]: [_; 1] = outputs.try_into().unwrap();
671 DispatcherImpl::Simple(SimpleDispatcher::new(
672 output,
673 output_mapping,
674 dispatcher.dispatcher_id,
675 ))
676 }
677 Unspecified => unreachable!(),
678 };
679
680 Ok(dispatcher_impl)
681 }
682}
683
684macro_rules! impl_dispatcher {
685 ($( { $variant_name:ident } ),*) => {
686 impl DispatcherImpl {
687 pub async fn dispatch_data(&mut self, chunk: StreamChunk) {
688 if let Err(e) = match self {
689 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
690 Self::Failed(..) => unreachable!(),
691 } {
692 *self = Self::Failed(self.dispatcher_id(), e);
693 }
694 }
695
696 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) {
697 if let Err(e) = match self {
698 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
699 Self::Failed(..) => unreachable!(),
700 } {
701 *self = Self::Failed(self.dispatcher_id(), e);
702 }
703 }
704
705 pub async fn dispatch_watermark(&mut self, watermark: Watermark) {
706 if let Err(e) = match self {
707 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
708 Self::Failed(..) => unreachable!(),
709 } {
710 *self = Self::Failed(self.dispatcher_id(), e);
711 }
712 }
713
714 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
715 match self {
716 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
717 Self::Failed(..) => {},
718 }
719 }
720
721 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
722 match self {
723 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
724 Self::Failed(..) => {},
725 }
726 }
727
728 pub fn dispatcher_id(&self) -> DispatcherId {
729 match self {
730 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
731 Self::Failed(dispatcher_id, ..) => *dispatcher_id,
732 }
733 }
734
735 pub fn is_empty(&self) -> bool {
736 match self {
737 $(Self::$variant_name(inner) => inner.is_empty(), )*
738 Self::Failed(..) => true,
739 }
740 }
741 }
742 }
743}
744
745macro_rules! for_all_dispatcher_variants {
746 ($macro:ident) => {
747 $macro! {
748 { Hash },
749 { Broadcast },
750 { Simple },
751 { RoundRobin }
752 }
753 };
754}
755
756for_all_dispatcher_variants! { impl_dispatcher }
757
758pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
759
760trait Dispatcher: Debug + 'static {
761 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
763 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
765 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
767
768 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
770 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
772
773 fn dispatcher_id(&self) -> DispatcherId;
779
780 fn is_empty(&self) -> bool;
783}
784
785async fn broadcast_concurrent(
790 outputs: impl IntoIterator<Item = &'_ mut Output>,
791 message: DispatcherMessageBatch,
792) -> StreamResult<()> {
793 futures::future::try_join_all(
794 outputs
795 .into_iter()
796 .map(|output| output.send(message.clone())),
797 )
798 .await?;
799 Ok(())
800}
801
802#[derive(Debug)]
803pub struct RoundRobinDataDispatcher {
804 outputs: Vec<Output>,
805 output_mapping: DispatchOutputMapping,
806 cur: usize,
807 dispatcher_id: DispatcherId,
808}
809
810impl RoundRobinDataDispatcher {
811 #[cfg_attr(not(test), expect(dead_code))]
812 pub fn new(
813 outputs: Vec<Output>,
814 output_mapping: DispatchOutputMapping,
815 dispatcher_id: DispatcherId,
816 ) -> Self {
817 Self {
818 outputs,
819 output_mapping,
820 cur: 0,
821 dispatcher_id,
822 }
823 }
824}
825
826impl Dispatcher for RoundRobinDataDispatcher {
827 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
828 let chunk = self.output_mapping.apply(chunk);
829
830 self.outputs[self.cur]
831 .send(DispatcherMessageBatch::Chunk(chunk))
832 .await?;
833 self.cur += 1;
834 self.cur %= self.outputs.len();
835 Ok(())
836 }
837
838 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
839 broadcast_concurrent(
841 &mut self.outputs,
842 DispatcherMessageBatch::BarrierBatch(barriers),
843 )
844 .await
845 }
846
847 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
848 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
849 broadcast_concurrent(
851 &mut self.outputs,
852 DispatcherMessageBatch::Watermark(watermark),
853 )
854 .await?;
855 }
856 Ok(())
857 }
858
859 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
860 self.outputs.extend(outputs);
861 }
862
863 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
864 self.outputs
865 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
866 .count();
867 self.cur = self.cur.min(self.outputs.len() - 1);
868 }
869
870 fn dispatcher_id(&self) -> DispatcherId {
871 self.dispatcher_id
872 }
873
874 fn is_empty(&self) -> bool {
875 self.outputs.is_empty()
876 }
877}
878
879pub struct HashDataDispatcher {
880 outputs: Vec<Output>,
881 keys: Vec<usize>,
882 output_mapping: DispatchOutputMapping,
883 hash_mapping: ExpandedActorMapping,
886 dispatcher_id: DispatcherId,
887}
888
889impl Debug for HashDataDispatcher {
890 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
891 f.debug_struct("HashDataDispatcher")
892 .field("outputs", &self.outputs)
893 .field("keys", &self.keys)
894 .field("dispatcher_id", &self.dispatcher_id)
895 .finish_non_exhaustive()
896 }
897}
898
899impl HashDataDispatcher {
900 pub fn new(
901 outputs: Vec<Output>,
902 keys: Vec<usize>,
903 output_mapping: DispatchOutputMapping,
904 hash_mapping: ExpandedActorMapping,
905 dispatcher_id: DispatcherId,
906 ) -> Self {
907 Self {
908 outputs,
909 keys,
910 output_mapping,
911 hash_mapping,
912 dispatcher_id,
913 }
914 }
915}
916
917impl Dispatcher for HashDataDispatcher {
918 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
919 self.outputs.extend(outputs);
920 }
921
922 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
923 broadcast_concurrent(
925 &mut self.outputs,
926 DispatcherMessageBatch::BarrierBatch(barriers),
927 )
928 .await
929 }
930
931 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
932 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
933 broadcast_concurrent(
935 &mut self.outputs,
936 DispatcherMessageBatch::Watermark(watermark),
937 )
938 .await?;
939 }
940 Ok(())
941 }
942
943 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
944 let num_outputs = self.outputs.len();
948
949 let vnode_count = self.hash_mapping.len();
951 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
952
953 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
954
955 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
956 .take(num_outputs)
957 .collect_vec();
958 let mut last_update_delete_row_idx = None;
959 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
960
961 for (row_idx, ((vnode, &op), visible)) in vnodes
962 .iter()
963 .copied()
964 .zip_eq_fast(chunk.ops())
965 .zip_eq_fast(chunk.visibility().iter())
966 .enumerate()
967 {
968 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
970 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
971 }
972
973 if !visible {
974 new_ops.push(op);
975 continue;
976 }
977
978 if op == Op::UpdateDelete {
984 last_update_delete_row_idx = Some(row_idx);
985 } else if op == Op::UpdateInsert {
986 let delete_row_idx = last_update_delete_row_idx
987 .take()
988 .expect("missing U- before U+");
989
990 let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
992 != chunk.row_at(row_idx).1.project(&self.keys);
993
994 if dist_key_changed {
995 new_ops.push(Op::Delete);
996 new_ops.push(Op::Insert);
997 } else {
998 new_ops.push(Op::UpdateDelete);
999 new_ops.push(Op::UpdateInsert);
1000 }
1001 } else {
1002 new_ops.push(op);
1003 }
1004 }
1005 assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1006
1007 let chunk = self.output_mapping.apply(chunk);
1010 let chunk_vis = chunk.visibility();
1012
1013 let chunk_ops = chunk.ops();
1020 let ops: Vec<Op> = new_ops
1021 .iter()
1022 .zip_eq_fast(chunk_ops.iter())
1023 .map(|(&new_op, &elim_op)| {
1024 match (new_op, elim_op) {
1027 (Op::Delete, Op::UpdateDelete) => Op::Delete,
1029 (Op::Insert, Op::UpdateInsert) => Op::Insert,
1031 _ => elim_op,
1033 }
1034 })
1035 .collect();
1036
1037 futures::future::try_join_all(
1039 vis_maps
1040 .into_iter()
1041 .zip_eq_fast(self.outputs.iter_mut())
1042 .map(|(vis_map, output)| async {
1043 let vis_map = vis_map.finish();
1044 let combined_vis = &vis_map & chunk_vis;
1049 let new_stream_chunk = StreamChunk::with_visibility(
1050 ops.clone(),
1051 chunk.columns().into(),
1052 combined_vis,
1053 );
1054 if new_stream_chunk.cardinality() > 0 {
1055 event!(
1056 tracing::Level::TRACE,
1057 msg = "chunk",
1058 downstream = %output.actor_id(),
1059 "send = \n{:#?}",
1060 new_stream_chunk
1061 );
1062 output
1063 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1064 .await?;
1065 }
1066 StreamResult::Ok(())
1067 }),
1068 )
1069 .await?;
1070
1071 Ok(())
1072 }
1073
1074 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1075 self.outputs
1076 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1077 .count();
1078 }
1079
1080 fn dispatcher_id(&self) -> DispatcherId {
1081 self.dispatcher_id
1082 }
1083
1084 fn is_empty(&self) -> bool {
1085 self.outputs.is_empty()
1086 }
1087}
1088
1089#[derive(Debug)]
1091pub struct BroadcastDispatcher {
1092 outputs: HashMap<ActorId, Output>,
1093 output_mapping: DispatchOutputMapping,
1094 dispatcher_id: DispatcherId,
1095}
1096
1097impl BroadcastDispatcher {
1098 pub fn new(
1099 outputs: impl IntoIterator<Item = Output>,
1100 output_mapping: DispatchOutputMapping,
1101 dispatcher_id: DispatcherId,
1102 ) -> Self {
1103 Self {
1104 outputs: Self::into_pairs(outputs).collect(),
1105 output_mapping,
1106 dispatcher_id,
1107 }
1108 }
1109
1110 fn into_pairs(
1111 outputs: impl IntoIterator<Item = Output>,
1112 ) -> impl Iterator<Item = (ActorId, Output)> {
1113 outputs
1114 .into_iter()
1115 .map(|output| (output.actor_id(), output))
1116 }
1117}
1118
1119impl Dispatcher for BroadcastDispatcher {
1120 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1121 let chunk = self.output_mapping.apply(chunk);
1122 broadcast_concurrent(
1123 self.outputs.values_mut(),
1124 DispatcherMessageBatch::Chunk(chunk),
1125 )
1126 .await
1127 }
1128
1129 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1130 broadcast_concurrent(
1132 self.outputs.values_mut(),
1133 DispatcherMessageBatch::BarrierBatch(barriers),
1134 )
1135 .await
1136 }
1137
1138 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1139 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1140 broadcast_concurrent(
1142 self.outputs.values_mut(),
1143 DispatcherMessageBatch::Watermark(watermark),
1144 )
1145 .await?;
1146 }
1147 Ok(())
1148 }
1149
1150 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1151 self.outputs.extend(Self::into_pairs(outputs));
1152 }
1153
1154 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1155 self.outputs
1156 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1157 .count();
1158 }
1159
1160 fn dispatcher_id(&self) -> DispatcherId {
1161 self.dispatcher_id
1162 }
1163
1164 fn is_empty(&self) -> bool {
1165 self.outputs.is_empty()
1166 }
1167}
1168
1169#[derive(Debug)]
1171pub struct SimpleDispatcher {
1172 output: SmallVec<[Output; 2]>,
1186 output_mapping: DispatchOutputMapping,
1187 dispatcher_id: DispatcherId,
1188}
1189
1190impl SimpleDispatcher {
1191 pub fn new(
1192 output: Output,
1193 output_mapping: DispatchOutputMapping,
1194 dispatcher_id: DispatcherId,
1195 ) -> Self {
1196 Self {
1197 output: smallvec![output],
1198 output_mapping,
1199 dispatcher_id,
1200 }
1201 }
1202}
1203
1204impl Dispatcher for SimpleDispatcher {
1205 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1206 self.output.extend(outputs);
1207 assert!(self.output.len() <= 2);
1208 }
1209
1210 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1211 for output in &mut self.output {
1213 output
1214 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1215 .await?;
1216 }
1217 Ok(())
1218 }
1219
1220 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1221 let output = self
1222 .output
1223 .iter_mut()
1224 .exactly_one()
1225 .expect("expect exactly one output");
1226
1227 let chunk = self.output_mapping.apply(chunk);
1228 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1229 }
1230
1231 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1232 let output = self
1233 .output
1234 .iter_mut()
1235 .exactly_one()
1236 .expect("expect exactly one output");
1237
1238 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1239 output
1240 .send(DispatcherMessageBatch::Watermark(watermark))
1241 .await?;
1242 }
1243 Ok(())
1244 }
1245
1246 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1247 self.output
1248 .retain(|output| !actor_ids.contains(&output.actor_id()));
1249 }
1250
1251 fn dispatcher_id(&self) -> DispatcherId {
1252 self.dispatcher_id
1253 }
1254
1255 fn is_empty(&self) -> bool {
1256 self.output.is_empty()
1257 }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262 use std::hash::{BuildHasher, Hasher};
1263
1264 use futures::pin_mut;
1265 use multimap::MultiMap;
1266 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1267 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1268 use risingwave_common::catalog::{Field, Schema};
1269 use risingwave_common::types::DataType;
1270 use risingwave_common::util::epoch::test_epoch;
1271 use risingwave_common::util::hash_util::Crc32FastBuilder;
1272 use risingwave_expr::expr::{InputRefExpression, NonStrictExpression};
1273 use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1274 use tokio::sync::mpsc::unbounded_channel;
1275
1276 use super::*;
1277 use crate::executor::exchange::output::Output;
1278 use crate::executor::exchange::permit::channel_for_test;
1279 use crate::executor::project::ProjectExecutor;
1280 use crate::executor::receiver::ReceiverExecutor;
1281 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
1282 use crate::executor::{ActorContext, BarrierInner as Barrier, MessageInner as Message};
1283 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1284
1285 #[tokio::test]
1286 async fn test_hash_dispatcher_complex() {
1287 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1289
1290 let num_outputs = 2; let key_indices = &[0, 2];
1292 let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1293 (0..num_outputs).map(|_| channel_for_test()).collect();
1294 let outputs = output_tx_vecs
1295 .into_iter()
1296 .enumerate()
1297 .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1298 .collect::<Vec<_>>();
1299 let mut hash_mapping = (1..num_outputs + 1)
1300 .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1301 .collect_vec();
1302 hash_mapping.resize(
1303 VirtualNode::COUNT_FOR_TEST,
1304 ActorId::new(num_outputs as u32),
1305 );
1306 let mut hash_dispatcher = HashDataDispatcher::new(
1307 outputs,
1308 key_indices.to_vec(),
1309 DispatchOutputMapping::Simple(vec![0, 1, 2]),
1310 hash_mapping,
1311 0.into(),
1312 );
1313
1314 let chunk = StreamChunk::from_pretty(
1315 " I I I
1316 + 4 6 8
1317 + 5 7 9
1318 + 0 0 0
1319 - 1 1 1 D
1320 U- 2 0 2
1321 U+ 2 0 2
1322 U- 3 3 2
1323 U+ 3 3 4",
1324 );
1325 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1326
1327 assert_eq!(
1328 *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1329 StreamChunk::from_pretty(
1330 " I I I
1331 + 4 6 8
1332 + 5 7 9
1333 + 0 0 0
1334 - 1 1 1 D
1335 U- 2 0 2
1336 U+ 2 0 2
1337 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1338 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1339 )
1340 );
1341 assert_eq!(
1342 *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1343 StreamChunk::from_pretty(
1344 " I I I
1345 + 4 6 8 D
1346 + 5 7 9 D
1347 + 0 0 0 D
1348 - 1 1 1 D // Should keep original invisible mark
1349 U- 2 0 2 D // Should keep UpdateDelete
1350 U+ 2 0 2 D // Should keep UpdateInsert
1351 - 3 3 2 // Should rewrite UpdateDelete to Delete
1352 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1353 )
1354 );
1355 }
1356
1357 #[tokio::test]
1358 async fn test_configuration_change() {
1359 let _schema = Schema { fields: vec![] };
1360 let (tx, rx) = channel_for_test();
1361 let actor_id = 233.into();
1362 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1363
1364 let (untouched, old, new) = (234.into(), 235.into(), 238.into()); let (old_simple, new_simple) = (114.into(), 514.into()); let broadcast_dispatcher_id = 666.into();
1370 let broadcast_dispatcher = PbDispatcher {
1371 r#type: DispatcherType::Broadcast as _,
1372 dispatcher_id: broadcast_dispatcher_id,
1373 downstream_actor_id: vec![untouched, old],
1374 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1376 };
1377
1378 let simple_dispatcher_id = 888.into();
1379 let simple_dispatcher = PbDispatcher {
1380 r#type: DispatcherType::Simple as _,
1381 dispatcher_id: simple_dispatcher_id,
1382 downstream_actor_id: vec![old_simple],
1383 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1385 };
1386
1387 let dispatcher_updates = maplit::hashmap! {
1388 actor_id => vec![PbDispatcherUpdate {
1389 actor_id,
1390 dispatcher_id: broadcast_dispatcher_id,
1391 added_downstream_actor_id: vec![new],
1392 removed_downstream_actor_id: vec![old],
1393 hash_mapping: Default::default(),
1394 }]
1395 };
1396 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1397 UpdateMutation {
1398 dispatchers: dispatcher_updates,
1399 ..Default::default()
1400 },
1401 ));
1402 barrier_test_env.inject_barrier(&b1, [actor_id]);
1403 barrier_test_env.flush_all_events().await;
1404
1405 let input = Executor::new(
1406 Default::default(),
1407 ReceiverExecutor::for_test(
1408 actor_id,
1409 rx,
1410 barrier_test_env.local_barrier_manager.clone(),
1411 )
1412 .boxed(),
1413 );
1414
1415 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1416 let mut rxs = [untouched, old, new, old_simple, new_simple]
1417 .into_iter()
1418 .map(|id| {
1419 (id, {
1420 let (tx, rx) = channel_for_test();
1421 new_output_request_tx
1422 .send((id, NewOutputRequest::Local(tx)))
1423 .unwrap();
1424 rx
1425 })
1426 })
1427 .collect::<HashMap<_, _>>();
1428 let executor = Box::new(
1429 DispatchExecutor::new(
1430 input,
1431 new_output_request_rx,
1432 vec![broadcast_dispatcher, simple_dispatcher],
1433 &ActorContext::for_test(actor_id),
1434 )
1435 .await
1436 .unwrap(),
1437 )
1438 .execute();
1439
1440 pin_mut!(executor);
1441
1442 macro_rules! try_recv {
1443 ($down_id:expr) => {
1444 rxs.get_mut(&$down_id).unwrap().try_recv()
1445 };
1446 }
1447
1448 tx.send(Message::Chunk(StreamChunk::default()).into())
1450 .await
1451 .unwrap();
1452
1453 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1454 .await
1455 .unwrap();
1456 executor.next().await.unwrap().unwrap();
1457
1458 try_recv!(untouched).unwrap().as_chunk().unwrap();
1460 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1461
1462 try_recv!(old).unwrap().as_chunk().unwrap();
1463 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1468 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1472 barrier_test_env.inject_barrier(&b2, [actor_id]);
1473 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1474 .await
1475 .unwrap();
1476 executor.next().await.unwrap().unwrap();
1477
1478 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1480 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1482
1483 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1488 .await
1489 .unwrap();
1490
1491 let dispatcher_updates = maplit::hashmap! {
1493 actor_id => vec![PbDispatcherUpdate {
1494 actor_id,
1495 dispatcher_id: simple_dispatcher_id,
1496 added_downstream_actor_id: vec![new_simple],
1497 removed_downstream_actor_id: vec![old_simple],
1498 hash_mapping: Default::default(),
1499 }]
1500 };
1501 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1502 UpdateMutation {
1503 dispatchers: dispatcher_updates,
1504 ..Default::default()
1505 },
1506 ));
1507 barrier_test_env.inject_barrier(&b3, [actor_id]);
1508 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1509 .await
1510 .unwrap();
1511 executor.next().await.unwrap().unwrap();
1512
1513 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1515 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1521 barrier_test_env.inject_barrier(&b4, [actor_id]);
1522 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1523 .await
1524 .unwrap();
1525 executor.next().await.unwrap().unwrap();
1526
1527 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1530 }
1531
1532 #[tokio::test]
1533 async fn test_hash_dispatcher() {
1534 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1536
1537 let num_outputs = 5; let cardinality = 10;
1539 let dimension = 4;
1540 let key_indices = &[0, 2];
1541 let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1542 (0..num_outputs).map(|_| channel_for_test()).collect();
1543 let outputs = output_tx_vecs
1544 .into_iter()
1545 .enumerate()
1546 .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1547 .collect::<Vec<_>>();
1548 let mut hash_mapping = (1..num_outputs + 1)
1549 .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1550 .collect_vec();
1551 hash_mapping.resize(
1552 VirtualNode::COUNT_FOR_TEST,
1553 ActorId::new(num_outputs as u32),
1554 );
1555 let mut hash_dispatcher = HashDataDispatcher::new(
1556 outputs,
1557 key_indices.to_vec(),
1558 DispatchOutputMapping::Simple((0..dimension).collect()),
1559 hash_mapping.clone(),
1560 0.into(),
1561 );
1562
1563 let mut ops = Vec::new();
1564 for idx in 0..cardinality {
1565 if idx % 2 == 0 {
1566 ops.push(Op::Insert);
1567 } else {
1568 ops.push(Op::Delete);
1569 }
1570 }
1571
1572 let mut start = 19260817i32..;
1573 let mut builders = (0..dimension)
1574 .map(|_| I32ArrayBuilder::new(cardinality))
1575 .collect_vec();
1576 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1577 let mut output_ops = vec![vec![]; num_outputs];
1578 for op in &ops {
1579 let hash_builder = Crc32FastBuilder;
1580 let mut hasher = hash_builder.build_hasher();
1581 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1582 for key_idx in key_indices {
1583 let val = one_row[*key_idx];
1584 let bytes = val.to_le_bytes();
1585 hasher.update(&bytes);
1586 }
1587 let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1588 .as_raw_id() as usize
1589 - 1;
1590 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1591 builder.append(Some(*val));
1592 }
1593 output_cols[output_idx]
1594 .iter_mut()
1595 .zip_eq_fast(one_row.iter())
1596 .for_each(|(each_column, val)| each_column.push(*val));
1597 output_ops[output_idx].push(op);
1598 }
1599
1600 let columns = builders
1601 .into_iter()
1602 .map(|builder| {
1603 let array = builder.finish();
1604 array.into_ref()
1605 })
1606 .collect();
1607
1608 let chunk = StreamChunk::new(ops, columns);
1609 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1610
1611 for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1612 let mut output = vec![];
1613 while let Some(Some(msg)) = rx.recv().now_or_never() {
1614 output.push(msg);
1615 }
1616 assert!(output.len() <= 1);
1618 if output.is_empty() {
1619 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1620 } else {
1621 let message = output.first().unwrap();
1622 let real_chunk = match message {
1623 DispatcherMessageBatch::Chunk(chunk) => chunk,
1624 _ => panic!(),
1625 };
1626 real_chunk
1627 .columns()
1628 .iter()
1629 .zip_eq_fast(output_cols[output_idx].iter())
1630 .for_each(|(real_col, expect_col)| {
1631 let real_vals = real_chunk
1632 .visibility()
1633 .iter_ones()
1634 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1635 .collect::<Vec<_>>();
1636 assert_eq!(real_vals.len(), expect_col.len());
1637 assert_eq!(real_vals, *expect_col);
1638 });
1639 }
1640 }
1641 }
1642
1643 #[tokio::test]
1644 async fn test_hash_dispatcher_non_adjacent_update_after_project() {
1645 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1647
1648 let schema = Schema {
1649 fields: vec![
1650 Field::unnamed(DataType::Int64),
1651 Field::unnamed(DataType::Int64),
1652 ],
1653 };
1654 let (mut tx, source) = MockSource::channel();
1655 let source = source.into_executor(schema, vec![0]);
1656
1657 let proj = ProjectExecutor::new(
1658 ActorContext::for_test(123),
1659 source,
1660 vec![
1661 NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 0)),
1662 NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
1663 ],
1664 MultiMap::new(),
1665 vec![],
1666 true,
1667 );
1668 let mut proj = proj.boxed().execute();
1669
1670 tx.push_barrier(test_epoch(1), false);
1671 proj.expect_barrier().await;
1672
1673 tx.push_chunk(StreamChunk::from_pretty(
1674 " I I
1675 U- 1 1
1676 U+ 1 2
1677 U- 1 2
1678 U+ 1 3",
1679 ));
1680
1681 let projected = proj.expect_chunk().await;
1682 assert_eq!(
1683 projected,
1684 StreamChunk::from_pretty(
1685 " I I
1686 - 1 1
1687 U+ 1 2 D
1688 U- 1 2 D
1689 + 1 3",
1690 )
1691 );
1692
1693 let (output_tx, _output_rx) = channel_for_test();
1694 let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1695 let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1696 let mut hash_dispatcher = HashDataDispatcher::new(
1697 outputs,
1698 vec![0],
1699 DispatchOutputMapping::Simple(vec![0]),
1700 hash_mapping,
1701 0.into(),
1702 );
1703
1704 hash_dispatcher.dispatch_data(projected).await.unwrap();
1705 }
1706
1707 #[tokio::test]
1708 async fn test_hash_dispatcher_missing_update_delete_after_project() {
1709 let schema = Schema {
1710 fields: vec![
1711 Field::unnamed(DataType::Int64),
1712 Field::unnamed(DataType::Int64),
1713 ],
1714 };
1715 let (mut tx, source) = MockSource::channel();
1716 let source = source.into_executor(schema, vec![0]);
1717
1718 let proj = ProjectExecutor::new(
1719 ActorContext::for_test(123),
1720 source,
1721 vec![NonStrictExpression::for_test(InputRefExpression::new(
1722 DataType::Int64,
1723 0,
1724 ))],
1725 MultiMap::new(),
1726 vec![],
1727 true,
1728 );
1729 let mut proj = proj.boxed().execute();
1730
1731 tx.push_barrier(test_epoch(1), false);
1732 proj.expect_barrier().await;
1733
1734 tx.push_chunk(StreamChunk::from_pretty(
1735 " I I
1736 + 1 10
1737 U- 1 10
1738 U+ 1 30",
1739 ));
1740
1741 let projected = proj.expect_chunk().await;
1742 assert_eq!(
1743 projected,
1744 StreamChunk::from_pretty(
1745 " I
1746 + 1 D
1747 U- 1 D
1748 + 1",
1749 )
1750 );
1751
1752 let (output_tx, _output_rx) = channel_for_test();
1753 let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1754 let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1755 let mut hash_dispatcher = HashDataDispatcher::new(
1756 outputs,
1757 vec![0],
1758 DispatchOutputMapping::Simple(vec![0]),
1759 hash_mapping,
1760 0.into(),
1761 );
1762
1763 hash_dispatcher.dispatch_data(projected).await.unwrap();
1764 }
1765
1766 #[tokio::test]
1767 async fn test_hash_dispatcher_internal_projection_normalize_single_u_plus() {
1768 let (output_tx, mut output_rx) = channel_for_test();
1769 let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1770 let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1771 let mut hash_dispatcher = HashDataDispatcher::new(
1772 outputs,
1773 vec![0],
1774 DispatchOutputMapping::Simple(vec![0]),
1775 hash_mapping,
1776 0.into(),
1777 );
1778
1779 let input = StreamChunk::from_pretty(
1780 " I I
1781 + 1 10
1782 U- 1 10
1783 U+ 1 30",
1784 );
1785
1786 hash_dispatcher.dispatch_data(input).await.unwrap();
1787
1788 let output = output_rx.recv().await.unwrap();
1789 assert_eq!(
1790 *output.as_chunk().unwrap(),
1791 StreamChunk::from_pretty(
1792 " I
1793 + 1 D
1794 U- 1 D
1795 + 1",
1796 )
1797 );
1798 }
1799}