1mod prelude;
16
17use std::collections::{HashMap, HashSet};
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use await_tree::InstrumentAwait;
22use enum_as_inner::EnumAsInner;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{Schema, TableId};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
31use risingwave_common::util::epoch::{Epoch, EpochPair};
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
34use risingwave_connector::source::SplitImpl;
35use risingwave_expr::expr::{Expression, NonStrictExpression};
36use risingwave_pb::data::PbEpoch;
37use risingwave_pb::expr::PbInputRef;
38use risingwave_pb::stream_plan::barrier::BarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
40use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
41use risingwave_pb::stream_plan::{
42 BarrierMutation, CombinedMutation, Dispatchers, DropSubscriptionsMutation, PauseMutation,
43 PbAddMutation, PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch,
44 PbUpdateMutation, PbWatermark, ResumeMutation, SourceChangeSplitMutation, StopMutation,
45 SubscriptionUpstreamInfo, ThrottleMutation,
46};
47use smallvec::SmallVec;
48
49use crate::error::StreamResult;
50use crate::task::{ActorId, FragmentId};
51
52mod actor;
53mod barrier_align;
54pub mod exchange;
55pub mod monitor;
56
57pub mod aggregate;
58pub mod asof_join;
59mod backfill;
60mod barrier_recv;
61mod batch_query;
62mod chain;
63mod changelog;
64mod dedup;
65mod dispatch;
66pub mod dml;
67mod dynamic_filter;
68pub mod eowc;
69pub mod error;
70mod expand;
71mod filter;
72pub mod hash_join;
73mod hop_window;
74mod join;
75mod lookup;
76mod lookup_union;
77mod merge;
78mod mview;
79mod nested_loop_temporal_join;
80mod no_op;
81mod now;
82mod over_window;
83pub mod project;
84mod rearranged_chain;
85mod receiver;
86pub mod row_id_gen;
87mod sink;
88pub mod source;
89mod stream_reader;
90pub mod subtask;
91mod temporal_join;
92mod top_n;
93mod troublemaker;
94mod union;
95mod values;
96mod watermark;
97mod watermark_filter;
98mod wrapper;
99
100mod approx_percentile;
101
102mod row_merge;
103
104#[cfg(test)]
105mod integration_tests;
106mod sync_kv_log_store;
107pub mod test_utils;
108mod utils;
109
110pub use actor::{Actor, ActorContext, ActorContextRef};
111use anyhow::Context;
112pub use approx_percentile::global::GlobalApproxPercentileExecutor;
113pub use approx_percentile::local::LocalApproxPercentileExecutor;
114pub use backfill::arrangement_backfill::*;
115pub use backfill::cdc::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable};
116pub use backfill::no_shuffle_backfill::*;
117pub use backfill::snapshot_backfill::*;
118pub use barrier_recv::BarrierRecvExecutor;
119pub use batch_query::BatchQueryExecutor;
120pub use chain::ChainExecutor;
121pub use changelog::ChangeLogExecutor;
122pub use dedup::AppendOnlyDedupExecutor;
123pub use dispatch::{DispatchExecutor, DispatcherImpl};
124pub use dynamic_filter::DynamicFilterExecutor;
125pub use error::{StreamExecutorError, StreamExecutorResult};
126pub use expand::ExpandExecutor;
127pub use filter::FilterExecutor;
128pub use hash_join::*;
129pub use hop_window::HopWindowExecutor;
130pub use join::{AsOfDesc, AsOfJoinType, JoinType};
131pub use lookup::*;
132pub use lookup_union::LookupUnionExecutor;
133pub use merge::MergeExecutor;
134pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
135pub use mview::*;
136pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
137pub use no_op::NoOpExecutor;
138pub use now::*;
139pub use over_window::*;
140pub use rearranged_chain::RearrangedChainExecutor;
141pub use receiver::ReceiverExecutor;
142use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
143pub use row_merge::RowMergeExecutor;
144pub use sink::SinkExecutor;
145pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
146pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
147pub use temporal_join::TemporalJoinExecutor;
148pub use top_n::{
149 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
150};
151pub use troublemaker::TroublemakerExecutor;
152pub use union::UnionExecutor;
153pub use utils::DummyExecutor;
154pub use values::ValuesExecutor;
155pub use watermark_filter::WatermarkFilterExecutor;
156pub use wrapper::WrapperExecutor;
157
158use self::barrier_align::AlignedMessageStream;
159
160pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
161pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
162pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
163pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
164
165pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
166use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
167use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
168
169pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
170pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
171pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
172
173#[derive(Debug, Default, Clone)]
175pub struct ExecutorInfo {
176 pub schema: Schema,
178
179 pub pk_indices: PkIndices,
183
184 pub identity: String,
186
187 pub id: u64,
189}
190
191impl ExecutorInfo {
192 pub fn new(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
193 Self {
194 schema,
195 pk_indices,
196 identity,
197 id,
198 }
199 }
200}
201
202pub trait Execute: Send + 'static {
204 fn execute(self: Box<Self>) -> BoxedMessageStream;
205
206 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
207 self.execute()
208 }
209
210 fn boxed(self) -> Box<dyn Execute>
211 where
212 Self: Sized + Send + 'static,
213 {
214 Box::new(self)
215 }
216}
217
218pub struct Executor {
221 info: ExecutorInfo,
222 execute: Box<dyn Execute>,
223}
224
225impl Executor {
226 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
227 Self { info, execute }
228 }
229
230 pub fn info(&self) -> &ExecutorInfo {
231 &self.info
232 }
233
234 pub fn schema(&self) -> &Schema {
235 &self.info.schema
236 }
237
238 pub fn pk_indices(&self) -> PkIndicesRef<'_> {
239 &self.info.pk_indices
240 }
241
242 pub fn identity(&self) -> &str {
243 &self.info.identity
244 }
245
246 pub fn execute(self) -> BoxedMessageStream {
247 self.execute.execute()
248 }
249
250 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
251 self.execute.execute_with_epoch(epoch)
252 }
253}
254
255impl std::fmt::Debug for Executor {
256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257 f.write_str(self.identity())
258 }
259}
260
261impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
262 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
263 Self::new(info, execute)
264 }
265}
266
267impl<E> From<(ExecutorInfo, E)> for Executor
268where
269 E: Execute,
270{
271 fn from((info, execute): (ExecutorInfo, E)) -> Self {
272 Self::new(info, execute.boxed())
273 }
274}
275
276pub const INVALID_EPOCH: u64 = 0;
277
278type UpstreamFragmentId = FragmentId;
279type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
280
281#[derive(Debug, Clone, PartialEq)]
282pub struct UpdateMutation {
283 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
284 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
285 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
286 pub dropped_actors: HashSet<ActorId>,
287 pub actor_splits: SplitAssignments,
288 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
289}
290
291#[derive(Debug, Clone, PartialEq)]
292pub struct AddMutation {
293 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
294 pub added_actors: HashSet<ActorId>,
295 pub splits: SplitAssignments,
297 pub pause: bool,
298 pub subscriptions_to_add: Vec<(TableId, u32)>,
300}
301
302#[derive(Debug, Clone, PartialEq)]
304pub enum Mutation {
305 Stop(HashSet<ActorId>),
306 Update(UpdateMutation),
307 Add(AddMutation),
308 SourceChangeSplit(SplitAssignments),
309 Pause,
310 Resume,
311 Throttle(HashMap<ActorId, Option<u32>>),
312 AddAndUpdate(AddMutation, UpdateMutation),
313 DropSubscriptions {
314 subscriptions_to_drop: Vec<(u32, TableId)>,
316 },
317}
318
319#[derive(Debug, Clone)]
324pub struct BarrierInner<M> {
325 pub epoch: EpochPair,
326 pub mutation: M,
327 pub kind: BarrierKind,
328
329 pub tracing_context: TracingContext,
331
332 pub passed_actors: Vec<ActorId>,
334}
335
336pub type BarrierMutationType = Option<Arc<Mutation>>;
337pub type Barrier = BarrierInner<BarrierMutationType>;
338pub type DispatcherBarrier = BarrierInner<()>;
339
340impl<M: Default> BarrierInner<M> {
341 pub fn new_test_barrier(epoch: u64) -> Self {
343 Self {
344 epoch: EpochPair::new_test_epoch(epoch),
345 kind: BarrierKind::Checkpoint,
346 tracing_context: TracingContext::none(),
347 mutation: Default::default(),
348 passed_actors: Default::default(),
349 }
350 }
351
352 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
353 Self {
354 epoch: EpochPair::new(epoch, prev_epoch),
355 kind: BarrierKind::Checkpoint,
356 tracing_context: TracingContext::none(),
357 mutation: Default::default(),
358 passed_actors: Default::default(),
359 }
360 }
361}
362
363impl Barrier {
364 pub fn into_dispatcher(self) -> DispatcherBarrier {
365 DispatcherBarrier {
366 epoch: self.epoch,
367 mutation: (),
368 kind: self.kind,
369 tracing_context: self.tracing_context,
370 passed_actors: self.passed_actors,
371 }
372 }
373
374 #[must_use]
375 pub fn with_mutation(self, mutation: Mutation) -> Self {
376 Self {
377 mutation: Some(Arc::new(mutation)),
378 ..self
379 }
380 }
381
382 #[must_use]
383 pub fn with_stop(self) -> Self {
384 self.with_mutation(Mutation::Stop(HashSet::default()))
385 }
386
387 pub fn is_with_stop_mutation(&self) -> bool {
389 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
390 }
391
392 pub fn is_stop(&self, actor_id: ActorId) -> bool {
394 self.all_stop_actors()
395 .is_some_and(|actors| actors.contains(&actor_id))
396 }
397
398 pub fn is_checkpoint(&self) -> bool {
399 self.kind == BarrierKind::Checkpoint
400 }
401
402 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
413 match self.mutation.as_deref()? {
414 Mutation::Update(UpdateMutation { actor_splits, .. })
415 | Mutation::Add(AddMutation {
416 splits: actor_splits,
417 ..
418 }) => actor_splits.get(&actor_id),
419
420 Mutation::AddAndUpdate(
421 AddMutation {
422 splits: add_actor_splits,
423 ..
424 },
425 UpdateMutation {
426 actor_splits: update_actor_splits,
427 ..
428 },
429 ) => add_actor_splits
430 .get(&actor_id)
431 .or_else(|| update_actor_splits.get(&actor_id)),
433
434 _ => {
435 if cfg!(debug_assertions) {
436 panic!(
437 "the initial mutation of the barrier should not be {:?}",
438 self.mutation
439 );
440 }
441 None
442 }
443 }
444 .map(|s| s.as_slice())
445 }
446
447 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
449 match self.mutation.as_deref() {
450 Some(Mutation::Stop(actors)) => Some(actors),
451 Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
452 | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
453 Some(dropped_actors)
454 }
455 _ => None,
456 }
457 }
458
459 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
465 match self.mutation.as_deref() {
466 Some(Mutation::Add(AddMutation { added_actors, .. }))
467 | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
468 added_actors.contains(&actor_id)
469 }
470 _ => false,
471 }
472 }
473
474 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
492 let Some(mutation) = self.mutation.as_deref() else {
493 return false;
494 };
495 match mutation {
496 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
498 Mutation::AddAndUpdate(
500 AddMutation { adds, .. },
501 UpdateMutation {
502 dispatchers,
503 actor_new_dispatchers,
504 ..
505 },
506 ) => {
507 adds.get(&upstream_actor_id).is_some()
508 || actor_new_dispatchers.get(&upstream_actor_id).is_some()
509 || dispatchers.get(&upstream_actor_id).is_some()
510 }
511 Mutation::Update(_)
512 | Mutation::Stop(_)
513 | Mutation::Pause
514 | Mutation::Resume
515 | Mutation::SourceChangeSplit(_)
516 | Mutation::Throttle(_)
517 | Mutation::DropSubscriptions { .. } => false,
518 }
519 }
520
521 pub fn is_pause_on_startup(&self) -> bool {
523 match self.mutation.as_deref() {
524 Some(Mutation::Add(AddMutation { pause, .. }))
525 | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
526 _ => false,
527 }
528 }
529
530 pub fn is_resume(&self) -> bool {
532 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
533 }
534
535 pub fn as_update_merge(
538 &self,
539 actor_id: ActorId,
540 upstream_fragment_id: UpstreamFragmentId,
541 ) -> Option<&MergeUpdate> {
542 self.mutation
543 .as_deref()
544 .and_then(|mutation| match mutation {
545 Mutation::Update(UpdateMutation { merges, .. })
546 | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
547 merges.get(&(actor_id, upstream_fragment_id))
548 }
549
550 _ => None,
551 })
552 }
553
554 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
560 self.mutation
561 .as_deref()
562 .and_then(|mutation| match mutation {
563 Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
564 | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
565 vnode_bitmaps.get(&actor_id).cloned()
566 }
567 _ => None,
568 })
569 }
570
571 pub fn get_curr_epoch(&self) -> Epoch {
572 Epoch(self.epoch.curr)
573 }
574
575 pub fn tracing_context(&self) -> &TracingContext {
577 &self.tracing_context
578 }
579
580 pub fn added_subscriber_on_mv_table(
581 &self,
582 mv_table_id: TableId,
583 ) -> impl Iterator<Item = u32> + '_ {
584 if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
585 self.mutation.as_deref()
586 {
587 Some(add)
588 } else {
589 None
590 }
591 .into_iter()
592 .flat_map(move |add| {
593 add.subscriptions_to_add.iter().filter_map(
594 move |(upstream_mv_table_id, subscriber_id)| {
595 if *upstream_mv_table_id == mv_table_id {
596 Some(*subscriber_id)
597 } else {
598 None
599 }
600 },
601 )
602 })
603 }
604}
605
606impl<M: PartialEq> PartialEq for BarrierInner<M> {
607 fn eq(&self, other: &Self) -> bool {
608 self.epoch == other.epoch && self.mutation == other.mutation
609 }
610}
611
612impl Mutation {
613 #[cfg(test)]
617 pub fn is_stop(&self) -> bool {
618 matches!(self, Mutation::Stop(_))
619 }
620
621 fn to_protobuf(&self) -> PbMutation {
622 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
623 actor_splits
624 .iter()
625 .map(|(&actor_id, splits)| {
626 (
627 actor_id,
628 ConnectorSplits {
629 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
630 },
631 )
632 })
633 .collect::<HashMap<_, _>>()
634 };
635
636 match self {
637 Mutation::Stop(actors) => PbMutation::Stop(StopMutation {
638 actors: actors.iter().copied().collect::<Vec<_>>(),
639 }),
640 Mutation::Update(UpdateMutation {
641 dispatchers,
642 merges,
643 vnode_bitmaps,
644 dropped_actors,
645 actor_splits,
646 actor_new_dispatchers,
647 }) => PbMutation::Update(PbUpdateMutation {
648 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
649 merge_update: merges.values().cloned().collect(),
650 actor_vnode_bitmap_update: vnode_bitmaps
651 .iter()
652 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
653 .collect(),
654 dropped_actors: dropped_actors.iter().cloned().collect(),
655 actor_splits: actor_splits_to_protobuf(actor_splits),
656 actor_new_dispatchers: actor_new_dispatchers
657 .iter()
658 .map(|(&actor_id, dispatchers)| {
659 (
660 actor_id,
661 Dispatchers {
662 dispatchers: dispatchers.clone(),
663 },
664 )
665 })
666 .collect(),
667 }),
668 Mutation::Add(AddMutation {
669 adds,
670 added_actors,
671 splits,
672 pause,
673 subscriptions_to_add,
674 }) => PbMutation::Add(PbAddMutation {
675 actor_dispatchers: adds
676 .iter()
677 .map(|(&actor_id, dispatchers)| {
678 (
679 actor_id,
680 Dispatchers {
681 dispatchers: dispatchers.clone(),
682 },
683 )
684 })
685 .collect(),
686 added_actors: added_actors.iter().copied().collect(),
687 actor_splits: actor_splits_to_protobuf(splits),
688 pause: *pause,
689 subscriptions_to_add: subscriptions_to_add
690 .iter()
691 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
692 subscriber_id: *subscriber_id,
693 upstream_mv_table_id: table_id.table_id,
694 })
695 .collect(),
696 }),
697 Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
698 actor_splits: changes
699 .iter()
700 .map(|(&actor_id, splits)| {
701 (
702 actor_id,
703 ConnectorSplits {
704 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
705 },
706 )
707 })
708 .collect(),
709 }),
710 Mutation::Pause => PbMutation::Pause(PauseMutation {}),
711 Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
712 Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
713 actor_throttle: changes
714 .iter()
715 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
716 .collect(),
717 }),
718
719 Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
720 mutations: vec![
721 BarrierMutation {
722 mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
723 },
724 BarrierMutation {
725 mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
726 },
727 ],
728 }),
729 Mutation::DropSubscriptions {
730 subscriptions_to_drop,
731 } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
732 info: subscriptions_to_drop
733 .iter()
734 .map(
735 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
736 subscriber_id: *subscriber_id,
737 upstream_mv_table_id: upstream_mv_table_id.table_id,
738 },
739 )
740 .collect(),
741 }),
742 }
743 }
744
745 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
746 let mutation = match prost {
747 PbMutation::Stop(stop) => {
748 Mutation::Stop(HashSet::from_iter(stop.actors.iter().cloned()))
749 }
750
751 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
752 dispatchers: update
753 .dispatcher_update
754 .iter()
755 .map(|u| (u.actor_id, u.clone()))
756 .into_group_map(),
757 merges: update
758 .merge_update
759 .iter()
760 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
761 .collect(),
762 vnode_bitmaps: update
763 .actor_vnode_bitmap_update
764 .iter()
765 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
766 .collect(),
767 dropped_actors: update.dropped_actors.iter().cloned().collect(),
768 actor_splits: update
769 .actor_splits
770 .iter()
771 .map(|(&actor_id, splits)| {
772 (
773 actor_id,
774 splits
775 .splits
776 .iter()
777 .map(|split| split.try_into().unwrap())
778 .collect(),
779 )
780 })
781 .collect(),
782 actor_new_dispatchers: update
783 .actor_new_dispatchers
784 .iter()
785 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
786 .collect(),
787 }),
788
789 PbMutation::Add(add) => Mutation::Add(AddMutation {
790 adds: add
791 .actor_dispatchers
792 .iter()
793 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
794 .collect(),
795 added_actors: add.added_actors.iter().copied().collect(),
796 splits: add
799 .actor_splits
800 .iter()
801 .map(|(&actor_id, splits)| {
802 (
803 actor_id,
804 splits
805 .splits
806 .iter()
807 .map(|split| split.try_into().unwrap())
808 .collect(),
809 )
810 })
811 .collect(),
812 pause: add.pause,
813 subscriptions_to_add: add
814 .subscriptions_to_add
815 .iter()
816 .map(
817 |SubscriptionUpstreamInfo {
818 subscriber_id,
819 upstream_mv_table_id,
820 }| {
821 (TableId::new(*upstream_mv_table_id), *subscriber_id)
822 },
823 )
824 .collect(),
825 }),
826
827 PbMutation::Splits(s) => {
828 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
829 Vec::with_capacity(s.actor_splits.len());
830 for (&actor_id, splits) in &s.actor_splits {
831 if !splits.splits.is_empty() {
832 change_splits.push((
833 actor_id,
834 splits
835 .splits
836 .iter()
837 .map(SplitImpl::try_from)
838 .try_collect()?,
839 ));
840 }
841 }
842 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
843 }
844 PbMutation::Pause(_) => Mutation::Pause,
845 PbMutation::Resume(_) => Mutation::Resume,
846 PbMutation::Throttle(changes) => Mutation::Throttle(
847 changes
848 .actor_throttle
849 .iter()
850 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
851 .collect(),
852 ),
853 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
854 subscriptions_to_drop: drop
855 .info
856 .iter()
857 .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
858 .collect(),
859 },
860 PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
861 [
862 BarrierMutation {
863 mutation: Some(add),
864 },
865 BarrierMutation {
866 mutation: Some(update),
867 },
868 ] => {
869 let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
870 unreachable!();
871 };
872
873 let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
874 unreachable!();
875 };
876
877 Mutation::AddAndUpdate(add_mutation, update_mutation)
878 }
879
880 _ => unreachable!(),
881 },
882 };
883 Ok(mutation)
884 }
885}
886
887impl<M> BarrierInner<M> {
888 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
889 let Self {
890 epoch,
891 mutation,
892 kind,
893 passed_actors,
894 tracing_context,
895 ..
896 } = self;
897
898 PbBarrier {
899 epoch: Some(PbEpoch {
900 curr: epoch.curr,
901 prev: epoch.prev,
902 }),
903 mutation: Some(PbBarrierMutation {
904 mutation: barrier_fn(mutation),
905 }),
906 tracing_context: tracing_context.to_protobuf(),
907 kind: *kind as _,
908 passed_actors: passed_actors.clone(),
909 }
910 }
911
912 fn from_protobuf_inner(
913 prost: &PbBarrier,
914 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
915 ) -> StreamExecutorResult<Self> {
916 let epoch = prost.get_epoch()?;
917
918 Ok(Self {
919 kind: prost.kind(),
920 epoch: EpochPair::new(epoch.curr, epoch.prev),
921 mutation: mutation_from_pb(
922 prost
923 .mutation
924 .as_ref()
925 .and_then(|mutation| mutation.mutation.as_ref()),
926 )?,
927 passed_actors: prost.get_passed_actors().clone(),
928 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
929 })
930 }
931
932 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
933 BarrierInner {
934 epoch: self.epoch,
935 mutation: f(self.mutation),
936 kind: self.kind,
937 tracing_context: self.tracing_context,
938 passed_actors: self.passed_actors,
939 }
940 }
941}
942
943impl DispatcherBarrier {
944 pub fn to_protobuf(&self) -> PbBarrier {
945 self.to_protobuf_inner(|_| None)
946 }
947}
948
949impl Barrier {
950 pub fn to_protobuf(&self) -> PbBarrier {
951 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
952 }
953
954 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
955 Self::from_protobuf_inner(prost, |mutation| {
956 mutation
957 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
958 .transpose()
959 })
960 }
961}
962
963#[derive(Debug, PartialEq, Eq, Clone)]
964pub struct Watermark {
965 pub col_idx: usize,
966 pub data_type: DataType,
967 pub val: ScalarImpl,
968}
969
970impl PartialOrd for Watermark {
971 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
972 Some(self.cmp(other))
973 }
974}
975
976impl Ord for Watermark {
977 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
978 self.val.default_cmp(&other.val)
979 }
980}
981
982impl Watermark {
983 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
984 Self {
985 col_idx,
986 data_type,
987 val,
988 }
989 }
990
991 pub async fn transform_with_expr(
992 self,
993 expr: &NonStrictExpression<impl Expression>,
994 new_col_idx: usize,
995 ) -> Option<Self> {
996 let Self { col_idx, val, .. } = self;
997 let row = {
998 let mut row = vec![None; col_idx + 1];
999 row[col_idx] = Some(val);
1000 OwnedRow::new(row)
1001 };
1002 let val = expr.eval_row_infallible(&row).await?;
1003 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1004 }
1005
1006 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1009 output_indices
1010 .iter()
1011 .position(|p| *p == self.col_idx)
1012 .map(|new_col_idx| self.with_idx(new_col_idx))
1013 }
1014
1015 pub fn to_protobuf(&self) -> PbWatermark {
1016 PbWatermark {
1017 column: Some(PbInputRef {
1018 index: self.col_idx as _,
1019 r#type: Some(self.data_type.to_protobuf()),
1020 }),
1021 val: Some(&self.val).to_protobuf().into(),
1022 }
1023 }
1024
1025 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1026 let col_ref = prost.get_column()?;
1027 let data_type = DataType::from(col_ref.get_type()?);
1028 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1029 .expect("watermark value cannot be null");
1030 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1031 }
1032
1033 pub fn with_idx(self, idx: usize) -> Self {
1034 Self::new(idx, self.data_type, self.val)
1035 }
1036}
1037
1038#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1039pub enum MessageInner<M> {
1040 Chunk(StreamChunk),
1041 Barrier(BarrierInner<M>),
1042 Watermark(Watermark),
1043}
1044
1045impl<M> MessageInner<M> {
1046 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1047 match self {
1048 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1049 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1050 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1051 }
1052 }
1053}
1054
1055pub type Message = MessageInner<BarrierMutationType>;
1056pub type DispatcherMessage = MessageInner<()>;
1057
1058#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1061pub enum MessageBatchInner<M> {
1062 Chunk(StreamChunk),
1063 BarrierBatch(Vec<BarrierInner<M>>),
1064 Watermark(Watermark),
1065}
1066pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1067pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1068pub type DispatcherMessageBatch = MessageBatchInner<()>;
1069
1070impl From<DispatcherMessage> for DispatcherMessageBatch {
1071 fn from(m: DispatcherMessage) -> Self {
1072 match m {
1073 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1074 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1075 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1076 }
1077 }
1078}
1079
1080impl From<StreamChunk> for Message {
1081 fn from(chunk: StreamChunk) -> Self {
1082 Message::Chunk(chunk)
1083 }
1084}
1085
1086impl<'a> TryFrom<&'a Message> for &'a Barrier {
1087 type Error = ();
1088
1089 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1090 match m {
1091 Message::Chunk(_) => Err(()),
1092 Message::Barrier(b) => Ok(b),
1093 Message::Watermark(_) => Err(()),
1094 }
1095 }
1096}
1097
1098impl Message {
1099 #[cfg(test)]
1104 pub fn is_stop(&self) -> bool {
1105 matches!(
1106 self,
1107 Message::Barrier(Barrier {
1108 mutation,
1109 ..
1110 }) if mutation.as_ref().unwrap().is_stop()
1111 )
1112 }
1113}
1114
1115impl DispatcherMessageBatch {
1116 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1117 let prost = match self {
1118 Self::Chunk(stream_chunk) => {
1119 let prost_stream_chunk = stream_chunk.to_protobuf();
1120 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1121 }
1122 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1123 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1124 }),
1125 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1126 };
1127 PbStreamMessageBatch {
1128 stream_message_batch: Some(prost),
1129 }
1130 }
1131
1132 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1133 let res = match prost.get_stream_message_batch()? {
1134 StreamMessageBatch::StreamChunk(chunk) => {
1135 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1136 }
1137 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1138 let barriers = barrier_batch
1139 .barriers
1140 .iter()
1141 .map(|barrier| {
1142 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1143 if mutation.is_some() {
1144 if cfg!(debug_assertions) {
1145 panic!("should not receive message of barrier with mutation");
1146 } else {
1147 warn!(?barrier, "receive message of barrier with mutation");
1148 }
1149 }
1150 Ok(())
1151 })
1152 })
1153 .try_collect()?;
1154 Self::BarrierBatch(barriers)
1155 }
1156 StreamMessageBatch::Watermark(watermark) => {
1157 Self::Watermark(Watermark::from_protobuf(watermark)?)
1158 }
1159 };
1160 Ok(res)
1161 }
1162
1163 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1164 ::prost::Message::encoded_len(msg)
1165 }
1166}
1167
1168pub type PkIndices = Vec<usize>;
1169pub type PkIndicesRef<'a> = &'a [usize];
1170pub type PkDataTypes = SmallVec<[DataType; 1]>;
1171
1172pub async fn expect_first_barrier<M: Debug>(
1174 stream: &mut (impl MessageStreamInner<M> + Unpin),
1175) -> StreamExecutorResult<BarrierInner<M>> {
1176 let message = stream
1177 .next()
1178 .instrument_await("expect_first_barrier")
1179 .await
1180 .context("failed to extract the first message: stream closed unexpectedly")??;
1181 let barrier = message
1182 .into_barrier()
1183 .expect("the first message must be a barrier");
1184 assert!(matches!(
1186 barrier.kind,
1187 BarrierKind::Checkpoint | BarrierKind::Initial
1188 ));
1189 Ok(barrier)
1190}
1191
1192pub async fn expect_first_barrier_from_aligned_stream(
1194 stream: &mut (impl AlignedMessageStream + Unpin),
1195) -> StreamExecutorResult<Barrier> {
1196 let message = stream
1197 .next()
1198 .instrument_await("expect_first_barrier")
1199 .await
1200 .context("failed to extract the first message: stream closed unexpectedly")??;
1201 let barrier = message
1202 .into_barrier()
1203 .expect("the first message must be a barrier");
1204 Ok(barrier)
1205}
1206
1207pub trait StreamConsumer: Send + 'static {
1209 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1210
1211 fn execute(self: Box<Self>) -> Self::BarrierStream;
1212}