risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{HashMap, HashSet};
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use await_tree::InstrumentAwait;
22use enum_as_inner::EnumAsInner;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{Schema, TableId};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
31use risingwave_common::util::epoch::{Epoch, EpochPair};
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
34use risingwave_connector::source::SplitImpl;
35use risingwave_expr::expr::{Expression, NonStrictExpression};
36use risingwave_pb::data::PbEpoch;
37use risingwave_pb::expr::PbInputRef;
38use risingwave_pb::stream_plan::barrier::BarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
40use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
41use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
42use risingwave_pb::stream_plan::{
43    BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
44    DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
45    PbDispatcher, PbStreamMessageBatch, PbUpdateMutation, PbWatermark, ResumeMutation,
46    SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
47    SubscriptionUpstreamInfo, ThrottleMutation,
48};
49use smallvec::SmallVec;
50
51use crate::error::StreamResult;
52use crate::task::{ActorId, FragmentId};
53
54mod actor;
55mod barrier_align;
56pub mod exchange;
57pub mod monitor;
58
59pub mod aggregate;
60pub mod asof_join;
61mod backfill;
62mod barrier_recv;
63mod batch_query;
64mod chain;
65mod changelog;
66mod dedup;
67mod dispatch;
68pub mod dml;
69mod dynamic_filter;
70pub mod eowc;
71pub mod error;
72mod expand;
73mod filter;
74pub mod hash_join;
75mod hop_window;
76mod join;
77mod lookup;
78mod lookup_union;
79mod merge;
80mod mview;
81mod nested_loop_temporal_join;
82mod no_op;
83mod now;
84mod over_window;
85pub mod project;
86mod rearranged_chain;
87mod receiver;
88pub mod row_id_gen;
89mod sink;
90pub mod source;
91mod stream_reader;
92pub mod subtask;
93mod temporal_join;
94mod top_n;
95mod troublemaker;
96mod union;
97mod values;
98mod watermark;
99mod watermark_filter;
100mod wrapper;
101
102mod approx_percentile;
103
104mod row_merge;
105
106#[cfg(test)]
107mod integration_tests;
108mod sync_kv_log_store;
109pub mod test_utils;
110mod utils;
111
112pub use actor::{Actor, ActorContext, ActorContextRef};
113use anyhow::Context;
114pub use approx_percentile::global::GlobalApproxPercentileExecutor;
115pub use approx_percentile::local::LocalApproxPercentileExecutor;
116pub use backfill::arrangement_backfill::*;
117pub use backfill::cdc::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable};
118pub use backfill::no_shuffle_backfill::*;
119pub use backfill::snapshot_backfill::*;
120pub use barrier_recv::BarrierRecvExecutor;
121pub use batch_query::BatchQueryExecutor;
122pub use chain::ChainExecutor;
123pub use changelog::ChangeLogExecutor;
124pub use dedup::AppendOnlyDedupExecutor;
125pub use dispatch::{DispatchExecutor, DispatcherImpl};
126pub use dynamic_filter::DynamicFilterExecutor;
127pub use error::{StreamExecutorError, StreamExecutorResult};
128pub use expand::ExpandExecutor;
129pub use filter::FilterExecutor;
130pub use hash_join::*;
131pub use hop_window::HopWindowExecutor;
132pub use join::{AsOfDesc, AsOfJoinType, JoinType};
133pub use lookup::*;
134pub use lookup_union::LookupUnionExecutor;
135pub use merge::MergeExecutor;
136pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
137pub use mview::*;
138pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
139pub use no_op::NoOpExecutor;
140pub use now::*;
141pub use over_window::*;
142pub use rearranged_chain::RearrangedChainExecutor;
143pub use receiver::ReceiverExecutor;
144use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
145pub use row_merge::RowMergeExecutor;
146pub use sink::SinkExecutor;
147pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
148pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
149pub use temporal_join::TemporalJoinExecutor;
150pub use top_n::{
151    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
152};
153pub use troublemaker::TroublemakerExecutor;
154pub use union::UnionExecutor;
155pub use utils::DummyExecutor;
156pub use values::ValuesExecutor;
157pub use watermark_filter::WatermarkFilterExecutor;
158pub use wrapper::WrapperExecutor;
159
160use self::barrier_align::AlignedMessageStream;
161
162pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
163pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
164pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
165pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
166
167pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
168use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
169use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
170
171pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
172pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
173pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
174
175/// Static information of an executor.
176#[derive(Debug, Default, Clone)]
177pub struct ExecutorInfo {
178    /// The schema of the OUTPUT of the executor.
179    pub schema: Schema,
180
181    /// The primary key indices of the OUTPUT of the executor.
182    /// Schema is used by both OLAP and streaming, therefore
183    /// pk indices are maintained independently.
184    pub pk_indices: PkIndices,
185
186    /// Identity of the executor.
187    pub identity: String,
188
189    /// The executor id of the executor.
190    pub id: u64,
191}
192
193impl ExecutorInfo {
194    pub fn new(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
195        Self {
196            schema,
197            pk_indices,
198            identity,
199            id,
200        }
201    }
202}
203
204/// [`Execute`] describes the methods an executor should implement to handle control messages.
205pub trait Execute: Send + 'static {
206    fn execute(self: Box<Self>) -> BoxedMessageStream;
207
208    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
209        self.execute()
210    }
211
212    fn boxed(self) -> Box<dyn Execute>
213    where
214        Self: Sized + Send + 'static,
215    {
216        Box::new(self)
217    }
218}
219
220/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
221/// handle messages ([`Execute`]).
222pub struct Executor {
223    info: ExecutorInfo,
224    execute: Box<dyn Execute>,
225}
226
227impl Executor {
228    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
229        Self { info, execute }
230    }
231
232    pub fn info(&self) -> &ExecutorInfo {
233        &self.info
234    }
235
236    pub fn schema(&self) -> &Schema {
237        &self.info.schema
238    }
239
240    pub fn pk_indices(&self) -> PkIndicesRef<'_> {
241        &self.info.pk_indices
242    }
243
244    pub fn identity(&self) -> &str {
245        &self.info.identity
246    }
247
248    pub fn execute(self) -> BoxedMessageStream {
249        self.execute.execute()
250    }
251
252    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
253        self.execute.execute_with_epoch(epoch)
254    }
255}
256
257impl std::fmt::Debug for Executor {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        f.write_str(self.identity())
260    }
261}
262
263impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
264    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
265        Self::new(info, execute)
266    }
267}
268
269impl<E> From<(ExecutorInfo, E)> for Executor
270where
271    E: Execute,
272{
273    fn from((info, execute): (ExecutorInfo, E)) -> Self {
274        Self::new(info, execute.boxed())
275    }
276}
277
278pub const INVALID_EPOCH: u64 = 0;
279
280type UpstreamFragmentId = FragmentId;
281type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
282
283#[derive(Debug, Clone, PartialEq)]
284pub struct UpdateMutation {
285    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
286    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
287    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
288    pub dropped_actors: HashSet<ActorId>,
289    pub actor_splits: SplitAssignments,
290    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
291}
292
293#[derive(Debug, Clone, PartialEq)]
294pub struct AddMutation {
295    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
296    pub added_actors: HashSet<ActorId>,
297    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
298    pub splits: SplitAssignments,
299    pub pause: bool,
300    /// (`upstream_mv_table_id`,  `subscriber_id`)
301    pub subscriptions_to_add: Vec<(TableId, u32)>,
302    /// nodes which should start backfill
303    pub backfill_nodes_to_pause: HashSet<FragmentId>,
304}
305
306/// See [`PbMutation`] for the semantics of each mutation.
307#[derive(Debug, Clone, PartialEq)]
308pub enum Mutation {
309    Stop(HashSet<ActorId>),
310    Update(UpdateMutation),
311    Add(AddMutation),
312    SourceChangeSplit(SplitAssignments),
313    Pause,
314    Resume,
315    Throttle(HashMap<ActorId, Option<u32>>),
316    AddAndUpdate(AddMutation, UpdateMutation),
317    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
318    DropSubscriptions {
319        /// `subscriber` -> `upstream_mv_table_id`
320        subscriptions_to_drop: Vec<(u32, TableId)>,
321    },
322    StartFragmentBackfill {
323        fragment_ids: HashSet<FragmentId>,
324    },
325}
326
327/// The generic type `M` is the mutation type of the barrier.
328///
329/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
330/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
331#[derive(Debug, Clone)]
332pub struct BarrierInner<M> {
333    pub epoch: EpochPair,
334    pub mutation: M,
335    pub kind: BarrierKind,
336
337    /// Tracing context for the **current** epoch of this barrier.
338    pub tracing_context: TracingContext,
339
340    /// The actors that this barrier has passed locally. Used for debugging only.
341    pub passed_actors: Vec<ActorId>,
342}
343
344pub type BarrierMutationType = Option<Arc<Mutation>>;
345pub type Barrier = BarrierInner<BarrierMutationType>;
346pub type DispatcherBarrier = BarrierInner<()>;
347
348impl<M: Default> BarrierInner<M> {
349    /// Create a plain barrier.
350    pub fn new_test_barrier(epoch: u64) -> Self {
351        Self {
352            epoch: EpochPair::new_test_epoch(epoch),
353            kind: BarrierKind::Checkpoint,
354            tracing_context: TracingContext::none(),
355            mutation: Default::default(),
356            passed_actors: Default::default(),
357        }
358    }
359
360    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
361        Self {
362            epoch: EpochPair::new(epoch, prev_epoch),
363            kind: BarrierKind::Checkpoint,
364            tracing_context: TracingContext::none(),
365            mutation: Default::default(),
366            passed_actors: Default::default(),
367        }
368    }
369}
370
371impl Barrier {
372    pub fn into_dispatcher(self) -> DispatcherBarrier {
373        DispatcherBarrier {
374            epoch: self.epoch,
375            mutation: (),
376            kind: self.kind,
377            tracing_context: self.tracing_context,
378            passed_actors: self.passed_actors,
379        }
380    }
381
382    #[must_use]
383    pub fn with_mutation(self, mutation: Mutation) -> Self {
384        Self {
385            mutation: Some(Arc::new(mutation)),
386            ..self
387        }
388    }
389
390    #[must_use]
391    pub fn with_stop(self) -> Self {
392        self.with_mutation(Mutation::Stop(HashSet::default()))
393    }
394
395    /// Whether this barrier carries stop mutation.
396    pub fn is_with_stop_mutation(&self) -> bool {
397        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
398    }
399
400    /// Whether this barrier is to stop the actor with `actor_id`.
401    pub fn is_stop(&self, actor_id: ActorId) -> bool {
402        self.all_stop_actors()
403            .is_some_and(|actors| actors.contains(&actor_id))
404    }
405
406    pub fn is_checkpoint(&self) -> bool {
407        self.kind == BarrierKind::Checkpoint
408    }
409
410    /// Get the initial split assignments for the actor with `actor_id`.
411    ///
412    /// This should only be called on the initial barrier received by the executor. It must be
413    ///
414    /// - `Add` mutation when it's a new streaming job, or recovery.
415    /// - `Update` mutation when it's created for scaling.
416    /// - `AddAndUpdate` mutation when it's created for sink-into-table.
417    ///
418    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
419    /// of existing executors.
420    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
421        match self.mutation.as_deref()? {
422            Mutation::Update(UpdateMutation { actor_splits, .. })
423            | Mutation::Add(AddMutation {
424                splits: actor_splits,
425                ..
426            }) => actor_splits.get(&actor_id),
427
428            Mutation::AddAndUpdate(
429                AddMutation {
430                    splits: add_actor_splits,
431                    ..
432                },
433                UpdateMutation {
434                    actor_splits: update_actor_splits,
435                    ..
436                },
437            ) => add_actor_splits
438                .get(&actor_id)
439                // `Add` and `Update` should apply to different fragments, so we don't need to merge them.
440                .or_else(|| update_actor_splits.get(&actor_id)),
441
442            _ => {
443                if cfg!(debug_assertions) {
444                    panic!(
445                        "the initial mutation of the barrier should not be {:?}",
446                        self.mutation
447                    );
448                }
449                None
450            }
451        }
452        .map(|s| s.as_slice())
453    }
454
455    /// Get all actors that to be stopped (dropped) by this barrier.
456    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
457        match self.mutation.as_deref() {
458            Some(Mutation::Stop(actors)) => Some(actors),
459            Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
460            | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
461                Some(dropped_actors)
462            }
463            _ => None,
464        }
465    }
466
467    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
468    /// `Values` to decide whether to output the existing (historical) data.
469    ///
470    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
471    /// added for scaling are not included.
472    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
473        match self.mutation.as_deref() {
474            Some(Mutation::Add(AddMutation { added_actors, .. }))
475            | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
476                added_actors.contains(&actor_id)
477            }
478            _ => false,
479        }
480    }
481
482    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
483    ///
484    /// # Use case
485    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
486    /// * Pause a standalone shared `SourceExecutor`.
487    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
488    ///
489    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
490    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
491    /// needs to be turned off.
492    ///
493    /// ## Some special cases not included
494    ///
495    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
496    /// care about **number of downstream fragments** (more precisely, existence).
497    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
498    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
499    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
500        let Some(mutation) = self.mutation.as_deref() else {
501            return false;
502        };
503        match mutation {
504            // Add is for mv, index and sink creation.
505            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
506            // AddAndUpdate is for sink-into-table.
507            Mutation::AddAndUpdate(
508                AddMutation { adds, .. },
509                UpdateMutation {
510                    dispatchers,
511                    actor_new_dispatchers,
512                    ..
513                },
514            ) => {
515                adds.get(&upstream_actor_id).is_some()
516                    || actor_new_dispatchers.get(&upstream_actor_id).is_some()
517                    || dispatchers.get(&upstream_actor_id).is_some()
518            }
519            Mutation::Update(_)
520            | Mutation::Stop(_)
521            | Mutation::Pause
522            | Mutation::Resume
523            | Mutation::SourceChangeSplit(_)
524            | Mutation::Throttle(_)
525            | Mutation::DropSubscriptions { .. }
526            | Mutation::ConnectorPropsChange(_)
527            | Mutation::StartFragmentBackfill { .. } => false,
528        }
529    }
530
531    /// Whether this barrier requires the executor to pause its data stream on startup.
532    pub fn is_pause_on_startup(&self) -> bool {
533        match self.mutation.as_deref() {
534            Some(Mutation::Add(AddMutation { pause, .. }))
535            | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
536            _ => false,
537        }
538    }
539
540    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
541        match self.mutation.as_deref() {
542            Some(Mutation::Add(AddMutation {
543                backfill_nodes_to_pause,
544                ..
545            }))
546            | Some(Mutation::AddAndUpdate(
547                AddMutation {
548                    backfill_nodes_to_pause,
549                    ..
550                },
551                _,
552            )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
553            _ => {
554                tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
555                true
556            }
557        }
558    }
559
560    /// Whether this barrier is for resume.
561    pub fn is_resume(&self) -> bool {
562        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
563    }
564
565    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
566    /// with `actor_id`.
567    pub fn as_update_merge(
568        &self,
569        actor_id: ActorId,
570        upstream_fragment_id: UpstreamFragmentId,
571    ) -> Option<&MergeUpdate> {
572        self.mutation
573            .as_deref()
574            .and_then(|mutation| match mutation {
575                Mutation::Update(UpdateMutation { merges, .. })
576                | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
577                    merges.get(&(actor_id, upstream_fragment_id))
578                }
579
580                _ => None,
581            })
582    }
583
584    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
585    /// with `actor_id`.
586    ///
587    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
588    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
589    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
590        self.mutation
591            .as_deref()
592            .and_then(|mutation| match mutation {
593                Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
594                | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
595                    vnode_bitmaps.get(&actor_id).cloned()
596                }
597                _ => None,
598            })
599    }
600
601    pub fn get_curr_epoch(&self) -> Epoch {
602        Epoch(self.epoch.curr)
603    }
604
605    /// Retrieve the tracing context for the **current** epoch of this barrier.
606    pub fn tracing_context(&self) -> &TracingContext {
607        &self.tracing_context
608    }
609
610    pub fn added_subscriber_on_mv_table(
611        &self,
612        mv_table_id: TableId,
613    ) -> impl Iterator<Item = u32> + '_ {
614        if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
615            self.mutation.as_deref()
616        {
617            Some(add)
618        } else {
619            None
620        }
621        .into_iter()
622        .flat_map(move |add| {
623            add.subscriptions_to_add.iter().filter_map(
624                move |(upstream_mv_table_id, subscriber_id)| {
625                    if *upstream_mv_table_id == mv_table_id {
626                        Some(*subscriber_id)
627                    } else {
628                        None
629                    }
630                },
631            )
632        })
633    }
634}
635
636impl<M: PartialEq> PartialEq for BarrierInner<M> {
637    fn eq(&self, other: &Self) -> bool {
638        self.epoch == other.epoch && self.mutation == other.mutation
639    }
640}
641
642impl Mutation {
643    /// Return true if the mutation is stop.
644    ///
645    /// Note that this does not mean we will stop the current actor.
646    #[cfg(test)]
647    pub fn is_stop(&self) -> bool {
648        matches!(self, Mutation::Stop(_))
649    }
650
651    fn to_protobuf(&self) -> PbMutation {
652        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
653            actor_splits
654                .iter()
655                .map(|(&actor_id, splits)| {
656                    (
657                        actor_id,
658                        ConnectorSplits {
659                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
660                        },
661                    )
662                })
663                .collect::<HashMap<_, _>>()
664        };
665
666        match self {
667            Mutation::Stop(actors) => PbMutation::Stop(StopMutation {
668                actors: actors.iter().copied().collect::<Vec<_>>(),
669            }),
670            Mutation::Update(UpdateMutation {
671                dispatchers,
672                merges,
673                vnode_bitmaps,
674                dropped_actors,
675                actor_splits,
676                actor_new_dispatchers,
677            }) => PbMutation::Update(PbUpdateMutation {
678                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
679                merge_update: merges.values().cloned().collect(),
680                actor_vnode_bitmap_update: vnode_bitmaps
681                    .iter()
682                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
683                    .collect(),
684                dropped_actors: dropped_actors.iter().cloned().collect(),
685                actor_splits: actor_splits_to_protobuf(actor_splits),
686                actor_new_dispatchers: actor_new_dispatchers
687                    .iter()
688                    .map(|(&actor_id, dispatchers)| {
689                        (
690                            actor_id,
691                            Dispatchers {
692                                dispatchers: dispatchers.clone(),
693                            },
694                        )
695                    })
696                    .collect(),
697            }),
698            Mutation::Add(AddMutation {
699                adds,
700                added_actors,
701                splits,
702                pause,
703                subscriptions_to_add,
704                backfill_nodes_to_pause,
705            }) => PbMutation::Add(PbAddMutation {
706                actor_dispatchers: adds
707                    .iter()
708                    .map(|(&actor_id, dispatchers)| {
709                        (
710                            actor_id,
711                            Dispatchers {
712                                dispatchers: dispatchers.clone(),
713                            },
714                        )
715                    })
716                    .collect(),
717                added_actors: added_actors.iter().copied().collect(),
718                actor_splits: actor_splits_to_protobuf(splits),
719                pause: *pause,
720                subscriptions_to_add: subscriptions_to_add
721                    .iter()
722                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
723                        subscriber_id: *subscriber_id,
724                        upstream_mv_table_id: table_id.table_id,
725                    })
726                    .collect(),
727                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
728            }),
729            Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
730                actor_splits: changes
731                    .iter()
732                    .map(|(&actor_id, splits)| {
733                        (
734                            actor_id,
735                            ConnectorSplits {
736                                splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
737                            },
738                        )
739                    })
740                    .collect(),
741            }),
742            Mutation::Pause => PbMutation::Pause(PauseMutation {}),
743            Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
744            Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
745                actor_throttle: changes
746                    .iter()
747                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
748                    .collect(),
749            }),
750
751            Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
752                mutations: vec![
753                    BarrierMutation {
754                        mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
755                    },
756                    BarrierMutation {
757                        mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
758                    },
759                ],
760            }),
761            Mutation::DropSubscriptions {
762                subscriptions_to_drop,
763            } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
764                info: subscriptions_to_drop
765                    .iter()
766                    .map(
767                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
768                            subscriber_id: *subscriber_id,
769                            upstream_mv_table_id: upstream_mv_table_id.table_id,
770                        },
771                    )
772                    .collect(),
773            }),
774            Mutation::ConnectorPropsChange(map) => {
775                PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
776                    connector_props_infos: map
777                        .iter()
778                        .map(|(actor_id, options)| {
779                            (
780                                *actor_id,
781                                ConnectorPropsInfo {
782                                    connector_props_info: options
783                                        .iter()
784                                        .map(|(k, v)| (k.clone(), v.clone()))
785                                        .collect(),
786                                },
787                            )
788                        })
789                        .collect(),
790                })
791            }
792            Mutation::StartFragmentBackfill { fragment_ids } => {
793                PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
794                    fragment_ids: fragment_ids.iter().copied().collect(),
795                })
796            }
797        }
798    }
799
800    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
801        let mutation = match prost {
802            PbMutation::Stop(stop) => {
803                Mutation::Stop(HashSet::from_iter(stop.actors.iter().cloned()))
804            }
805
806            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
807                dispatchers: update
808                    .dispatcher_update
809                    .iter()
810                    .map(|u| (u.actor_id, u.clone()))
811                    .into_group_map(),
812                merges: update
813                    .merge_update
814                    .iter()
815                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
816                    .collect(),
817                vnode_bitmaps: update
818                    .actor_vnode_bitmap_update
819                    .iter()
820                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
821                    .collect(),
822                dropped_actors: update.dropped_actors.iter().cloned().collect(),
823                actor_splits: update
824                    .actor_splits
825                    .iter()
826                    .map(|(&actor_id, splits)| {
827                        (
828                            actor_id,
829                            splits
830                                .splits
831                                .iter()
832                                .map(|split| split.try_into().unwrap())
833                                .collect(),
834                        )
835                    })
836                    .collect(),
837                actor_new_dispatchers: update
838                    .actor_new_dispatchers
839                    .iter()
840                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
841                    .collect(),
842            }),
843
844            PbMutation::Add(add) => Mutation::Add(AddMutation {
845                adds: add
846                    .actor_dispatchers
847                    .iter()
848                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
849                    .collect(),
850                added_actors: add.added_actors.iter().copied().collect(),
851                // TODO: remove this and use `SourceChangesSplit` after we support multiple
852                // mutations.
853                splits: add
854                    .actor_splits
855                    .iter()
856                    .map(|(&actor_id, splits)| {
857                        (
858                            actor_id,
859                            splits
860                                .splits
861                                .iter()
862                                .map(|split| split.try_into().unwrap())
863                                .collect(),
864                        )
865                    })
866                    .collect(),
867                pause: add.pause,
868                subscriptions_to_add: add
869                    .subscriptions_to_add
870                    .iter()
871                    .map(
872                        |SubscriptionUpstreamInfo {
873                             subscriber_id,
874                             upstream_mv_table_id,
875                         }| {
876                            (TableId::new(*upstream_mv_table_id), *subscriber_id)
877                        },
878                    )
879                    .collect(),
880                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
881            }),
882
883            PbMutation::Splits(s) => {
884                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
885                    Vec::with_capacity(s.actor_splits.len());
886                for (&actor_id, splits) in &s.actor_splits {
887                    if !splits.splits.is_empty() {
888                        change_splits.push((
889                            actor_id,
890                            splits
891                                .splits
892                                .iter()
893                                .map(SplitImpl::try_from)
894                                .try_collect()?,
895                        ));
896                    }
897                }
898                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
899            }
900            PbMutation::Pause(_) => Mutation::Pause,
901            PbMutation::Resume(_) => Mutation::Resume,
902            PbMutation::Throttle(changes) => Mutation::Throttle(
903                changes
904                    .actor_throttle
905                    .iter()
906                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
907                    .collect(),
908            ),
909            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
910                subscriptions_to_drop: drop
911                    .info
912                    .iter()
913                    .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
914                    .collect(),
915            },
916            PbMutation::ConnectorPropsChange(alter_connector_props) => {
917                Mutation::ConnectorPropsChange(
918                    alter_connector_props
919                        .connector_props_infos
920                        .iter()
921                        .map(|(actor_id, options)| {
922                            (
923                                *actor_id,
924                                options
925                                    .connector_props_info
926                                    .iter()
927                                    .map(|(k, v)| (k.clone(), v.clone()))
928                                    .collect(),
929                            )
930                        })
931                        .collect(),
932                )
933            }
934            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
935                Mutation::StartFragmentBackfill {
936                    fragment_ids: start_fragment_backfill
937                        .fragment_ids
938                        .iter()
939                        .copied()
940                        .collect(),
941                }
942            }
943            PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
944                [
945                    BarrierMutation {
946                        mutation: Some(add),
947                    },
948                    BarrierMutation {
949                        mutation: Some(update),
950                    },
951                ] => {
952                    let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
953                        unreachable!();
954                    };
955
956                    let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
957                        unreachable!();
958                    };
959
960                    Mutation::AddAndUpdate(add_mutation, update_mutation)
961                }
962
963                _ => unreachable!(),
964            },
965        };
966        Ok(mutation)
967    }
968}
969
970impl<M> BarrierInner<M> {
971    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
972        let Self {
973            epoch,
974            mutation,
975            kind,
976            passed_actors,
977            tracing_context,
978            ..
979        } = self;
980
981        PbBarrier {
982            epoch: Some(PbEpoch {
983                curr: epoch.curr,
984                prev: epoch.prev,
985            }),
986            mutation: Some(PbBarrierMutation {
987                mutation: barrier_fn(mutation),
988            }),
989            tracing_context: tracing_context.to_protobuf(),
990            kind: *kind as _,
991            passed_actors: passed_actors.clone(),
992        }
993    }
994
995    fn from_protobuf_inner(
996        prost: &PbBarrier,
997        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
998    ) -> StreamExecutorResult<Self> {
999        let epoch = prost.get_epoch()?;
1000
1001        Ok(Self {
1002            kind: prost.kind(),
1003            epoch: EpochPair::new(epoch.curr, epoch.prev),
1004            mutation: mutation_from_pb(
1005                prost
1006                    .mutation
1007                    .as_ref()
1008                    .and_then(|mutation| mutation.mutation.as_ref()),
1009            )?,
1010            passed_actors: prost.get_passed_actors().clone(),
1011            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1012        })
1013    }
1014
1015    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1016        BarrierInner {
1017            epoch: self.epoch,
1018            mutation: f(self.mutation),
1019            kind: self.kind,
1020            tracing_context: self.tracing_context,
1021            passed_actors: self.passed_actors,
1022        }
1023    }
1024}
1025
1026impl DispatcherBarrier {
1027    pub fn to_protobuf(&self) -> PbBarrier {
1028        self.to_protobuf_inner(|_| None)
1029    }
1030}
1031
1032impl Barrier {
1033    pub fn to_protobuf(&self) -> PbBarrier {
1034        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1035    }
1036
1037    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1038        Self::from_protobuf_inner(prost, |mutation| {
1039            mutation
1040                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1041                .transpose()
1042        })
1043    }
1044}
1045
1046#[derive(Debug, PartialEq, Eq, Clone)]
1047pub struct Watermark {
1048    pub col_idx: usize,
1049    pub data_type: DataType,
1050    pub val: ScalarImpl,
1051}
1052
1053impl PartialOrd for Watermark {
1054    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1055        Some(self.cmp(other))
1056    }
1057}
1058
1059impl Ord for Watermark {
1060    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1061        self.val.default_cmp(&other.val)
1062    }
1063}
1064
1065impl Watermark {
1066    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1067        Self {
1068            col_idx,
1069            data_type,
1070            val,
1071        }
1072    }
1073
1074    pub async fn transform_with_expr(
1075        self,
1076        expr: &NonStrictExpression<impl Expression>,
1077        new_col_idx: usize,
1078    ) -> Option<Self> {
1079        let Self { col_idx, val, .. } = self;
1080        let row = {
1081            let mut row = vec![None; col_idx + 1];
1082            row[col_idx] = Some(val);
1083            OwnedRow::new(row)
1084        };
1085        let val = expr.eval_row_infallible(&row).await?;
1086        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1087    }
1088
1089    /// Transform the watermark with the given output indices. If this watermark is not in the
1090    /// output, return `None`.
1091    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1092        output_indices
1093            .iter()
1094            .position(|p| *p == self.col_idx)
1095            .map(|new_col_idx| self.with_idx(new_col_idx))
1096    }
1097
1098    pub fn to_protobuf(&self) -> PbWatermark {
1099        PbWatermark {
1100            column: Some(PbInputRef {
1101                index: self.col_idx as _,
1102                r#type: Some(self.data_type.to_protobuf()),
1103            }),
1104            val: Some(&self.val).to_protobuf().into(),
1105        }
1106    }
1107
1108    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1109        let col_ref = prost.get_column()?;
1110        let data_type = DataType::from(col_ref.get_type()?);
1111        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1112            .expect("watermark value cannot be null");
1113        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1114    }
1115
1116    pub fn with_idx(self, idx: usize) -> Self {
1117        Self::new(idx, self.data_type, self.val)
1118    }
1119}
1120
1121#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1122pub enum MessageInner<M> {
1123    Chunk(StreamChunk),
1124    Barrier(BarrierInner<M>),
1125    Watermark(Watermark),
1126}
1127
1128impl<M> MessageInner<M> {
1129    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1130        match self {
1131            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1132            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1133            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1134        }
1135    }
1136}
1137
1138pub type Message = MessageInner<BarrierMutationType>;
1139pub type DispatcherMessage = MessageInner<()>;
1140
1141/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1142/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1143#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1144pub enum MessageBatchInner<M> {
1145    Chunk(StreamChunk),
1146    BarrierBatch(Vec<BarrierInner<M>>),
1147    Watermark(Watermark),
1148}
1149pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1150pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1151pub type DispatcherMessageBatch = MessageBatchInner<()>;
1152
1153impl From<DispatcherMessage> for DispatcherMessageBatch {
1154    fn from(m: DispatcherMessage) -> Self {
1155        match m {
1156            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1157            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1158            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1159        }
1160    }
1161}
1162
1163impl From<StreamChunk> for Message {
1164    fn from(chunk: StreamChunk) -> Self {
1165        Message::Chunk(chunk)
1166    }
1167}
1168
1169impl<'a> TryFrom<&'a Message> for &'a Barrier {
1170    type Error = ();
1171
1172    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1173        match m {
1174            Message::Chunk(_) => Err(()),
1175            Message::Barrier(b) => Ok(b),
1176            Message::Watermark(_) => Err(()),
1177        }
1178    }
1179}
1180
1181impl Message {
1182    /// Return true if the message is a stop barrier, meaning the stream
1183    /// will not continue, false otherwise.
1184    ///
1185    /// Note that this does not mean we will stop the current actor.
1186    #[cfg(test)]
1187    pub fn is_stop(&self) -> bool {
1188        matches!(
1189            self,
1190            Message::Barrier(Barrier {
1191                mutation,
1192                ..
1193            }) if mutation.as_ref().unwrap().is_stop()
1194        )
1195    }
1196}
1197
1198impl DispatcherMessageBatch {
1199    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1200        let prost = match self {
1201            Self::Chunk(stream_chunk) => {
1202                let prost_stream_chunk = stream_chunk.to_protobuf();
1203                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1204            }
1205            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1206                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1207            }),
1208            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1209        };
1210        PbStreamMessageBatch {
1211            stream_message_batch: Some(prost),
1212        }
1213    }
1214
1215    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1216        let res = match prost.get_stream_message_batch()? {
1217            StreamMessageBatch::StreamChunk(chunk) => {
1218                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1219            }
1220            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1221                let barriers = barrier_batch
1222                    .barriers
1223                    .iter()
1224                    .map(|barrier| {
1225                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1226                            if mutation.is_some() {
1227                                if cfg!(debug_assertions) {
1228                                    panic!("should not receive message of barrier with mutation");
1229                                } else {
1230                                    warn!(?barrier, "receive message of barrier with mutation");
1231                                }
1232                            }
1233                            Ok(())
1234                        })
1235                    })
1236                    .try_collect()?;
1237                Self::BarrierBatch(barriers)
1238            }
1239            StreamMessageBatch::Watermark(watermark) => {
1240                Self::Watermark(Watermark::from_protobuf(watermark)?)
1241            }
1242        };
1243        Ok(res)
1244    }
1245
1246    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1247        ::prost::Message::encoded_len(msg)
1248    }
1249}
1250
1251pub type PkIndices = Vec<usize>;
1252pub type PkIndicesRef<'a> = &'a [usize];
1253pub type PkDataTypes = SmallVec<[DataType; 1]>;
1254
1255/// Expect the first message of the given `stream` as a barrier.
1256pub async fn expect_first_barrier<M: Debug>(
1257    stream: &mut (impl MessageStreamInner<M> + Unpin),
1258) -> StreamExecutorResult<BarrierInner<M>> {
1259    let message = stream
1260        .next()
1261        .instrument_await("expect_first_barrier")
1262        .await
1263        .context("failed to extract the first message: stream closed unexpectedly")??;
1264    let barrier = message
1265        .into_barrier()
1266        .expect("the first message must be a barrier");
1267    // TODO: Is this check correct?
1268    assert!(matches!(
1269        barrier.kind,
1270        BarrierKind::Checkpoint | BarrierKind::Initial
1271    ));
1272    Ok(barrier)
1273}
1274
1275/// Expect the first message of the given `stream` as a barrier.
1276pub async fn expect_first_barrier_from_aligned_stream(
1277    stream: &mut (impl AlignedMessageStream + Unpin),
1278) -> StreamExecutorResult<Barrier> {
1279    let message = stream
1280        .next()
1281        .instrument_await("expect_first_barrier")
1282        .await
1283        .context("failed to extract the first message: stream closed unexpectedly")??;
1284    let barrier = message
1285        .into_barrier()
1286        .expect("the first message must be a barrier");
1287    Ok(barrier)
1288}
1289
1290/// `StreamConsumer` is the last step in an actor.
1291pub trait StreamConsumer: Send + 'static {
1292    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1293
1294    fn execute(self: Box<Self>) -> Self::BarrierStream;
1295}