1use std::collections::{HashMap, HashSet};
16use std::iter::repeat_with;
17use std::ops::{Deref, DerefMut};
18use std::time::Duration;
19
20use anyhow::anyhow;
21use futures::FutureExt;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::BitmapBuilder;
25use risingwave_common::config::StreamingConfig;
26use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
27use risingwave_common::metrics::LabelGuardedIntCounter;
28use risingwave_common::row::RowExt;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
31use risingwave_pb::stream_plan::{self, PbDispatcher};
32use smallvec::{SmallVec, smallvec};
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::task::consume_budget;
36use tokio::time::Instant;
37use tokio_stream::StreamExt;
38use tokio_stream::adapters::Peekable;
39use tracing::{Instrument, event};
40
41use super::exchange::output::Output;
42use super::{
43 AddMutation, DispatcherBarriers, DispatcherMessageBatch, MessageBatch, TroublemakerExecutor,
44 UpdateMutation,
45};
46use crate::executor::prelude::*;
47use crate::executor::{StopMutation, StreamConsumer};
48use crate::task::{DispatcherId, NewOutputRequest};
49
50mod dispatch_sync_log_store;
51mod output_mapping;
52
53pub use dispatch_sync_log_store::SyncLogStoreDispatchExecutor;
54pub use output_mapping::DispatchOutputMapping;
55use risingwave_common::id::FragmentId;
56
57use crate::error::StreamError;
58
59pub struct DispatchExecutor {
63 input: Executor,
64 inner: DispatchExecutorInner,
65}
66
67struct DispatcherWithMetrics {
68 dispatcher: DispatcherImpl,
69 pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter,
70}
71
72impl Debug for DispatcherWithMetrics {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 self.dispatcher.fmt(f)
75 }
76}
77
78impl Deref for DispatcherWithMetrics {
79 type Target = DispatcherImpl;
80
81 fn deref(&self) -> &Self::Target {
82 &self.dispatcher
83 }
84}
85
86impl DerefMut for DispatcherWithMetrics {
87 fn deref_mut(&mut self) -> &mut Self::Target {
88 &mut self.dispatcher
89 }
90}
91
92struct DispatchExecutorMetrics {
93 actor_id_str: String,
94 fragment_id_str: String,
95 metrics: Arc<StreamingMetrics>,
96 actor_out_record_cnt: LabelGuardedIntCounter,
97}
98
99impl DispatchExecutorMetrics {
100 fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
101 DispatcherWithMetrics {
102 actor_output_buffer_blocking_duration_ns: self
103 .metrics
104 .actor_output_buffer_blocking_duration_ns
105 .with_guarded_label_values(&[
106 self.actor_id_str.as_str(),
107 self.fragment_id_str.as_str(),
108 dispatcher.dispatcher_id().to_string().as_str(),
109 ]),
110 dispatcher,
111 }
112 }
113}
114
115pub struct DispatchExecutorInner {
116 dispatchers: Vec<DispatcherWithMetrics>,
117 actor_id: ActorId,
118 actor_config: Arc<StreamingConfig>,
119 metrics: DispatchExecutorMetrics,
120 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
121 pending_new_output_requests: HashMap<ActorId, NewOutputRequest>,
122}
123
124impl DispatchExecutorInner {
125 async fn collect_outputs(
126 &mut self,
127 downstream_actors: &[ActorId],
128 ) -> StreamResult<Vec<Output>> {
129 fn resolve_output(downstream_actor: ActorId, request: NewOutputRequest) -> Output {
130 let tx = match request {
131 NewOutputRequest::Local(tx) | NewOutputRequest::Remote(tx) => tx,
132 };
133 Output::new(downstream_actor, tx)
134 }
135 let mut outputs = Vec::with_capacity(downstream_actors.len());
136 for &downstream_actor in downstream_actors {
137 let output =
138 if let Some(request) = self.pending_new_output_requests.remove(&downstream_actor) {
139 resolve_output(downstream_actor, request)
140 } else {
141 loop {
142 let (requested_actor, request) = self
143 .new_output_request_rx
144 .recv()
145 .await
146 .ok_or_else(|| anyhow!("end of new output request"))?;
147 if requested_actor == downstream_actor {
148 break resolve_output(requested_actor, request);
149 } else {
150 assert!(
151 self.pending_new_output_requests
152 .insert(requested_actor, request)
153 .is_none(),
154 "duplicated inflight new output requests from actor {}",
155 requested_actor
156 );
157 }
158 }
159 };
160 outputs.push(output);
161 }
162 Ok(outputs)
163 }
164
165 async fn dispatch(&mut self, msg: MessageBatch) -> StreamResult<()> {
166 if self.dispatchers.is_empty() {
167 consume_budget().await;
172 }
173 async fn await_with_metrics(
174 future: impl Future<Output = ()>,
175 metrics: &LabelGuardedIntCounter,
176 ) {
177 let mut start_time = Instant::now();
178 let interval_duration = Duration::from_secs(15);
179 let mut interval =
180 tokio::time::interval_at(start_time + interval_duration, interval_duration);
181
182 let mut fut = pin!(future);
183
184 loop {
185 tokio::select! {
186 biased;
187 _res = &mut fut => {
188 let ns = start_time.elapsed().as_nanos() as u64;
189 metrics.inc_by(ns);
190 break;
191 }
192 _ = interval.tick() => {
193 start_time = Instant::now();
194 metrics.inc_by(interval_duration.as_nanos() as u64);
195 }
196 };
197 }
198 }
199
200 use futures::StreamExt;
201
202 let limit = self.actor_config.developer.exchange_concurrent_dispatchers;
203 match msg {
205 MessageBatch::BarrierBatch(barrier_batch) => {
206 if barrier_batch.is_empty() {
207 return Ok(());
208 }
209 let mutation = barrier_batch[0].mutation.clone();
211 self.pre_mutate_dispatchers(&mutation).await?;
212 futures::stream::iter(self.dispatchers.iter_mut())
213 .for_each_concurrent(limit, |dispatcher| async {
214 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
215 let dispatcher_output = &mut dispatcher.dispatcher;
216 let fut = dispatcher_output.dispatch_barriers(
217 barrier_batch
218 .iter()
219 .cloned()
220 .map(|b| b.into_dispatcher())
221 .collect(),
222 );
223 await_with_metrics(fut, metrics).await;
224 })
225 .await;
226 self.post_mutate_dispatchers(&mutation)?;
227 }
228 MessageBatch::Watermark(watermark) => {
229 futures::stream::iter(self.dispatchers.iter_mut())
230 .for_each_concurrent(limit, |dispatcher| async {
231 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
232 let dispatcher_output = &mut dispatcher.dispatcher;
233 let fut = dispatcher_output.dispatch_watermark(watermark.clone());
234 await_with_metrics(fut, metrics).await;
235 })
236 .await;
237 }
238 MessageBatch::Chunk(chunk) => {
239 futures::stream::iter(self.dispatchers.iter_mut())
240 .for_each_concurrent(limit, |dispatcher| async {
241 let metrics = &dispatcher.actor_output_buffer_blocking_duration_ns;
242 let dispatcher_output = &mut dispatcher.dispatcher;
243 let fut = dispatcher_output.dispatch_data(chunk.clone());
244 await_with_metrics(fut, metrics).await;
245 })
246 .await;
247
248 self.metrics
249 .actor_out_record_cnt
250 .inc_by(chunk.cardinality() as _);
251 }
252 }
253 self.dispatchers.retain(|dispatcher| match &**dispatcher {
254 DispatcherImpl::Failed(dispatcher_id, e) => {
255 warn!(%dispatcher_id, err = %e.as_report(), actor_id = %self.actor_id, "dispatch fail");
256 false
257 }
258 _ => true,
259 });
260 Ok(())
261 }
262
263 async fn add_dispatchers<'a>(
265 &mut self,
266 new_dispatchers: impl IntoIterator<Item = &'a PbDispatcher>,
267 ) -> StreamResult<()> {
268 for dispatcher in new_dispatchers {
269 let outputs = self
270 .collect_outputs(&dispatcher.downstream_actor_id)
271 .await?;
272 let dispatcher = DispatcherImpl::new(outputs, dispatcher)?;
273 let dispatcher = self.metrics.monitor_dispatcher(dispatcher);
274 self.dispatchers.push(dispatcher);
275 }
276
277 assert!(
278 self.dispatchers
279 .iter()
280 .map(|d| d.dispatcher_id())
281 .all_unique(),
282 "dispatcher ids must be unique: {:?}",
283 self.dispatchers
284 );
285
286 Ok(())
287 }
288
289 fn find_dispatcher(&mut self, dispatcher_id: DispatcherId) -> Option<&mut DispatcherImpl> {
290 self.dispatchers
291 .iter_mut()
292 .find(|d| d.dispatcher_id() == dispatcher_id)
293 .map(DerefMut::deref_mut)
294 }
295
296 async fn pre_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
299 let outputs = self
300 .collect_outputs(&update.added_downstream_actor_id)
301 .await?;
302
303 let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
304 warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
305 return Ok(());
306 };
307 dispatcher.add_outputs(outputs);
308
309 Ok(())
310 }
311
312 fn post_update_dispatcher(&mut self, update: &PbDispatcherUpdate) -> StreamResult<()> {
315 let ids = update.removed_downstream_actor_id.iter().copied().collect();
316
317 let Some(dispatcher) = self.find_dispatcher(update.dispatcher_id) else {
318 warn!(dispatcher_id = %update.dispatcher_id, added = ?update.added_downstream_actor_id, removed = ?update.removed_downstream_actor_id, actor_id = %self.actor_id, "ignore dispatcher update");
319 return Ok(());
320 };
321 dispatcher.remove_outputs(&ids);
322
323 if let DispatcherImpl::Hash(dispatcher) = dispatcher {
330 dispatcher.hash_mapping =
331 ActorMapping::from_protobuf(update.get_hash_mapping()?).to_expanded();
332 }
333
334 Ok(())
335 }
336
337 async fn pre_mutate_dispatchers(
339 &mut self,
340 mutation: &Option<Arc<Mutation>>,
341 ) -> StreamResult<()> {
342 let Some(mutation) = mutation.as_deref() else {
343 return Ok(());
344 };
345
346 match mutation {
347 Mutation::Add(AddMutation { adds, .. }) => {
348 if let Some(new_dispatchers) = adds.get(&self.actor_id) {
349 self.add_dispatchers(new_dispatchers).await?;
350 }
351 }
352 Mutation::Update(UpdateMutation {
353 dispatchers,
354 actor_new_dispatchers: actor_dispatchers,
355 ..
356 }) => {
357 if let Some(new_dispatchers) = actor_dispatchers.get(&self.actor_id) {
358 self.add_dispatchers(new_dispatchers).await?;
359 }
360
361 if let Some(updates) = dispatchers.get(&self.actor_id) {
362 for update in updates {
363 self.pre_update_dispatcher(update).await?;
364 }
365 }
366 }
367 _ => {}
368 }
369
370 Ok(())
371 }
372
373 fn post_mutate_dispatchers(&mut self, mutation: &Option<Arc<Mutation>>) -> StreamResult<()> {
375 let Some(mutation) = mutation.as_deref() else {
376 return Ok(());
377 };
378
379 match mutation {
380 Mutation::Stop(StopMutation { dropped_actors, .. }) => {
381 if !dropped_actors.contains(&self.actor_id) {
383 for dispatcher in &mut self.dispatchers {
384 dispatcher.remove_outputs(dropped_actors);
385 }
386 }
387 }
388 Mutation::Update(UpdateMutation {
389 dispatchers,
390 dropped_actors,
391 ..
392 }) => {
393 if let Some(updates) = dispatchers.get(&self.actor_id) {
394 for update in updates {
395 self.post_update_dispatcher(update)?;
396 }
397 }
398
399 if !dropped_actors.contains(&self.actor_id) {
400 for dispatcher in &mut self.dispatchers {
401 dispatcher.remove_outputs(dropped_actors);
402 }
403 }
404 }
405 _ => {}
406 };
407
408 self.dispatchers.retain(|d| !d.is_empty());
411
412 Ok(())
413 }
414}
415
416impl DispatchExecutor {
417 pub(crate) async fn new(
418 input: Executor,
419 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
420 dispatchers: Vec<stream_plan::Dispatcher>,
421 actor_context: &ActorContextRef,
422 ) -> StreamResult<Self> {
423 let mut executor = Self::new_inner(
424 input,
425 new_output_request_rx,
426 vec![],
427 actor_context.id,
428 actor_context.fragment_id,
429 actor_context.config.clone(),
430 actor_context.streaming_metrics.clone(),
431 );
432 let inner = &mut executor.inner;
433 for dispatcher in dispatchers {
434 let outputs = inner
435 .collect_outputs(&dispatcher.downstream_actor_id)
436 .await?;
437 let dispatcher = DispatcherImpl::new(outputs, &dispatcher)?;
438 let dispatcher = inner.metrics.monitor_dispatcher(dispatcher);
439 inner.dispatchers.push(dispatcher);
440 }
441 Ok(executor)
442 }
443
444 #[cfg(test)]
445 pub(crate) fn for_test(
446 input: Executor,
447 dispatchers: Vec<DispatcherImpl>,
448 actor_id: ActorId,
449 fragment_id: FragmentId,
450 actor_config: Arc<StreamingConfig>,
451 metrics: Arc<StreamingMetrics>,
452 ) -> (
453 Self,
454 tokio::sync::mpsc::UnboundedSender<(ActorId, NewOutputRequest)>,
455 ) {
456 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
457
458 (
459 Self::new_inner(
460 input,
461 rx,
462 dispatchers,
463 actor_id,
464 fragment_id,
465 actor_config,
466 metrics,
467 ),
468 tx,
469 )
470 }
471
472 fn new_inner(
473 mut input: Executor,
474 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
475 dispatchers: Vec<DispatcherImpl>,
476 actor_id: ActorId,
477 fragment_id: FragmentId,
478 actor_config: Arc<StreamingConfig>,
479 metrics: Arc<StreamingMetrics>,
480 ) -> Self {
481 if crate::consistency::insane() {
482 let mut info = input.info().clone();
484 info.identity = format!("{} (embedded trouble)", info.identity);
485 let troublemaker = TroublemakerExecutor::new(input, actor_config.developer.chunk_size);
486 input = (info, troublemaker).into();
487 }
488
489 let actor_id_str = actor_id.to_string();
490 let fragment_id_str = fragment_id.to_string();
491 let actor_out_record_cnt = metrics
492 .actor_out_record_cnt
493 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
494 let metrics = DispatchExecutorMetrics {
495 actor_id_str,
496 fragment_id_str,
497 metrics,
498 actor_out_record_cnt,
499 };
500 let dispatchers = dispatchers
501 .into_iter()
502 .map(|dispatcher| metrics.monitor_dispatcher(dispatcher))
503 .collect();
504
505 Self {
506 input,
507 inner: DispatchExecutorInner {
508 dispatchers,
509 actor_id,
510 actor_config,
511 metrics,
512 new_output_request_rx,
513 pending_new_output_requests: Default::default(),
514 },
515 }
516 }
517}
518
519async fn dispatch_message_batch(
522 inner: &mut DispatchExecutorInner,
523 batch: MessageBatch,
524) -> StreamResult<Option<Vec<Barrier>>> {
525 match batch {
526 MessageBatch::Chunk(chunk) => {
527 inner
528 .dispatch(MessageBatch::Chunk(chunk))
529 .instrument(tracing::info_span!("dispatch_chunk"))
530 .instrument_await("dispatch_chunk")
531 .await?;
532 Ok(None)
533 }
534 MessageBatch::BarrierBatch(barrier_batch) => {
535 assert!(!barrier_batch.is_empty());
536 let yielded_barriers = barrier_batch.clone();
537 inner
538 .dispatch(MessageBatch::BarrierBatch(barrier_batch))
539 .instrument(tracing::info_span!("dispatch_barrier_batch"))
540 .instrument_await("dispatch_barrier_batch")
541 .await?;
542 inner
543 .metrics
544 .metrics
545 .barrier_batch_size
546 .observe(yielded_barriers.len() as f64);
547 Ok(Some(yielded_barriers))
548 }
549 MessageBatch::Watermark(watermark) => {
550 inner
551 .dispatch(MessageBatch::Watermark(watermark))
552 .instrument(tracing::info_span!("dispatch_watermark"))
553 .instrument_await("dispatch_watermark")
554 .await?;
555 Ok(None)
556 }
557 }
558}
559
560impl StreamConsumer for DispatchExecutor {
561 type BarrierStream = impl Stream<Item = StreamResult<Barrier>> + Send;
562
563 fn execute(mut self: Box<Self>) -> Self::BarrierStream {
564 let max_barrier_count_per_batch = self.inner.actor_config.developer.max_barrier_batch_size;
565 #[try_stream]
566 async move {
567 let mut input = self.input.execute().peekable();
568 loop {
569 let Some(message) =
570 try_batch_barriers(max_barrier_count_per_batch, &mut input).await?
571 else {
572 break;
574 };
575 if let Some(barrier_batch) =
576 dispatch_message_batch(&mut self.inner, message).await?
577 {
578 for barrier in barrier_batch {
579 yield barrier;
580 }
581 }
582 }
583 }
584 }
585}
586
587async fn try_batch_barriers(
593 max_barrier_count_per_batch: u32,
594 input: &mut Peekable<BoxedMessageStream>,
595) -> StreamResult<Option<MessageBatch>> {
596 let Some(msg) = input.next().await else {
597 return Ok(None);
599 };
600 let mut barrier_batch = vec![];
601 let msg: Message = msg?;
602 let max_peek_attempts = match msg {
603 Message::Chunk(c) => {
604 return Ok(Some(MessageBatch::Chunk(c)));
605 }
606 Message::Watermark(w) => {
607 return Ok(Some(MessageBatch::Watermark(w)));
608 }
609 Message::Barrier(b) => {
610 let peek_more_barrier = b.mutation.is_none();
611 barrier_batch.push(b);
612 if peek_more_barrier {
613 max_barrier_count_per_batch.saturating_sub(1)
614 } else {
615 0
616 }
617 }
618 };
619 for _ in 0..max_peek_attempts {
621 let peek = input.peek().now_or_never();
622 let Some(peek) = peek else {
623 break;
624 };
625 let Some(msg) = peek else {
626 break;
628 };
629 let Ok(Message::Barrier(barrier)) = msg else {
630 break;
631 };
632 if barrier.mutation.is_some() {
633 break;
634 }
635 let msg: Message = input.next().now_or_never().unwrap().unwrap()?;
636 let Message::Barrier(ref barrier) = msg else {
637 unreachable!("must be a barrier");
638 };
639 barrier_batch.push(barrier.clone());
640 }
641 Ok(Some(MessageBatch::BarrierBatch(barrier_batch)))
642}
643
644#[derive(Debug)]
645pub(crate) enum DispatcherImpl {
646 Hash(HashDataDispatcher),
647 Broadcast(BroadcastDispatcher),
648 Simple(SimpleDispatcher),
649 #[cfg_attr(not(test), expect(dead_code))]
650 RoundRobin(RoundRobinDataDispatcher),
651 Failed(DispatcherId, StreamError),
654}
655
656impl DispatcherImpl {
657 pub fn new(outputs: Vec<Output>, dispatcher: &PbDispatcher) -> StreamResult<Self> {
658 let output_mapping =
659 DispatchOutputMapping::from_protobuf(dispatcher.output_mapping.clone().unwrap());
660
661 use risingwave_pb::stream_plan::DispatcherType::*;
662 let dispatcher_impl = match dispatcher.get_type()? {
663 Hash => {
664 assert!(!outputs.is_empty());
665 let dist_key_indices = dispatcher
666 .dist_key_indices
667 .iter()
668 .map(|i| *i as usize)
669 .collect();
670
671 let hash_mapping =
672 ActorMapping::from_protobuf(dispatcher.get_hash_mapping()?).to_expanded();
673
674 DispatcherImpl::Hash(HashDataDispatcher::new(
675 outputs,
676 dist_key_indices,
677 output_mapping,
678 hash_mapping,
679 dispatcher.dispatcher_id,
680 ))
681 }
682 Broadcast => DispatcherImpl::Broadcast(BroadcastDispatcher::new(
683 outputs,
684 output_mapping,
685 dispatcher.dispatcher_id,
686 )),
687 Simple | NoShuffle => {
688 let [output]: [_; 1] = outputs.try_into().unwrap();
689 DispatcherImpl::Simple(SimpleDispatcher::new(
690 output,
691 output_mapping,
692 dispatcher.dispatcher_id,
693 ))
694 }
695 Unspecified => unreachable!(),
696 };
697
698 Ok(dispatcher_impl)
699 }
700}
701
702macro_rules! impl_dispatcher {
703 ($( { $variant_name:ident } ),*) => {
704 impl DispatcherImpl {
705 pub async fn dispatch_data(&mut self, chunk: StreamChunk) {
706 if let Err(e) = match self {
707 $( Self::$variant_name(inner) => inner.dispatch_data(chunk).await, )*
708 Self::Failed(..) => unreachable!(),
709 } {
710 *self = Self::Failed(self.dispatcher_id(), e);
711 }
712 }
713
714 pub async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) {
715 if let Err(e) = match self {
716 $( Self::$variant_name(inner) => inner.dispatch_barriers(barriers).await, )*
717 Self::Failed(..) => unreachable!(),
718 } {
719 *self = Self::Failed(self.dispatcher_id(), e);
720 }
721 }
722
723 pub async fn dispatch_watermark(&mut self, watermark: Watermark) {
724 if let Err(e) = match self {
725 $( Self::$variant_name(inner) => inner.dispatch_watermark(watermark).await, )*
726 Self::Failed(..) => unreachable!(),
727 } {
728 *self = Self::Failed(self.dispatcher_id(), e);
729 }
730 }
731
732 pub fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
733 match self {
734 $(Self::$variant_name(inner) => inner.add_outputs(outputs), )*
735 Self::Failed(..) => {},
736 }
737 }
738
739 pub fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
740 match self {
741 $(Self::$variant_name(inner) => inner.remove_outputs(actor_ids), )*
742 Self::Failed(..) => {},
743 }
744 }
745
746 pub fn dispatcher_id(&self) -> DispatcherId {
747 match self {
748 $(Self::$variant_name(inner) => inner.dispatcher_id(), )*
749 Self::Failed(dispatcher_id, ..) => *dispatcher_id,
750 }
751 }
752
753 pub fn is_empty(&self) -> bool {
754 match self {
755 $(Self::$variant_name(inner) => inner.is_empty(), )*
756 Self::Failed(..) => true,
757 }
758 }
759 }
760 }
761}
762
763macro_rules! for_all_dispatcher_variants {
764 ($macro:ident) => {
765 $macro! {
766 { Hash },
767 { Broadcast },
768 { Simple },
769 { RoundRobin }
770 }
771 };
772}
773
774for_all_dispatcher_variants! { impl_dispatcher }
775
776pub trait DispatchFuture<'a> = Future<Output = StreamResult<()>> + Send;
777
778trait Dispatcher: Debug + 'static {
779 fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
781 fn dispatch_barriers(&mut self, barrier: DispatcherBarriers) -> impl DispatchFuture<'_>;
783 fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;
785
786 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>);
788 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>);
790
791 fn dispatcher_id(&self) -> DispatcherId;
797
798 fn is_empty(&self) -> bool;
801}
802
803async fn broadcast_concurrent(
808 outputs: impl IntoIterator<Item = &'_ mut Output>,
809 message: DispatcherMessageBatch,
810) -> StreamResult<()> {
811 futures::future::try_join_all(
812 outputs
813 .into_iter()
814 .map(|output| output.send(message.clone())),
815 )
816 .await?;
817 Ok(())
818}
819
820#[derive(Debug)]
821pub struct RoundRobinDataDispatcher {
822 outputs: Vec<Output>,
823 output_mapping: DispatchOutputMapping,
824 cur: usize,
825 dispatcher_id: DispatcherId,
826}
827
828impl RoundRobinDataDispatcher {
829 #[cfg_attr(not(test), expect(dead_code))]
830 pub fn new(
831 outputs: Vec<Output>,
832 output_mapping: DispatchOutputMapping,
833 dispatcher_id: DispatcherId,
834 ) -> Self {
835 Self {
836 outputs,
837 output_mapping,
838 cur: 0,
839 dispatcher_id,
840 }
841 }
842}
843
844impl Dispatcher for RoundRobinDataDispatcher {
845 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
846 let chunk = self.output_mapping.apply(chunk);
847
848 self.outputs[self.cur]
849 .send(DispatcherMessageBatch::Chunk(chunk))
850 .await?;
851 self.cur += 1;
852 self.cur %= self.outputs.len();
853 Ok(())
854 }
855
856 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
857 broadcast_concurrent(
859 &mut self.outputs,
860 DispatcherMessageBatch::BarrierBatch(barriers),
861 )
862 .await
863 }
864
865 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
866 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
867 broadcast_concurrent(
869 &mut self.outputs,
870 DispatcherMessageBatch::Watermark(watermark),
871 )
872 .await?;
873 }
874 Ok(())
875 }
876
877 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
878 self.outputs.extend(outputs);
879 }
880
881 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
882 self.outputs
883 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
884 .count();
885 self.cur = self.cur.min(self.outputs.len() - 1);
886 }
887
888 fn dispatcher_id(&self) -> DispatcherId {
889 self.dispatcher_id
890 }
891
892 fn is_empty(&self) -> bool {
893 self.outputs.is_empty()
894 }
895}
896
897pub struct HashDataDispatcher {
898 outputs: Vec<Output>,
899 keys: Vec<usize>,
900 output_mapping: DispatchOutputMapping,
901 hash_mapping: ExpandedActorMapping,
904 dispatcher_id: DispatcherId,
905}
906
907impl Debug for HashDataDispatcher {
908 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
909 f.debug_struct("HashDataDispatcher")
910 .field("outputs", &self.outputs)
911 .field("keys", &self.keys)
912 .field("dispatcher_id", &self.dispatcher_id)
913 .finish_non_exhaustive()
914 }
915}
916
917impl HashDataDispatcher {
918 pub fn new(
919 outputs: Vec<Output>,
920 keys: Vec<usize>,
921 output_mapping: DispatchOutputMapping,
922 hash_mapping: ExpandedActorMapping,
923 dispatcher_id: DispatcherId,
924 ) -> Self {
925 Self {
926 outputs,
927 keys,
928 output_mapping,
929 hash_mapping,
930 dispatcher_id,
931 }
932 }
933}
934
935impl Dispatcher for HashDataDispatcher {
936 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
937 self.outputs.extend(outputs);
938 }
939
940 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
941 broadcast_concurrent(
943 &mut self.outputs,
944 DispatcherMessageBatch::BarrierBatch(barriers),
945 )
946 .await
947 }
948
949 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
950 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
951 broadcast_concurrent(
953 &mut self.outputs,
954 DispatcherMessageBatch::Watermark(watermark),
955 )
956 .await?;
957 }
958 Ok(())
959 }
960
961 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
962 let num_outputs = self.outputs.len();
966
967 let vnode_count = self.hash_mapping.len();
969 let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys, vnode_count);
970
971 tracing::debug!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes);
972
973 let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity()))
974 .take(num_outputs)
975 .collect_vec();
976 let mut last_update_delete_row_idx = None;
977 let mut new_ops: Vec<Op> = Vec::with_capacity(chunk.capacity());
978
979 for (row_idx, ((vnode, &op), visible)) in vnodes
980 .iter()
981 .copied()
982 .zip_eq_fast(chunk.ops())
983 .zip_eq_fast(chunk.visibility().iter())
984 .enumerate()
985 {
986 for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) {
988 vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id());
989 }
990
991 if !visible {
992 new_ops.push(op);
993 continue;
994 }
995
996 if op == Op::UpdateDelete {
1002 last_update_delete_row_idx = Some(row_idx);
1003 } else if op == Op::UpdateInsert {
1004 let delete_row_idx = last_update_delete_row_idx
1005 .take()
1006 .expect("missing U- before U+");
1007
1008 let dist_key_changed = chunk.row_at(delete_row_idx).1.project(&self.keys)
1010 != chunk.row_at(row_idx).1.project(&self.keys);
1011
1012 if dist_key_changed {
1013 new_ops.push(Op::Delete);
1014 new_ops.push(Op::Insert);
1015 } else {
1016 new_ops.push(Op::UpdateDelete);
1017 new_ops.push(Op::UpdateInsert);
1018 }
1019 } else {
1020 new_ops.push(op);
1021 }
1022 }
1023 assert!(last_update_delete_row_idx.is_none(), "missing U+ after U-");
1024
1025 let chunk = self.output_mapping.apply(chunk);
1028 let chunk_vis = chunk.visibility();
1030
1031 let chunk_ops = chunk.ops();
1038 let ops: Vec<Op> = new_ops
1039 .iter()
1040 .zip_eq_fast(chunk_ops.iter())
1041 .map(|(&new_op, &elim_op)| {
1042 match (new_op, elim_op) {
1045 (Op::Delete, Op::UpdateDelete) => Op::Delete,
1047 (Op::Insert, Op::UpdateInsert) => Op::Insert,
1049 _ => elim_op,
1051 }
1052 })
1053 .collect();
1054
1055 futures::future::try_join_all(
1057 vis_maps
1058 .into_iter()
1059 .zip_eq_fast(self.outputs.iter_mut())
1060 .map(|(vis_map, output)| async {
1061 let vis_map = vis_map.finish();
1062 let combined_vis = &vis_map & chunk_vis;
1067 let new_stream_chunk = StreamChunk::with_visibility(
1068 ops.clone(),
1069 chunk.columns().into(),
1070 combined_vis,
1071 );
1072 if new_stream_chunk.cardinality() > 0 {
1073 event!(
1074 tracing::Level::TRACE,
1075 msg = "chunk",
1076 downstream = %output.actor_id(),
1077 "send = \n{:#?}",
1078 new_stream_chunk
1079 );
1080 output
1081 .send(DispatcherMessageBatch::Chunk(new_stream_chunk))
1082 .await?;
1083 }
1084 StreamResult::Ok(())
1085 }),
1086 )
1087 .await?;
1088
1089 Ok(())
1090 }
1091
1092 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1093 self.outputs
1094 .extract_if(.., |output| actor_ids.contains(&output.actor_id()))
1095 .count();
1096 }
1097
1098 fn dispatcher_id(&self) -> DispatcherId {
1099 self.dispatcher_id
1100 }
1101
1102 fn is_empty(&self) -> bool {
1103 self.outputs.is_empty()
1104 }
1105}
1106
1107#[derive(Debug)]
1109pub struct BroadcastDispatcher {
1110 outputs: HashMap<ActorId, Output>,
1111 output_mapping: DispatchOutputMapping,
1112 dispatcher_id: DispatcherId,
1113}
1114
1115impl BroadcastDispatcher {
1116 pub fn new(
1117 outputs: impl IntoIterator<Item = Output>,
1118 output_mapping: DispatchOutputMapping,
1119 dispatcher_id: DispatcherId,
1120 ) -> Self {
1121 Self {
1122 outputs: Self::into_pairs(outputs).collect(),
1123 output_mapping,
1124 dispatcher_id,
1125 }
1126 }
1127
1128 fn into_pairs(
1129 outputs: impl IntoIterator<Item = Output>,
1130 ) -> impl Iterator<Item = (ActorId, Output)> {
1131 outputs
1132 .into_iter()
1133 .map(|output| (output.actor_id(), output))
1134 }
1135}
1136
1137impl Dispatcher for BroadcastDispatcher {
1138 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1139 let chunk = self.output_mapping.apply(chunk);
1140 broadcast_concurrent(
1141 self.outputs.values_mut(),
1142 DispatcherMessageBatch::Chunk(chunk),
1143 )
1144 .await
1145 }
1146
1147 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1148 broadcast_concurrent(
1150 self.outputs.values_mut(),
1151 DispatcherMessageBatch::BarrierBatch(barriers),
1152 )
1153 .await
1154 }
1155
1156 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1157 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1158 broadcast_concurrent(
1160 self.outputs.values_mut(),
1161 DispatcherMessageBatch::Watermark(watermark),
1162 )
1163 .await?;
1164 }
1165 Ok(())
1166 }
1167
1168 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1169 self.outputs.extend(Self::into_pairs(outputs));
1170 }
1171
1172 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1173 self.outputs
1174 .extract_if(|actor_id, _| actor_ids.contains(actor_id))
1175 .count();
1176 }
1177
1178 fn dispatcher_id(&self) -> DispatcherId {
1179 self.dispatcher_id
1180 }
1181
1182 fn is_empty(&self) -> bool {
1183 self.outputs.is_empty()
1184 }
1185}
1186
1187#[derive(Debug)]
1189pub struct SimpleDispatcher {
1190 output: SmallVec<[Output; 2]>,
1204 output_mapping: DispatchOutputMapping,
1205 dispatcher_id: DispatcherId,
1206}
1207
1208impl SimpleDispatcher {
1209 pub fn new(
1210 output: Output,
1211 output_mapping: DispatchOutputMapping,
1212 dispatcher_id: DispatcherId,
1213 ) -> Self {
1214 Self {
1215 output: smallvec![output],
1216 output_mapping,
1217 dispatcher_id,
1218 }
1219 }
1220}
1221
1222impl Dispatcher for SimpleDispatcher {
1223 fn add_outputs(&mut self, outputs: impl IntoIterator<Item = Output>) {
1224 self.output.extend(outputs);
1225 assert!(self.output.len() <= 2);
1226 }
1227
1228 async fn dispatch_barriers(&mut self, barriers: DispatcherBarriers) -> StreamResult<()> {
1229 for output in &mut self.output {
1231 output
1232 .send(DispatcherMessageBatch::BarrierBatch(barriers.clone()))
1233 .await?;
1234 }
1235 Ok(())
1236 }
1237
1238 async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> {
1239 let output = self
1240 .output
1241 .iter_mut()
1242 .exactly_one()
1243 .expect("expect exactly one output");
1244
1245 let chunk = self.output_mapping.apply(chunk);
1246 output.send(DispatcherMessageBatch::Chunk(chunk)).await
1247 }
1248
1249 async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
1250 let output = self
1251 .output
1252 .iter_mut()
1253 .exactly_one()
1254 .expect("expect exactly one output");
1255
1256 if let Some(watermark) = self.output_mapping.apply_watermark(watermark) {
1257 output
1258 .send(DispatcherMessageBatch::Watermark(watermark))
1259 .await?;
1260 }
1261 Ok(())
1262 }
1263
1264 fn remove_outputs(&mut self, actor_ids: &HashSet<ActorId>) {
1265 self.output
1266 .retain(|output| !actor_ids.contains(&output.actor_id()));
1267 }
1268
1269 fn dispatcher_id(&self) -> DispatcherId {
1270 self.dispatcher_id
1271 }
1272
1273 fn is_empty(&self) -> bool {
1274 self.output.is_empty()
1275 }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use std::hash::{BuildHasher, Hasher};
1281
1282 use futures::pin_mut;
1283 use multimap::MultiMap;
1284 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
1285 use risingwave_common::array::{Array, ArrayBuilder, I32ArrayBuilder};
1286 use risingwave_common::catalog::{Field, Schema};
1287 use risingwave_common::types::DataType;
1288 use risingwave_common::util::epoch::test_epoch;
1289 use risingwave_common::util::hash_util::Crc32FastBuilder;
1290 use risingwave_expr::expr::{InputRefExpression, NonStrictExpression};
1291 use risingwave_pb::stream_plan::{DispatcherType, PbDispatchOutputMapping};
1292 use tokio::sync::mpsc::unbounded_channel;
1293
1294 use super::*;
1295 use crate::executor::exchange::output::Output;
1296 use crate::executor::exchange::permit::channel_for_test;
1297 use crate::executor::project::ProjectExecutor;
1298 use crate::executor::receiver::ReceiverExecutor;
1299 use crate::executor::test_utils::{MockSource, StreamExecutorTestExt};
1300 use crate::executor::{ActorContext, BarrierInner as Barrier, MessageInner as Message};
1301 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
1302
1303 #[tokio::test]
1304 async fn test_hash_dispatcher_complex() {
1305 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1307
1308 let num_outputs = 2; let key_indices = &[0, 2];
1310 let (output_tx_vecs, mut output_rx_vecs): (Vec<_>, Vec<_>) =
1311 (0..num_outputs).map(|_| channel_for_test()).collect();
1312 let outputs = output_tx_vecs
1313 .into_iter()
1314 .enumerate()
1315 .map(|(actor_id, tx)| Output::new(ActorId::new(actor_id as u32 + 1), tx))
1316 .collect::<Vec<_>>();
1317 let mut hash_mapping = (1..num_outputs + 1)
1318 .flat_map(|id| vec![ActorId::new(id as u32); VirtualNode::COUNT_FOR_TEST / num_outputs])
1319 .collect_vec();
1320 hash_mapping.resize(
1321 VirtualNode::COUNT_FOR_TEST,
1322 ActorId::new(num_outputs as u32),
1323 );
1324 let mut hash_dispatcher = HashDataDispatcher::new(
1325 outputs,
1326 key_indices.to_vec(),
1327 DispatchOutputMapping::Simple(vec![0, 1, 2]),
1328 hash_mapping,
1329 0.into(),
1330 );
1331
1332 let chunk = StreamChunk::from_pretty(
1333 " I I I
1334 + 4 6 8
1335 + 5 7 9
1336 + 0 0 0
1337 - 1 1 1 D
1338 U- 2 0 2
1339 U+ 2 0 2
1340 U- 3 3 2
1341 U+ 3 3 4",
1342 );
1343 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1344
1345 assert_eq!(
1346 *output_rx_vecs[0].recv().await.unwrap().as_chunk().unwrap(),
1347 StreamChunk::from_pretty(
1348 " I I I
1349 + 4 6 8
1350 + 5 7 9
1351 + 0 0 0
1352 - 1 1 1 D
1353 U- 2 0 2
1354 U+ 2 0 2
1355 - 3 3 2 D // Should rewrite UpdateDelete to Delete
1356 + 3 3 4 // Should rewrite UpdateInsert to Insert",
1357 )
1358 );
1359 assert_eq!(
1360 *output_rx_vecs[1].recv().await.unwrap().as_chunk().unwrap(),
1361 StreamChunk::from_pretty(
1362 " I I I
1363 + 4 6 8 D
1364 + 5 7 9 D
1365 + 0 0 0 D
1366 - 1 1 1 D // Should keep original invisible mark
1367 U- 2 0 2 D // Should keep UpdateDelete
1368 U+ 2 0 2 D // Should keep UpdateInsert
1369 - 3 3 2 // Should rewrite UpdateDelete to Delete
1370 + 3 3 4 D // Should rewrite UpdateInsert to Insert",
1371 )
1372 );
1373 }
1374
1375 #[tokio::test]
1376 async fn test_configuration_change() {
1377 let _schema = Schema { fields: vec![] };
1378 let (tx, rx) = channel_for_test();
1379 let actor_id = 233.into();
1380 let barrier_test_env = LocalBarrierTestEnv::for_test().await;
1381
1382 let (untouched, old, new) = (234.into(), 235.into(), 238.into()); let (old_simple, new_simple) = (114.into(), 514.into()); let broadcast_dispatcher_id = 666.into();
1388 let broadcast_dispatcher = PbDispatcher {
1389 r#type: DispatcherType::Broadcast as _,
1390 dispatcher_id: broadcast_dispatcher_id,
1391 downstream_actor_id: vec![untouched, old],
1392 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1394 };
1395
1396 let simple_dispatcher_id = 888.into();
1397 let simple_dispatcher = PbDispatcher {
1398 r#type: DispatcherType::Simple as _,
1399 dispatcher_id: simple_dispatcher_id,
1400 downstream_actor_id: vec![old_simple],
1401 output_mapping: PbDispatchOutputMapping::identical(0).into(), ..Default::default()
1403 };
1404
1405 let dispatcher_updates = maplit::hashmap! {
1406 actor_id => vec![PbDispatcherUpdate {
1407 actor_id,
1408 dispatcher_id: broadcast_dispatcher_id,
1409 added_downstream_actor_id: vec![new],
1410 removed_downstream_actor_id: vec![old],
1411 hash_mapping: Default::default(),
1412 }]
1413 };
1414 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
1415 UpdateMutation {
1416 dispatchers: dispatcher_updates,
1417 ..Default::default()
1418 },
1419 ));
1420 barrier_test_env.inject_barrier(&b1, [actor_id]);
1421 barrier_test_env.flush_all_events().await;
1422
1423 let input = Executor::new(
1424 Default::default(),
1425 ReceiverExecutor::for_test(
1426 actor_id,
1427 rx,
1428 barrier_test_env.local_barrier_manager.clone(),
1429 )
1430 .boxed(),
1431 );
1432
1433 let (new_output_request_tx, new_output_request_rx) = unbounded_channel();
1434 let mut rxs = [untouched, old, new, old_simple, new_simple]
1435 .into_iter()
1436 .map(|id| {
1437 (id, {
1438 let (tx, rx) = channel_for_test();
1439 new_output_request_tx
1440 .send((id, NewOutputRequest::Local(tx)))
1441 .unwrap();
1442 rx
1443 })
1444 })
1445 .collect::<HashMap<_, _>>();
1446 let executor = Box::new(
1447 DispatchExecutor::new(
1448 input,
1449 new_output_request_rx,
1450 vec![broadcast_dispatcher, simple_dispatcher],
1451 &ActorContext::for_test(actor_id),
1452 )
1453 .await
1454 .unwrap(),
1455 )
1456 .execute();
1457
1458 pin_mut!(executor);
1459
1460 macro_rules! try_recv {
1461 ($down_id:expr) => {
1462 rxs.get_mut(&$down_id).unwrap().try_recv()
1463 };
1464 }
1465
1466 tx.send(Message::Chunk(StreamChunk::default()).into())
1468 .await
1469 .unwrap();
1470
1471 tx.send(Message::Barrier(b1.clone().into_dispatcher()).into())
1472 .await
1473 .unwrap();
1474 executor.next().await.unwrap().unwrap();
1475
1476 try_recv!(untouched).unwrap().as_chunk().unwrap();
1478 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1479
1480 try_recv!(old).unwrap().as_chunk().unwrap();
1481 try_recv!(old).unwrap().as_barrier_batch().unwrap(); try_recv!(new).unwrap().as_barrier_batch().unwrap(); try_recv!(old_simple).unwrap().as_chunk().unwrap();
1486 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); let b2 = Barrier::new_test_barrier(test_epoch(2));
1490 barrier_test_env.inject_barrier(&b2, [actor_id]);
1491 tx.send(Message::Barrier(b2.into_dispatcher()).into())
1492 .await
1493 .unwrap();
1494 executor.next().await.unwrap().unwrap();
1495
1496 try_recv!(untouched).unwrap().as_barrier_batch().unwrap();
1498 try_recv!(old).unwrap_err(); try_recv!(new).unwrap().as_barrier_batch().unwrap();
1500
1501 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap_err(); tx.send(Message::Chunk(StreamChunk::default()).into())
1506 .await
1507 .unwrap();
1508
1509 let dispatcher_updates = maplit::hashmap! {
1511 actor_id => vec![PbDispatcherUpdate {
1512 actor_id,
1513 dispatcher_id: simple_dispatcher_id,
1514 added_downstream_actor_id: vec![new_simple],
1515 removed_downstream_actor_id: vec![old_simple],
1516 hash_mapping: Default::default(),
1517 }]
1518 };
1519 let b3 = Barrier::new_test_barrier(test_epoch(3)).with_mutation(Mutation::Update(
1520 UpdateMutation {
1521 dispatchers: dispatcher_updates,
1522 ..Default::default()
1523 },
1524 ));
1525 barrier_test_env.inject_barrier(&b3, [actor_id]);
1526 tx.send(Message::Barrier(b3.into_dispatcher()).into())
1527 .await
1528 .unwrap();
1529 executor.next().await.unwrap().unwrap();
1530
1531 try_recv!(old_simple).unwrap().as_chunk().unwrap();
1533 try_recv!(old_simple).unwrap().as_barrier_batch().unwrap(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap(); let b4 = Barrier::new_test_barrier(test_epoch(4));
1539 barrier_test_env.inject_barrier(&b4, [actor_id]);
1540 tx.send(Message::Barrier(b4.into_dispatcher()).into())
1541 .await
1542 .unwrap();
1543 executor.next().await.unwrap().unwrap();
1544
1545 try_recv!(old_simple).unwrap_err(); try_recv!(new_simple).unwrap().as_barrier_batch().unwrap();
1548 }
1549
1550 #[tokio::test]
1551 async fn test_hash_dispatcher() {
1552 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1554
1555 let num_outputs = 5; let cardinality = 10;
1557 let dimension = 4;
1558 let key_indices = &[0, 2];
1559 let (output_tx_vecs, output_rx_vecs): (Vec<_>, Vec<_>) =
1560 (0..num_outputs).map(|_| channel_for_test()).collect();
1561 let outputs = output_tx_vecs
1562 .into_iter()
1563 .enumerate()
1564 .map(|(actor_id, tx)| Output::new(ActorId::new(1 + actor_id as u32), tx))
1565 .collect::<Vec<_>>();
1566 let mut hash_mapping = (1..num_outputs + 1)
1567 .flat_map(|id| vec![ActorId::new(id as _); VirtualNode::COUNT_FOR_TEST / num_outputs])
1568 .collect_vec();
1569 hash_mapping.resize(
1570 VirtualNode::COUNT_FOR_TEST,
1571 ActorId::new(num_outputs as u32),
1572 );
1573 let mut hash_dispatcher = HashDataDispatcher::new(
1574 outputs,
1575 key_indices.to_vec(),
1576 DispatchOutputMapping::Simple((0..dimension).collect()),
1577 hash_mapping.clone(),
1578 0.into(),
1579 );
1580
1581 let mut ops = Vec::new();
1582 for idx in 0..cardinality {
1583 if idx % 2 == 0 {
1584 ops.push(Op::Insert);
1585 } else {
1586 ops.push(Op::Delete);
1587 }
1588 }
1589
1590 let mut start = 19260817i32..;
1591 let mut builders = (0..dimension)
1592 .map(|_| I32ArrayBuilder::new(cardinality))
1593 .collect_vec();
1594 let mut output_cols = vec![vec![vec![]; dimension]; num_outputs];
1595 let mut output_ops = vec![vec![]; num_outputs];
1596 for op in &ops {
1597 let hash_builder = Crc32FastBuilder;
1598 let mut hasher = hash_builder.build_hasher();
1599 let one_row = (0..dimension).map(|_| start.next().unwrap()).collect_vec();
1600 for key_idx in key_indices {
1601 let val = one_row[*key_idx];
1602 let bytes = val.to_le_bytes();
1603 hasher.update(&bytes);
1604 }
1605 let output_idx = hash_mapping[hasher.finish() as usize % VirtualNode::COUNT_FOR_TEST]
1606 .as_raw_id() as usize
1607 - 1;
1608 for (builder, val) in builders.iter_mut().zip_eq_fast(one_row.iter()) {
1609 builder.append(Some(*val));
1610 }
1611 output_cols[output_idx]
1612 .iter_mut()
1613 .zip_eq_fast(one_row.iter())
1614 .for_each(|(each_column, val)| each_column.push(*val));
1615 output_ops[output_idx].push(op);
1616 }
1617
1618 let columns = builders
1619 .into_iter()
1620 .map(|builder| {
1621 let array = builder.finish();
1622 array.into_ref()
1623 })
1624 .collect();
1625
1626 let chunk = StreamChunk::new(ops, columns);
1627 hash_dispatcher.dispatch_data(chunk).await.unwrap();
1628
1629 for (output_idx, mut rx) in output_rx_vecs.into_iter().enumerate() {
1630 let mut output = vec![];
1631 while let Some(Some(msg)) = rx.recv().now_or_never() {
1632 output.push(msg);
1633 }
1634 assert!(output.len() <= 1);
1636 if output.is_empty() {
1637 assert!(output_cols[output_idx].iter().all(|x| { x.is_empty() }));
1638 } else {
1639 let message = output.first().unwrap();
1640 let real_chunk = match message {
1641 DispatcherMessageBatch::Chunk(chunk) => chunk,
1642 _ => panic!(),
1643 };
1644 real_chunk
1645 .columns()
1646 .iter()
1647 .zip_eq_fast(output_cols[output_idx].iter())
1648 .for_each(|(real_col, expect_col)| {
1649 let real_vals = real_chunk
1650 .visibility()
1651 .iter_ones()
1652 .map(|row_idx| real_col.as_int32().value_at(row_idx).unwrap())
1653 .collect::<Vec<_>>();
1654 assert_eq!(real_vals.len(), expect_col.len());
1655 assert_eq!(real_vals, *expect_col);
1656 });
1657 }
1658 }
1659 }
1660
1661 #[tokio::test]
1662 async fn test_hash_dispatcher_non_adjacent_update_after_project() {
1663 assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
1665
1666 let schema = Schema {
1667 fields: vec![
1668 Field::unnamed(DataType::Int64),
1669 Field::unnamed(DataType::Int64),
1670 ],
1671 };
1672 let (mut tx, source) = MockSource::channel();
1673 let source = source.into_executor(schema, vec![0]);
1674
1675 let proj = ProjectExecutor::new(
1676 ActorContext::for_test(123),
1677 source,
1678 vec![
1679 NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 0)),
1680 NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
1681 ],
1682 MultiMap::new(),
1683 vec![],
1684 true,
1685 );
1686 let mut proj = proj.boxed().execute();
1687
1688 tx.push_barrier(test_epoch(1), false);
1689 proj.expect_barrier().await;
1690
1691 tx.push_chunk(StreamChunk::from_pretty(
1692 " I I
1693 U- 1 1
1694 U+ 1 2
1695 U- 1 2
1696 U+ 1 3",
1697 ));
1698
1699 let projected = proj.expect_chunk().await;
1700 assert_eq!(
1701 projected,
1702 StreamChunk::from_pretty(
1703 " I I
1704 - 1 1
1705 U+ 1 2 D
1706 U- 1 2 D
1707 + 1 3",
1708 )
1709 );
1710
1711 let (output_tx, _output_rx) = channel_for_test();
1712 let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1713 let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1714 let mut hash_dispatcher = HashDataDispatcher::new(
1715 outputs,
1716 vec![0],
1717 DispatchOutputMapping::Simple(vec![0]),
1718 hash_mapping,
1719 0.into(),
1720 );
1721
1722 hash_dispatcher.dispatch_data(projected).await.unwrap();
1723 }
1724
1725 #[tokio::test]
1726 async fn test_hash_dispatcher_missing_update_delete_after_project() {
1727 let schema = Schema {
1728 fields: vec![
1729 Field::unnamed(DataType::Int64),
1730 Field::unnamed(DataType::Int64),
1731 ],
1732 };
1733 let (mut tx, source) = MockSource::channel();
1734 let source = source.into_executor(schema, vec![0]);
1735
1736 let proj = ProjectExecutor::new(
1737 ActorContext::for_test(123),
1738 source,
1739 vec![NonStrictExpression::for_test(InputRefExpression::new(
1740 DataType::Int64,
1741 0,
1742 ))],
1743 MultiMap::new(),
1744 vec![],
1745 true,
1746 );
1747 let mut proj = proj.boxed().execute();
1748
1749 tx.push_barrier(test_epoch(1), false);
1750 proj.expect_barrier().await;
1751
1752 tx.push_chunk(StreamChunk::from_pretty(
1753 " I I
1754 + 1 10
1755 U- 1 10
1756 U+ 1 30",
1757 ));
1758
1759 let projected = proj.expect_chunk().await;
1760 assert_eq!(
1761 projected,
1762 StreamChunk::from_pretty(
1763 " I
1764 + 1 D
1765 U- 1 D
1766 + 1",
1767 )
1768 );
1769
1770 let (output_tx, _output_rx) = channel_for_test();
1771 let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1772 let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1773 let mut hash_dispatcher = HashDataDispatcher::new(
1774 outputs,
1775 vec![0],
1776 DispatchOutputMapping::Simple(vec![0]),
1777 hash_mapping,
1778 0.into(),
1779 );
1780
1781 hash_dispatcher.dispatch_data(projected).await.unwrap();
1782 }
1783
1784 #[tokio::test]
1785 async fn test_hash_dispatcher_internal_projection_normalize_single_u_plus() {
1786 let (output_tx, mut output_rx) = channel_for_test();
1787 let outputs = vec![Output::new(ActorId::new(1), output_tx)];
1788 let hash_mapping = vec![ActorId::new(1); VirtualNode::COUNT_FOR_TEST];
1789 let mut hash_dispatcher = HashDataDispatcher::new(
1790 outputs,
1791 vec![0],
1792 DispatchOutputMapping::Simple(vec![0]),
1793 hash_mapping,
1794 0.into(),
1795 );
1796
1797 let input = StreamChunk::from_pretty(
1798 " I I
1799 + 1 10
1800 U- 1 10
1801 U+ 1 30",
1802 );
1803
1804 hash_dispatcher.dispatch_data(input).await.unwrap();
1805
1806 let output = output_rx.recv().await.unwrap();
1807 assert_eq!(
1808 *output.as_chunk().unwrap(),
1809 StreamChunk::from_pretty(
1810 " I
1811 + 1 D
1812 U- 1 D
1813 + 1",
1814 )
1815 );
1816 }
1817}