1mod prelude;
16
17use std::collections::{HashMap, HashSet};
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use await_tree::InstrumentAwait;
22use enum_as_inner::EnumAsInner;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{Schema, TableId};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
31use risingwave_common::util::epoch::{Epoch, EpochPair};
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
34use risingwave_connector::source::SplitImpl;
35use risingwave_expr::expr::{Expression, NonStrictExpression};
36use risingwave_pb::data::PbEpoch;
37use risingwave_pb::expr::PbInputRef;
38use risingwave_pb::stream_plan::barrier::BarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
40use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
41use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
42use risingwave_pb::stream_plan::{
43 BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
44 DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
45 PbDispatcher, PbStreamMessageBatch, PbUpdateMutation, PbWatermark, ResumeMutation,
46 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
47 SubscriptionUpstreamInfo, ThrottleMutation,
48};
49use smallvec::SmallVec;
50
51use crate::error::StreamResult;
52use crate::task::{ActorId, FragmentId};
53
54mod actor;
55mod barrier_align;
56pub mod exchange;
57pub mod monitor;
58
59pub mod aggregate;
60pub mod asof_join;
61mod backfill;
62mod barrier_recv;
63mod batch_query;
64mod chain;
65mod changelog;
66mod dedup;
67mod dispatch;
68pub mod dml;
69mod dynamic_filter;
70pub mod eowc;
71pub mod error;
72mod expand;
73mod filter;
74pub mod hash_join;
75mod hop_window;
76mod join;
77mod lookup;
78mod lookup_union;
79mod merge;
80mod mview;
81mod nested_loop_temporal_join;
82mod no_op;
83mod now;
84mod over_window;
85pub mod project;
86mod rearranged_chain;
87mod receiver;
88pub mod row_id_gen;
89mod sink;
90pub mod source;
91mod stream_reader;
92pub mod subtask;
93mod temporal_join;
94mod top_n;
95mod troublemaker;
96mod union;
97mod values;
98mod watermark;
99mod watermark_filter;
100mod wrapper;
101
102mod approx_percentile;
103
104mod row_merge;
105
106#[cfg(test)]
107mod integration_tests;
108mod sync_kv_log_store;
109pub mod test_utils;
110mod utils;
111
112pub use actor::{Actor, ActorContext, ActorContextRef};
113use anyhow::Context;
114pub use approx_percentile::global::GlobalApproxPercentileExecutor;
115pub use approx_percentile::local::LocalApproxPercentileExecutor;
116pub use backfill::arrangement_backfill::*;
117pub use backfill::cdc::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable};
118pub use backfill::no_shuffle_backfill::*;
119pub use backfill::snapshot_backfill::*;
120pub use barrier_recv::BarrierRecvExecutor;
121pub use batch_query::BatchQueryExecutor;
122pub use chain::ChainExecutor;
123pub use changelog::ChangeLogExecutor;
124pub use dedup::AppendOnlyDedupExecutor;
125pub use dispatch::{DispatchExecutor, DispatcherImpl};
126pub use dynamic_filter::DynamicFilterExecutor;
127pub use error::{StreamExecutorError, StreamExecutorResult};
128pub use expand::ExpandExecutor;
129pub use filter::FilterExecutor;
130pub use hash_join::*;
131pub use hop_window::HopWindowExecutor;
132pub use join::{AsOfDesc, AsOfJoinType, JoinType};
133pub use lookup::*;
134pub use lookup_union::LookupUnionExecutor;
135pub use merge::MergeExecutor;
136pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
137pub use mview::*;
138pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
139pub use no_op::NoOpExecutor;
140pub use now::*;
141pub use over_window::*;
142pub use rearranged_chain::RearrangedChainExecutor;
143pub use receiver::ReceiverExecutor;
144use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
145pub use row_merge::RowMergeExecutor;
146pub use sink::SinkExecutor;
147pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
148pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
149pub use temporal_join::TemporalJoinExecutor;
150pub use top_n::{
151 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
152};
153pub use troublemaker::TroublemakerExecutor;
154pub use union::UnionExecutor;
155pub use utils::DummyExecutor;
156pub use values::ValuesExecutor;
157pub use watermark_filter::WatermarkFilterExecutor;
158pub use wrapper::WrapperExecutor;
159
160use self::barrier_align::AlignedMessageStream;
161
162pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
163pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
164pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
165pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
166
167pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
168use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
169use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
170
171pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
172pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
173pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
174
175#[derive(Debug, Default, Clone)]
177pub struct ExecutorInfo {
178 pub schema: Schema,
180
181 pub pk_indices: PkIndices,
185
186 pub identity: String,
188
189 pub id: u64,
191}
192
193impl ExecutorInfo {
194 pub fn new(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
195 Self {
196 schema,
197 pk_indices,
198 identity,
199 id,
200 }
201 }
202}
203
204pub trait Execute: Send + 'static {
206 fn execute(self: Box<Self>) -> BoxedMessageStream;
207
208 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
209 self.execute()
210 }
211
212 fn boxed(self) -> Box<dyn Execute>
213 where
214 Self: Sized + Send + 'static,
215 {
216 Box::new(self)
217 }
218}
219
220pub struct Executor {
223 info: ExecutorInfo,
224 execute: Box<dyn Execute>,
225}
226
227impl Executor {
228 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
229 Self { info, execute }
230 }
231
232 pub fn info(&self) -> &ExecutorInfo {
233 &self.info
234 }
235
236 pub fn schema(&self) -> &Schema {
237 &self.info.schema
238 }
239
240 pub fn pk_indices(&self) -> PkIndicesRef<'_> {
241 &self.info.pk_indices
242 }
243
244 pub fn identity(&self) -> &str {
245 &self.info.identity
246 }
247
248 pub fn execute(self) -> BoxedMessageStream {
249 self.execute.execute()
250 }
251
252 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
253 self.execute.execute_with_epoch(epoch)
254 }
255}
256
257impl std::fmt::Debug for Executor {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 f.write_str(self.identity())
260 }
261}
262
263impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
264 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
265 Self::new(info, execute)
266 }
267}
268
269impl<E> From<(ExecutorInfo, E)> for Executor
270where
271 E: Execute,
272{
273 fn from((info, execute): (ExecutorInfo, E)) -> Self {
274 Self::new(info, execute.boxed())
275 }
276}
277
278pub const INVALID_EPOCH: u64 = 0;
279
280type UpstreamFragmentId = FragmentId;
281type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
282
283#[derive(Debug, Clone, PartialEq)]
284pub struct UpdateMutation {
285 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
286 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
287 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
288 pub dropped_actors: HashSet<ActorId>,
289 pub actor_splits: SplitAssignments,
290 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
291}
292
293#[derive(Debug, Clone, PartialEq)]
294pub struct AddMutation {
295 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
296 pub added_actors: HashSet<ActorId>,
297 pub splits: SplitAssignments,
299 pub pause: bool,
300 pub subscriptions_to_add: Vec<(TableId, u32)>,
302 pub backfill_nodes_to_pause: HashSet<FragmentId>,
304}
305
306#[derive(Debug, Clone, PartialEq)]
308pub enum Mutation {
309 Stop(HashSet<ActorId>),
310 Update(UpdateMutation),
311 Add(AddMutation),
312 SourceChangeSplit(SplitAssignments),
313 Pause,
314 Resume,
315 Throttle(HashMap<ActorId, Option<u32>>),
316 AddAndUpdate(AddMutation, UpdateMutation),
317 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
318 DropSubscriptions {
319 subscriptions_to_drop: Vec<(u32, TableId)>,
321 },
322 StartFragmentBackfill {
323 fragment_ids: HashSet<FragmentId>,
324 },
325}
326
327#[derive(Debug, Clone)]
332pub struct BarrierInner<M> {
333 pub epoch: EpochPair,
334 pub mutation: M,
335 pub kind: BarrierKind,
336
337 pub tracing_context: TracingContext,
339
340 pub passed_actors: Vec<ActorId>,
342}
343
344pub type BarrierMutationType = Option<Arc<Mutation>>;
345pub type Barrier = BarrierInner<BarrierMutationType>;
346pub type DispatcherBarrier = BarrierInner<()>;
347
348impl<M: Default> BarrierInner<M> {
349 pub fn new_test_barrier(epoch: u64) -> Self {
351 Self {
352 epoch: EpochPair::new_test_epoch(epoch),
353 kind: BarrierKind::Checkpoint,
354 tracing_context: TracingContext::none(),
355 mutation: Default::default(),
356 passed_actors: Default::default(),
357 }
358 }
359
360 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
361 Self {
362 epoch: EpochPair::new(epoch, prev_epoch),
363 kind: BarrierKind::Checkpoint,
364 tracing_context: TracingContext::none(),
365 mutation: Default::default(),
366 passed_actors: Default::default(),
367 }
368 }
369}
370
371impl Barrier {
372 pub fn into_dispatcher(self) -> DispatcherBarrier {
373 DispatcherBarrier {
374 epoch: self.epoch,
375 mutation: (),
376 kind: self.kind,
377 tracing_context: self.tracing_context,
378 passed_actors: self.passed_actors,
379 }
380 }
381
382 #[must_use]
383 pub fn with_mutation(self, mutation: Mutation) -> Self {
384 Self {
385 mutation: Some(Arc::new(mutation)),
386 ..self
387 }
388 }
389
390 #[must_use]
391 pub fn with_stop(self) -> Self {
392 self.with_mutation(Mutation::Stop(HashSet::default()))
393 }
394
395 pub fn is_with_stop_mutation(&self) -> bool {
397 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
398 }
399
400 pub fn is_stop(&self, actor_id: ActorId) -> bool {
402 self.all_stop_actors()
403 .is_some_and(|actors| actors.contains(&actor_id))
404 }
405
406 pub fn is_checkpoint(&self) -> bool {
407 self.kind == BarrierKind::Checkpoint
408 }
409
410 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
421 match self.mutation.as_deref()? {
422 Mutation::Update(UpdateMutation { actor_splits, .. })
423 | Mutation::Add(AddMutation {
424 splits: actor_splits,
425 ..
426 }) => actor_splits.get(&actor_id),
427
428 Mutation::AddAndUpdate(
429 AddMutation {
430 splits: add_actor_splits,
431 ..
432 },
433 UpdateMutation {
434 actor_splits: update_actor_splits,
435 ..
436 },
437 ) => add_actor_splits
438 .get(&actor_id)
439 .or_else(|| update_actor_splits.get(&actor_id)),
441
442 _ => {
443 if cfg!(debug_assertions) {
444 panic!(
445 "the initial mutation of the barrier should not be {:?}",
446 self.mutation
447 );
448 }
449 None
450 }
451 }
452 .map(|s| s.as_slice())
453 }
454
455 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
457 match self.mutation.as_deref() {
458 Some(Mutation::Stop(actors)) => Some(actors),
459 Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
460 | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
461 Some(dropped_actors)
462 }
463 _ => None,
464 }
465 }
466
467 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
473 match self.mutation.as_deref() {
474 Some(Mutation::Add(AddMutation { added_actors, .. }))
475 | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
476 added_actors.contains(&actor_id)
477 }
478 _ => false,
479 }
480 }
481
482 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
500 let Some(mutation) = self.mutation.as_deref() else {
501 return false;
502 };
503 match mutation {
504 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
506 Mutation::AddAndUpdate(
508 AddMutation { adds, .. },
509 UpdateMutation {
510 dispatchers,
511 actor_new_dispatchers,
512 ..
513 },
514 ) => {
515 adds.get(&upstream_actor_id).is_some()
516 || actor_new_dispatchers.get(&upstream_actor_id).is_some()
517 || dispatchers.get(&upstream_actor_id).is_some()
518 }
519 Mutation::Update(_)
520 | Mutation::Stop(_)
521 | Mutation::Pause
522 | Mutation::Resume
523 | Mutation::SourceChangeSplit(_)
524 | Mutation::Throttle(_)
525 | Mutation::DropSubscriptions { .. }
526 | Mutation::ConnectorPropsChange(_)
527 | Mutation::StartFragmentBackfill { .. } => false,
528 }
529 }
530
531 pub fn is_pause_on_startup(&self) -> bool {
533 match self.mutation.as_deref() {
534 Some(Mutation::Add(AddMutation { pause, .. }))
535 | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
536 _ => false,
537 }
538 }
539
540 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
541 match self.mutation.as_deref() {
542 Some(Mutation::Add(AddMutation {
543 backfill_nodes_to_pause,
544 ..
545 }))
546 | Some(Mutation::AddAndUpdate(
547 AddMutation {
548 backfill_nodes_to_pause,
549 ..
550 },
551 _,
552 )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
553 _ => {
554 tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
555 true
556 }
557 }
558 }
559
560 pub fn is_resume(&self) -> bool {
562 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
563 }
564
565 pub fn as_update_merge(
568 &self,
569 actor_id: ActorId,
570 upstream_fragment_id: UpstreamFragmentId,
571 ) -> Option<&MergeUpdate> {
572 self.mutation
573 .as_deref()
574 .and_then(|mutation| match mutation {
575 Mutation::Update(UpdateMutation { merges, .. })
576 | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
577 merges.get(&(actor_id, upstream_fragment_id))
578 }
579
580 _ => None,
581 })
582 }
583
584 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
590 self.mutation
591 .as_deref()
592 .and_then(|mutation| match mutation {
593 Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
594 | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
595 vnode_bitmaps.get(&actor_id).cloned()
596 }
597 _ => None,
598 })
599 }
600
601 pub fn get_curr_epoch(&self) -> Epoch {
602 Epoch(self.epoch.curr)
603 }
604
605 pub fn tracing_context(&self) -> &TracingContext {
607 &self.tracing_context
608 }
609
610 pub fn added_subscriber_on_mv_table(
611 &self,
612 mv_table_id: TableId,
613 ) -> impl Iterator<Item = u32> + '_ {
614 if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
615 self.mutation.as_deref()
616 {
617 Some(add)
618 } else {
619 None
620 }
621 .into_iter()
622 .flat_map(move |add| {
623 add.subscriptions_to_add.iter().filter_map(
624 move |(upstream_mv_table_id, subscriber_id)| {
625 if *upstream_mv_table_id == mv_table_id {
626 Some(*subscriber_id)
627 } else {
628 None
629 }
630 },
631 )
632 })
633 }
634}
635
636impl<M: PartialEq> PartialEq for BarrierInner<M> {
637 fn eq(&self, other: &Self) -> bool {
638 self.epoch == other.epoch && self.mutation == other.mutation
639 }
640}
641
642impl Mutation {
643 #[cfg(test)]
647 pub fn is_stop(&self) -> bool {
648 matches!(self, Mutation::Stop(_))
649 }
650
651 fn to_protobuf(&self) -> PbMutation {
652 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
653 actor_splits
654 .iter()
655 .map(|(&actor_id, splits)| {
656 (
657 actor_id,
658 ConnectorSplits {
659 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
660 },
661 )
662 })
663 .collect::<HashMap<_, _>>()
664 };
665
666 match self {
667 Mutation::Stop(actors) => PbMutation::Stop(StopMutation {
668 actors: actors.iter().copied().collect::<Vec<_>>(),
669 }),
670 Mutation::Update(UpdateMutation {
671 dispatchers,
672 merges,
673 vnode_bitmaps,
674 dropped_actors,
675 actor_splits,
676 actor_new_dispatchers,
677 }) => PbMutation::Update(PbUpdateMutation {
678 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
679 merge_update: merges.values().cloned().collect(),
680 actor_vnode_bitmap_update: vnode_bitmaps
681 .iter()
682 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
683 .collect(),
684 dropped_actors: dropped_actors.iter().cloned().collect(),
685 actor_splits: actor_splits_to_protobuf(actor_splits),
686 actor_new_dispatchers: actor_new_dispatchers
687 .iter()
688 .map(|(&actor_id, dispatchers)| {
689 (
690 actor_id,
691 Dispatchers {
692 dispatchers: dispatchers.clone(),
693 },
694 )
695 })
696 .collect(),
697 }),
698 Mutation::Add(AddMutation {
699 adds,
700 added_actors,
701 splits,
702 pause,
703 subscriptions_to_add,
704 backfill_nodes_to_pause,
705 }) => PbMutation::Add(PbAddMutation {
706 actor_dispatchers: adds
707 .iter()
708 .map(|(&actor_id, dispatchers)| {
709 (
710 actor_id,
711 Dispatchers {
712 dispatchers: dispatchers.clone(),
713 },
714 )
715 })
716 .collect(),
717 added_actors: added_actors.iter().copied().collect(),
718 actor_splits: actor_splits_to_protobuf(splits),
719 pause: *pause,
720 subscriptions_to_add: subscriptions_to_add
721 .iter()
722 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
723 subscriber_id: *subscriber_id,
724 upstream_mv_table_id: table_id.table_id,
725 })
726 .collect(),
727 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
728 }),
729 Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
730 actor_splits: changes
731 .iter()
732 .map(|(&actor_id, splits)| {
733 (
734 actor_id,
735 ConnectorSplits {
736 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
737 },
738 )
739 })
740 .collect(),
741 }),
742 Mutation::Pause => PbMutation::Pause(PauseMutation {}),
743 Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
744 Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
745 actor_throttle: changes
746 .iter()
747 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
748 .collect(),
749 }),
750
751 Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
752 mutations: vec![
753 BarrierMutation {
754 mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
755 },
756 BarrierMutation {
757 mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
758 },
759 ],
760 }),
761 Mutation::DropSubscriptions {
762 subscriptions_to_drop,
763 } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
764 info: subscriptions_to_drop
765 .iter()
766 .map(
767 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
768 subscriber_id: *subscriber_id,
769 upstream_mv_table_id: upstream_mv_table_id.table_id,
770 },
771 )
772 .collect(),
773 }),
774 Mutation::ConnectorPropsChange(map) => {
775 PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
776 connector_props_infos: map
777 .iter()
778 .map(|(actor_id, options)| {
779 (
780 *actor_id,
781 ConnectorPropsInfo {
782 connector_props_info: options
783 .iter()
784 .map(|(k, v)| (k.clone(), v.clone()))
785 .collect(),
786 },
787 )
788 })
789 .collect(),
790 })
791 }
792 Mutation::StartFragmentBackfill { fragment_ids } => {
793 PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
794 fragment_ids: fragment_ids.iter().copied().collect(),
795 })
796 }
797 }
798 }
799
800 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
801 let mutation = match prost {
802 PbMutation::Stop(stop) => {
803 Mutation::Stop(HashSet::from_iter(stop.actors.iter().cloned()))
804 }
805
806 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
807 dispatchers: update
808 .dispatcher_update
809 .iter()
810 .map(|u| (u.actor_id, u.clone()))
811 .into_group_map(),
812 merges: update
813 .merge_update
814 .iter()
815 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
816 .collect(),
817 vnode_bitmaps: update
818 .actor_vnode_bitmap_update
819 .iter()
820 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
821 .collect(),
822 dropped_actors: update.dropped_actors.iter().cloned().collect(),
823 actor_splits: update
824 .actor_splits
825 .iter()
826 .map(|(&actor_id, splits)| {
827 (
828 actor_id,
829 splits
830 .splits
831 .iter()
832 .map(|split| split.try_into().unwrap())
833 .collect(),
834 )
835 })
836 .collect(),
837 actor_new_dispatchers: update
838 .actor_new_dispatchers
839 .iter()
840 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
841 .collect(),
842 }),
843
844 PbMutation::Add(add) => Mutation::Add(AddMutation {
845 adds: add
846 .actor_dispatchers
847 .iter()
848 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
849 .collect(),
850 added_actors: add.added_actors.iter().copied().collect(),
851 splits: add
854 .actor_splits
855 .iter()
856 .map(|(&actor_id, splits)| {
857 (
858 actor_id,
859 splits
860 .splits
861 .iter()
862 .map(|split| split.try_into().unwrap())
863 .collect(),
864 )
865 })
866 .collect(),
867 pause: add.pause,
868 subscriptions_to_add: add
869 .subscriptions_to_add
870 .iter()
871 .map(
872 |SubscriptionUpstreamInfo {
873 subscriber_id,
874 upstream_mv_table_id,
875 }| {
876 (TableId::new(*upstream_mv_table_id), *subscriber_id)
877 },
878 )
879 .collect(),
880 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
881 }),
882
883 PbMutation::Splits(s) => {
884 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
885 Vec::with_capacity(s.actor_splits.len());
886 for (&actor_id, splits) in &s.actor_splits {
887 if !splits.splits.is_empty() {
888 change_splits.push((
889 actor_id,
890 splits
891 .splits
892 .iter()
893 .map(SplitImpl::try_from)
894 .try_collect()?,
895 ));
896 }
897 }
898 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
899 }
900 PbMutation::Pause(_) => Mutation::Pause,
901 PbMutation::Resume(_) => Mutation::Resume,
902 PbMutation::Throttle(changes) => Mutation::Throttle(
903 changes
904 .actor_throttle
905 .iter()
906 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
907 .collect(),
908 ),
909 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
910 subscriptions_to_drop: drop
911 .info
912 .iter()
913 .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
914 .collect(),
915 },
916 PbMutation::ConnectorPropsChange(alter_connector_props) => {
917 Mutation::ConnectorPropsChange(
918 alter_connector_props
919 .connector_props_infos
920 .iter()
921 .map(|(actor_id, options)| {
922 (
923 *actor_id,
924 options
925 .connector_props_info
926 .iter()
927 .map(|(k, v)| (k.clone(), v.clone()))
928 .collect(),
929 )
930 })
931 .collect(),
932 )
933 }
934 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
935 Mutation::StartFragmentBackfill {
936 fragment_ids: start_fragment_backfill
937 .fragment_ids
938 .iter()
939 .copied()
940 .collect(),
941 }
942 }
943 PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
944 [
945 BarrierMutation {
946 mutation: Some(add),
947 },
948 BarrierMutation {
949 mutation: Some(update),
950 },
951 ] => {
952 let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
953 unreachable!();
954 };
955
956 let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
957 unreachable!();
958 };
959
960 Mutation::AddAndUpdate(add_mutation, update_mutation)
961 }
962
963 _ => unreachable!(),
964 },
965 };
966 Ok(mutation)
967 }
968}
969
970impl<M> BarrierInner<M> {
971 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
972 let Self {
973 epoch,
974 mutation,
975 kind,
976 passed_actors,
977 tracing_context,
978 ..
979 } = self;
980
981 PbBarrier {
982 epoch: Some(PbEpoch {
983 curr: epoch.curr,
984 prev: epoch.prev,
985 }),
986 mutation: Some(PbBarrierMutation {
987 mutation: barrier_fn(mutation),
988 }),
989 tracing_context: tracing_context.to_protobuf(),
990 kind: *kind as _,
991 passed_actors: passed_actors.clone(),
992 }
993 }
994
995 fn from_protobuf_inner(
996 prost: &PbBarrier,
997 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
998 ) -> StreamExecutorResult<Self> {
999 let epoch = prost.get_epoch()?;
1000
1001 Ok(Self {
1002 kind: prost.kind(),
1003 epoch: EpochPair::new(epoch.curr, epoch.prev),
1004 mutation: mutation_from_pb(
1005 prost
1006 .mutation
1007 .as_ref()
1008 .and_then(|mutation| mutation.mutation.as_ref()),
1009 )?,
1010 passed_actors: prost.get_passed_actors().clone(),
1011 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1012 })
1013 }
1014
1015 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1016 BarrierInner {
1017 epoch: self.epoch,
1018 mutation: f(self.mutation),
1019 kind: self.kind,
1020 tracing_context: self.tracing_context,
1021 passed_actors: self.passed_actors,
1022 }
1023 }
1024}
1025
1026impl DispatcherBarrier {
1027 pub fn to_protobuf(&self) -> PbBarrier {
1028 self.to_protobuf_inner(|_| None)
1029 }
1030}
1031
1032impl Barrier {
1033 pub fn to_protobuf(&self) -> PbBarrier {
1034 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1035 }
1036
1037 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1038 Self::from_protobuf_inner(prost, |mutation| {
1039 mutation
1040 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1041 .transpose()
1042 })
1043 }
1044}
1045
1046#[derive(Debug, PartialEq, Eq, Clone)]
1047pub struct Watermark {
1048 pub col_idx: usize,
1049 pub data_type: DataType,
1050 pub val: ScalarImpl,
1051}
1052
1053impl PartialOrd for Watermark {
1054 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1055 Some(self.cmp(other))
1056 }
1057}
1058
1059impl Ord for Watermark {
1060 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1061 self.val.default_cmp(&other.val)
1062 }
1063}
1064
1065impl Watermark {
1066 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1067 Self {
1068 col_idx,
1069 data_type,
1070 val,
1071 }
1072 }
1073
1074 pub async fn transform_with_expr(
1075 self,
1076 expr: &NonStrictExpression<impl Expression>,
1077 new_col_idx: usize,
1078 ) -> Option<Self> {
1079 let Self { col_idx, val, .. } = self;
1080 let row = {
1081 let mut row = vec![None; col_idx + 1];
1082 row[col_idx] = Some(val);
1083 OwnedRow::new(row)
1084 };
1085 let val = expr.eval_row_infallible(&row).await?;
1086 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1087 }
1088
1089 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1092 output_indices
1093 .iter()
1094 .position(|p| *p == self.col_idx)
1095 .map(|new_col_idx| self.with_idx(new_col_idx))
1096 }
1097
1098 pub fn to_protobuf(&self) -> PbWatermark {
1099 PbWatermark {
1100 column: Some(PbInputRef {
1101 index: self.col_idx as _,
1102 r#type: Some(self.data_type.to_protobuf()),
1103 }),
1104 val: Some(&self.val).to_protobuf().into(),
1105 }
1106 }
1107
1108 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1109 let col_ref = prost.get_column()?;
1110 let data_type = DataType::from(col_ref.get_type()?);
1111 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1112 .expect("watermark value cannot be null");
1113 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1114 }
1115
1116 pub fn with_idx(self, idx: usize) -> Self {
1117 Self::new(idx, self.data_type, self.val)
1118 }
1119}
1120
1121#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1122pub enum MessageInner<M> {
1123 Chunk(StreamChunk),
1124 Barrier(BarrierInner<M>),
1125 Watermark(Watermark),
1126}
1127
1128impl<M> MessageInner<M> {
1129 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1130 match self {
1131 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1132 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1133 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1134 }
1135 }
1136}
1137
1138pub type Message = MessageInner<BarrierMutationType>;
1139pub type DispatcherMessage = MessageInner<()>;
1140
1141#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1144pub enum MessageBatchInner<M> {
1145 Chunk(StreamChunk),
1146 BarrierBatch(Vec<BarrierInner<M>>),
1147 Watermark(Watermark),
1148}
1149pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1150pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1151pub type DispatcherMessageBatch = MessageBatchInner<()>;
1152
1153impl From<DispatcherMessage> for DispatcherMessageBatch {
1154 fn from(m: DispatcherMessage) -> Self {
1155 match m {
1156 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1157 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1158 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1159 }
1160 }
1161}
1162
1163impl From<StreamChunk> for Message {
1164 fn from(chunk: StreamChunk) -> Self {
1165 Message::Chunk(chunk)
1166 }
1167}
1168
1169impl<'a> TryFrom<&'a Message> for &'a Barrier {
1170 type Error = ();
1171
1172 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1173 match m {
1174 Message::Chunk(_) => Err(()),
1175 Message::Barrier(b) => Ok(b),
1176 Message::Watermark(_) => Err(()),
1177 }
1178 }
1179}
1180
1181impl Message {
1182 #[cfg(test)]
1187 pub fn is_stop(&self) -> bool {
1188 matches!(
1189 self,
1190 Message::Barrier(Barrier {
1191 mutation,
1192 ..
1193 }) if mutation.as_ref().unwrap().is_stop()
1194 )
1195 }
1196}
1197
1198impl DispatcherMessageBatch {
1199 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1200 let prost = match self {
1201 Self::Chunk(stream_chunk) => {
1202 let prost_stream_chunk = stream_chunk.to_protobuf();
1203 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1204 }
1205 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1206 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1207 }),
1208 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1209 };
1210 PbStreamMessageBatch {
1211 stream_message_batch: Some(prost),
1212 }
1213 }
1214
1215 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1216 let res = match prost.get_stream_message_batch()? {
1217 StreamMessageBatch::StreamChunk(chunk) => {
1218 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1219 }
1220 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1221 let barriers = barrier_batch
1222 .barriers
1223 .iter()
1224 .map(|barrier| {
1225 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1226 if mutation.is_some() {
1227 if cfg!(debug_assertions) {
1228 panic!("should not receive message of barrier with mutation");
1229 } else {
1230 warn!(?barrier, "receive message of barrier with mutation");
1231 }
1232 }
1233 Ok(())
1234 })
1235 })
1236 .try_collect()?;
1237 Self::BarrierBatch(barriers)
1238 }
1239 StreamMessageBatch::Watermark(watermark) => {
1240 Self::Watermark(Watermark::from_protobuf(watermark)?)
1241 }
1242 };
1243 Ok(res)
1244 }
1245
1246 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1247 ::prost::Message::encoded_len(msg)
1248 }
1249}
1250
1251pub type PkIndices = Vec<usize>;
1252pub type PkIndicesRef<'a> = &'a [usize];
1253pub type PkDataTypes = SmallVec<[DataType; 1]>;
1254
1255pub async fn expect_first_barrier<M: Debug>(
1257 stream: &mut (impl MessageStreamInner<M> + Unpin),
1258) -> StreamExecutorResult<BarrierInner<M>> {
1259 let message = stream
1260 .next()
1261 .instrument_await("expect_first_barrier")
1262 .await
1263 .context("failed to extract the first message: stream closed unexpectedly")??;
1264 let barrier = message
1265 .into_barrier()
1266 .expect("the first message must be a barrier");
1267 assert!(matches!(
1269 barrier.kind,
1270 BarrierKind::Checkpoint | BarrierKind::Initial
1271 ));
1272 Ok(barrier)
1273}
1274
1275pub async fn expect_first_barrier_from_aligned_stream(
1277 stream: &mut (impl AlignedMessageStream + Unpin),
1278) -> StreamExecutorResult<Barrier> {
1279 let message = stream
1280 .next()
1281 .instrument_await("expect_first_barrier")
1282 .await
1283 .context("failed to extract the first message: stream closed unexpectedly")??;
1284 let barrier = message
1285 .into_barrier()
1286 .expect("the first message must be a barrier");
1287 Ok(barrier)
1288}
1289
1290pub trait StreamConsumer: Send + 'static {
1292 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1293
1294 fn execute(self: Box<Self>) -> Self::BarrierStream;
1295}