1mod prelude;
16
17use std::collections::{HashMap, HashSet};
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use await_tree::InstrumentAwait;
22use enum_as_inner::EnumAsInner;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{Schema, TableId};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
31use risingwave_common::util::epoch::{Epoch, EpochPair};
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
34use risingwave_connector::source::SplitImpl;
35use risingwave_expr::expr::{Expression, NonStrictExpression};
36use risingwave_pb::data::PbEpoch;
37use risingwave_pb::expr::PbInputRef;
38use risingwave_pb::stream_plan::barrier::BarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
40use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
41use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
42use risingwave_pb::stream_plan::{
43 BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
44 DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
45 PbDispatcher, PbStreamMessageBatch, PbUpdateMutation, PbWatermark, ResumeMutation,
46 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
47 SubscriptionUpstreamInfo, ThrottleMutation,
48};
49use smallvec::SmallVec;
50
51use crate::error::StreamResult;
52use crate::task::{ActorId, FragmentId};
53
54mod actor;
55mod barrier_align;
56pub mod exchange;
57pub mod monitor;
58
59pub mod aggregate;
60pub mod asof_join;
61mod backfill;
62mod barrier_recv;
63mod batch_query;
64mod chain;
65mod changelog;
66mod dedup;
67mod dispatch;
68pub mod dml;
69mod dynamic_filter;
70pub mod eowc;
71pub mod error;
72mod expand;
73mod filter;
74pub mod hash_join;
75mod hop_window;
76mod join;
77mod lookup;
78mod lookup_union;
79mod merge;
80mod mview;
81mod nested_loop_temporal_join;
82mod no_op;
83mod now;
84mod over_window;
85pub mod project;
86mod rearranged_chain;
87mod receiver;
88pub mod row_id_gen;
89mod sink;
90pub mod source;
91mod stream_reader;
92pub mod subtask;
93mod temporal_join;
94mod top_n;
95mod troublemaker;
96mod union;
97mod values;
98mod watermark;
99mod watermark_filter;
100mod wrapper;
101
102mod approx_percentile;
103
104mod row_merge;
105
106#[cfg(test)]
107mod integration_tests;
108mod sync_kv_log_store;
109pub mod test_utils;
110mod utils;
111
112pub use actor::{Actor, ActorContext, ActorContextRef};
113use anyhow::Context;
114pub use approx_percentile::global::GlobalApproxPercentileExecutor;
115pub use approx_percentile::local::LocalApproxPercentileExecutor;
116pub use backfill::arrangement_backfill::*;
117pub use backfill::cdc::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable};
118pub use backfill::no_shuffle_backfill::*;
119pub use backfill::snapshot_backfill::*;
120pub use barrier_recv::BarrierRecvExecutor;
121pub use batch_query::BatchQueryExecutor;
122pub use chain::ChainExecutor;
123pub use changelog::ChangeLogExecutor;
124pub use dedup::AppendOnlyDedupExecutor;
125pub use dispatch::{DispatchExecutor, DispatcherImpl};
126pub use dynamic_filter::DynamicFilterExecutor;
127pub use error::{StreamExecutorError, StreamExecutorResult};
128pub use expand::ExpandExecutor;
129pub use filter::FilterExecutor;
130pub use hash_join::*;
131pub use hop_window::HopWindowExecutor;
132pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
133pub use join::{AsOfDesc, AsOfJoinType, JoinType};
134pub use lookup::*;
135pub use lookup_union::LookupUnionExecutor;
136pub use merge::MergeExecutor;
137pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
138pub use mview::*;
139pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
140pub use no_op::NoOpExecutor;
141pub use now::*;
142pub use over_window::*;
143pub use rearranged_chain::RearrangedChainExecutor;
144pub use receiver::ReceiverExecutor;
145use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
146pub use row_merge::RowMergeExecutor;
147pub use sink::SinkExecutor;
148pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
149pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
150pub use temporal_join::TemporalJoinExecutor;
151pub use top_n::{
152 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
153};
154pub use troublemaker::TroublemakerExecutor;
155pub use union::UnionExecutor;
156pub use utils::DummyExecutor;
157pub use values::ValuesExecutor;
158pub use watermark_filter::WatermarkFilterExecutor;
159pub use wrapper::WrapperExecutor;
160
161use self::barrier_align::AlignedMessageStream;
162
163pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
164pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
165pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
166pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
167
168pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
169use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
170use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
171
172pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
173pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
174pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
175
176#[derive(Debug, Default, Clone)]
178pub struct ExecutorInfo {
179 pub schema: Schema,
181
182 pub pk_indices: PkIndices,
186
187 pub identity: String,
189
190 pub id: u64,
192}
193
194impl ExecutorInfo {
195 pub fn new(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
196 Self {
197 schema,
198 pk_indices,
199 identity,
200 id,
201 }
202 }
203}
204
205pub trait Execute: Send + 'static {
207 fn execute(self: Box<Self>) -> BoxedMessageStream;
208
209 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
210 self.execute()
211 }
212
213 fn boxed(self) -> Box<dyn Execute>
214 where
215 Self: Sized + Send + 'static,
216 {
217 Box::new(self)
218 }
219}
220
221pub struct Executor {
224 info: ExecutorInfo,
225 execute: Box<dyn Execute>,
226}
227
228impl Executor {
229 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
230 Self { info, execute }
231 }
232
233 pub fn info(&self) -> &ExecutorInfo {
234 &self.info
235 }
236
237 pub fn schema(&self) -> &Schema {
238 &self.info.schema
239 }
240
241 pub fn pk_indices(&self) -> PkIndicesRef<'_> {
242 &self.info.pk_indices
243 }
244
245 pub fn identity(&self) -> &str {
246 &self.info.identity
247 }
248
249 pub fn execute(self) -> BoxedMessageStream {
250 self.execute.execute()
251 }
252
253 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
254 self.execute.execute_with_epoch(epoch)
255 }
256}
257
258impl std::fmt::Debug for Executor {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 f.write_str(self.identity())
261 }
262}
263
264impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
265 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
266 Self::new(info, execute)
267 }
268}
269
270impl<E> From<(ExecutorInfo, E)> for Executor
271where
272 E: Execute,
273{
274 fn from((info, execute): (ExecutorInfo, E)) -> Self {
275 Self::new(info, execute.boxed())
276 }
277}
278
279pub const INVALID_EPOCH: u64 = 0;
280
281type UpstreamFragmentId = FragmentId;
282type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
283
284#[derive(Debug, Clone, PartialEq)]
285pub struct UpdateMutation {
286 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
287 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
288 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
289 pub dropped_actors: HashSet<ActorId>,
290 pub actor_splits: SplitAssignments,
291 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
292}
293
294#[derive(Debug, Clone, PartialEq)]
295pub struct AddMutation {
296 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
297 pub added_actors: HashSet<ActorId>,
298 pub splits: SplitAssignments,
300 pub pause: bool,
301 pub subscriptions_to_add: Vec<(TableId, u32)>,
303 pub backfill_nodes_to_pause: HashSet<FragmentId>,
305}
306
307#[derive(Debug, Clone, PartialEq)]
309pub enum Mutation {
310 Stop(HashSet<ActorId>),
311 Update(UpdateMutation),
312 Add(AddMutation),
313 SourceChangeSplit(SplitAssignments),
314 Pause,
315 Resume,
316 Throttle(HashMap<ActorId, Option<u32>>),
317 AddAndUpdate(AddMutation, UpdateMutation),
318 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
319 DropSubscriptions {
320 subscriptions_to_drop: Vec<(u32, TableId)>,
322 },
323 StartFragmentBackfill {
324 fragment_ids: HashSet<FragmentId>,
325 },
326}
327
328#[derive(Debug, Clone)]
333pub struct BarrierInner<M> {
334 pub epoch: EpochPair,
335 pub mutation: M,
336 pub kind: BarrierKind,
337
338 pub tracing_context: TracingContext,
340
341 pub passed_actors: Vec<ActorId>,
343}
344
345pub type BarrierMutationType = Option<Arc<Mutation>>;
346pub type Barrier = BarrierInner<BarrierMutationType>;
347pub type DispatcherBarrier = BarrierInner<()>;
348
349impl<M: Default> BarrierInner<M> {
350 pub fn new_test_barrier(epoch: u64) -> Self {
352 Self {
353 epoch: EpochPair::new_test_epoch(epoch),
354 kind: BarrierKind::Checkpoint,
355 tracing_context: TracingContext::none(),
356 mutation: Default::default(),
357 passed_actors: Default::default(),
358 }
359 }
360
361 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
362 Self {
363 epoch: EpochPair::new(epoch, prev_epoch),
364 kind: BarrierKind::Checkpoint,
365 tracing_context: TracingContext::none(),
366 mutation: Default::default(),
367 passed_actors: Default::default(),
368 }
369 }
370}
371
372impl Barrier {
373 pub fn into_dispatcher(self) -> DispatcherBarrier {
374 DispatcherBarrier {
375 epoch: self.epoch,
376 mutation: (),
377 kind: self.kind,
378 tracing_context: self.tracing_context,
379 passed_actors: self.passed_actors,
380 }
381 }
382
383 #[must_use]
384 pub fn with_mutation(self, mutation: Mutation) -> Self {
385 Self {
386 mutation: Some(Arc::new(mutation)),
387 ..self
388 }
389 }
390
391 #[must_use]
392 pub fn with_stop(self) -> Self {
393 self.with_mutation(Mutation::Stop(HashSet::default()))
394 }
395
396 pub fn is_with_stop_mutation(&self) -> bool {
398 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
399 }
400
401 pub fn is_stop(&self, actor_id: ActorId) -> bool {
403 self.all_stop_actors()
404 .is_some_and(|actors| actors.contains(&actor_id))
405 }
406
407 pub fn is_checkpoint(&self) -> bool {
408 self.kind == BarrierKind::Checkpoint
409 }
410
411 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
422 match self.mutation.as_deref()? {
423 Mutation::Update(UpdateMutation { actor_splits, .. })
424 | Mutation::Add(AddMutation {
425 splits: actor_splits,
426 ..
427 }) => actor_splits.get(&actor_id),
428
429 Mutation::AddAndUpdate(
430 AddMutation {
431 splits: add_actor_splits,
432 ..
433 },
434 UpdateMutation {
435 actor_splits: update_actor_splits,
436 ..
437 },
438 ) => add_actor_splits
439 .get(&actor_id)
440 .or_else(|| update_actor_splits.get(&actor_id)),
442
443 _ => {
444 if cfg!(debug_assertions) {
445 panic!(
446 "the initial mutation of the barrier should not be {:?}",
447 self.mutation
448 );
449 }
450 None
451 }
452 }
453 .map(|s| s.as_slice())
454 }
455
456 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
458 match self.mutation.as_deref() {
459 Some(Mutation::Stop(actors)) => Some(actors),
460 Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
461 | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
462 Some(dropped_actors)
463 }
464 _ => None,
465 }
466 }
467
468 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
474 match self.mutation.as_deref() {
475 Some(Mutation::Add(AddMutation { added_actors, .. }))
476 | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
477 added_actors.contains(&actor_id)
478 }
479 _ => false,
480 }
481 }
482
483 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
484 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
485 fragment_ids.contains(&fragment_id)
486 } else {
487 false
488 }
489 }
490
491 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
509 let Some(mutation) = self.mutation.as_deref() else {
510 return false;
511 };
512 match mutation {
513 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
515 Mutation::AddAndUpdate(
517 AddMutation { adds, .. },
518 UpdateMutation {
519 dispatchers,
520 actor_new_dispatchers,
521 ..
522 },
523 ) => {
524 adds.get(&upstream_actor_id).is_some()
525 || actor_new_dispatchers.get(&upstream_actor_id).is_some()
526 || dispatchers.get(&upstream_actor_id).is_some()
527 }
528 Mutation::Update(_)
529 | Mutation::Stop(_)
530 | Mutation::Pause
531 | Mutation::Resume
532 | Mutation::SourceChangeSplit(_)
533 | Mutation::Throttle(_)
534 | Mutation::DropSubscriptions { .. }
535 | Mutation::ConnectorPropsChange(_)
536 | Mutation::StartFragmentBackfill { .. } => false,
537 }
538 }
539
540 pub fn is_pause_on_startup(&self) -> bool {
542 match self.mutation.as_deref() {
543 Some(Mutation::Add(AddMutation { pause, .. }))
544 | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
545 _ => false,
546 }
547 }
548
549 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
550 match self.mutation.as_deref() {
551 Some(Mutation::Add(AddMutation {
552 backfill_nodes_to_pause,
553 ..
554 }))
555 | Some(Mutation::AddAndUpdate(
556 AddMutation {
557 backfill_nodes_to_pause,
558 ..
559 },
560 _,
561 )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
562 _ => {
563 tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
564 true
565 }
566 }
567 }
568
569 pub fn is_resume(&self) -> bool {
571 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
572 }
573
574 pub fn as_update_merge(
577 &self,
578 actor_id: ActorId,
579 upstream_fragment_id: UpstreamFragmentId,
580 ) -> Option<&MergeUpdate> {
581 self.mutation
582 .as_deref()
583 .and_then(|mutation| match mutation {
584 Mutation::Update(UpdateMutation { merges, .. })
585 | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
586 merges.get(&(actor_id, upstream_fragment_id))
587 }
588
589 _ => None,
590 })
591 }
592
593 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
599 self.mutation
600 .as_deref()
601 .and_then(|mutation| match mutation {
602 Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
603 | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
604 vnode_bitmaps.get(&actor_id).cloned()
605 }
606 _ => None,
607 })
608 }
609
610 pub fn get_curr_epoch(&self) -> Epoch {
611 Epoch(self.epoch.curr)
612 }
613
614 pub fn tracing_context(&self) -> &TracingContext {
616 &self.tracing_context
617 }
618
619 pub fn added_subscriber_on_mv_table(
620 &self,
621 mv_table_id: TableId,
622 ) -> impl Iterator<Item = u32> + '_ {
623 if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
624 self.mutation.as_deref()
625 {
626 Some(add)
627 } else {
628 None
629 }
630 .into_iter()
631 .flat_map(move |add| {
632 add.subscriptions_to_add.iter().filter_map(
633 move |(upstream_mv_table_id, subscriber_id)| {
634 if *upstream_mv_table_id == mv_table_id {
635 Some(*subscriber_id)
636 } else {
637 None
638 }
639 },
640 )
641 })
642 }
643}
644
645impl<M: PartialEq> PartialEq for BarrierInner<M> {
646 fn eq(&self, other: &Self) -> bool {
647 self.epoch == other.epoch && self.mutation == other.mutation
648 }
649}
650
651impl Mutation {
652 #[cfg(test)]
656 pub fn is_stop(&self) -> bool {
657 matches!(self, Mutation::Stop(_))
658 }
659
660 fn to_protobuf(&self) -> PbMutation {
661 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
662 actor_splits
663 .iter()
664 .map(|(&actor_id, splits)| {
665 (
666 actor_id,
667 ConnectorSplits {
668 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
669 },
670 )
671 })
672 .collect::<HashMap<_, _>>()
673 };
674
675 match self {
676 Mutation::Stop(actors) => PbMutation::Stop(StopMutation {
677 actors: actors.iter().copied().collect::<Vec<_>>(),
678 }),
679 Mutation::Update(UpdateMutation {
680 dispatchers,
681 merges,
682 vnode_bitmaps,
683 dropped_actors,
684 actor_splits,
685 actor_new_dispatchers,
686 }) => PbMutation::Update(PbUpdateMutation {
687 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
688 merge_update: merges.values().cloned().collect(),
689 actor_vnode_bitmap_update: vnode_bitmaps
690 .iter()
691 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
692 .collect(),
693 dropped_actors: dropped_actors.iter().cloned().collect(),
694 actor_splits: actor_splits_to_protobuf(actor_splits),
695 actor_new_dispatchers: actor_new_dispatchers
696 .iter()
697 .map(|(&actor_id, dispatchers)| {
698 (
699 actor_id,
700 Dispatchers {
701 dispatchers: dispatchers.clone(),
702 },
703 )
704 })
705 .collect(),
706 }),
707 Mutation::Add(AddMutation {
708 adds,
709 added_actors,
710 splits,
711 pause,
712 subscriptions_to_add,
713 backfill_nodes_to_pause,
714 }) => PbMutation::Add(PbAddMutation {
715 actor_dispatchers: adds
716 .iter()
717 .map(|(&actor_id, dispatchers)| {
718 (
719 actor_id,
720 Dispatchers {
721 dispatchers: dispatchers.clone(),
722 },
723 )
724 })
725 .collect(),
726 added_actors: added_actors.iter().copied().collect(),
727 actor_splits: actor_splits_to_protobuf(splits),
728 pause: *pause,
729 subscriptions_to_add: subscriptions_to_add
730 .iter()
731 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
732 subscriber_id: *subscriber_id,
733 upstream_mv_table_id: table_id.table_id,
734 })
735 .collect(),
736 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
737 }),
738 Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
739 actor_splits: changes
740 .iter()
741 .map(|(&actor_id, splits)| {
742 (
743 actor_id,
744 ConnectorSplits {
745 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
746 },
747 )
748 })
749 .collect(),
750 }),
751 Mutation::Pause => PbMutation::Pause(PauseMutation {}),
752 Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
753 Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
754 actor_throttle: changes
755 .iter()
756 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
757 .collect(),
758 }),
759
760 Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
761 mutations: vec![
762 BarrierMutation {
763 mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
764 },
765 BarrierMutation {
766 mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
767 },
768 ],
769 }),
770 Mutation::DropSubscriptions {
771 subscriptions_to_drop,
772 } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
773 info: subscriptions_to_drop
774 .iter()
775 .map(
776 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
777 subscriber_id: *subscriber_id,
778 upstream_mv_table_id: upstream_mv_table_id.table_id,
779 },
780 )
781 .collect(),
782 }),
783 Mutation::ConnectorPropsChange(map) => {
784 PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
785 connector_props_infos: map
786 .iter()
787 .map(|(actor_id, options)| {
788 (
789 *actor_id,
790 ConnectorPropsInfo {
791 connector_props_info: options
792 .iter()
793 .map(|(k, v)| (k.clone(), v.clone()))
794 .collect(),
795 },
796 )
797 })
798 .collect(),
799 })
800 }
801 Mutation::StartFragmentBackfill { fragment_ids } => {
802 PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
803 fragment_ids: fragment_ids.iter().copied().collect(),
804 })
805 }
806 }
807 }
808
809 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
810 let mutation = match prost {
811 PbMutation::Stop(stop) => {
812 Mutation::Stop(HashSet::from_iter(stop.actors.iter().cloned()))
813 }
814
815 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
816 dispatchers: update
817 .dispatcher_update
818 .iter()
819 .map(|u| (u.actor_id, u.clone()))
820 .into_group_map(),
821 merges: update
822 .merge_update
823 .iter()
824 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
825 .collect(),
826 vnode_bitmaps: update
827 .actor_vnode_bitmap_update
828 .iter()
829 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
830 .collect(),
831 dropped_actors: update.dropped_actors.iter().cloned().collect(),
832 actor_splits: update
833 .actor_splits
834 .iter()
835 .map(|(&actor_id, splits)| {
836 (
837 actor_id,
838 splits
839 .splits
840 .iter()
841 .map(|split| split.try_into().unwrap())
842 .collect(),
843 )
844 })
845 .collect(),
846 actor_new_dispatchers: update
847 .actor_new_dispatchers
848 .iter()
849 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
850 .collect(),
851 }),
852
853 PbMutation::Add(add) => Mutation::Add(AddMutation {
854 adds: add
855 .actor_dispatchers
856 .iter()
857 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
858 .collect(),
859 added_actors: add.added_actors.iter().copied().collect(),
860 splits: add
863 .actor_splits
864 .iter()
865 .map(|(&actor_id, splits)| {
866 (
867 actor_id,
868 splits
869 .splits
870 .iter()
871 .map(|split| split.try_into().unwrap())
872 .collect(),
873 )
874 })
875 .collect(),
876 pause: add.pause,
877 subscriptions_to_add: add
878 .subscriptions_to_add
879 .iter()
880 .map(
881 |SubscriptionUpstreamInfo {
882 subscriber_id,
883 upstream_mv_table_id,
884 }| {
885 (TableId::new(*upstream_mv_table_id), *subscriber_id)
886 },
887 )
888 .collect(),
889 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
890 }),
891
892 PbMutation::Splits(s) => {
893 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
894 Vec::with_capacity(s.actor_splits.len());
895 for (&actor_id, splits) in &s.actor_splits {
896 if !splits.splits.is_empty() {
897 change_splits.push((
898 actor_id,
899 splits
900 .splits
901 .iter()
902 .map(SplitImpl::try_from)
903 .try_collect()?,
904 ));
905 }
906 }
907 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
908 }
909 PbMutation::Pause(_) => Mutation::Pause,
910 PbMutation::Resume(_) => Mutation::Resume,
911 PbMutation::Throttle(changes) => Mutation::Throttle(
912 changes
913 .actor_throttle
914 .iter()
915 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
916 .collect(),
917 ),
918 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
919 subscriptions_to_drop: drop
920 .info
921 .iter()
922 .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
923 .collect(),
924 },
925 PbMutation::ConnectorPropsChange(alter_connector_props) => {
926 Mutation::ConnectorPropsChange(
927 alter_connector_props
928 .connector_props_infos
929 .iter()
930 .map(|(actor_id, options)| {
931 (
932 *actor_id,
933 options
934 .connector_props_info
935 .iter()
936 .map(|(k, v)| (k.clone(), v.clone()))
937 .collect(),
938 )
939 })
940 .collect(),
941 )
942 }
943 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
944 Mutation::StartFragmentBackfill {
945 fragment_ids: start_fragment_backfill
946 .fragment_ids
947 .iter()
948 .copied()
949 .collect(),
950 }
951 }
952 PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
953 [
954 BarrierMutation {
955 mutation: Some(add),
956 },
957 BarrierMutation {
958 mutation: Some(update),
959 },
960 ] => {
961 let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
962 unreachable!();
963 };
964
965 let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
966 unreachable!();
967 };
968
969 Mutation::AddAndUpdate(add_mutation, update_mutation)
970 }
971
972 _ => unreachable!(),
973 },
974 };
975 Ok(mutation)
976 }
977}
978
979impl<M> BarrierInner<M> {
980 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
981 let Self {
982 epoch,
983 mutation,
984 kind,
985 passed_actors,
986 tracing_context,
987 ..
988 } = self;
989
990 PbBarrier {
991 epoch: Some(PbEpoch {
992 curr: epoch.curr,
993 prev: epoch.prev,
994 }),
995 mutation: Some(PbBarrierMutation {
996 mutation: barrier_fn(mutation),
997 }),
998 tracing_context: tracing_context.to_protobuf(),
999 kind: *kind as _,
1000 passed_actors: passed_actors.clone(),
1001 }
1002 }
1003
1004 fn from_protobuf_inner(
1005 prost: &PbBarrier,
1006 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1007 ) -> StreamExecutorResult<Self> {
1008 let epoch = prost.get_epoch()?;
1009
1010 Ok(Self {
1011 kind: prost.kind(),
1012 epoch: EpochPair::new(epoch.curr, epoch.prev),
1013 mutation: mutation_from_pb(
1014 prost
1015 .mutation
1016 .as_ref()
1017 .and_then(|mutation| mutation.mutation.as_ref()),
1018 )?,
1019 passed_actors: prost.get_passed_actors().clone(),
1020 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1021 })
1022 }
1023
1024 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1025 BarrierInner {
1026 epoch: self.epoch,
1027 mutation: f(self.mutation),
1028 kind: self.kind,
1029 tracing_context: self.tracing_context,
1030 passed_actors: self.passed_actors,
1031 }
1032 }
1033}
1034
1035impl DispatcherBarrier {
1036 pub fn to_protobuf(&self) -> PbBarrier {
1037 self.to_protobuf_inner(|_| None)
1038 }
1039}
1040
1041impl Barrier {
1042 pub fn to_protobuf(&self) -> PbBarrier {
1043 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1044 }
1045
1046 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1047 Self::from_protobuf_inner(prost, |mutation| {
1048 mutation
1049 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1050 .transpose()
1051 })
1052 }
1053}
1054
1055#[derive(Debug, PartialEq, Eq, Clone)]
1056pub struct Watermark {
1057 pub col_idx: usize,
1058 pub data_type: DataType,
1059 pub val: ScalarImpl,
1060}
1061
1062impl PartialOrd for Watermark {
1063 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1064 Some(self.cmp(other))
1065 }
1066}
1067
1068impl Ord for Watermark {
1069 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1070 self.val.default_cmp(&other.val)
1071 }
1072}
1073
1074impl Watermark {
1075 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1076 Self {
1077 col_idx,
1078 data_type,
1079 val,
1080 }
1081 }
1082
1083 pub async fn transform_with_expr(
1084 self,
1085 expr: &NonStrictExpression<impl Expression>,
1086 new_col_idx: usize,
1087 ) -> Option<Self> {
1088 let Self { col_idx, val, .. } = self;
1089 let row = {
1090 let mut row = vec![None; col_idx + 1];
1091 row[col_idx] = Some(val);
1092 OwnedRow::new(row)
1093 };
1094 let val = expr.eval_row_infallible(&row).await?;
1095 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1096 }
1097
1098 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1101 output_indices
1102 .iter()
1103 .position(|p| *p == self.col_idx)
1104 .map(|new_col_idx| self.with_idx(new_col_idx))
1105 }
1106
1107 pub fn to_protobuf(&self) -> PbWatermark {
1108 PbWatermark {
1109 column: Some(PbInputRef {
1110 index: self.col_idx as _,
1111 r#type: Some(self.data_type.to_protobuf()),
1112 }),
1113 val: Some(&self.val).to_protobuf().into(),
1114 }
1115 }
1116
1117 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1118 let col_ref = prost.get_column()?;
1119 let data_type = DataType::from(col_ref.get_type()?);
1120 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1121 .expect("watermark value cannot be null");
1122 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1123 }
1124
1125 pub fn with_idx(self, idx: usize) -> Self {
1126 Self::new(idx, self.data_type, self.val)
1127 }
1128}
1129
1130#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1131pub enum MessageInner<M> {
1132 Chunk(StreamChunk),
1133 Barrier(BarrierInner<M>),
1134 Watermark(Watermark),
1135}
1136
1137impl<M> MessageInner<M> {
1138 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1139 match self {
1140 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1141 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1142 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1143 }
1144 }
1145}
1146
1147pub type Message = MessageInner<BarrierMutationType>;
1148pub type DispatcherMessage = MessageInner<()>;
1149
1150#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1153pub enum MessageBatchInner<M> {
1154 Chunk(StreamChunk),
1155 BarrierBatch(Vec<BarrierInner<M>>),
1156 Watermark(Watermark),
1157}
1158pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1159pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1160pub type DispatcherMessageBatch = MessageBatchInner<()>;
1161
1162impl From<DispatcherMessage> for DispatcherMessageBatch {
1163 fn from(m: DispatcherMessage) -> Self {
1164 match m {
1165 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1166 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1167 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1168 }
1169 }
1170}
1171
1172impl From<StreamChunk> for Message {
1173 fn from(chunk: StreamChunk) -> Self {
1174 Message::Chunk(chunk)
1175 }
1176}
1177
1178impl<'a> TryFrom<&'a Message> for &'a Barrier {
1179 type Error = ();
1180
1181 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1182 match m {
1183 Message::Chunk(_) => Err(()),
1184 Message::Barrier(b) => Ok(b),
1185 Message::Watermark(_) => Err(()),
1186 }
1187 }
1188}
1189
1190impl Message {
1191 #[cfg(test)]
1196 pub fn is_stop(&self) -> bool {
1197 matches!(
1198 self,
1199 Message::Barrier(Barrier {
1200 mutation,
1201 ..
1202 }) if mutation.as_ref().unwrap().is_stop()
1203 )
1204 }
1205}
1206
1207impl DispatcherMessageBatch {
1208 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1209 let prost = match self {
1210 Self::Chunk(stream_chunk) => {
1211 let prost_stream_chunk = stream_chunk.to_protobuf();
1212 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1213 }
1214 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1215 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1216 }),
1217 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1218 };
1219 PbStreamMessageBatch {
1220 stream_message_batch: Some(prost),
1221 }
1222 }
1223
1224 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1225 let res = match prost.get_stream_message_batch()? {
1226 StreamMessageBatch::StreamChunk(chunk) => {
1227 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1228 }
1229 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1230 let barriers = barrier_batch
1231 .barriers
1232 .iter()
1233 .map(|barrier| {
1234 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1235 if mutation.is_some() {
1236 if cfg!(debug_assertions) {
1237 panic!("should not receive message of barrier with mutation");
1238 } else {
1239 warn!(?barrier, "receive message of barrier with mutation");
1240 }
1241 }
1242 Ok(())
1243 })
1244 })
1245 .try_collect()?;
1246 Self::BarrierBatch(barriers)
1247 }
1248 StreamMessageBatch::Watermark(watermark) => {
1249 Self::Watermark(Watermark::from_protobuf(watermark)?)
1250 }
1251 };
1252 Ok(res)
1253 }
1254
1255 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1256 ::prost::Message::encoded_len(msg)
1257 }
1258}
1259
1260pub type PkIndices = Vec<usize>;
1261pub type PkIndicesRef<'a> = &'a [usize];
1262pub type PkDataTypes = SmallVec<[DataType; 1]>;
1263
1264pub async fn expect_first_barrier<M: Debug>(
1266 stream: &mut (impl MessageStreamInner<M> + Unpin),
1267) -> StreamExecutorResult<BarrierInner<M>> {
1268 let message = stream
1269 .next()
1270 .instrument_await("expect_first_barrier")
1271 .await
1272 .context("failed to extract the first message: stream closed unexpectedly")??;
1273 let barrier = message
1274 .into_barrier()
1275 .expect("the first message must be a barrier");
1276 assert!(matches!(
1278 barrier.kind,
1279 BarrierKind::Checkpoint | BarrierKind::Initial
1280 ));
1281 Ok(barrier)
1282}
1283
1284pub async fn expect_first_barrier_from_aligned_stream(
1286 stream: &mut (impl AlignedMessageStream + Unpin),
1287) -> StreamExecutorResult<Barrier> {
1288 let message = stream
1289 .next()
1290 .instrument_await("expect_first_barrier")
1291 .await
1292 .context("failed to extract the first message: stream closed unexpectedly")??;
1293 let barrier = message
1294 .into_barrier()
1295 .expect("the first message must be a barrier");
1296 Ok(barrier)
1297}
1298
1299pub trait StreamConsumer: Send + 'static {
1301 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1302
1303 fn execute(self: Box<Self>) -> Self::BarrierStream;
1304}