risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{HashMap, HashSet};
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use await_tree::InstrumentAwait;
22use enum_as_inner::EnumAsInner;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{Schema, TableId};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
31use risingwave_common::util::epoch::{Epoch, EpochPair};
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
34use risingwave_connector::source::SplitImpl;
35use risingwave_expr::expr::{Expression, NonStrictExpression};
36use risingwave_pb::data::PbEpoch;
37use risingwave_pb::expr::PbInputRef;
38use risingwave_pb::stream_plan::barrier::BarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
40use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
41use risingwave_pb::stream_plan::{
42    BarrierMutation, CombinedMutation, Dispatchers, DropSubscriptionsMutation, PauseMutation,
43    PbAddMutation, PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch,
44    PbUpdateMutation, PbWatermark, ResumeMutation, SourceChangeSplitMutation, StopMutation,
45    SubscriptionUpstreamInfo, ThrottleMutation,
46};
47use smallvec::SmallVec;
48
49use crate::error::StreamResult;
50use crate::task::{ActorId, FragmentId};
51
52mod actor;
53mod barrier_align;
54pub mod exchange;
55pub mod monitor;
56
57pub mod aggregate;
58pub mod asof_join;
59mod backfill;
60mod barrier_recv;
61mod batch_query;
62mod chain;
63mod changelog;
64mod dedup;
65mod dispatch;
66pub mod dml;
67mod dynamic_filter;
68pub mod eowc;
69pub mod error;
70mod expand;
71mod filter;
72pub mod hash_join;
73mod hop_window;
74mod join;
75mod lookup;
76mod lookup_union;
77mod merge;
78mod mview;
79mod nested_loop_temporal_join;
80mod no_op;
81mod now;
82mod over_window;
83pub mod project;
84mod rearranged_chain;
85mod receiver;
86pub mod row_id_gen;
87mod sink;
88pub mod source;
89mod stream_reader;
90pub mod subtask;
91mod temporal_join;
92mod top_n;
93mod troublemaker;
94mod union;
95mod values;
96mod watermark;
97mod watermark_filter;
98mod wrapper;
99
100mod approx_percentile;
101
102mod row_merge;
103
104#[cfg(test)]
105mod integration_tests;
106mod sync_kv_log_store;
107pub mod test_utils;
108mod utils;
109
110pub use actor::{Actor, ActorContext, ActorContextRef};
111use anyhow::Context;
112pub use approx_percentile::global::GlobalApproxPercentileExecutor;
113pub use approx_percentile::local::LocalApproxPercentileExecutor;
114pub use backfill::arrangement_backfill::*;
115pub use backfill::cdc::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable};
116pub use backfill::no_shuffle_backfill::*;
117pub use backfill::snapshot_backfill::*;
118pub use barrier_recv::BarrierRecvExecutor;
119pub use batch_query::BatchQueryExecutor;
120pub use chain::ChainExecutor;
121pub use changelog::ChangeLogExecutor;
122pub use dedup::AppendOnlyDedupExecutor;
123pub use dispatch::{DispatchExecutor, DispatcherImpl};
124pub use dynamic_filter::DynamicFilterExecutor;
125pub use error::{StreamExecutorError, StreamExecutorResult};
126pub use expand::ExpandExecutor;
127pub use filter::FilterExecutor;
128pub use hash_join::*;
129pub use hop_window::HopWindowExecutor;
130pub use join::{AsOfDesc, AsOfJoinType, JoinType};
131pub use lookup::*;
132pub use lookup_union::LookupUnionExecutor;
133pub use merge::MergeExecutor;
134pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
135pub use mview::*;
136pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
137pub use no_op::NoOpExecutor;
138pub use now::*;
139pub use over_window::*;
140pub use rearranged_chain::RearrangedChainExecutor;
141pub use receiver::ReceiverExecutor;
142use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
143pub use row_merge::RowMergeExecutor;
144pub use sink::SinkExecutor;
145pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
146pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
147pub use temporal_join::TemporalJoinExecutor;
148pub use top_n::{
149    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
150};
151pub use troublemaker::TroublemakerExecutor;
152pub use union::UnionExecutor;
153pub use utils::DummyExecutor;
154pub use values::ValuesExecutor;
155pub use watermark_filter::WatermarkFilterExecutor;
156pub use wrapper::WrapperExecutor;
157
158use self::barrier_align::AlignedMessageStream;
159
160pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
161pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
162pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
163pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
164
165pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
166use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
167use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
168
169pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
170pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
171pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
172
173/// Static information of an executor.
174#[derive(Debug, Default, Clone)]
175pub struct ExecutorInfo {
176    /// The schema of the OUTPUT of the executor.
177    pub schema: Schema,
178
179    /// The primary key indices of the OUTPUT of the executor.
180    /// Schema is used by both OLAP and streaming, therefore
181    /// pk indices are maintained independently.
182    pub pk_indices: PkIndices,
183
184    /// Identity of the executor.
185    pub identity: String,
186
187    /// The executor id of the executor.
188    pub id: u64,
189}
190
191impl ExecutorInfo {
192    pub fn new(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
193        Self {
194            schema,
195            pk_indices,
196            identity,
197            id,
198        }
199    }
200}
201
202/// [`Execute`] describes the methods an executor should implement to handle control messages.
203pub trait Execute: Send + 'static {
204    fn execute(self: Box<Self>) -> BoxedMessageStream;
205
206    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
207        self.execute()
208    }
209
210    fn boxed(self) -> Box<dyn Execute>
211    where
212        Self: Sized + Send + 'static,
213    {
214        Box::new(self)
215    }
216}
217
218/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
219/// handle messages ([`Execute`]).
220pub struct Executor {
221    info: ExecutorInfo,
222    execute: Box<dyn Execute>,
223}
224
225impl Executor {
226    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
227        Self { info, execute }
228    }
229
230    pub fn info(&self) -> &ExecutorInfo {
231        &self.info
232    }
233
234    pub fn schema(&self) -> &Schema {
235        &self.info.schema
236    }
237
238    pub fn pk_indices(&self) -> PkIndicesRef<'_> {
239        &self.info.pk_indices
240    }
241
242    pub fn identity(&self) -> &str {
243        &self.info.identity
244    }
245
246    pub fn execute(self) -> BoxedMessageStream {
247        self.execute.execute()
248    }
249
250    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
251        self.execute.execute_with_epoch(epoch)
252    }
253}
254
255impl std::fmt::Debug for Executor {
256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257        f.write_str(self.identity())
258    }
259}
260
261impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
262    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
263        Self::new(info, execute)
264    }
265}
266
267impl<E> From<(ExecutorInfo, E)> for Executor
268where
269    E: Execute,
270{
271    fn from((info, execute): (ExecutorInfo, E)) -> Self {
272        Self::new(info, execute.boxed())
273    }
274}
275
276pub const INVALID_EPOCH: u64 = 0;
277
278type UpstreamFragmentId = FragmentId;
279type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
280
281#[derive(Debug, Clone, PartialEq)]
282pub struct UpdateMutation {
283    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
284    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
285    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
286    pub dropped_actors: HashSet<ActorId>,
287    pub actor_splits: SplitAssignments,
288    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
289}
290
291#[derive(Debug, Clone, PartialEq)]
292pub struct AddMutation {
293    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
294    pub added_actors: HashSet<ActorId>,
295    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
296    pub splits: SplitAssignments,
297    pub pause: bool,
298    /// (`upstream_mv_table_id`,  `subscriber_id`)
299    pub subscriptions_to_add: Vec<(TableId, u32)>,
300}
301
302/// See [`PbMutation`] for the semantics of each mutation.
303#[derive(Debug, Clone, PartialEq)]
304pub enum Mutation {
305    Stop(HashSet<ActorId>),
306    Update(UpdateMutation),
307    Add(AddMutation),
308    SourceChangeSplit(SplitAssignments),
309    Pause,
310    Resume,
311    Throttle(HashMap<ActorId, Option<u32>>),
312    AddAndUpdate(AddMutation, UpdateMutation),
313    DropSubscriptions {
314        /// `subscriber` -> `upstream_mv_table_id`
315        subscriptions_to_drop: Vec<(u32, TableId)>,
316    },
317}
318
319/// The generic type `M` is the mutation type of the barrier.
320///
321/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
322/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
323#[derive(Debug, Clone)]
324pub struct BarrierInner<M> {
325    pub epoch: EpochPair,
326    pub mutation: M,
327    pub kind: BarrierKind,
328
329    /// Tracing context for the **current** epoch of this barrier.
330    pub tracing_context: TracingContext,
331
332    /// The actors that this barrier has passed locally. Used for debugging only.
333    pub passed_actors: Vec<ActorId>,
334}
335
336pub type BarrierMutationType = Option<Arc<Mutation>>;
337pub type Barrier = BarrierInner<BarrierMutationType>;
338pub type DispatcherBarrier = BarrierInner<()>;
339
340impl<M: Default> BarrierInner<M> {
341    /// Create a plain barrier.
342    pub fn new_test_barrier(epoch: u64) -> Self {
343        Self {
344            epoch: EpochPair::new_test_epoch(epoch),
345            kind: BarrierKind::Checkpoint,
346            tracing_context: TracingContext::none(),
347            mutation: Default::default(),
348            passed_actors: Default::default(),
349        }
350    }
351
352    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
353        Self {
354            epoch: EpochPair::new(epoch, prev_epoch),
355            kind: BarrierKind::Checkpoint,
356            tracing_context: TracingContext::none(),
357            mutation: Default::default(),
358            passed_actors: Default::default(),
359        }
360    }
361}
362
363impl Barrier {
364    pub fn into_dispatcher(self) -> DispatcherBarrier {
365        DispatcherBarrier {
366            epoch: self.epoch,
367            mutation: (),
368            kind: self.kind,
369            tracing_context: self.tracing_context,
370            passed_actors: self.passed_actors,
371        }
372    }
373
374    #[must_use]
375    pub fn with_mutation(self, mutation: Mutation) -> Self {
376        Self {
377            mutation: Some(Arc::new(mutation)),
378            ..self
379        }
380    }
381
382    #[must_use]
383    pub fn with_stop(self) -> Self {
384        self.with_mutation(Mutation::Stop(HashSet::default()))
385    }
386
387    /// Whether this barrier carries stop mutation.
388    pub fn is_with_stop_mutation(&self) -> bool {
389        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
390    }
391
392    /// Whether this barrier is to stop the actor with `actor_id`.
393    pub fn is_stop(&self, actor_id: ActorId) -> bool {
394        self.all_stop_actors()
395            .is_some_and(|actors| actors.contains(&actor_id))
396    }
397
398    pub fn is_checkpoint(&self) -> bool {
399        self.kind == BarrierKind::Checkpoint
400    }
401
402    /// Get the initial split assignments for the actor with `actor_id`.
403    ///
404    /// This should only be called on the initial barrier received by the executor. It must be
405    ///
406    /// - `Add` mutation when it's a new streaming job, or recovery.
407    /// - `Update` mutation when it's created for scaling.
408    /// - `AddAndUpdate` mutation when it's created for sink-into-table.
409    ///
410    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
411    /// of existing executors.
412    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
413        match self.mutation.as_deref()? {
414            Mutation::Update(UpdateMutation { actor_splits, .. })
415            | Mutation::Add(AddMutation {
416                splits: actor_splits,
417                ..
418            }) => actor_splits.get(&actor_id),
419
420            Mutation::AddAndUpdate(
421                AddMutation {
422                    splits: add_actor_splits,
423                    ..
424                },
425                UpdateMutation {
426                    actor_splits: update_actor_splits,
427                    ..
428                },
429            ) => add_actor_splits
430                .get(&actor_id)
431                // `Add` and `Update` should apply to different fragments, so we don't need to merge them.
432                .or_else(|| update_actor_splits.get(&actor_id)),
433
434            _ => {
435                if cfg!(debug_assertions) {
436                    panic!(
437                        "the initial mutation of the barrier should not be {:?}",
438                        self.mutation
439                    );
440                }
441                None
442            }
443        }
444        .map(|s| s.as_slice())
445    }
446
447    /// Get all actors that to be stopped (dropped) by this barrier.
448    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
449        match self.mutation.as_deref() {
450            Some(Mutation::Stop(actors)) => Some(actors),
451            Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
452            | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
453                Some(dropped_actors)
454            }
455            _ => None,
456        }
457    }
458
459    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
460    /// `Values` to decide whether to output the existing (historical) data.
461    ///
462    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
463    /// added for scaling are not included.
464    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
465        match self.mutation.as_deref() {
466            Some(Mutation::Add(AddMutation { added_actors, .. }))
467            | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
468                added_actors.contains(&actor_id)
469            }
470            _ => false,
471        }
472    }
473
474    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
475    ///
476    /// # Use case
477    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
478    /// * Pause a standalone shared `SourceExecutor`.
479    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
480    ///
481    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
482    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
483    /// needs to be turned off.
484    ///
485    /// ## Some special cases not included
486    ///
487    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
488    /// care about **number of downstream fragments** (more precisely, existence).
489    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
490    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
491    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
492        let Some(mutation) = self.mutation.as_deref() else {
493            return false;
494        };
495        match mutation {
496            // Add is for mv, index and sink creation.
497            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
498            // AddAndUpdate is for sink-into-table.
499            Mutation::AddAndUpdate(
500                AddMutation { adds, .. },
501                UpdateMutation {
502                    dispatchers,
503                    actor_new_dispatchers,
504                    ..
505                },
506            ) => {
507                adds.get(&upstream_actor_id).is_some()
508                    || actor_new_dispatchers.get(&upstream_actor_id).is_some()
509                    || dispatchers.get(&upstream_actor_id).is_some()
510            }
511            Mutation::Update(_)
512            | Mutation::Stop(_)
513            | Mutation::Pause
514            | Mutation::Resume
515            | Mutation::SourceChangeSplit(_)
516            | Mutation::Throttle(_)
517            | Mutation::DropSubscriptions { .. } => false,
518        }
519    }
520
521    /// Whether this barrier requires the executor to pause its data stream on startup.
522    pub fn is_pause_on_startup(&self) -> bool {
523        match self.mutation.as_deref() {
524            Some(Mutation::Add(AddMutation { pause, .. }))
525            | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
526            _ => false,
527        }
528    }
529
530    /// Whether this barrier is for resume.
531    pub fn is_resume(&self) -> bool {
532        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
533    }
534
535    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
536    /// with `actor_id`.
537    pub fn as_update_merge(
538        &self,
539        actor_id: ActorId,
540        upstream_fragment_id: UpstreamFragmentId,
541    ) -> Option<&MergeUpdate> {
542        self.mutation
543            .as_deref()
544            .and_then(|mutation| match mutation {
545                Mutation::Update(UpdateMutation { merges, .. })
546                | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
547                    merges.get(&(actor_id, upstream_fragment_id))
548                }
549
550                _ => None,
551            })
552    }
553
554    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
555    /// with `actor_id`.
556    ///
557    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
558    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
559    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
560        self.mutation
561            .as_deref()
562            .and_then(|mutation| match mutation {
563                Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
564                | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
565                    vnode_bitmaps.get(&actor_id).cloned()
566                }
567                _ => None,
568            })
569    }
570
571    pub fn get_curr_epoch(&self) -> Epoch {
572        Epoch(self.epoch.curr)
573    }
574
575    /// Retrieve the tracing context for the **current** epoch of this barrier.
576    pub fn tracing_context(&self) -> &TracingContext {
577        &self.tracing_context
578    }
579
580    pub fn added_subscriber_on_mv_table(
581        &self,
582        mv_table_id: TableId,
583    ) -> impl Iterator<Item = u32> + '_ {
584        if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
585            self.mutation.as_deref()
586        {
587            Some(add)
588        } else {
589            None
590        }
591        .into_iter()
592        .flat_map(move |add| {
593            add.subscriptions_to_add.iter().filter_map(
594                move |(upstream_mv_table_id, subscriber_id)| {
595                    if *upstream_mv_table_id == mv_table_id {
596                        Some(*subscriber_id)
597                    } else {
598                        None
599                    }
600                },
601            )
602        })
603    }
604}
605
606impl<M: PartialEq> PartialEq for BarrierInner<M> {
607    fn eq(&self, other: &Self) -> bool {
608        self.epoch == other.epoch && self.mutation == other.mutation
609    }
610}
611
612impl Mutation {
613    /// Return true if the mutation is stop.
614    ///
615    /// Note that this does not mean we will stop the current actor.
616    #[cfg(test)]
617    pub fn is_stop(&self) -> bool {
618        matches!(self, Mutation::Stop(_))
619    }
620
621    fn to_protobuf(&self) -> PbMutation {
622        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
623            actor_splits
624                .iter()
625                .map(|(&actor_id, splits)| {
626                    (
627                        actor_id,
628                        ConnectorSplits {
629                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
630                        },
631                    )
632                })
633                .collect::<HashMap<_, _>>()
634        };
635
636        match self {
637            Mutation::Stop(actors) => PbMutation::Stop(StopMutation {
638                actors: actors.iter().copied().collect::<Vec<_>>(),
639            }),
640            Mutation::Update(UpdateMutation {
641                dispatchers,
642                merges,
643                vnode_bitmaps,
644                dropped_actors,
645                actor_splits,
646                actor_new_dispatchers,
647            }) => PbMutation::Update(PbUpdateMutation {
648                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
649                merge_update: merges.values().cloned().collect(),
650                actor_vnode_bitmap_update: vnode_bitmaps
651                    .iter()
652                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
653                    .collect(),
654                dropped_actors: dropped_actors.iter().cloned().collect(),
655                actor_splits: actor_splits_to_protobuf(actor_splits),
656                actor_new_dispatchers: actor_new_dispatchers
657                    .iter()
658                    .map(|(&actor_id, dispatchers)| {
659                        (
660                            actor_id,
661                            Dispatchers {
662                                dispatchers: dispatchers.clone(),
663                            },
664                        )
665                    })
666                    .collect(),
667            }),
668            Mutation::Add(AddMutation {
669                adds,
670                added_actors,
671                splits,
672                pause,
673                subscriptions_to_add,
674            }) => PbMutation::Add(PbAddMutation {
675                actor_dispatchers: adds
676                    .iter()
677                    .map(|(&actor_id, dispatchers)| {
678                        (
679                            actor_id,
680                            Dispatchers {
681                                dispatchers: dispatchers.clone(),
682                            },
683                        )
684                    })
685                    .collect(),
686                added_actors: added_actors.iter().copied().collect(),
687                actor_splits: actor_splits_to_protobuf(splits),
688                pause: *pause,
689                subscriptions_to_add: subscriptions_to_add
690                    .iter()
691                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
692                        subscriber_id: *subscriber_id,
693                        upstream_mv_table_id: table_id.table_id,
694                    })
695                    .collect(),
696            }),
697            Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
698                actor_splits: changes
699                    .iter()
700                    .map(|(&actor_id, splits)| {
701                        (
702                            actor_id,
703                            ConnectorSplits {
704                                splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
705                            },
706                        )
707                    })
708                    .collect(),
709            }),
710            Mutation::Pause => PbMutation::Pause(PauseMutation {}),
711            Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
712            Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
713                actor_throttle: changes
714                    .iter()
715                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
716                    .collect(),
717            }),
718
719            Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
720                mutations: vec![
721                    BarrierMutation {
722                        mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
723                    },
724                    BarrierMutation {
725                        mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
726                    },
727                ],
728            }),
729            Mutation::DropSubscriptions {
730                subscriptions_to_drop,
731            } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
732                info: subscriptions_to_drop
733                    .iter()
734                    .map(
735                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
736                            subscriber_id: *subscriber_id,
737                            upstream_mv_table_id: upstream_mv_table_id.table_id,
738                        },
739                    )
740                    .collect(),
741            }),
742        }
743    }
744
745    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
746        let mutation = match prost {
747            PbMutation::Stop(stop) => {
748                Mutation::Stop(HashSet::from_iter(stop.actors.iter().cloned()))
749            }
750
751            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
752                dispatchers: update
753                    .dispatcher_update
754                    .iter()
755                    .map(|u| (u.actor_id, u.clone()))
756                    .into_group_map(),
757                merges: update
758                    .merge_update
759                    .iter()
760                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
761                    .collect(),
762                vnode_bitmaps: update
763                    .actor_vnode_bitmap_update
764                    .iter()
765                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
766                    .collect(),
767                dropped_actors: update.dropped_actors.iter().cloned().collect(),
768                actor_splits: update
769                    .actor_splits
770                    .iter()
771                    .map(|(&actor_id, splits)| {
772                        (
773                            actor_id,
774                            splits
775                                .splits
776                                .iter()
777                                .map(|split| split.try_into().unwrap())
778                                .collect(),
779                        )
780                    })
781                    .collect(),
782                actor_new_dispatchers: update
783                    .actor_new_dispatchers
784                    .iter()
785                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
786                    .collect(),
787            }),
788
789            PbMutation::Add(add) => Mutation::Add(AddMutation {
790                adds: add
791                    .actor_dispatchers
792                    .iter()
793                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
794                    .collect(),
795                added_actors: add.added_actors.iter().copied().collect(),
796                // TODO: remove this and use `SourceChangesSplit` after we support multiple
797                // mutations.
798                splits: add
799                    .actor_splits
800                    .iter()
801                    .map(|(&actor_id, splits)| {
802                        (
803                            actor_id,
804                            splits
805                                .splits
806                                .iter()
807                                .map(|split| split.try_into().unwrap())
808                                .collect(),
809                        )
810                    })
811                    .collect(),
812                pause: add.pause,
813                subscriptions_to_add: add
814                    .subscriptions_to_add
815                    .iter()
816                    .map(
817                        |SubscriptionUpstreamInfo {
818                             subscriber_id,
819                             upstream_mv_table_id,
820                         }| {
821                            (TableId::new(*upstream_mv_table_id), *subscriber_id)
822                        },
823                    )
824                    .collect(),
825            }),
826
827            PbMutation::Splits(s) => {
828                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
829                    Vec::with_capacity(s.actor_splits.len());
830                for (&actor_id, splits) in &s.actor_splits {
831                    if !splits.splits.is_empty() {
832                        change_splits.push((
833                            actor_id,
834                            splits
835                                .splits
836                                .iter()
837                                .map(SplitImpl::try_from)
838                                .try_collect()?,
839                        ));
840                    }
841                }
842                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
843            }
844            PbMutation::Pause(_) => Mutation::Pause,
845            PbMutation::Resume(_) => Mutation::Resume,
846            PbMutation::Throttle(changes) => Mutation::Throttle(
847                changes
848                    .actor_throttle
849                    .iter()
850                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
851                    .collect(),
852            ),
853            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
854                subscriptions_to_drop: drop
855                    .info
856                    .iter()
857                    .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
858                    .collect(),
859            },
860            PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
861                [
862                    BarrierMutation {
863                        mutation: Some(add),
864                    },
865                    BarrierMutation {
866                        mutation: Some(update),
867                    },
868                ] => {
869                    let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
870                        unreachable!();
871                    };
872
873                    let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
874                        unreachable!();
875                    };
876
877                    Mutation::AddAndUpdate(add_mutation, update_mutation)
878                }
879
880                _ => unreachable!(),
881            },
882        };
883        Ok(mutation)
884    }
885}
886
887impl<M> BarrierInner<M> {
888    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
889        let Self {
890            epoch,
891            mutation,
892            kind,
893            passed_actors,
894            tracing_context,
895            ..
896        } = self;
897
898        PbBarrier {
899            epoch: Some(PbEpoch {
900                curr: epoch.curr,
901                prev: epoch.prev,
902            }),
903            mutation: Some(PbBarrierMutation {
904                mutation: barrier_fn(mutation),
905            }),
906            tracing_context: tracing_context.to_protobuf(),
907            kind: *kind as _,
908            passed_actors: passed_actors.clone(),
909        }
910    }
911
912    fn from_protobuf_inner(
913        prost: &PbBarrier,
914        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
915    ) -> StreamExecutorResult<Self> {
916        let epoch = prost.get_epoch()?;
917
918        Ok(Self {
919            kind: prost.kind(),
920            epoch: EpochPair::new(epoch.curr, epoch.prev),
921            mutation: mutation_from_pb(
922                prost
923                    .mutation
924                    .as_ref()
925                    .and_then(|mutation| mutation.mutation.as_ref()),
926            )?,
927            passed_actors: prost.get_passed_actors().clone(),
928            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
929        })
930    }
931
932    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
933        BarrierInner {
934            epoch: self.epoch,
935            mutation: f(self.mutation),
936            kind: self.kind,
937            tracing_context: self.tracing_context,
938            passed_actors: self.passed_actors,
939        }
940    }
941}
942
943impl DispatcherBarrier {
944    pub fn to_protobuf(&self) -> PbBarrier {
945        self.to_protobuf_inner(|_| None)
946    }
947}
948
949impl Barrier {
950    pub fn to_protobuf(&self) -> PbBarrier {
951        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
952    }
953
954    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
955        Self::from_protobuf_inner(prost, |mutation| {
956            mutation
957                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
958                .transpose()
959        })
960    }
961}
962
963#[derive(Debug, PartialEq, Eq, Clone)]
964pub struct Watermark {
965    pub col_idx: usize,
966    pub data_type: DataType,
967    pub val: ScalarImpl,
968}
969
970impl PartialOrd for Watermark {
971    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
972        Some(self.cmp(other))
973    }
974}
975
976impl Ord for Watermark {
977    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
978        self.val.default_cmp(&other.val)
979    }
980}
981
982impl Watermark {
983    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
984        Self {
985            col_idx,
986            data_type,
987            val,
988        }
989    }
990
991    pub async fn transform_with_expr(
992        self,
993        expr: &NonStrictExpression<impl Expression>,
994        new_col_idx: usize,
995    ) -> Option<Self> {
996        let Self { col_idx, val, .. } = self;
997        let row = {
998            let mut row = vec![None; col_idx + 1];
999            row[col_idx] = Some(val);
1000            OwnedRow::new(row)
1001        };
1002        let val = expr.eval_row_infallible(&row).await?;
1003        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1004    }
1005
1006    /// Transform the watermark with the given output indices. If this watermark is not in the
1007    /// output, return `None`.
1008    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1009        output_indices
1010            .iter()
1011            .position(|p| *p == self.col_idx)
1012            .map(|new_col_idx| self.with_idx(new_col_idx))
1013    }
1014
1015    pub fn to_protobuf(&self) -> PbWatermark {
1016        PbWatermark {
1017            column: Some(PbInputRef {
1018                index: self.col_idx as _,
1019                r#type: Some(self.data_type.to_protobuf()),
1020            }),
1021            val: Some(&self.val).to_protobuf().into(),
1022        }
1023    }
1024
1025    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1026        let col_ref = prost.get_column()?;
1027        let data_type = DataType::from(col_ref.get_type()?);
1028        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1029            .expect("watermark value cannot be null");
1030        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1031    }
1032
1033    pub fn with_idx(self, idx: usize) -> Self {
1034        Self::new(idx, self.data_type, self.val)
1035    }
1036}
1037
1038#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1039pub enum MessageInner<M> {
1040    Chunk(StreamChunk),
1041    Barrier(BarrierInner<M>),
1042    Watermark(Watermark),
1043}
1044
1045impl<M> MessageInner<M> {
1046    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1047        match self {
1048            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1049            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1050            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1051        }
1052    }
1053}
1054
1055pub type Message = MessageInner<BarrierMutationType>;
1056pub type DispatcherMessage = MessageInner<()>;
1057
1058/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1059/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1060#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1061pub enum MessageBatchInner<M> {
1062    Chunk(StreamChunk),
1063    BarrierBatch(Vec<BarrierInner<M>>),
1064    Watermark(Watermark),
1065}
1066pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1067pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1068pub type DispatcherMessageBatch = MessageBatchInner<()>;
1069
1070impl From<DispatcherMessage> for DispatcherMessageBatch {
1071    fn from(m: DispatcherMessage) -> Self {
1072        match m {
1073            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1074            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1075            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1076        }
1077    }
1078}
1079
1080impl From<StreamChunk> for Message {
1081    fn from(chunk: StreamChunk) -> Self {
1082        Message::Chunk(chunk)
1083    }
1084}
1085
1086impl<'a> TryFrom<&'a Message> for &'a Barrier {
1087    type Error = ();
1088
1089    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1090        match m {
1091            Message::Chunk(_) => Err(()),
1092            Message::Barrier(b) => Ok(b),
1093            Message::Watermark(_) => Err(()),
1094        }
1095    }
1096}
1097
1098impl Message {
1099    /// Return true if the message is a stop barrier, meaning the stream
1100    /// will not continue, false otherwise.
1101    ///
1102    /// Note that this does not mean we will stop the current actor.
1103    #[cfg(test)]
1104    pub fn is_stop(&self) -> bool {
1105        matches!(
1106            self,
1107            Message::Barrier(Barrier {
1108                mutation,
1109                ..
1110            }) if mutation.as_ref().unwrap().is_stop()
1111        )
1112    }
1113}
1114
1115impl DispatcherMessageBatch {
1116    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1117        let prost = match self {
1118            Self::Chunk(stream_chunk) => {
1119                let prost_stream_chunk = stream_chunk.to_protobuf();
1120                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1121            }
1122            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1123                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1124            }),
1125            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1126        };
1127        PbStreamMessageBatch {
1128            stream_message_batch: Some(prost),
1129        }
1130    }
1131
1132    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1133        let res = match prost.get_stream_message_batch()? {
1134            StreamMessageBatch::StreamChunk(chunk) => {
1135                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1136            }
1137            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1138                let barriers = barrier_batch
1139                    .barriers
1140                    .iter()
1141                    .map(|barrier| {
1142                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1143                            if mutation.is_some() {
1144                                if cfg!(debug_assertions) {
1145                                    panic!("should not receive message of barrier with mutation");
1146                                } else {
1147                                    warn!(?barrier, "receive message of barrier with mutation");
1148                                }
1149                            }
1150                            Ok(())
1151                        })
1152                    })
1153                    .try_collect()?;
1154                Self::BarrierBatch(barriers)
1155            }
1156            StreamMessageBatch::Watermark(watermark) => {
1157                Self::Watermark(Watermark::from_protobuf(watermark)?)
1158            }
1159        };
1160        Ok(res)
1161    }
1162
1163    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1164        ::prost::Message::encoded_len(msg)
1165    }
1166}
1167
1168pub type PkIndices = Vec<usize>;
1169pub type PkIndicesRef<'a> = &'a [usize];
1170pub type PkDataTypes = SmallVec<[DataType; 1]>;
1171
1172/// Expect the first message of the given `stream` as a barrier.
1173pub async fn expect_first_barrier<M: Debug>(
1174    stream: &mut (impl MessageStreamInner<M> + Unpin),
1175) -> StreamExecutorResult<BarrierInner<M>> {
1176    let message = stream
1177        .next()
1178        .instrument_await("expect_first_barrier")
1179        .await
1180        .context("failed to extract the first message: stream closed unexpectedly")??;
1181    let barrier = message
1182        .into_barrier()
1183        .expect("the first message must be a barrier");
1184    // TODO: Is this check correct?
1185    assert!(matches!(
1186        barrier.kind,
1187        BarrierKind::Checkpoint | BarrierKind::Initial
1188    ));
1189    Ok(barrier)
1190}
1191
1192/// Expect the first message of the given `stream` as a barrier.
1193pub async fn expect_first_barrier_from_aligned_stream(
1194    stream: &mut (impl AlignedMessageStream + Unpin),
1195) -> StreamExecutorResult<Barrier> {
1196    let message = stream
1197        .next()
1198        .instrument_await("expect_first_barrier")
1199        .await
1200        .context("failed to extract the first message: stream closed unexpectedly")??;
1201    let barrier = message
1202        .into_barrier()
1203        .expect("the first message must be a barrier");
1204    Ok(barrier)
1205}
1206
1207/// `StreamConsumer` is the last step in an actor.
1208pub trait StreamConsumer: Send + 'static {
1209    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1210
1211    fn execute(self: Box<Self>) -> Self::BarrierStream;
1212}