risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
51use risingwave_pb::stream_plan::stream_node::PbStreamKind;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54    BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
55    DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
56    PbDispatcher, PbSinkAddColumns, PbStopMutation, PbStreamMessageBatch, PbUpdateMutation,
57    PbWatermark, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
58    SubscriptionUpstreamInfo, ThrottleMutation,
59};
60use smallvec::SmallVec;
61use tokio::sync::mpsc;
62use tokio::time::Instant;
63
64use crate::error::StreamResult;
65use crate::executor::exchange::input::{
66    BoxedActorInput, BoxedInput, apply_dispatcher_barrier, assert_equal_dispatcher_barrier,
67    new_input,
68};
69use crate::executor::prelude::StreamingMetrics;
70use crate::executor::watermark::BufferedWatermarks;
71use crate::task::{ActorId, FragmentId, LocalBarrierManager};
72
73mod actor;
74mod barrier_align;
75pub mod exchange;
76pub mod monitor;
77
78pub mod aggregate;
79pub mod asof_join;
80mod backfill;
81mod barrier_recv;
82mod batch_query;
83mod chain;
84mod changelog;
85mod dedup;
86mod dispatch;
87pub mod dml;
88mod dynamic_filter;
89pub mod eowc;
90pub mod error;
91mod expand;
92mod filter;
93pub mod hash_join;
94mod hop_window;
95mod join;
96mod lookup;
97mod lookup_union;
98mod merge;
99mod mview;
100mod nested_loop_temporal_join;
101mod no_op;
102mod now;
103mod over_window;
104pub mod project;
105mod rearranged_chain;
106mod receiver;
107pub mod row_id_gen;
108mod sink;
109pub mod source;
110mod stream_reader;
111pub mod subtask;
112mod temporal_join;
113mod top_n;
114mod troublemaker;
115mod union;
116mod upstream_sink_union;
117mod values;
118mod watermark;
119mod watermark_filter;
120mod wrapper;
121
122mod approx_percentile;
123
124mod row_merge;
125
126#[cfg(test)]
127mod integration_tests;
128mod sync_kv_log_store;
129#[cfg(any(test, feature = "test"))]
130pub mod test_utils;
131mod utils;
132mod vector_index;
133
134pub use actor::{Actor, ActorContext, ActorContextRef};
135use anyhow::Context;
136pub use approx_percentile::global::GlobalApproxPercentileExecutor;
137pub use approx_percentile::local::LocalApproxPercentileExecutor;
138pub use backfill::arrangement_backfill::*;
139pub use backfill::cdc::{
140    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
141};
142pub use backfill::no_shuffle_backfill::*;
143pub use backfill::snapshot_backfill::*;
144pub use barrier_recv::BarrierRecvExecutor;
145pub use batch_query::BatchQueryExecutor;
146pub use chain::ChainExecutor;
147pub use changelog::ChangeLogExecutor;
148pub use dedup::AppendOnlyDedupExecutor;
149pub use dispatch::{DispatchExecutor, DispatcherImpl};
150pub use dynamic_filter::DynamicFilterExecutor;
151pub use error::{StreamExecutorError, StreamExecutorResult};
152pub use expand::ExpandExecutor;
153pub use filter::{FilterExecutor, UpsertFilterExecutor};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::*;
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector_index::VectorIndexWriteExecutor;
184pub use watermark_filter::WatermarkFilterExecutor;
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197    CdcTableSnapshotSplitAssignmentWithGeneration,
198    build_actor_cdc_table_snapshot_splits_with_generation,
199    build_pb_actor_cdc_table_snapshot_splits_with_generation,
200};
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
203
204pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
205pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
206pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
207
208/// Static information of an executor.
209#[derive(Debug, Default, Clone)]
210pub struct ExecutorInfo {
211    /// The schema of the OUTPUT of the executor.
212    pub schema: Schema,
213
214    /// The primary key indices of the OUTPUT of the executor.
215    /// Schema is used by both OLAP and streaming, therefore
216    /// pk indices are maintained independently.
217    pub pk_indices: PkIndices,
218
219    /// The stream kind of the OUTPUT of the executor.
220    pub stream_kind: PbStreamKind,
221
222    /// Identity of the executor.
223    pub identity: String,
224
225    /// The executor id of the executor.
226    pub id: u64,
227}
228
229impl ExecutorInfo {
230    pub fn for_test(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
231        Self {
232            schema,
233            pk_indices,
234            stream_kind: PbStreamKind::Retract, // dummy value for test
235            identity,
236            id,
237        }
238    }
239}
240
241/// [`Execute`] describes the methods an executor should implement to handle control messages.
242pub trait Execute: Send + 'static {
243    fn execute(self: Box<Self>) -> BoxedMessageStream;
244
245    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
246        self.execute()
247    }
248
249    fn boxed(self) -> Box<dyn Execute>
250    where
251        Self: Sized + Send + 'static,
252    {
253        Box::new(self)
254    }
255}
256
257/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
258/// handle messages ([`Execute`]).
259pub struct Executor {
260    info: ExecutorInfo,
261    execute: Box<dyn Execute>,
262}
263
264impl Executor {
265    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
266        Self { info, execute }
267    }
268
269    pub fn info(&self) -> &ExecutorInfo {
270        &self.info
271    }
272
273    pub fn schema(&self) -> &Schema {
274        &self.info.schema
275    }
276
277    pub fn pk_indices(&self) -> PkIndicesRef<'_> {
278        &self.info.pk_indices
279    }
280
281    pub fn stream_kind(&self) -> PbStreamKind {
282        self.info.stream_kind
283    }
284
285    pub fn identity(&self) -> &str {
286        &self.info.identity
287    }
288
289    pub fn execute(self) -> BoxedMessageStream {
290        self.execute.execute()
291    }
292
293    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
294        self.execute.execute_with_epoch(epoch)
295    }
296}
297
298impl std::fmt::Debug for Executor {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        f.write_str(self.identity())
301    }
302}
303
304impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
305    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
306        Self::new(info, execute)
307    }
308}
309
310impl<E> From<(ExecutorInfo, E)> for Executor
311where
312    E: Execute,
313{
314    fn from((info, execute): (ExecutorInfo, E)) -> Self {
315        Self::new(info, execute.boxed())
316    }
317}
318
319pub const INVALID_EPOCH: u64 = 0;
320
321type UpstreamFragmentId = FragmentId;
322type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
323
324#[derive(Debug, Clone, PartialEq)]
325#[cfg_attr(any(test, feature = "test"), derive(Default))]
326pub struct UpdateMutation {
327    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
328    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
329    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
330    pub dropped_actors: HashSet<ActorId>,
331    pub actor_splits: SplitAssignments,
332    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
333    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
334    pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
335}
336
337#[derive(Debug, Clone, PartialEq)]
338#[cfg_attr(any(test, feature = "test"), derive(Default))]
339pub struct AddMutation {
340    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
341    pub added_actors: HashSet<ActorId>,
342    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
343    pub splits: SplitAssignments,
344    pub pause: bool,
345    /// (`upstream_mv_table_id`,  `subscriber_id`)
346    pub subscriptions_to_add: Vec<(TableId, u32)>,
347    /// nodes which should start backfill
348    pub backfill_nodes_to_pause: HashSet<FragmentId>,
349    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
350    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
351}
352
353#[derive(Debug, Clone, PartialEq)]
354#[cfg_attr(any(test, feature = "test"), derive(Default))]
355pub struct StopMutation {
356    pub dropped_actors: HashSet<ActorId>,
357    pub dropped_sink_fragments: HashSet<FragmentId>,
358}
359
360/// See [`PbMutation`] for the semantics of each mutation.
361#[derive(Debug, Clone, PartialEq)]
362pub enum Mutation {
363    Stop(StopMutation),
364    Update(UpdateMutation),
365    Add(AddMutation),
366    SourceChangeSplit(SplitAssignments),
367    Pause,
368    Resume,
369    Throttle(HashMap<ActorId, Option<u32>>),
370    AddAndUpdate(AddMutation, UpdateMutation),
371    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
372    DropSubscriptions {
373        /// `subscriber` -> `upstream_mv_table_id`
374        subscriptions_to_drop: Vec<(u32, TableId)>,
375    },
376    StartFragmentBackfill {
377        fragment_ids: HashSet<FragmentId>,
378    },
379    RefreshStart {
380        table_id: TableId,
381        associated_source_id: TableId,
382    },
383    /// This mutation is generated by the source executor
384    LoadFinish {
385        associated_source_id: TableId,
386    },
387}
388
389/// The generic type `M` is the mutation type of the barrier.
390///
391/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
392/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
393#[derive(Debug, Clone)]
394pub struct BarrierInner<M> {
395    pub epoch: EpochPair,
396    pub mutation: M,
397    pub kind: BarrierKind,
398
399    /// Tracing context for the **current** epoch of this barrier.
400    pub tracing_context: TracingContext,
401
402    /// The actors that this barrier has passed locally. Used for debugging only.
403    pub passed_actors: Vec<ActorId>,
404}
405
406pub type BarrierMutationType = Option<Arc<Mutation>>;
407pub type Barrier = BarrierInner<BarrierMutationType>;
408pub type DispatcherBarrier = BarrierInner<()>;
409
410impl<M: Default> BarrierInner<M> {
411    /// Create a plain barrier.
412    pub fn new_test_barrier(epoch: u64) -> Self {
413        Self {
414            epoch: EpochPair::new_test_epoch(epoch),
415            kind: BarrierKind::Checkpoint,
416            tracing_context: TracingContext::none(),
417            mutation: Default::default(),
418            passed_actors: Default::default(),
419        }
420    }
421
422    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
423        Self {
424            epoch: EpochPair::new(epoch, prev_epoch),
425            kind: BarrierKind::Checkpoint,
426            tracing_context: TracingContext::none(),
427            mutation: Default::default(),
428            passed_actors: Default::default(),
429        }
430    }
431}
432
433impl Barrier {
434    pub fn into_dispatcher(self) -> DispatcherBarrier {
435        DispatcherBarrier {
436            epoch: self.epoch,
437            mutation: (),
438            kind: self.kind,
439            tracing_context: self.tracing_context,
440            passed_actors: self.passed_actors,
441        }
442    }
443
444    #[must_use]
445    pub fn with_mutation(self, mutation: Mutation) -> Self {
446        Self {
447            mutation: Some(Arc::new(mutation)),
448            ..self
449        }
450    }
451
452    #[must_use]
453    pub fn with_stop(self) -> Self {
454        self.with_mutation(Mutation::Stop(StopMutation {
455            dropped_actors: Default::default(),
456            dropped_sink_fragments: Default::default(),
457        }))
458    }
459
460    /// Whether this barrier carries stop mutation.
461    pub fn is_with_stop_mutation(&self) -> bool {
462        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
463    }
464
465    /// Whether this barrier is to stop the actor with `actor_id`.
466    pub fn is_stop(&self, actor_id: ActorId) -> bool {
467        self.all_stop_actors()
468            .is_some_and(|actors| actors.contains(&actor_id))
469    }
470
471    pub fn is_checkpoint(&self) -> bool {
472        self.kind == BarrierKind::Checkpoint
473    }
474
475    /// Get the initial split assignments for the actor with `actor_id`.
476    ///
477    /// This should only be called on the initial barrier received by the executor. It must be
478    ///
479    /// - `Add` mutation when it's a new streaming job, or recovery.
480    /// - `Update` mutation when it's created for scaling.
481    /// - `AddAndUpdate` mutation when it's created for sink-into-table.
482    ///
483    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
484    /// of existing executors.
485    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
486        match self.mutation.as_deref()? {
487            Mutation::Update(UpdateMutation { actor_splits, .. })
488            | Mutation::Add(AddMutation {
489                splits: actor_splits,
490                ..
491            }) => actor_splits.get(&actor_id),
492
493            Mutation::AddAndUpdate(
494                AddMutation {
495                    splits: add_actor_splits,
496                    ..
497                },
498                UpdateMutation {
499                    actor_splits: update_actor_splits,
500                    ..
501                },
502            ) => add_actor_splits
503                .get(&actor_id)
504                // `Add` and `Update` should apply to different fragments, so we don't need to merge them.
505                .or_else(|| update_actor_splits.get(&actor_id)),
506
507            _ => {
508                if cfg!(debug_assertions) {
509                    panic!(
510                        "the initial mutation of the barrier should not be {:?}",
511                        self.mutation
512                    );
513                }
514                None
515            }
516        }
517        .map(|s| s.as_slice())
518    }
519
520    /// Get all actors that to be stopped (dropped) by this barrier.
521    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
522        match self.mutation.as_deref() {
523            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
524            Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
525            | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
526                Some(dropped_actors)
527            }
528            _ => None,
529        }
530    }
531
532    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
533    /// `Values` to decide whether to output the existing (historical) data.
534    ///
535    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
536    /// added for scaling are not included.
537    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
538        match self.mutation.as_deref() {
539            Some(Mutation::Add(AddMutation { added_actors, .. }))
540            | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
541                added_actors.contains(&actor_id)
542            }
543            _ => false,
544        }
545    }
546
547    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
548        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
549            fragment_ids.contains(&fragment_id)
550        } else {
551            false
552        }
553    }
554
555    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
556    ///
557    /// # Use case
558    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
559    /// * Pause a standalone shared `SourceExecutor`.
560    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
561    ///
562    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
563    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
564    /// needs to be turned off.
565    ///
566    /// ## Some special cases not included
567    ///
568    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
569    /// care about **number of downstream fragments** (more precisely, existence).
570    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
571    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
572    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
573        let Some(mutation) = self.mutation.as_deref() else {
574            return false;
575        };
576        match mutation {
577            // Add is for mv, index and sink creation.
578            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
579            // AddAndUpdate is for sink-into-table.
580            Mutation::AddAndUpdate(
581                AddMutation { adds, .. },
582                UpdateMutation {
583                    dispatchers,
584                    actor_new_dispatchers,
585                    ..
586                },
587            ) => {
588                adds.get(&upstream_actor_id).is_some()
589                    || actor_new_dispatchers.get(&upstream_actor_id).is_some()
590                    || dispatchers.get(&upstream_actor_id).is_some()
591            }
592            Mutation::Update(_)
593            | Mutation::Stop(_)
594            | Mutation::Pause
595            | Mutation::Resume
596            | Mutation::SourceChangeSplit(_)
597            | Mutation::Throttle(_)
598            | Mutation::DropSubscriptions { .. }
599            | Mutation::ConnectorPropsChange(_)
600            | Mutation::StartFragmentBackfill { .. }
601            | Mutation::RefreshStart { .. }
602            | Mutation::LoadFinish { .. } => false,
603        }
604    }
605
606    /// Whether this barrier requires the executor to pause its data stream on startup.
607    pub fn is_pause_on_startup(&self) -> bool {
608        match self.mutation.as_deref() {
609            Some(Mutation::Add(AddMutation { pause, .. }))
610            | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
611            _ => false,
612        }
613    }
614
615    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
616        match self.mutation.as_deref() {
617            Some(Mutation::Add(AddMutation {
618                backfill_nodes_to_pause,
619                ..
620            }))
621            | Some(Mutation::AddAndUpdate(
622                AddMutation {
623                    backfill_nodes_to_pause,
624                    ..
625                },
626                _,
627            )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
628            _ => {
629                tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
630                true
631            }
632        }
633    }
634
635    /// Whether this barrier is for resume.
636    pub fn is_resume(&self) -> bool {
637        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
638    }
639
640    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
641    /// with `actor_id`.
642    pub fn as_update_merge(
643        &self,
644        actor_id: ActorId,
645        upstream_fragment_id: UpstreamFragmentId,
646    ) -> Option<&MergeUpdate> {
647        self.mutation
648            .as_deref()
649            .and_then(|mutation| match mutation {
650                Mutation::Update(UpdateMutation { merges, .. })
651                | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
652                    merges.get(&(actor_id, upstream_fragment_id))
653                }
654                _ => None,
655            })
656    }
657
658    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
659    /// the specified downstream fragment.
660    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
661        self.mutation
662            .as_deref()
663            .and_then(|mutation| match mutation {
664                Mutation::Add(AddMutation {
665                    new_upstream_sinks, ..
666                }) => new_upstream_sinks.get(&fragment_id),
667                _ => None,
668            })
669    }
670
671    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
672    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
673        self.mutation
674            .as_deref()
675            .and_then(|mutation| match mutation {
676                Mutation::Stop(StopMutation {
677                    dropped_sink_fragments,
678                    ..
679                }) => Some(dropped_sink_fragments),
680                _ => None,
681            })
682    }
683
684    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
685    /// with `actor_id`.
686    ///
687    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
688    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
689    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
690        self.mutation
691            .as_deref()
692            .and_then(|mutation| match mutation {
693                Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
694                | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
695                    vnode_bitmaps.get(&actor_id).cloned()
696                }
697                _ => None,
698            })
699    }
700
701    pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
702        self.mutation
703            .as_deref()
704            .and_then(|mutation| match mutation {
705                Mutation::Update(UpdateMutation {
706                    sink_add_columns, ..
707                })
708                | Mutation::AddAndUpdate(
709                    _,
710                    UpdateMutation {
711                        sink_add_columns, ..
712                    },
713                ) => sink_add_columns.get(&sink_id).cloned(),
714                _ => None,
715            })
716    }
717
718    pub fn get_curr_epoch(&self) -> Epoch {
719        Epoch(self.epoch.curr)
720    }
721
722    /// Retrieve the tracing context for the **current** epoch of this barrier.
723    pub fn tracing_context(&self) -> &TracingContext {
724        &self.tracing_context
725    }
726
727    pub fn added_subscriber_on_mv_table(
728        &self,
729        mv_table_id: TableId,
730    ) -> impl Iterator<Item = u32> + '_ {
731        if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
732            self.mutation.as_deref()
733        {
734            Some(add)
735        } else {
736            None
737        }
738        .into_iter()
739        .flat_map(move |add| {
740            add.subscriptions_to_add.iter().filter_map(
741                move |(upstream_mv_table_id, subscriber_id)| {
742                    if *upstream_mv_table_id == mv_table_id {
743                        Some(*subscriber_id)
744                    } else {
745                        None
746                    }
747                },
748            )
749        })
750    }
751}
752
753impl<M: PartialEq> PartialEq for BarrierInner<M> {
754    fn eq(&self, other: &Self) -> bool {
755        self.epoch == other.epoch && self.mutation == other.mutation
756    }
757}
758
759impl Mutation {
760    /// Return true if the mutation is stop.
761    ///
762    /// Note that this does not mean we will stop the current actor.
763    #[cfg(test)]
764    pub fn is_stop(&self) -> bool {
765        matches!(self, Mutation::Stop(_))
766    }
767
768    fn to_protobuf(&self) -> PbMutation {
769        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
770            actor_splits
771                .iter()
772                .map(|(&actor_id, splits)| {
773                    (
774                        actor_id,
775                        ConnectorSplits {
776                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
777                        },
778                    )
779                })
780                .collect::<HashMap<_, _>>()
781        };
782
783        match self {
784            Mutation::Stop(StopMutation {
785                dropped_actors,
786                dropped_sink_fragments,
787            }) => PbMutation::Stop(PbStopMutation {
788                actors: dropped_actors.iter().copied().collect(),
789                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
790            }),
791            Mutation::Update(UpdateMutation {
792                dispatchers,
793                merges,
794                vnode_bitmaps,
795                dropped_actors,
796                actor_splits,
797                actor_new_dispatchers,
798                actor_cdc_table_snapshot_splits,
799                sink_add_columns,
800            }) => PbMutation::Update(PbUpdateMutation {
801                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
802                merge_update: merges.values().cloned().collect(),
803                actor_vnode_bitmap_update: vnode_bitmaps
804                    .iter()
805                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
806                    .collect(),
807                dropped_actors: dropped_actors.iter().cloned().collect(),
808                actor_splits: actor_splits_to_protobuf(actor_splits),
809                actor_new_dispatchers: actor_new_dispatchers
810                    .iter()
811                    .map(|(&actor_id, dispatchers)| {
812                        (
813                            actor_id,
814                            Dispatchers {
815                                dispatchers: dispatchers.clone(),
816                            },
817                        )
818                    })
819                    .collect(),
820                actor_cdc_table_snapshot_splits:
821                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
822                        actor_cdc_table_snapshot_splits.clone(),
823                    )
824                    .into(),
825                sink_add_columns: sink_add_columns
826                    .iter()
827                    .map(|(sink_id, add_columns)| {
828                        (
829                            sink_id.sink_id,
830                            PbSinkAddColumns {
831                                fields: add_columns.iter().map(|field| field.to_prost()).collect(),
832                            },
833                        )
834                    })
835                    .collect(),
836            }),
837            Mutation::Add(AddMutation {
838                adds,
839                added_actors,
840                splits,
841                pause,
842                subscriptions_to_add,
843                backfill_nodes_to_pause,
844                actor_cdc_table_snapshot_splits,
845                new_upstream_sinks,
846            }) => PbMutation::Add(PbAddMutation {
847                actor_dispatchers: adds
848                    .iter()
849                    .map(|(&actor_id, dispatchers)| {
850                        (
851                            actor_id,
852                            Dispatchers {
853                                dispatchers: dispatchers.clone(),
854                            },
855                        )
856                    })
857                    .collect(),
858                added_actors: added_actors.iter().copied().collect(),
859                actor_splits: actor_splits_to_protobuf(splits),
860                pause: *pause,
861                subscriptions_to_add: subscriptions_to_add
862                    .iter()
863                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
864                        subscriber_id: *subscriber_id,
865                        upstream_mv_table_id: table_id.table_id,
866                    })
867                    .collect(),
868                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
869                actor_cdc_table_snapshot_splits:
870                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
871                        actor_cdc_table_snapshot_splits.clone(),
872                    )
873                    .into(),
874                new_upstream_sinks: new_upstream_sinks
875                    .iter()
876                    .map(|(k, v)| (*k, v.clone()))
877                    .collect(),
878            }),
879            Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
880                actor_splits: changes
881                    .iter()
882                    .map(|(&actor_id, splits)| {
883                        (
884                            actor_id,
885                            ConnectorSplits {
886                                splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
887                            },
888                        )
889                    })
890                    .collect(),
891            }),
892            Mutation::Pause => PbMutation::Pause(PauseMutation {}),
893            Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
894            Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
895                actor_throttle: changes
896                    .iter()
897                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
898                    .collect(),
899            }),
900
901            Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
902                mutations: vec![
903                    BarrierMutation {
904                        mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
905                    },
906                    BarrierMutation {
907                        mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
908                    },
909                ],
910            }),
911            Mutation::DropSubscriptions {
912                subscriptions_to_drop,
913            } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
914                info: subscriptions_to_drop
915                    .iter()
916                    .map(
917                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
918                            subscriber_id: *subscriber_id,
919                            upstream_mv_table_id: upstream_mv_table_id.table_id,
920                        },
921                    )
922                    .collect(),
923            }),
924            Mutation::ConnectorPropsChange(map) => {
925                PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
926                    connector_props_infos: map
927                        .iter()
928                        .map(|(actor_id, options)| {
929                            (
930                                *actor_id,
931                                ConnectorPropsInfo {
932                                    connector_props_info: options
933                                        .iter()
934                                        .map(|(k, v)| (k.clone(), v.clone()))
935                                        .collect(),
936                                },
937                            )
938                        })
939                        .collect(),
940                })
941            }
942            Mutation::StartFragmentBackfill { fragment_ids } => {
943                PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
944                    fragment_ids: fragment_ids.iter().copied().collect(),
945                })
946            }
947            Mutation::RefreshStart {
948                table_id,
949                associated_source_id,
950            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
951                table_id: table_id.table_id,
952                associated_source_id: associated_source_id.table_id,
953            }),
954            Mutation::LoadFinish {
955                associated_source_id,
956            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
957                associated_source_id: associated_source_id.table_id,
958            }),
959        }
960    }
961
962    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
963        let mutation = match prost {
964            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
965                dropped_actors: stop.actors.iter().copied().collect(),
966                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
967            }),
968
969            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
970                dispatchers: update
971                    .dispatcher_update
972                    .iter()
973                    .map(|u| (u.actor_id, u.clone()))
974                    .into_group_map(),
975                merges: update
976                    .merge_update
977                    .iter()
978                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
979                    .collect(),
980                vnode_bitmaps: update
981                    .actor_vnode_bitmap_update
982                    .iter()
983                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
984                    .collect(),
985                dropped_actors: update.dropped_actors.iter().cloned().collect(),
986                actor_splits: update
987                    .actor_splits
988                    .iter()
989                    .map(|(&actor_id, splits)| {
990                        (
991                            actor_id,
992                            splits
993                                .splits
994                                .iter()
995                                .map(|split| split.try_into().unwrap())
996                                .collect(),
997                        )
998                    })
999                    .collect(),
1000                actor_new_dispatchers: update
1001                    .actor_new_dispatchers
1002                    .iter()
1003                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1004                    .collect(),
1005                actor_cdc_table_snapshot_splits:
1006                    build_actor_cdc_table_snapshot_splits_with_generation(
1007                        update
1008                            .actor_cdc_table_snapshot_splits
1009                            .clone()
1010                            .unwrap_or_default(),
1011                    ),
1012                sink_add_columns: update
1013                    .sink_add_columns
1014                    .iter()
1015                    .map(|(sink_id, add_columns)| {
1016                        (
1017                            SinkId::new(*sink_id),
1018                            add_columns.fields.iter().map(Field::from_prost).collect(),
1019                        )
1020                    })
1021                    .collect(),
1022            }),
1023
1024            PbMutation::Add(add) => Mutation::Add(AddMutation {
1025                adds: add
1026                    .actor_dispatchers
1027                    .iter()
1028                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1029                    .collect(),
1030                added_actors: add.added_actors.iter().copied().collect(),
1031                // TODO: remove this and use `SourceChangesSplit` after we support multiple
1032                // mutations.
1033                splits: add
1034                    .actor_splits
1035                    .iter()
1036                    .map(|(&actor_id, splits)| {
1037                        (
1038                            actor_id,
1039                            splits
1040                                .splits
1041                                .iter()
1042                                .map(|split| split.try_into().unwrap())
1043                                .collect(),
1044                        )
1045                    })
1046                    .collect(),
1047                pause: add.pause,
1048                subscriptions_to_add: add
1049                    .subscriptions_to_add
1050                    .iter()
1051                    .map(
1052                        |SubscriptionUpstreamInfo {
1053                             subscriber_id,
1054                             upstream_mv_table_id,
1055                         }| {
1056                            (TableId::new(*upstream_mv_table_id), *subscriber_id)
1057                        },
1058                    )
1059                    .collect(),
1060                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1061                actor_cdc_table_snapshot_splits:
1062                    build_actor_cdc_table_snapshot_splits_with_generation(
1063                        add.actor_cdc_table_snapshot_splits
1064                            .clone()
1065                            .unwrap_or_default(),
1066                    ),
1067                new_upstream_sinks: add
1068                    .new_upstream_sinks
1069                    .iter()
1070                    .map(|(k, v)| (*k, v.clone()))
1071                    .collect(),
1072            }),
1073
1074            PbMutation::Splits(s) => {
1075                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1076                    Vec::with_capacity(s.actor_splits.len());
1077                for (&actor_id, splits) in &s.actor_splits {
1078                    if !splits.splits.is_empty() {
1079                        change_splits.push((
1080                            actor_id,
1081                            splits
1082                                .splits
1083                                .iter()
1084                                .map(SplitImpl::try_from)
1085                                .try_collect()?,
1086                        ));
1087                    }
1088                }
1089                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1090            }
1091            PbMutation::Pause(_) => Mutation::Pause,
1092            PbMutation::Resume(_) => Mutation::Resume,
1093            PbMutation::Throttle(changes) => Mutation::Throttle(
1094                changes
1095                    .actor_throttle
1096                    .iter()
1097                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1098                    .collect(),
1099            ),
1100            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1101                subscriptions_to_drop: drop
1102                    .info
1103                    .iter()
1104                    .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
1105                    .collect(),
1106            },
1107            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1108                Mutation::ConnectorPropsChange(
1109                    alter_connector_props
1110                        .connector_props_infos
1111                        .iter()
1112                        .map(|(actor_id, options)| {
1113                            (
1114                                *actor_id,
1115                                options
1116                                    .connector_props_info
1117                                    .iter()
1118                                    .map(|(k, v)| (k.clone(), v.clone()))
1119                                    .collect(),
1120                            )
1121                        })
1122                        .collect(),
1123                )
1124            }
1125            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1126                Mutation::StartFragmentBackfill {
1127                    fragment_ids: start_fragment_backfill
1128                        .fragment_ids
1129                        .iter()
1130                        .copied()
1131                        .collect(),
1132                }
1133            }
1134            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1135                table_id: TableId::new(refresh_start.table_id),
1136                associated_source_id: TableId::new(refresh_start.associated_source_id),
1137            },
1138            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1139                associated_source_id: TableId::new(load_finish.associated_source_id),
1140            },
1141            PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
1142                [
1143                    BarrierMutation {
1144                        mutation: Some(add),
1145                    },
1146                    BarrierMutation {
1147                        mutation: Some(update),
1148                    },
1149                ] => {
1150                    let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
1151                        unreachable!();
1152                    };
1153
1154                    let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
1155                        unreachable!();
1156                    };
1157
1158                    Mutation::AddAndUpdate(add_mutation, update_mutation)
1159                }
1160
1161                _ => unreachable!(),
1162            },
1163        };
1164        Ok(mutation)
1165    }
1166}
1167
1168impl<M> BarrierInner<M> {
1169    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1170        let Self {
1171            epoch,
1172            mutation,
1173            kind,
1174            passed_actors,
1175            tracing_context,
1176            ..
1177        } = self;
1178
1179        PbBarrier {
1180            epoch: Some(PbEpoch {
1181                curr: epoch.curr,
1182                prev: epoch.prev,
1183            }),
1184            mutation: Some(PbBarrierMutation {
1185                mutation: barrier_fn(mutation),
1186            }),
1187            tracing_context: tracing_context.to_protobuf(),
1188            kind: *kind as _,
1189            passed_actors: passed_actors.clone(),
1190        }
1191    }
1192
1193    fn from_protobuf_inner(
1194        prost: &PbBarrier,
1195        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1196    ) -> StreamExecutorResult<Self> {
1197        let epoch = prost.get_epoch()?;
1198
1199        Ok(Self {
1200            kind: prost.kind(),
1201            epoch: EpochPair::new(epoch.curr, epoch.prev),
1202            mutation: mutation_from_pb(
1203                prost
1204                    .mutation
1205                    .as_ref()
1206                    .and_then(|mutation| mutation.mutation.as_ref()),
1207            )?,
1208            passed_actors: prost.get_passed_actors().clone(),
1209            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1210        })
1211    }
1212
1213    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1214        BarrierInner {
1215            epoch: self.epoch,
1216            mutation: f(self.mutation),
1217            kind: self.kind,
1218            tracing_context: self.tracing_context,
1219            passed_actors: self.passed_actors,
1220        }
1221    }
1222}
1223
1224impl DispatcherBarrier {
1225    pub fn to_protobuf(&self) -> PbBarrier {
1226        self.to_protobuf_inner(|_| None)
1227    }
1228}
1229
1230impl Barrier {
1231    pub fn to_protobuf(&self) -> PbBarrier {
1232        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1233    }
1234
1235    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1236        Self::from_protobuf_inner(prost, |mutation| {
1237            mutation
1238                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1239                .transpose()
1240        })
1241    }
1242}
1243
1244#[derive(Debug, PartialEq, Eq, Clone)]
1245pub struct Watermark {
1246    pub col_idx: usize,
1247    pub data_type: DataType,
1248    pub val: ScalarImpl,
1249}
1250
1251impl PartialOrd for Watermark {
1252    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1253        Some(self.cmp(other))
1254    }
1255}
1256
1257impl Ord for Watermark {
1258    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1259        self.val.default_cmp(&other.val)
1260    }
1261}
1262
1263impl Watermark {
1264    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1265        Self {
1266            col_idx,
1267            data_type,
1268            val,
1269        }
1270    }
1271
1272    pub async fn transform_with_expr(
1273        self,
1274        expr: &NonStrictExpression<impl Expression>,
1275        new_col_idx: usize,
1276    ) -> Option<Self> {
1277        let Self { col_idx, val, .. } = self;
1278        let row = {
1279            let mut row = vec![None; col_idx + 1];
1280            row[col_idx] = Some(val);
1281            OwnedRow::new(row)
1282        };
1283        let val = expr.eval_row_infallible(&row).await?;
1284        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1285    }
1286
1287    /// Transform the watermark with the given output indices. If this watermark is not in the
1288    /// output, return `None`.
1289    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1290        output_indices
1291            .iter()
1292            .position(|p| *p == self.col_idx)
1293            .map(|new_col_idx| self.with_idx(new_col_idx))
1294    }
1295
1296    pub fn to_protobuf(&self) -> PbWatermark {
1297        PbWatermark {
1298            column: Some(PbInputRef {
1299                index: self.col_idx as _,
1300                r#type: Some(self.data_type.to_protobuf()),
1301            }),
1302            val: Some(&self.val).to_protobuf().into(),
1303        }
1304    }
1305
1306    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1307        let col_ref = prost.get_column()?;
1308        let data_type = DataType::from(col_ref.get_type()?);
1309        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1310            .expect("watermark value cannot be null");
1311        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1312    }
1313
1314    pub fn with_idx(self, idx: usize) -> Self {
1315        Self::new(idx, self.data_type, self.val)
1316    }
1317}
1318
1319#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1320pub enum MessageInner<M> {
1321    Chunk(StreamChunk),
1322    Barrier(BarrierInner<M>),
1323    Watermark(Watermark),
1324}
1325
1326impl<M> MessageInner<M> {
1327    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1328        match self {
1329            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1330            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1331            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1332        }
1333    }
1334}
1335
1336pub type Message = MessageInner<BarrierMutationType>;
1337pub type DispatcherMessage = MessageInner<()>;
1338
1339/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1340/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1341#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1342pub enum MessageBatchInner<M> {
1343    Chunk(StreamChunk),
1344    BarrierBatch(Vec<BarrierInner<M>>),
1345    Watermark(Watermark),
1346}
1347pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1348pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1349pub type DispatcherMessageBatch = MessageBatchInner<()>;
1350
1351impl From<DispatcherMessage> for DispatcherMessageBatch {
1352    fn from(m: DispatcherMessage) -> Self {
1353        match m {
1354            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1355            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1356            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1357        }
1358    }
1359}
1360
1361impl From<StreamChunk> for Message {
1362    fn from(chunk: StreamChunk) -> Self {
1363        Message::Chunk(chunk)
1364    }
1365}
1366
1367impl<'a> TryFrom<&'a Message> for &'a Barrier {
1368    type Error = ();
1369
1370    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1371        match m {
1372            Message::Chunk(_) => Err(()),
1373            Message::Barrier(b) => Ok(b),
1374            Message::Watermark(_) => Err(()),
1375        }
1376    }
1377}
1378
1379impl Message {
1380    /// Return true if the message is a stop barrier, meaning the stream
1381    /// will not continue, false otherwise.
1382    ///
1383    /// Note that this does not mean we will stop the current actor.
1384    #[cfg(test)]
1385    pub fn is_stop(&self) -> bool {
1386        matches!(
1387            self,
1388            Message::Barrier(Barrier {
1389                mutation,
1390                ..
1391            }) if mutation.as_ref().unwrap().is_stop()
1392        )
1393    }
1394}
1395
1396impl DispatcherMessageBatch {
1397    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1398        let prost = match self {
1399            Self::Chunk(stream_chunk) => {
1400                let prost_stream_chunk = stream_chunk.to_protobuf();
1401                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1402            }
1403            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1404                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1405            }),
1406            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1407        };
1408        PbStreamMessageBatch {
1409            stream_message_batch: Some(prost),
1410        }
1411    }
1412
1413    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1414        let res = match prost.get_stream_message_batch()? {
1415            StreamMessageBatch::StreamChunk(chunk) => {
1416                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1417            }
1418            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1419                let barriers = barrier_batch
1420                    .barriers
1421                    .iter()
1422                    .map(|barrier| {
1423                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1424                            if mutation.is_some() {
1425                                if cfg!(debug_assertions) {
1426                                    panic!("should not receive message of barrier with mutation");
1427                                } else {
1428                                    warn!(?barrier, "receive message of barrier with mutation");
1429                                }
1430                            }
1431                            Ok(())
1432                        })
1433                    })
1434                    .try_collect()?;
1435                Self::BarrierBatch(barriers)
1436            }
1437            StreamMessageBatch::Watermark(watermark) => {
1438                Self::Watermark(Watermark::from_protobuf(watermark)?)
1439            }
1440        };
1441        Ok(res)
1442    }
1443
1444    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1445        ::prost::Message::encoded_len(msg)
1446    }
1447}
1448
1449pub type PkIndices = Vec<usize>;
1450pub type PkIndicesRef<'a> = &'a [usize];
1451pub type PkDataTypes = SmallVec<[DataType; 1]>;
1452
1453/// Expect the first message of the given `stream` as a barrier.
1454pub async fn expect_first_barrier<M: Debug>(
1455    stream: &mut (impl MessageStreamInner<M> + Unpin),
1456) -> StreamExecutorResult<BarrierInner<M>> {
1457    let message = stream
1458        .next()
1459        .instrument_await("expect_first_barrier")
1460        .await
1461        .context("failed to extract the first message: stream closed unexpectedly")??;
1462    let barrier = message
1463        .into_barrier()
1464        .expect("the first message must be a barrier");
1465    // TODO: Is this check correct?
1466    assert!(matches!(
1467        barrier.kind,
1468        BarrierKind::Checkpoint | BarrierKind::Initial
1469    ));
1470    Ok(barrier)
1471}
1472
1473/// Expect the first message of the given `stream` as a barrier.
1474pub async fn expect_first_barrier_from_aligned_stream(
1475    stream: &mut (impl AlignedMessageStream + Unpin),
1476) -> StreamExecutorResult<Barrier> {
1477    let message = stream
1478        .next()
1479        .instrument_await("expect_first_barrier")
1480        .await
1481        .context("failed to extract the first message: stream closed unexpectedly")??;
1482    let barrier = message
1483        .into_barrier()
1484        .expect("the first message must be a barrier");
1485    Ok(barrier)
1486}
1487
1488/// `StreamConsumer` is the last step in an actor.
1489pub trait StreamConsumer: Send + 'static {
1490    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1491
1492    fn execute(self: Box<Self>) -> Self::BarrierStream;
1493}
1494
1495type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1496
1497/// A stream for merging messages from multiple upstreams.
1498/// Can dynamically add and delete upstream streams.
1499/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1500pub struct DynamicReceivers<InputId, M> {
1501    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1502    barrier: Option<BarrierInner<M>>,
1503    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1504    start_ts: Option<Instant>,
1505    /// The upstreams that're blocked by the `barrier`.
1506    blocked: Vec<BoxedMessageInput<InputId, M>>,
1507    /// The upstreams that're not blocked and can be polled.
1508    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1509    /// watermark column index -> `BufferedWatermarks`
1510    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1511    /// Currently only used for union.
1512    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1513    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1514    merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1515}
1516
1517impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1518    for DynamicReceivers<InputId, M>
1519{
1520    type Item = MessageStreamItemInner<M>;
1521
1522    fn poll_next(
1523        mut self: Pin<&mut Self>,
1524        cx: &mut std::task::Context<'_>,
1525    ) -> Poll<Option<Self::Item>> {
1526        if self.is_empty() {
1527            return Poll::Ready(None);
1528        }
1529
1530        loop {
1531            match futures::ready!(self.active.poll_next_unpin(cx)) {
1532                // Directly forward the error.
1533                Some((Some(Err(e)), _)) => {
1534                    return Poll::Ready(Some(Err(e)));
1535                }
1536                // Handle the message from some upstream.
1537                Some((Some(Ok(message)), remaining)) => {
1538                    let input_id = remaining.id();
1539                    match message {
1540                        MessageInner::Chunk(chunk) => {
1541                            // Continue polling this upstream by pushing it back to `active`.
1542                            self.active.push(remaining.into_future());
1543                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1544                        }
1545                        MessageInner::Watermark(watermark) => {
1546                            // Continue polling this upstream by pushing it back to `active`.
1547                            self.active.push(remaining.into_future());
1548                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1549                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1550                            }
1551                        }
1552                        MessageInner::Barrier(barrier) => {
1553                            // Block this upstream by pushing it to `blocked`.
1554                            if self.blocked.is_empty() {
1555                                self.start_ts = Some(Instant::now());
1556                            }
1557                            self.blocked.push(remaining);
1558                            if let Some(current_barrier) = self.barrier.as_ref() {
1559                                if current_barrier.epoch != barrier.epoch {
1560                                    return Poll::Ready(Some(Err(
1561                                        StreamExecutorError::align_barrier(
1562                                            current_barrier.clone().map_mutation(|_| None),
1563                                            barrier.map_mutation(|_| None),
1564                                        ),
1565                                    )));
1566                                }
1567                            } else {
1568                                self.barrier = Some(barrier);
1569                            }
1570                        }
1571                    }
1572                }
1573                // We use barrier as the control message of the stream. That is, we always stop the
1574                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1575                // termination.
1576                //
1577                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1578                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1579                // So this branch will never be reached in all cases.
1580                Some((None, remaining)) => {
1581                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1582                        "upstream input {:?} unexpectedly closed",
1583                        remaining.id()
1584                    )))));
1585                }
1586                // There's no active upstreams. Process the barrier and resume the blocked ones.
1587                None => {
1588                    assert!(!self.blocked.is_empty());
1589
1590                    let start_ts = self
1591                        .start_ts
1592                        .take()
1593                        .expect("should have received at least one barrier");
1594                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1595                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1596                    }
1597                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1598                        // Observe did a few atomic operation inside, we want to avoid the overhead.
1599                        merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1600                    }
1601
1602                    break;
1603                }
1604            }
1605        }
1606
1607        assert!(self.active.is_terminated());
1608
1609        let barrier = self.barrier.take().unwrap();
1610
1611        let upstreams = std::mem::take(&mut self.blocked);
1612        self.extend_active(upstreams);
1613        assert!(!self.active.is_terminated());
1614
1615        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1616    }
1617}
1618
1619impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1620    pub fn new(
1621        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1622        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1623        merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1624    ) -> Self {
1625        let mut this = Self {
1626            barrier: None,
1627            start_ts: None,
1628            blocked: Vec::with_capacity(upstreams.len()),
1629            active: Default::default(),
1630            buffered_watermarks: Default::default(),
1631            merge_barrier_align_duration,
1632            barrier_align_duration,
1633        };
1634        this.extend_active(upstreams);
1635        this
1636    }
1637
1638    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1639    /// clean state right after a barrier.
1640    pub fn extend_active(
1641        &mut self,
1642        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1643    ) {
1644        assert!(self.blocked.is_empty() && self.barrier.is_none());
1645
1646        self.active
1647            .extend(upstreams.into_iter().map(|s| s.into_future()));
1648    }
1649
1650    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1651    pub fn handle_watermark(
1652        &mut self,
1653        input_id: InputId,
1654        watermark: Watermark,
1655    ) -> Option<Watermark> {
1656        let col_idx = watermark.col_idx;
1657        // Insert a buffer watermarks when first received from a column.
1658        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1659        let watermarks = self
1660            .buffered_watermarks
1661            .entry(col_idx)
1662            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1663        watermarks.handle_watermark(input_id, watermark)
1664    }
1665
1666    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1667    /// right after a barrier.
1668    pub fn add_upstreams_from(
1669        &mut self,
1670        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1671    ) {
1672        assert!(self.blocked.is_empty() && self.barrier.is_none());
1673
1674        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1675        let input_ids = new_inputs.iter().map(|input| input.id());
1676        self.buffered_watermarks.values_mut().for_each(|buffers| {
1677            // Add buffers to the buffered watermarks for all cols
1678            buffers.add_buffers(input_ids.clone());
1679        });
1680        self.active
1681            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1682    }
1683
1684    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1685    /// clean state right after a barrier.
1686    /// The current container does not necessarily contain all the input ids passed in.
1687    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1688        assert!(self.blocked.is_empty() && self.barrier.is_none());
1689
1690        let new_upstreams = std::mem::take(&mut self.active)
1691            .into_iter()
1692            .map(|s| s.into_inner().unwrap())
1693            .filter(|u| !upstream_input_ids.contains(&u.id()));
1694        self.extend_active(new_upstreams);
1695        self.buffered_watermarks.values_mut().for_each(|buffers| {
1696            // Call `check_heap` in case the only upstream(s) that does not have
1697            // watermark in heap is removed
1698            buffers.remove_buffer(upstream_input_ids.clone());
1699        });
1700    }
1701
1702    pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1703        self.merge_barrier_align_duration.clone()
1704    }
1705
1706    pub fn flush_buffered_watermarks(&mut self) {
1707        self.buffered_watermarks
1708            .values_mut()
1709            .for_each(|buffers| buffers.clear());
1710    }
1711
1712    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1713        self.blocked
1714            .iter()
1715            .map(|s| s.id())
1716            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1717    }
1718
1719    pub fn is_empty(&self) -> bool {
1720        self.blocked.is_empty() && self.active.is_empty()
1721    }
1722}
1723
1724// Explanation of why we need `DispatchBarrierBuffer`:
1725//
1726// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1727// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1728// for these old upstreams to completely process their barriers and align before we can safely update the
1729// `upstream-input-set`.
1730//
1731// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1732// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1733// downstream_merge_update -> old_actor_processing]
1734//
1735// To address this, we split the application of a barrier's `Mutation` into two steps:
1736// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1737//    and cache it.
1738// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1739//
1740// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1741// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1742// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1743// `barrier_rx`.
1744pub(crate) struct DispatchBarrierBuffer {
1745    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1746    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1747    recv_state: BarrierReceiverState,
1748    curr_upstream_fragment_id: FragmentId,
1749    actor_id: ActorId,
1750    // read-only context for building new inputs
1751    build_input_ctx: Arc<BuildInputContext>,
1752}
1753
1754struct BuildInputContext {
1755    pub actor_id: ActorId,
1756    pub local_barrier_manager: LocalBarrierManager,
1757    pub metrics: Arc<StreamingMetrics>,
1758    pub fragment_id: FragmentId,
1759}
1760
1761type BoxedNewInputsFuture =
1762    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1763
1764enum BarrierReceiverState {
1765    ReceivingBarrier,
1766    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1767}
1768
1769impl DispatchBarrierBuffer {
1770    pub fn new(
1771        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1772        actor_id: ActorId,
1773        curr_upstream_fragment_id: FragmentId,
1774        local_barrier_manager: LocalBarrierManager,
1775        metrics: Arc<StreamingMetrics>,
1776        fragment_id: FragmentId,
1777    ) -> Self {
1778        Self {
1779            buffer: VecDeque::new(),
1780            barrier_rx,
1781            recv_state: BarrierReceiverState::ReceivingBarrier,
1782            curr_upstream_fragment_id,
1783            actor_id,
1784            build_input_ctx: Arc::new(BuildInputContext {
1785                actor_id,
1786                local_barrier_manager,
1787                metrics,
1788                fragment_id,
1789            }),
1790        }
1791    }
1792
1793    pub async fn await_next_message(
1794        &mut self,
1795        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1796    ) -> StreamExecutorResult<DispatcherMessage> {
1797        tokio::select! {
1798            biased;
1799
1800            msg = stream.try_next() => {
1801                msg?.ok_or_else(
1802                    || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1803                )
1804            }
1805
1806            e = self.continuously_fetch_barrier_rx() => {
1807                Err(e)
1808            }
1809        }
1810    }
1811
1812    pub async fn pop_barrier_with_inputs(
1813        &mut self,
1814        barrier: DispatcherBarrier,
1815    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1816        while self.buffer.is_empty() {
1817            self.try_fetch_barrier_rx(false).await?;
1818        }
1819        let (mut recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1820        apply_dispatcher_barrier(&mut recv_barrier, barrier);
1821
1822        Ok((recv_barrier, inputs))
1823    }
1824
1825    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1826        loop {
1827            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1828                return e;
1829            }
1830        }
1831    }
1832
1833    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1834        match &mut self.recv_state {
1835            BarrierReceiverState::ReceivingBarrier => {
1836                let Some(barrier) = self.barrier_rx.recv().await else {
1837                    if pending_on_end {
1838                        return pending().await;
1839                    } else {
1840                        return Err(StreamExecutorError::channel_closed(
1841                            "barrier channel closed unexpectedly",
1842                        ));
1843                    }
1844                };
1845                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1846                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1847                } else {
1848                    self.buffer.push_back((barrier, None));
1849                }
1850            }
1851            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1852                let new_inputs = fut.await?;
1853                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1854                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1855            }
1856        }
1857        Ok(())
1858    }
1859
1860    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1861        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1862            && !update.added_upstream_actors.is_empty()
1863        {
1864            // When update upstream fragment, added_actors will not be empty.
1865            let upstream_fragment_id =
1866                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1867                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1868                    new_upstream_fragment_id
1869                } else {
1870                    self.curr_upstream_fragment_id
1871                };
1872            let ctx = self.build_input_ctx.clone();
1873            let added_upstream_actors = update.added_upstream_actors.clone();
1874            let barrier = barrier.clone();
1875            let fut = async move {
1876                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1877                    let mut new_input = new_input(
1878                        &ctx.local_barrier_manager,
1879                        ctx.metrics.clone(),
1880                        ctx.actor_id,
1881                        ctx.fragment_id,
1882                        upstream_actor,
1883                        upstream_fragment_id,
1884                    )
1885                    .await?;
1886
1887                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1888                    // original upstreams.
1889                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1890                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1891
1892                    StreamExecutorResult::Ok(new_input)
1893                }))
1894                .await
1895            }
1896            .boxed();
1897
1898            Some(fut)
1899        } else {
1900            None
1901        }
1902    }
1903}