risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{HashMap, HashSet};
18use std::fmt::Debug;
19use std::sync::Arc;
20
21use await_tree::InstrumentAwait;
22use enum_as_inner::EnumAsInner;
23use futures::stream::BoxStream;
24use futures::{Stream, StreamExt};
25use itertools::Itertools;
26use risingwave_common::array::StreamChunk;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::{Schema, TableId};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
31use risingwave_common::util::epoch::{Epoch, EpochPair};
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
34use risingwave_connector::source::SplitImpl;
35use risingwave_expr::expr::{Expression, NonStrictExpression};
36use risingwave_pb::data::PbEpoch;
37use risingwave_pb::expr::PbInputRef;
38use risingwave_pb::stream_plan::barrier::BarrierKind;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
40use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
41use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
42use risingwave_pb::stream_plan::{
43    BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
44    DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
45    PbDispatcher, PbStreamMessageBatch, PbUpdateMutation, PbWatermark, ResumeMutation,
46    SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
47    SubscriptionUpstreamInfo, ThrottleMutation,
48};
49use smallvec::SmallVec;
50
51use crate::error::StreamResult;
52use crate::task::{ActorId, FragmentId};
53
54mod actor;
55mod barrier_align;
56pub mod exchange;
57pub mod monitor;
58
59pub mod aggregate;
60pub mod asof_join;
61mod backfill;
62mod barrier_recv;
63mod batch_query;
64mod chain;
65mod changelog;
66mod dedup;
67mod dispatch;
68pub mod dml;
69mod dynamic_filter;
70pub mod eowc;
71pub mod error;
72mod expand;
73mod filter;
74pub mod hash_join;
75mod hop_window;
76mod join;
77mod lookup;
78mod lookup_union;
79mod merge;
80mod mview;
81mod nested_loop_temporal_join;
82mod no_op;
83mod now;
84mod over_window;
85pub mod project;
86mod rearranged_chain;
87mod receiver;
88pub mod row_id_gen;
89mod sink;
90pub mod source;
91mod stream_reader;
92pub mod subtask;
93mod temporal_join;
94mod top_n;
95mod troublemaker;
96mod union;
97mod values;
98mod watermark;
99mod watermark_filter;
100mod wrapper;
101
102mod approx_percentile;
103
104mod row_merge;
105
106#[cfg(test)]
107mod integration_tests;
108mod sync_kv_log_store;
109pub mod test_utils;
110mod utils;
111
112pub use actor::{Actor, ActorContext, ActorContextRef};
113use anyhow::Context;
114pub use approx_percentile::global::GlobalApproxPercentileExecutor;
115pub use approx_percentile::local::LocalApproxPercentileExecutor;
116pub use backfill::arrangement_backfill::*;
117pub use backfill::cdc::{CdcBackfillExecutor, CdcScanOptions, ExternalStorageTable};
118pub use backfill::no_shuffle_backfill::*;
119pub use backfill::snapshot_backfill::*;
120pub use barrier_recv::BarrierRecvExecutor;
121pub use batch_query::BatchQueryExecutor;
122pub use chain::ChainExecutor;
123pub use changelog::ChangeLogExecutor;
124pub use dedup::AppendOnlyDedupExecutor;
125pub use dispatch::{DispatchExecutor, DispatcherImpl};
126pub use dynamic_filter::DynamicFilterExecutor;
127pub use error::{StreamExecutorError, StreamExecutorResult};
128pub use expand::ExpandExecutor;
129pub use filter::FilterExecutor;
130pub use hash_join::*;
131pub use hop_window::HopWindowExecutor;
132pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
133pub use join::{AsOfDesc, AsOfJoinType, JoinType};
134pub use lookup::*;
135pub use lookup_union::LookupUnionExecutor;
136pub use merge::MergeExecutor;
137pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
138pub use mview::*;
139pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
140pub use no_op::NoOpExecutor;
141pub use now::*;
142pub use over_window::*;
143pub use rearranged_chain::RearrangedChainExecutor;
144pub use receiver::ReceiverExecutor;
145use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
146pub use row_merge::RowMergeExecutor;
147pub use sink::SinkExecutor;
148pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
149pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
150pub use temporal_join::TemporalJoinExecutor;
151pub use top_n::{
152    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
153};
154pub use troublemaker::TroublemakerExecutor;
155pub use union::UnionExecutor;
156pub use utils::DummyExecutor;
157pub use values::ValuesExecutor;
158pub use watermark_filter::WatermarkFilterExecutor;
159pub use wrapper::WrapperExecutor;
160
161use self::barrier_align::AlignedMessageStream;
162
163pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
164pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
165pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
166pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
167
168pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
169use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
170use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
171
172pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
173pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
174pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
175
176/// Static information of an executor.
177#[derive(Debug, Default, Clone)]
178pub struct ExecutorInfo {
179    /// The schema of the OUTPUT of the executor.
180    pub schema: Schema,
181
182    /// The primary key indices of the OUTPUT of the executor.
183    /// Schema is used by both OLAP and streaming, therefore
184    /// pk indices are maintained independently.
185    pub pk_indices: PkIndices,
186
187    /// Identity of the executor.
188    pub identity: String,
189
190    /// The executor id of the executor.
191    pub id: u64,
192}
193
194impl ExecutorInfo {
195    pub fn new(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
196        Self {
197            schema,
198            pk_indices,
199            identity,
200            id,
201        }
202    }
203}
204
205/// [`Execute`] describes the methods an executor should implement to handle control messages.
206pub trait Execute: Send + 'static {
207    fn execute(self: Box<Self>) -> BoxedMessageStream;
208
209    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
210        self.execute()
211    }
212
213    fn boxed(self) -> Box<dyn Execute>
214    where
215        Self: Sized + Send + 'static,
216    {
217        Box::new(self)
218    }
219}
220
221/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
222/// handle messages ([`Execute`]).
223pub struct Executor {
224    info: ExecutorInfo,
225    execute: Box<dyn Execute>,
226}
227
228impl Executor {
229    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
230        Self { info, execute }
231    }
232
233    pub fn info(&self) -> &ExecutorInfo {
234        &self.info
235    }
236
237    pub fn schema(&self) -> &Schema {
238        &self.info.schema
239    }
240
241    pub fn pk_indices(&self) -> PkIndicesRef<'_> {
242        &self.info.pk_indices
243    }
244
245    pub fn identity(&self) -> &str {
246        &self.info.identity
247    }
248
249    pub fn execute(self) -> BoxedMessageStream {
250        self.execute.execute()
251    }
252
253    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
254        self.execute.execute_with_epoch(epoch)
255    }
256}
257
258impl std::fmt::Debug for Executor {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        f.write_str(self.identity())
261    }
262}
263
264impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
265    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
266        Self::new(info, execute)
267    }
268}
269
270impl<E> From<(ExecutorInfo, E)> for Executor
271where
272    E: Execute,
273{
274    fn from((info, execute): (ExecutorInfo, E)) -> Self {
275        Self::new(info, execute.boxed())
276    }
277}
278
279pub const INVALID_EPOCH: u64 = 0;
280
281type UpstreamFragmentId = FragmentId;
282type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
283
284#[derive(Debug, Clone, PartialEq)]
285pub struct UpdateMutation {
286    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
287    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
288    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
289    pub dropped_actors: HashSet<ActorId>,
290    pub actor_splits: SplitAssignments,
291    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
292}
293
294#[derive(Debug, Clone, PartialEq)]
295pub struct AddMutation {
296    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
297    pub added_actors: HashSet<ActorId>,
298    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
299    pub splits: SplitAssignments,
300    pub pause: bool,
301    /// (`upstream_mv_table_id`,  `subscriber_id`)
302    pub subscriptions_to_add: Vec<(TableId, u32)>,
303    /// nodes which should start backfill
304    pub backfill_nodes_to_pause: HashSet<FragmentId>,
305}
306
307/// See [`PbMutation`] for the semantics of each mutation.
308#[derive(Debug, Clone, PartialEq)]
309pub enum Mutation {
310    Stop(HashSet<ActorId>),
311    Update(UpdateMutation),
312    Add(AddMutation),
313    SourceChangeSplit(SplitAssignments),
314    Pause,
315    Resume,
316    Throttle(HashMap<ActorId, Option<u32>>),
317    AddAndUpdate(AddMutation, UpdateMutation),
318    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
319    DropSubscriptions {
320        /// `subscriber` -> `upstream_mv_table_id`
321        subscriptions_to_drop: Vec<(u32, TableId)>,
322    },
323    StartFragmentBackfill {
324        fragment_ids: HashSet<FragmentId>,
325    },
326}
327
328/// The generic type `M` is the mutation type of the barrier.
329///
330/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
331/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
332#[derive(Debug, Clone)]
333pub struct BarrierInner<M> {
334    pub epoch: EpochPair,
335    pub mutation: M,
336    pub kind: BarrierKind,
337
338    /// Tracing context for the **current** epoch of this barrier.
339    pub tracing_context: TracingContext,
340
341    /// The actors that this barrier has passed locally. Used for debugging only.
342    pub passed_actors: Vec<ActorId>,
343}
344
345pub type BarrierMutationType = Option<Arc<Mutation>>;
346pub type Barrier = BarrierInner<BarrierMutationType>;
347pub type DispatcherBarrier = BarrierInner<()>;
348
349impl<M: Default> BarrierInner<M> {
350    /// Create a plain barrier.
351    pub fn new_test_barrier(epoch: u64) -> Self {
352        Self {
353            epoch: EpochPair::new_test_epoch(epoch),
354            kind: BarrierKind::Checkpoint,
355            tracing_context: TracingContext::none(),
356            mutation: Default::default(),
357            passed_actors: Default::default(),
358        }
359    }
360
361    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
362        Self {
363            epoch: EpochPair::new(epoch, prev_epoch),
364            kind: BarrierKind::Checkpoint,
365            tracing_context: TracingContext::none(),
366            mutation: Default::default(),
367            passed_actors: Default::default(),
368        }
369    }
370}
371
372impl Barrier {
373    pub fn into_dispatcher(self) -> DispatcherBarrier {
374        DispatcherBarrier {
375            epoch: self.epoch,
376            mutation: (),
377            kind: self.kind,
378            tracing_context: self.tracing_context,
379            passed_actors: self.passed_actors,
380        }
381    }
382
383    #[must_use]
384    pub fn with_mutation(self, mutation: Mutation) -> Self {
385        Self {
386            mutation: Some(Arc::new(mutation)),
387            ..self
388        }
389    }
390
391    #[must_use]
392    pub fn with_stop(self) -> Self {
393        self.with_mutation(Mutation::Stop(HashSet::default()))
394    }
395
396    /// Whether this barrier carries stop mutation.
397    pub fn is_with_stop_mutation(&self) -> bool {
398        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
399    }
400
401    /// Whether this barrier is to stop the actor with `actor_id`.
402    pub fn is_stop(&self, actor_id: ActorId) -> bool {
403        self.all_stop_actors()
404            .is_some_and(|actors| actors.contains(&actor_id))
405    }
406
407    pub fn is_checkpoint(&self) -> bool {
408        self.kind == BarrierKind::Checkpoint
409    }
410
411    /// Get the initial split assignments for the actor with `actor_id`.
412    ///
413    /// This should only be called on the initial barrier received by the executor. It must be
414    ///
415    /// - `Add` mutation when it's a new streaming job, or recovery.
416    /// - `Update` mutation when it's created for scaling.
417    /// - `AddAndUpdate` mutation when it's created for sink-into-table.
418    ///
419    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
420    /// of existing executors.
421    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
422        match self.mutation.as_deref()? {
423            Mutation::Update(UpdateMutation { actor_splits, .. })
424            | Mutation::Add(AddMutation {
425                splits: actor_splits,
426                ..
427            }) => actor_splits.get(&actor_id),
428
429            Mutation::AddAndUpdate(
430                AddMutation {
431                    splits: add_actor_splits,
432                    ..
433                },
434                UpdateMutation {
435                    actor_splits: update_actor_splits,
436                    ..
437                },
438            ) => add_actor_splits
439                .get(&actor_id)
440                // `Add` and `Update` should apply to different fragments, so we don't need to merge them.
441                .or_else(|| update_actor_splits.get(&actor_id)),
442
443            _ => {
444                if cfg!(debug_assertions) {
445                    panic!(
446                        "the initial mutation of the barrier should not be {:?}",
447                        self.mutation
448                    );
449                }
450                None
451            }
452        }
453        .map(|s| s.as_slice())
454    }
455
456    /// Get all actors that to be stopped (dropped) by this barrier.
457    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
458        match self.mutation.as_deref() {
459            Some(Mutation::Stop(actors)) => Some(actors),
460            Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
461            | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
462                Some(dropped_actors)
463            }
464            _ => None,
465        }
466    }
467
468    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
469    /// `Values` to decide whether to output the existing (historical) data.
470    ///
471    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
472    /// added for scaling are not included.
473    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
474        match self.mutation.as_deref() {
475            Some(Mutation::Add(AddMutation { added_actors, .. }))
476            | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
477                added_actors.contains(&actor_id)
478            }
479            _ => false,
480        }
481    }
482
483    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
484        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
485            fragment_ids.contains(&fragment_id)
486        } else {
487            false
488        }
489    }
490
491    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
492    ///
493    /// # Use case
494    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
495    /// * Pause a standalone shared `SourceExecutor`.
496    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
497    ///
498    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
499    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
500    /// needs to be turned off.
501    ///
502    /// ## Some special cases not included
503    ///
504    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
505    /// care about **number of downstream fragments** (more precisely, existence).
506    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
507    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
508    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
509        let Some(mutation) = self.mutation.as_deref() else {
510            return false;
511        };
512        match mutation {
513            // Add is for mv, index and sink creation.
514            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
515            // AddAndUpdate is for sink-into-table.
516            Mutation::AddAndUpdate(
517                AddMutation { adds, .. },
518                UpdateMutation {
519                    dispatchers,
520                    actor_new_dispatchers,
521                    ..
522                },
523            ) => {
524                adds.get(&upstream_actor_id).is_some()
525                    || actor_new_dispatchers.get(&upstream_actor_id).is_some()
526                    || dispatchers.get(&upstream_actor_id).is_some()
527            }
528            Mutation::Update(_)
529            | Mutation::Stop(_)
530            | Mutation::Pause
531            | Mutation::Resume
532            | Mutation::SourceChangeSplit(_)
533            | Mutation::Throttle(_)
534            | Mutation::DropSubscriptions { .. }
535            | Mutation::ConnectorPropsChange(_)
536            | Mutation::StartFragmentBackfill { .. } => false,
537        }
538    }
539
540    /// Whether this barrier requires the executor to pause its data stream on startup.
541    pub fn is_pause_on_startup(&self) -> bool {
542        match self.mutation.as_deref() {
543            Some(Mutation::Add(AddMutation { pause, .. }))
544            | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
545            _ => false,
546        }
547    }
548
549    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
550        match self.mutation.as_deref() {
551            Some(Mutation::Add(AddMutation {
552                backfill_nodes_to_pause,
553                ..
554            }))
555            | Some(Mutation::AddAndUpdate(
556                AddMutation {
557                    backfill_nodes_to_pause,
558                    ..
559                },
560                _,
561            )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
562            _ => {
563                tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
564                true
565            }
566        }
567    }
568
569    /// Whether this barrier is for resume.
570    pub fn is_resume(&self) -> bool {
571        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
572    }
573
574    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
575    /// with `actor_id`.
576    pub fn as_update_merge(
577        &self,
578        actor_id: ActorId,
579        upstream_fragment_id: UpstreamFragmentId,
580    ) -> Option<&MergeUpdate> {
581        self.mutation
582            .as_deref()
583            .and_then(|mutation| match mutation {
584                Mutation::Update(UpdateMutation { merges, .. })
585                | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
586                    merges.get(&(actor_id, upstream_fragment_id))
587                }
588
589                _ => None,
590            })
591    }
592
593    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
594    /// with `actor_id`.
595    ///
596    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
597    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
598    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
599        self.mutation
600            .as_deref()
601            .and_then(|mutation| match mutation {
602                Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
603                | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
604                    vnode_bitmaps.get(&actor_id).cloned()
605                }
606                _ => None,
607            })
608    }
609
610    pub fn get_curr_epoch(&self) -> Epoch {
611        Epoch(self.epoch.curr)
612    }
613
614    /// Retrieve the tracing context for the **current** epoch of this barrier.
615    pub fn tracing_context(&self) -> &TracingContext {
616        &self.tracing_context
617    }
618
619    pub fn added_subscriber_on_mv_table(
620        &self,
621        mv_table_id: TableId,
622    ) -> impl Iterator<Item = u32> + '_ {
623        if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
624            self.mutation.as_deref()
625        {
626            Some(add)
627        } else {
628            None
629        }
630        .into_iter()
631        .flat_map(move |add| {
632            add.subscriptions_to_add.iter().filter_map(
633                move |(upstream_mv_table_id, subscriber_id)| {
634                    if *upstream_mv_table_id == mv_table_id {
635                        Some(*subscriber_id)
636                    } else {
637                        None
638                    }
639                },
640            )
641        })
642    }
643}
644
645impl<M: PartialEq> PartialEq for BarrierInner<M> {
646    fn eq(&self, other: &Self) -> bool {
647        self.epoch == other.epoch && self.mutation == other.mutation
648    }
649}
650
651impl Mutation {
652    /// Return true if the mutation is stop.
653    ///
654    /// Note that this does not mean we will stop the current actor.
655    #[cfg(test)]
656    pub fn is_stop(&self) -> bool {
657        matches!(self, Mutation::Stop(_))
658    }
659
660    fn to_protobuf(&self) -> PbMutation {
661        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
662            actor_splits
663                .iter()
664                .map(|(&actor_id, splits)| {
665                    (
666                        actor_id,
667                        ConnectorSplits {
668                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
669                        },
670                    )
671                })
672                .collect::<HashMap<_, _>>()
673        };
674
675        match self {
676            Mutation::Stop(actors) => PbMutation::Stop(StopMutation {
677                actors: actors.iter().copied().collect::<Vec<_>>(),
678            }),
679            Mutation::Update(UpdateMutation {
680                dispatchers,
681                merges,
682                vnode_bitmaps,
683                dropped_actors,
684                actor_splits,
685                actor_new_dispatchers,
686            }) => PbMutation::Update(PbUpdateMutation {
687                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
688                merge_update: merges.values().cloned().collect(),
689                actor_vnode_bitmap_update: vnode_bitmaps
690                    .iter()
691                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
692                    .collect(),
693                dropped_actors: dropped_actors.iter().cloned().collect(),
694                actor_splits: actor_splits_to_protobuf(actor_splits),
695                actor_new_dispatchers: actor_new_dispatchers
696                    .iter()
697                    .map(|(&actor_id, dispatchers)| {
698                        (
699                            actor_id,
700                            Dispatchers {
701                                dispatchers: dispatchers.clone(),
702                            },
703                        )
704                    })
705                    .collect(),
706            }),
707            Mutation::Add(AddMutation {
708                adds,
709                added_actors,
710                splits,
711                pause,
712                subscriptions_to_add,
713                backfill_nodes_to_pause,
714            }) => PbMutation::Add(PbAddMutation {
715                actor_dispatchers: adds
716                    .iter()
717                    .map(|(&actor_id, dispatchers)| {
718                        (
719                            actor_id,
720                            Dispatchers {
721                                dispatchers: dispatchers.clone(),
722                            },
723                        )
724                    })
725                    .collect(),
726                added_actors: added_actors.iter().copied().collect(),
727                actor_splits: actor_splits_to_protobuf(splits),
728                pause: *pause,
729                subscriptions_to_add: subscriptions_to_add
730                    .iter()
731                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
732                        subscriber_id: *subscriber_id,
733                        upstream_mv_table_id: table_id.table_id,
734                    })
735                    .collect(),
736                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
737            }),
738            Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
739                actor_splits: changes
740                    .iter()
741                    .map(|(&actor_id, splits)| {
742                        (
743                            actor_id,
744                            ConnectorSplits {
745                                splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
746                            },
747                        )
748                    })
749                    .collect(),
750            }),
751            Mutation::Pause => PbMutation::Pause(PauseMutation {}),
752            Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
753            Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
754                actor_throttle: changes
755                    .iter()
756                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
757                    .collect(),
758            }),
759
760            Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
761                mutations: vec![
762                    BarrierMutation {
763                        mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
764                    },
765                    BarrierMutation {
766                        mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
767                    },
768                ],
769            }),
770            Mutation::DropSubscriptions {
771                subscriptions_to_drop,
772            } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
773                info: subscriptions_to_drop
774                    .iter()
775                    .map(
776                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
777                            subscriber_id: *subscriber_id,
778                            upstream_mv_table_id: upstream_mv_table_id.table_id,
779                        },
780                    )
781                    .collect(),
782            }),
783            Mutation::ConnectorPropsChange(map) => {
784                PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
785                    connector_props_infos: map
786                        .iter()
787                        .map(|(actor_id, options)| {
788                            (
789                                *actor_id,
790                                ConnectorPropsInfo {
791                                    connector_props_info: options
792                                        .iter()
793                                        .map(|(k, v)| (k.clone(), v.clone()))
794                                        .collect(),
795                                },
796                            )
797                        })
798                        .collect(),
799                })
800            }
801            Mutation::StartFragmentBackfill { fragment_ids } => {
802                PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
803                    fragment_ids: fragment_ids.iter().copied().collect(),
804                })
805            }
806        }
807    }
808
809    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
810        let mutation = match prost {
811            PbMutation::Stop(stop) => {
812                Mutation::Stop(HashSet::from_iter(stop.actors.iter().cloned()))
813            }
814
815            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
816                dispatchers: update
817                    .dispatcher_update
818                    .iter()
819                    .map(|u| (u.actor_id, u.clone()))
820                    .into_group_map(),
821                merges: update
822                    .merge_update
823                    .iter()
824                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
825                    .collect(),
826                vnode_bitmaps: update
827                    .actor_vnode_bitmap_update
828                    .iter()
829                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
830                    .collect(),
831                dropped_actors: update.dropped_actors.iter().cloned().collect(),
832                actor_splits: update
833                    .actor_splits
834                    .iter()
835                    .map(|(&actor_id, splits)| {
836                        (
837                            actor_id,
838                            splits
839                                .splits
840                                .iter()
841                                .map(|split| split.try_into().unwrap())
842                                .collect(),
843                        )
844                    })
845                    .collect(),
846                actor_new_dispatchers: update
847                    .actor_new_dispatchers
848                    .iter()
849                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
850                    .collect(),
851            }),
852
853            PbMutation::Add(add) => Mutation::Add(AddMutation {
854                adds: add
855                    .actor_dispatchers
856                    .iter()
857                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
858                    .collect(),
859                added_actors: add.added_actors.iter().copied().collect(),
860                // TODO: remove this and use `SourceChangesSplit` after we support multiple
861                // mutations.
862                splits: add
863                    .actor_splits
864                    .iter()
865                    .map(|(&actor_id, splits)| {
866                        (
867                            actor_id,
868                            splits
869                                .splits
870                                .iter()
871                                .map(|split| split.try_into().unwrap())
872                                .collect(),
873                        )
874                    })
875                    .collect(),
876                pause: add.pause,
877                subscriptions_to_add: add
878                    .subscriptions_to_add
879                    .iter()
880                    .map(
881                        |SubscriptionUpstreamInfo {
882                             subscriber_id,
883                             upstream_mv_table_id,
884                         }| {
885                            (TableId::new(*upstream_mv_table_id), *subscriber_id)
886                        },
887                    )
888                    .collect(),
889                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
890            }),
891
892            PbMutation::Splits(s) => {
893                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
894                    Vec::with_capacity(s.actor_splits.len());
895                for (&actor_id, splits) in &s.actor_splits {
896                    if !splits.splits.is_empty() {
897                        change_splits.push((
898                            actor_id,
899                            splits
900                                .splits
901                                .iter()
902                                .map(SplitImpl::try_from)
903                                .try_collect()?,
904                        ));
905                    }
906                }
907                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
908            }
909            PbMutation::Pause(_) => Mutation::Pause,
910            PbMutation::Resume(_) => Mutation::Resume,
911            PbMutation::Throttle(changes) => Mutation::Throttle(
912                changes
913                    .actor_throttle
914                    .iter()
915                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
916                    .collect(),
917            ),
918            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
919                subscriptions_to_drop: drop
920                    .info
921                    .iter()
922                    .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
923                    .collect(),
924            },
925            PbMutation::ConnectorPropsChange(alter_connector_props) => {
926                Mutation::ConnectorPropsChange(
927                    alter_connector_props
928                        .connector_props_infos
929                        .iter()
930                        .map(|(actor_id, options)| {
931                            (
932                                *actor_id,
933                                options
934                                    .connector_props_info
935                                    .iter()
936                                    .map(|(k, v)| (k.clone(), v.clone()))
937                                    .collect(),
938                            )
939                        })
940                        .collect(),
941                )
942            }
943            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
944                Mutation::StartFragmentBackfill {
945                    fragment_ids: start_fragment_backfill
946                        .fragment_ids
947                        .iter()
948                        .copied()
949                        .collect(),
950                }
951            }
952            PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
953                [
954                    BarrierMutation {
955                        mutation: Some(add),
956                    },
957                    BarrierMutation {
958                        mutation: Some(update),
959                    },
960                ] => {
961                    let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
962                        unreachable!();
963                    };
964
965                    let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
966                        unreachable!();
967                    };
968
969                    Mutation::AddAndUpdate(add_mutation, update_mutation)
970                }
971
972                _ => unreachable!(),
973            },
974        };
975        Ok(mutation)
976    }
977}
978
979impl<M> BarrierInner<M> {
980    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
981        let Self {
982            epoch,
983            mutation,
984            kind,
985            passed_actors,
986            tracing_context,
987            ..
988        } = self;
989
990        PbBarrier {
991            epoch: Some(PbEpoch {
992                curr: epoch.curr,
993                prev: epoch.prev,
994            }),
995            mutation: Some(PbBarrierMutation {
996                mutation: barrier_fn(mutation),
997            }),
998            tracing_context: tracing_context.to_protobuf(),
999            kind: *kind as _,
1000            passed_actors: passed_actors.clone(),
1001        }
1002    }
1003
1004    fn from_protobuf_inner(
1005        prost: &PbBarrier,
1006        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1007    ) -> StreamExecutorResult<Self> {
1008        let epoch = prost.get_epoch()?;
1009
1010        Ok(Self {
1011            kind: prost.kind(),
1012            epoch: EpochPair::new(epoch.curr, epoch.prev),
1013            mutation: mutation_from_pb(
1014                prost
1015                    .mutation
1016                    .as_ref()
1017                    .and_then(|mutation| mutation.mutation.as_ref()),
1018            )?,
1019            passed_actors: prost.get_passed_actors().clone(),
1020            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1021        })
1022    }
1023
1024    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1025        BarrierInner {
1026            epoch: self.epoch,
1027            mutation: f(self.mutation),
1028            kind: self.kind,
1029            tracing_context: self.tracing_context,
1030            passed_actors: self.passed_actors,
1031        }
1032    }
1033}
1034
1035impl DispatcherBarrier {
1036    pub fn to_protobuf(&self) -> PbBarrier {
1037        self.to_protobuf_inner(|_| None)
1038    }
1039}
1040
1041impl Barrier {
1042    pub fn to_protobuf(&self) -> PbBarrier {
1043        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1044    }
1045
1046    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1047        Self::from_protobuf_inner(prost, |mutation| {
1048            mutation
1049                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1050                .transpose()
1051        })
1052    }
1053}
1054
1055#[derive(Debug, PartialEq, Eq, Clone)]
1056pub struct Watermark {
1057    pub col_idx: usize,
1058    pub data_type: DataType,
1059    pub val: ScalarImpl,
1060}
1061
1062impl PartialOrd for Watermark {
1063    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1064        Some(self.cmp(other))
1065    }
1066}
1067
1068impl Ord for Watermark {
1069    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1070        self.val.default_cmp(&other.val)
1071    }
1072}
1073
1074impl Watermark {
1075    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1076        Self {
1077            col_idx,
1078            data_type,
1079            val,
1080        }
1081    }
1082
1083    pub async fn transform_with_expr(
1084        self,
1085        expr: &NonStrictExpression<impl Expression>,
1086        new_col_idx: usize,
1087    ) -> Option<Self> {
1088        let Self { col_idx, val, .. } = self;
1089        let row = {
1090            let mut row = vec![None; col_idx + 1];
1091            row[col_idx] = Some(val);
1092            OwnedRow::new(row)
1093        };
1094        let val = expr.eval_row_infallible(&row).await?;
1095        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1096    }
1097
1098    /// Transform the watermark with the given output indices. If this watermark is not in the
1099    /// output, return `None`.
1100    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1101        output_indices
1102            .iter()
1103            .position(|p| *p == self.col_idx)
1104            .map(|new_col_idx| self.with_idx(new_col_idx))
1105    }
1106
1107    pub fn to_protobuf(&self) -> PbWatermark {
1108        PbWatermark {
1109            column: Some(PbInputRef {
1110                index: self.col_idx as _,
1111                r#type: Some(self.data_type.to_protobuf()),
1112            }),
1113            val: Some(&self.val).to_protobuf().into(),
1114        }
1115    }
1116
1117    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1118        let col_ref = prost.get_column()?;
1119        let data_type = DataType::from(col_ref.get_type()?);
1120        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1121            .expect("watermark value cannot be null");
1122        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1123    }
1124
1125    pub fn with_idx(self, idx: usize) -> Self {
1126        Self::new(idx, self.data_type, self.val)
1127    }
1128}
1129
1130#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1131pub enum MessageInner<M> {
1132    Chunk(StreamChunk),
1133    Barrier(BarrierInner<M>),
1134    Watermark(Watermark),
1135}
1136
1137impl<M> MessageInner<M> {
1138    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1139        match self {
1140            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1141            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1142            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1143        }
1144    }
1145}
1146
1147pub type Message = MessageInner<BarrierMutationType>;
1148pub type DispatcherMessage = MessageInner<()>;
1149
1150/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1151/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1152#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1153pub enum MessageBatchInner<M> {
1154    Chunk(StreamChunk),
1155    BarrierBatch(Vec<BarrierInner<M>>),
1156    Watermark(Watermark),
1157}
1158pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1159pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1160pub type DispatcherMessageBatch = MessageBatchInner<()>;
1161
1162impl From<DispatcherMessage> for DispatcherMessageBatch {
1163    fn from(m: DispatcherMessage) -> Self {
1164        match m {
1165            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1166            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1167            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1168        }
1169    }
1170}
1171
1172impl From<StreamChunk> for Message {
1173    fn from(chunk: StreamChunk) -> Self {
1174        Message::Chunk(chunk)
1175    }
1176}
1177
1178impl<'a> TryFrom<&'a Message> for &'a Barrier {
1179    type Error = ();
1180
1181    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1182        match m {
1183            Message::Chunk(_) => Err(()),
1184            Message::Barrier(b) => Ok(b),
1185            Message::Watermark(_) => Err(()),
1186        }
1187    }
1188}
1189
1190impl Message {
1191    /// Return true if the message is a stop barrier, meaning the stream
1192    /// will not continue, false otherwise.
1193    ///
1194    /// Note that this does not mean we will stop the current actor.
1195    #[cfg(test)]
1196    pub fn is_stop(&self) -> bool {
1197        matches!(
1198            self,
1199            Message::Barrier(Barrier {
1200                mutation,
1201                ..
1202            }) if mutation.as_ref().unwrap().is_stop()
1203        )
1204    }
1205}
1206
1207impl DispatcherMessageBatch {
1208    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1209        let prost = match self {
1210            Self::Chunk(stream_chunk) => {
1211                let prost_stream_chunk = stream_chunk.to_protobuf();
1212                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1213            }
1214            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1215                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1216            }),
1217            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1218        };
1219        PbStreamMessageBatch {
1220            stream_message_batch: Some(prost),
1221        }
1222    }
1223
1224    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1225        let res = match prost.get_stream_message_batch()? {
1226            StreamMessageBatch::StreamChunk(chunk) => {
1227                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1228            }
1229            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1230                let barriers = barrier_batch
1231                    .barriers
1232                    .iter()
1233                    .map(|barrier| {
1234                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1235                            if mutation.is_some() {
1236                                if cfg!(debug_assertions) {
1237                                    panic!("should not receive message of barrier with mutation");
1238                                } else {
1239                                    warn!(?barrier, "receive message of barrier with mutation");
1240                                }
1241                            }
1242                            Ok(())
1243                        })
1244                    })
1245                    .try_collect()?;
1246                Self::BarrierBatch(barriers)
1247            }
1248            StreamMessageBatch::Watermark(watermark) => {
1249                Self::Watermark(Watermark::from_protobuf(watermark)?)
1250            }
1251        };
1252        Ok(res)
1253    }
1254
1255    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1256        ::prost::Message::encoded_len(msg)
1257    }
1258}
1259
1260pub type PkIndices = Vec<usize>;
1261pub type PkIndicesRef<'a> = &'a [usize];
1262pub type PkDataTypes = SmallVec<[DataType; 1]>;
1263
1264/// Expect the first message of the given `stream` as a barrier.
1265pub async fn expect_first_barrier<M: Debug>(
1266    stream: &mut (impl MessageStreamInner<M> + Unpin),
1267) -> StreamExecutorResult<BarrierInner<M>> {
1268    let message = stream
1269        .next()
1270        .instrument_await("expect_first_barrier")
1271        .await
1272        .context("failed to extract the first message: stream closed unexpectedly")??;
1273    let barrier = message
1274        .into_barrier()
1275        .expect("the first message must be a barrier");
1276    // TODO: Is this check correct?
1277    assert!(matches!(
1278        barrier.kind,
1279        BarrierKind::Checkpoint | BarrierKind::Initial
1280    ));
1281    Ok(barrier)
1282}
1283
1284/// Expect the first message of the given `stream` as a barrier.
1285pub async fn expect_first_barrier_from_aligned_stream(
1286    stream: &mut (impl AlignedMessageStream + Unpin),
1287) -> StreamExecutorResult<Barrier> {
1288    let message = stream
1289        .next()
1290        .instrument_await("expect_first_barrier")
1291        .await
1292        .context("failed to extract the first message: stream closed unexpectedly")??;
1293    let barrier = message
1294        .into_barrier()
1295        .expect("the first message must be a barrier");
1296    Ok(barrier)
1297}
1298
1299/// `StreamConsumer` is the last step in an actor.
1300pub trait StreamConsumer: Send + 'static {
1301    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1302
1303    fn execute(self: Box<Self>) -> Self::BarrierStream;
1304}