risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
51use risingwave_pb::stream_plan::stream_node::PbStreamKind;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54    BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
55    DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
56    PbDispatcher, PbSinkAddColumns, PbStopMutation, PbStreamMessageBatch, PbUpdateMutation,
57    PbWatermark, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
58    SubscriptionUpstreamInfo, ThrottleMutation,
59};
60use smallvec::SmallVec;
61use tokio::sync::mpsc;
62use tokio::time::Instant;
63
64use crate::error::StreamResult;
65use crate::executor::exchange::input::{
66    BoxedActorInput, BoxedInput, apply_dispatcher_barrier, assert_equal_dispatcher_barrier,
67    new_input,
68};
69use crate::executor::prelude::StreamingMetrics;
70use crate::executor::watermark::BufferedWatermarks;
71use crate::task::{ActorId, FragmentId, LocalBarrierManager};
72
73mod actor;
74mod barrier_align;
75pub mod exchange;
76pub mod monitor;
77
78pub mod aggregate;
79pub mod asof_join;
80mod backfill;
81mod barrier_recv;
82mod batch_query;
83mod chain;
84mod changelog;
85mod dedup;
86mod dispatch;
87pub mod dml;
88mod dynamic_filter;
89pub mod eowc;
90pub mod error;
91mod expand;
92mod filter;
93pub mod hash_join;
94mod hop_window;
95mod join;
96pub mod locality_provider;
97mod lookup;
98mod lookup_union;
99mod merge;
100mod mview;
101mod nested_loop_temporal_join;
102mod no_op;
103mod now;
104mod over_window;
105pub mod project;
106mod rearranged_chain;
107mod receiver;
108pub mod row_id_gen;
109mod sink;
110pub mod source;
111mod stream_reader;
112pub mod subtask;
113mod temporal_join;
114mod top_n;
115mod troublemaker;
116mod union;
117mod upstream_sink_union;
118mod values;
119mod watermark;
120mod watermark_filter;
121mod wrapper;
122
123mod approx_percentile;
124
125mod row_merge;
126
127#[cfg(test)]
128mod integration_tests;
129mod sync_kv_log_store;
130#[cfg(any(test, feature = "test"))]
131pub mod test_utils;
132mod utils;
133mod vector_index;
134
135pub use actor::{Actor, ActorContext, ActorContextRef};
136use anyhow::Context;
137pub use approx_percentile::global::GlobalApproxPercentileExecutor;
138pub use approx_percentile::local::LocalApproxPercentileExecutor;
139pub use backfill::arrangement_backfill::*;
140pub use backfill::cdc::{
141    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
142};
143pub use backfill::no_shuffle_backfill::*;
144pub use backfill::snapshot_backfill::*;
145pub use barrier_recv::BarrierRecvExecutor;
146pub use batch_query::BatchQueryExecutor;
147pub use chain::ChainExecutor;
148pub use changelog::ChangeLogExecutor;
149pub use dedup::AppendOnlyDedupExecutor;
150pub use dispatch::{DispatchExecutor, DispatcherImpl};
151pub use dynamic_filter::DynamicFilterExecutor;
152pub use error::{StreamExecutorError, StreamExecutorResult};
153pub use expand::ExpandExecutor;
154pub use filter::{FilterExecutor, UpsertFilterExecutor};
155pub use hash_join::*;
156pub use hop_window::HopWindowExecutor;
157pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
158pub use join::{AsOfDesc, AsOfJoinType, JoinType};
159pub use lookup::*;
160pub use lookup_union::LookupUnionExecutor;
161pub use merge::MergeExecutor;
162pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
163pub use mview::*;
164pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
165pub use no_op::NoOpExecutor;
166pub use now::*;
167pub use over_window::*;
168pub use rearranged_chain::RearrangedChainExecutor;
169pub use receiver::ReceiverExecutor;
170use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
171pub use row_merge::RowMergeExecutor;
172pub use sink::SinkExecutor;
173pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
174pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
175pub use temporal_join::TemporalJoinExecutor;
176pub use top_n::{
177    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
178};
179pub use troublemaker::TroublemakerExecutor;
180pub use union::UnionExecutor;
181pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
182pub use utils::DummyExecutor;
183pub use values::ValuesExecutor;
184pub use vector_index::VectorIndexWriteExecutor;
185pub use watermark_filter::WatermarkFilterExecutor;
186pub use wrapper::WrapperExecutor;
187
188use self::barrier_align::AlignedMessageStream;
189
190pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
191pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
192pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
193pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
194
195pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
196use risingwave_connector::sink::catalog::SinkId;
197use risingwave_connector::source::cdc::{
198    CdcTableSnapshotSplitAssignmentWithGeneration,
199    build_actor_cdc_table_snapshot_splits_with_generation,
200    build_pb_actor_cdc_table_snapshot_splits_with_generation,
201};
202use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
203use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
204
205pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
206pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
207pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
208
209/// Static information of an executor.
210#[derive(Debug, Default, Clone)]
211pub struct ExecutorInfo {
212    /// The schema of the OUTPUT of the executor.
213    pub schema: Schema,
214
215    /// The primary key indices of the OUTPUT of the executor.
216    /// Schema is used by both OLAP and streaming, therefore
217    /// pk indices are maintained independently.
218    pub pk_indices: PkIndices,
219
220    /// The stream kind of the OUTPUT of the executor.
221    pub stream_kind: PbStreamKind,
222
223    /// Identity of the executor.
224    pub identity: String,
225
226    /// The executor id of the executor.
227    pub id: u64,
228}
229
230impl ExecutorInfo {
231    pub fn for_test(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
232        Self {
233            schema,
234            pk_indices,
235            stream_kind: PbStreamKind::Retract, // dummy value for test
236            identity,
237            id,
238        }
239    }
240}
241
242/// [`Execute`] describes the methods an executor should implement to handle control messages.
243pub trait Execute: Send + 'static {
244    fn execute(self: Box<Self>) -> BoxedMessageStream;
245
246    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
247        self.execute()
248    }
249
250    fn boxed(self) -> Box<dyn Execute>
251    where
252        Self: Sized + Send + 'static,
253    {
254        Box::new(self)
255    }
256}
257
258/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
259/// handle messages ([`Execute`]).
260pub struct Executor {
261    info: ExecutorInfo,
262    execute: Box<dyn Execute>,
263}
264
265impl Executor {
266    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
267        Self { info, execute }
268    }
269
270    pub fn info(&self) -> &ExecutorInfo {
271        &self.info
272    }
273
274    pub fn schema(&self) -> &Schema {
275        &self.info.schema
276    }
277
278    pub fn pk_indices(&self) -> PkIndicesRef<'_> {
279        &self.info.pk_indices
280    }
281
282    pub fn stream_kind(&self) -> PbStreamKind {
283        self.info.stream_kind
284    }
285
286    pub fn identity(&self) -> &str {
287        &self.info.identity
288    }
289
290    pub fn execute(self) -> BoxedMessageStream {
291        self.execute.execute()
292    }
293
294    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
295        self.execute.execute_with_epoch(epoch)
296    }
297}
298
299impl std::fmt::Debug for Executor {
300    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301        f.write_str(self.identity())
302    }
303}
304
305impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
306    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
307        Self::new(info, execute)
308    }
309}
310
311impl<E> From<(ExecutorInfo, E)> for Executor
312where
313    E: Execute,
314{
315    fn from((info, execute): (ExecutorInfo, E)) -> Self {
316        Self::new(info, execute.boxed())
317    }
318}
319
320pub const INVALID_EPOCH: u64 = 0;
321
322type UpstreamFragmentId = FragmentId;
323type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
324
325#[derive(Debug, Clone, PartialEq)]
326#[cfg_attr(any(test, feature = "test"), derive(Default))]
327pub struct UpdateMutation {
328    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
329    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
330    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
331    pub dropped_actors: HashSet<ActorId>,
332    pub actor_splits: SplitAssignments,
333    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
334    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
335    pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
336}
337
338#[derive(Debug, Clone, PartialEq)]
339#[cfg_attr(any(test, feature = "test"), derive(Default))]
340pub struct AddMutation {
341    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
342    pub added_actors: HashSet<ActorId>,
343    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
344    pub splits: SplitAssignments,
345    pub pause: bool,
346    /// (`upstream_mv_table_id`,  `subscriber_id`)
347    pub subscriptions_to_add: Vec<(TableId, u32)>,
348    /// nodes which should start backfill
349    pub backfill_nodes_to_pause: HashSet<FragmentId>,
350    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
351    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
352}
353
354#[derive(Debug, Clone, PartialEq)]
355#[cfg_attr(any(test, feature = "test"), derive(Default))]
356pub struct StopMutation {
357    pub dropped_actors: HashSet<ActorId>,
358    pub dropped_sink_fragments: HashSet<FragmentId>,
359}
360
361/// See [`PbMutation`] for the semantics of each mutation.
362#[derive(Debug, Clone, PartialEq)]
363pub enum Mutation {
364    Stop(StopMutation),
365    Update(UpdateMutation),
366    Add(AddMutation),
367    SourceChangeSplit(SplitAssignments),
368    Pause,
369    Resume,
370    Throttle(HashMap<ActorId, Option<u32>>),
371    AddAndUpdate(AddMutation, UpdateMutation),
372    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
373    DropSubscriptions {
374        /// `subscriber` -> `upstream_mv_table_id`
375        subscriptions_to_drop: Vec<(u32, TableId)>,
376    },
377    StartFragmentBackfill {
378        fragment_ids: HashSet<FragmentId>,
379    },
380    RefreshStart {
381        table_id: TableId,
382        associated_source_id: TableId,
383    },
384    ListFinish {
385        associated_source_id: TableId,
386    },
387    LoadFinish {
388        associated_source_id: TableId,
389    },
390}
391
392/// The generic type `M` is the mutation type of the barrier.
393///
394/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
395/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
396#[derive(Debug, Clone)]
397pub struct BarrierInner<M> {
398    pub epoch: EpochPair,
399    pub mutation: M,
400    pub kind: BarrierKind,
401
402    /// Tracing context for the **current** epoch of this barrier.
403    pub tracing_context: TracingContext,
404
405    /// The actors that this barrier has passed locally. Used for debugging only.
406    pub passed_actors: Vec<ActorId>,
407}
408
409pub type BarrierMutationType = Option<Arc<Mutation>>;
410pub type Barrier = BarrierInner<BarrierMutationType>;
411pub type DispatcherBarrier = BarrierInner<()>;
412
413impl<M: Default> BarrierInner<M> {
414    /// Create a plain barrier.
415    pub fn new_test_barrier(epoch: u64) -> Self {
416        Self {
417            epoch: EpochPair::new_test_epoch(epoch),
418            kind: BarrierKind::Checkpoint,
419            tracing_context: TracingContext::none(),
420            mutation: Default::default(),
421            passed_actors: Default::default(),
422        }
423    }
424
425    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
426        Self {
427            epoch: EpochPair::new(epoch, prev_epoch),
428            kind: BarrierKind::Checkpoint,
429            tracing_context: TracingContext::none(),
430            mutation: Default::default(),
431            passed_actors: Default::default(),
432        }
433    }
434}
435
436impl Barrier {
437    pub fn into_dispatcher(self) -> DispatcherBarrier {
438        DispatcherBarrier {
439            epoch: self.epoch,
440            mutation: (),
441            kind: self.kind,
442            tracing_context: self.tracing_context,
443            passed_actors: self.passed_actors,
444        }
445    }
446
447    #[must_use]
448    pub fn with_mutation(self, mutation: Mutation) -> Self {
449        Self {
450            mutation: Some(Arc::new(mutation)),
451            ..self
452        }
453    }
454
455    #[must_use]
456    pub fn with_stop(self) -> Self {
457        self.with_mutation(Mutation::Stop(StopMutation {
458            dropped_actors: Default::default(),
459            dropped_sink_fragments: Default::default(),
460        }))
461    }
462
463    /// Whether this barrier carries stop mutation.
464    pub fn is_with_stop_mutation(&self) -> bool {
465        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
466    }
467
468    /// Whether this barrier is to stop the actor with `actor_id`.
469    pub fn is_stop(&self, actor_id: ActorId) -> bool {
470        self.all_stop_actors()
471            .is_some_and(|actors| actors.contains(&actor_id))
472    }
473
474    pub fn is_checkpoint(&self) -> bool {
475        self.kind == BarrierKind::Checkpoint
476    }
477
478    /// Get the initial split assignments for the actor with `actor_id`.
479    ///
480    /// This should only be called on the initial barrier received by the executor. It must be
481    ///
482    /// - `Add` mutation when it's a new streaming job, or recovery.
483    /// - `Update` mutation when it's created for scaling.
484    /// - `AddAndUpdate` mutation when it's created for sink-into-table.
485    ///
486    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
487    /// of existing executors.
488    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
489        match self.mutation.as_deref()? {
490            Mutation::Update(UpdateMutation { actor_splits, .. })
491            | Mutation::Add(AddMutation {
492                splits: actor_splits,
493                ..
494            }) => actor_splits.get(&actor_id),
495
496            Mutation::AddAndUpdate(
497                AddMutation {
498                    splits: add_actor_splits,
499                    ..
500                },
501                UpdateMutation {
502                    actor_splits: update_actor_splits,
503                    ..
504                },
505            ) => add_actor_splits
506                .get(&actor_id)
507                // `Add` and `Update` should apply to different fragments, so we don't need to merge them.
508                .or_else(|| update_actor_splits.get(&actor_id)),
509
510            _ => {
511                if cfg!(debug_assertions) {
512                    panic!(
513                        "the initial mutation of the barrier should not be {:?}",
514                        self.mutation
515                    );
516                }
517                None
518            }
519        }
520        .map(|s| s.as_slice())
521    }
522
523    /// Get all actors that to be stopped (dropped) by this barrier.
524    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
525        match self.mutation.as_deref() {
526            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
527            Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
528            | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
529                Some(dropped_actors)
530            }
531            _ => None,
532        }
533    }
534
535    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
536    /// `Values` to decide whether to output the existing (historical) data.
537    ///
538    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
539    /// added for scaling are not included.
540    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
541        match self.mutation.as_deref() {
542            Some(Mutation::Add(AddMutation { added_actors, .. }))
543            | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
544                added_actors.contains(&actor_id)
545            }
546            _ => false,
547        }
548    }
549
550    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
551        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
552            fragment_ids.contains(&fragment_id)
553        } else {
554            false
555        }
556    }
557
558    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
559    ///
560    /// # Use case
561    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
562    /// * Pause a standalone shared `SourceExecutor`.
563    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
564    ///
565    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
566    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
567    /// needs to be turned off.
568    ///
569    /// ## Some special cases not included
570    ///
571    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
572    /// care about **number of downstream fragments** (more precisely, existence).
573    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
574    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
575    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
576        let Some(mutation) = self.mutation.as_deref() else {
577            return false;
578        };
579        match mutation {
580            // Add is for mv, index and sink creation.
581            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
582            // AddAndUpdate is for sink-into-table.
583            Mutation::AddAndUpdate(
584                AddMutation { adds, .. },
585                UpdateMutation {
586                    dispatchers,
587                    actor_new_dispatchers,
588                    ..
589                },
590            ) => {
591                adds.get(&upstream_actor_id).is_some()
592                    || actor_new_dispatchers.get(&upstream_actor_id).is_some()
593                    || dispatchers.get(&upstream_actor_id).is_some()
594            }
595            Mutation::Update(_)
596            | Mutation::Stop(_)
597            | Mutation::Pause
598            | Mutation::Resume
599            | Mutation::SourceChangeSplit(_)
600            | Mutation::Throttle(_)
601            | Mutation::DropSubscriptions { .. }
602            | Mutation::ConnectorPropsChange(_)
603            | Mutation::StartFragmentBackfill { .. }
604            | Mutation::RefreshStart { .. }
605            | Mutation::ListFinish { .. }
606            | Mutation::LoadFinish { .. } => false,
607        }
608    }
609
610    /// Whether this barrier requires the executor to pause its data stream on startup.
611    pub fn is_pause_on_startup(&self) -> bool {
612        match self.mutation.as_deref() {
613            Some(Mutation::Add(AddMutation { pause, .. }))
614            | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
615            _ => false,
616        }
617    }
618
619    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
620        match self.mutation.as_deref() {
621            Some(Mutation::Add(AddMutation {
622                backfill_nodes_to_pause,
623                ..
624            }))
625            | Some(Mutation::AddAndUpdate(
626                AddMutation {
627                    backfill_nodes_to_pause,
628                    ..
629                },
630                _,
631            )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
632            _ => {
633                tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
634                true
635            }
636        }
637    }
638
639    /// Whether this barrier is for resume.
640    pub fn is_resume(&self) -> bool {
641        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
642    }
643
644    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
645    /// with `actor_id`.
646    pub fn as_update_merge(
647        &self,
648        actor_id: ActorId,
649        upstream_fragment_id: UpstreamFragmentId,
650    ) -> Option<&MergeUpdate> {
651        self.mutation
652            .as_deref()
653            .and_then(|mutation| match mutation {
654                Mutation::Update(UpdateMutation { merges, .. })
655                | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
656                    merges.get(&(actor_id, upstream_fragment_id))
657                }
658                _ => None,
659            })
660    }
661
662    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
663    /// the specified downstream fragment.
664    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
665        self.mutation
666            .as_deref()
667            .and_then(|mutation| match mutation {
668                Mutation::Add(AddMutation {
669                    new_upstream_sinks, ..
670                }) => new_upstream_sinks.get(&fragment_id),
671                _ => None,
672            })
673    }
674
675    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
676    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
677        self.mutation
678            .as_deref()
679            .and_then(|mutation| match mutation {
680                Mutation::Stop(StopMutation {
681                    dropped_sink_fragments,
682                    ..
683                }) => Some(dropped_sink_fragments),
684                _ => None,
685            })
686    }
687
688    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
689    /// with `actor_id`.
690    ///
691    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
692    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
693    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
694        self.mutation
695            .as_deref()
696            .and_then(|mutation| match mutation {
697                Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
698                | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
699                    vnode_bitmaps.get(&actor_id).cloned()
700                }
701                _ => None,
702            })
703    }
704
705    pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
706        self.mutation
707            .as_deref()
708            .and_then(|mutation| match mutation {
709                Mutation::Update(UpdateMutation {
710                    sink_add_columns, ..
711                })
712                | Mutation::AddAndUpdate(
713                    _,
714                    UpdateMutation {
715                        sink_add_columns, ..
716                    },
717                ) => sink_add_columns.get(&sink_id).cloned(),
718                _ => None,
719            })
720    }
721
722    pub fn get_curr_epoch(&self) -> Epoch {
723        Epoch(self.epoch.curr)
724    }
725
726    /// Retrieve the tracing context for the **current** epoch of this barrier.
727    pub fn tracing_context(&self) -> &TracingContext {
728        &self.tracing_context
729    }
730
731    pub fn added_subscriber_on_mv_table(
732        &self,
733        mv_table_id: TableId,
734    ) -> impl Iterator<Item = u32> + '_ {
735        if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
736            self.mutation.as_deref()
737        {
738            Some(add)
739        } else {
740            None
741        }
742        .into_iter()
743        .flat_map(move |add| {
744            add.subscriptions_to_add.iter().filter_map(
745                move |(upstream_mv_table_id, subscriber_id)| {
746                    if *upstream_mv_table_id == mv_table_id {
747                        Some(*subscriber_id)
748                    } else {
749                        None
750                    }
751                },
752            )
753        })
754    }
755}
756
757impl<M: PartialEq> PartialEq for BarrierInner<M> {
758    fn eq(&self, other: &Self) -> bool {
759        self.epoch == other.epoch && self.mutation == other.mutation
760    }
761}
762
763impl Mutation {
764    /// Return true if the mutation is stop.
765    ///
766    /// Note that this does not mean we will stop the current actor.
767    #[cfg(test)]
768    pub fn is_stop(&self) -> bool {
769        matches!(self, Mutation::Stop(_))
770    }
771
772    fn to_protobuf(&self) -> PbMutation {
773        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
774            actor_splits
775                .iter()
776                .map(|(&actor_id, splits)| {
777                    (
778                        actor_id,
779                        ConnectorSplits {
780                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
781                        },
782                    )
783                })
784                .collect::<HashMap<_, _>>()
785        };
786
787        match self {
788            Mutation::Stop(StopMutation {
789                dropped_actors,
790                dropped_sink_fragments,
791            }) => PbMutation::Stop(PbStopMutation {
792                actors: dropped_actors.iter().copied().collect(),
793                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
794            }),
795            Mutation::Update(UpdateMutation {
796                dispatchers,
797                merges,
798                vnode_bitmaps,
799                dropped_actors,
800                actor_splits,
801                actor_new_dispatchers,
802                actor_cdc_table_snapshot_splits,
803                sink_add_columns,
804            }) => PbMutation::Update(PbUpdateMutation {
805                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
806                merge_update: merges.values().cloned().collect(),
807                actor_vnode_bitmap_update: vnode_bitmaps
808                    .iter()
809                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
810                    .collect(),
811                dropped_actors: dropped_actors.iter().cloned().collect(),
812                actor_splits: actor_splits_to_protobuf(actor_splits),
813                actor_new_dispatchers: actor_new_dispatchers
814                    .iter()
815                    .map(|(&actor_id, dispatchers)| {
816                        (
817                            actor_id,
818                            Dispatchers {
819                                dispatchers: dispatchers.clone(),
820                            },
821                        )
822                    })
823                    .collect(),
824                actor_cdc_table_snapshot_splits:
825                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
826                        actor_cdc_table_snapshot_splits.clone(),
827                    )
828                    .into(),
829                sink_add_columns: sink_add_columns
830                    .iter()
831                    .map(|(sink_id, add_columns)| {
832                        (
833                            sink_id.sink_id,
834                            PbSinkAddColumns {
835                                fields: add_columns.iter().map(|field| field.to_prost()).collect(),
836                            },
837                        )
838                    })
839                    .collect(),
840            }),
841            Mutation::Add(AddMutation {
842                adds,
843                added_actors,
844                splits,
845                pause,
846                subscriptions_to_add,
847                backfill_nodes_to_pause,
848                actor_cdc_table_snapshot_splits,
849                new_upstream_sinks,
850            }) => PbMutation::Add(PbAddMutation {
851                actor_dispatchers: adds
852                    .iter()
853                    .map(|(&actor_id, dispatchers)| {
854                        (
855                            actor_id,
856                            Dispatchers {
857                                dispatchers: dispatchers.clone(),
858                            },
859                        )
860                    })
861                    .collect(),
862                added_actors: added_actors.iter().copied().collect(),
863                actor_splits: actor_splits_to_protobuf(splits),
864                pause: *pause,
865                subscriptions_to_add: subscriptions_to_add
866                    .iter()
867                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
868                        subscriber_id: *subscriber_id,
869                        upstream_mv_table_id: table_id.table_id,
870                    })
871                    .collect(),
872                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
873                actor_cdc_table_snapshot_splits:
874                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
875                        actor_cdc_table_snapshot_splits.clone(),
876                    )
877                    .into(),
878                new_upstream_sinks: new_upstream_sinks
879                    .iter()
880                    .map(|(k, v)| (*k, v.clone()))
881                    .collect(),
882            }),
883            Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
884                actor_splits: changes
885                    .iter()
886                    .map(|(&actor_id, splits)| {
887                        (
888                            actor_id,
889                            ConnectorSplits {
890                                splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
891                            },
892                        )
893                    })
894                    .collect(),
895            }),
896            Mutation::Pause => PbMutation::Pause(PauseMutation {}),
897            Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
898            Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
899                actor_throttle: changes
900                    .iter()
901                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
902                    .collect(),
903            }),
904
905            Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
906                mutations: vec![
907                    BarrierMutation {
908                        mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
909                    },
910                    BarrierMutation {
911                        mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
912                    },
913                ],
914            }),
915            Mutation::DropSubscriptions {
916                subscriptions_to_drop,
917            } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
918                info: subscriptions_to_drop
919                    .iter()
920                    .map(
921                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
922                            subscriber_id: *subscriber_id,
923                            upstream_mv_table_id: upstream_mv_table_id.table_id,
924                        },
925                    )
926                    .collect(),
927            }),
928            Mutation::ConnectorPropsChange(map) => {
929                PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
930                    connector_props_infos: map
931                        .iter()
932                        .map(|(actor_id, options)| {
933                            (
934                                *actor_id,
935                                ConnectorPropsInfo {
936                                    connector_props_info: options
937                                        .iter()
938                                        .map(|(k, v)| (k.clone(), v.clone()))
939                                        .collect(),
940                                },
941                            )
942                        })
943                        .collect(),
944                })
945            }
946            Mutation::StartFragmentBackfill { fragment_ids } => {
947                PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
948                    fragment_ids: fragment_ids.iter().copied().collect(),
949                })
950            }
951            Mutation::RefreshStart {
952                table_id,
953                associated_source_id,
954            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
955                table_id: table_id.table_id,
956                associated_source_id: associated_source_id.table_id,
957            }),
958            Mutation::ListFinish {
959                associated_source_id,
960            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
961                associated_source_id: associated_source_id.table_id,
962            }),
963            Mutation::LoadFinish {
964                associated_source_id,
965            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
966                associated_source_id: associated_source_id.table_id,
967            }),
968        }
969    }
970
971    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
972        let mutation = match prost {
973            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
974                dropped_actors: stop.actors.iter().copied().collect(),
975                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
976            }),
977
978            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
979                dispatchers: update
980                    .dispatcher_update
981                    .iter()
982                    .map(|u| (u.actor_id, u.clone()))
983                    .into_group_map(),
984                merges: update
985                    .merge_update
986                    .iter()
987                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
988                    .collect(),
989                vnode_bitmaps: update
990                    .actor_vnode_bitmap_update
991                    .iter()
992                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
993                    .collect(),
994                dropped_actors: update.dropped_actors.iter().cloned().collect(),
995                actor_splits: update
996                    .actor_splits
997                    .iter()
998                    .map(|(&actor_id, splits)| {
999                        (
1000                            actor_id,
1001                            splits
1002                                .splits
1003                                .iter()
1004                                .map(|split| split.try_into().unwrap())
1005                                .collect(),
1006                        )
1007                    })
1008                    .collect(),
1009                actor_new_dispatchers: update
1010                    .actor_new_dispatchers
1011                    .iter()
1012                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1013                    .collect(),
1014                actor_cdc_table_snapshot_splits:
1015                    build_actor_cdc_table_snapshot_splits_with_generation(
1016                        update
1017                            .actor_cdc_table_snapshot_splits
1018                            .clone()
1019                            .unwrap_or_default(),
1020                    ),
1021                sink_add_columns: update
1022                    .sink_add_columns
1023                    .iter()
1024                    .map(|(sink_id, add_columns)| {
1025                        (
1026                            SinkId::new(*sink_id),
1027                            add_columns.fields.iter().map(Field::from_prost).collect(),
1028                        )
1029                    })
1030                    .collect(),
1031            }),
1032
1033            PbMutation::Add(add) => Mutation::Add(AddMutation {
1034                adds: add
1035                    .actor_dispatchers
1036                    .iter()
1037                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1038                    .collect(),
1039                added_actors: add.added_actors.iter().copied().collect(),
1040                // TODO: remove this and use `SourceChangesSplit` after we support multiple
1041                // mutations.
1042                splits: add
1043                    .actor_splits
1044                    .iter()
1045                    .map(|(&actor_id, splits)| {
1046                        (
1047                            actor_id,
1048                            splits
1049                                .splits
1050                                .iter()
1051                                .map(|split| split.try_into().unwrap())
1052                                .collect(),
1053                        )
1054                    })
1055                    .collect(),
1056                pause: add.pause,
1057                subscriptions_to_add: add
1058                    .subscriptions_to_add
1059                    .iter()
1060                    .map(
1061                        |SubscriptionUpstreamInfo {
1062                             subscriber_id,
1063                             upstream_mv_table_id,
1064                         }| {
1065                            (TableId::new(*upstream_mv_table_id), *subscriber_id)
1066                        },
1067                    )
1068                    .collect(),
1069                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1070                actor_cdc_table_snapshot_splits:
1071                    build_actor_cdc_table_snapshot_splits_with_generation(
1072                        add.actor_cdc_table_snapshot_splits
1073                            .clone()
1074                            .unwrap_or_default(),
1075                    ),
1076                new_upstream_sinks: add
1077                    .new_upstream_sinks
1078                    .iter()
1079                    .map(|(k, v)| (*k, v.clone()))
1080                    .collect(),
1081            }),
1082
1083            PbMutation::Splits(s) => {
1084                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1085                    Vec::with_capacity(s.actor_splits.len());
1086                for (&actor_id, splits) in &s.actor_splits {
1087                    if !splits.splits.is_empty() {
1088                        change_splits.push((
1089                            actor_id,
1090                            splits
1091                                .splits
1092                                .iter()
1093                                .map(SplitImpl::try_from)
1094                                .try_collect()?,
1095                        ));
1096                    }
1097                }
1098                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1099            }
1100            PbMutation::Pause(_) => Mutation::Pause,
1101            PbMutation::Resume(_) => Mutation::Resume,
1102            PbMutation::Throttle(changes) => Mutation::Throttle(
1103                changes
1104                    .actor_throttle
1105                    .iter()
1106                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1107                    .collect(),
1108            ),
1109            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1110                subscriptions_to_drop: drop
1111                    .info
1112                    .iter()
1113                    .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
1114                    .collect(),
1115            },
1116            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1117                Mutation::ConnectorPropsChange(
1118                    alter_connector_props
1119                        .connector_props_infos
1120                        .iter()
1121                        .map(|(actor_id, options)| {
1122                            (
1123                                *actor_id,
1124                                options
1125                                    .connector_props_info
1126                                    .iter()
1127                                    .map(|(k, v)| (k.clone(), v.clone()))
1128                                    .collect(),
1129                            )
1130                        })
1131                        .collect(),
1132                )
1133            }
1134            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1135                Mutation::StartFragmentBackfill {
1136                    fragment_ids: start_fragment_backfill
1137                        .fragment_ids
1138                        .iter()
1139                        .copied()
1140                        .collect(),
1141                }
1142            }
1143            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1144                table_id: TableId::new(refresh_start.table_id),
1145                associated_source_id: TableId::new(refresh_start.associated_source_id),
1146            },
1147            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1148                associated_source_id: TableId::new(list_finish.associated_source_id),
1149            },
1150            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1151                associated_source_id: TableId::new(load_finish.associated_source_id),
1152            },
1153            PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
1154                [
1155                    BarrierMutation {
1156                        mutation: Some(add),
1157                    },
1158                    BarrierMutation {
1159                        mutation: Some(update),
1160                    },
1161                ] => {
1162                    let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
1163                        unreachable!();
1164                    };
1165
1166                    let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
1167                        unreachable!();
1168                    };
1169
1170                    Mutation::AddAndUpdate(add_mutation, update_mutation)
1171                }
1172
1173                _ => unreachable!(),
1174            },
1175        };
1176        Ok(mutation)
1177    }
1178}
1179
1180impl<M> BarrierInner<M> {
1181    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1182        let Self {
1183            epoch,
1184            mutation,
1185            kind,
1186            passed_actors,
1187            tracing_context,
1188            ..
1189        } = self;
1190
1191        PbBarrier {
1192            epoch: Some(PbEpoch {
1193                curr: epoch.curr,
1194                prev: epoch.prev,
1195            }),
1196            mutation: Some(PbBarrierMutation {
1197                mutation: barrier_fn(mutation),
1198            }),
1199            tracing_context: tracing_context.to_protobuf(),
1200            kind: *kind as _,
1201            passed_actors: passed_actors.clone(),
1202        }
1203    }
1204
1205    fn from_protobuf_inner(
1206        prost: &PbBarrier,
1207        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1208    ) -> StreamExecutorResult<Self> {
1209        let epoch = prost.get_epoch()?;
1210
1211        Ok(Self {
1212            kind: prost.kind(),
1213            epoch: EpochPair::new(epoch.curr, epoch.prev),
1214            mutation: mutation_from_pb(
1215                prost
1216                    .mutation
1217                    .as_ref()
1218                    .and_then(|mutation| mutation.mutation.as_ref()),
1219            )?,
1220            passed_actors: prost.get_passed_actors().clone(),
1221            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1222        })
1223    }
1224
1225    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1226        BarrierInner {
1227            epoch: self.epoch,
1228            mutation: f(self.mutation),
1229            kind: self.kind,
1230            tracing_context: self.tracing_context,
1231            passed_actors: self.passed_actors,
1232        }
1233    }
1234}
1235
1236impl DispatcherBarrier {
1237    pub fn to_protobuf(&self) -> PbBarrier {
1238        self.to_protobuf_inner(|_| None)
1239    }
1240}
1241
1242impl Barrier {
1243    pub fn to_protobuf(&self) -> PbBarrier {
1244        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1245    }
1246
1247    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1248        Self::from_protobuf_inner(prost, |mutation| {
1249            mutation
1250                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1251                .transpose()
1252        })
1253    }
1254}
1255
1256#[derive(Debug, PartialEq, Eq, Clone)]
1257pub struct Watermark {
1258    pub col_idx: usize,
1259    pub data_type: DataType,
1260    pub val: ScalarImpl,
1261}
1262
1263impl PartialOrd for Watermark {
1264    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1265        Some(self.cmp(other))
1266    }
1267}
1268
1269impl Ord for Watermark {
1270    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1271        self.val.default_cmp(&other.val)
1272    }
1273}
1274
1275impl Watermark {
1276    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1277        Self {
1278            col_idx,
1279            data_type,
1280            val,
1281        }
1282    }
1283
1284    pub async fn transform_with_expr(
1285        self,
1286        expr: &NonStrictExpression<impl Expression>,
1287        new_col_idx: usize,
1288    ) -> Option<Self> {
1289        let Self { col_idx, val, .. } = self;
1290        let row = {
1291            let mut row = vec![None; col_idx + 1];
1292            row[col_idx] = Some(val);
1293            OwnedRow::new(row)
1294        };
1295        let val = expr.eval_row_infallible(&row).await?;
1296        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1297    }
1298
1299    /// Transform the watermark with the given output indices. If this watermark is not in the
1300    /// output, return `None`.
1301    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1302        output_indices
1303            .iter()
1304            .position(|p| *p == self.col_idx)
1305            .map(|new_col_idx| self.with_idx(new_col_idx))
1306    }
1307
1308    pub fn to_protobuf(&self) -> PbWatermark {
1309        PbWatermark {
1310            column: Some(PbInputRef {
1311                index: self.col_idx as _,
1312                r#type: Some(self.data_type.to_protobuf()),
1313            }),
1314            val: Some(&self.val).to_protobuf().into(),
1315        }
1316    }
1317
1318    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1319        let col_ref = prost.get_column()?;
1320        let data_type = DataType::from(col_ref.get_type()?);
1321        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1322            .expect("watermark value cannot be null");
1323        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1324    }
1325
1326    pub fn with_idx(self, idx: usize) -> Self {
1327        Self::new(idx, self.data_type, self.val)
1328    }
1329}
1330
1331#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1332pub enum MessageInner<M> {
1333    Chunk(StreamChunk),
1334    Barrier(BarrierInner<M>),
1335    Watermark(Watermark),
1336}
1337
1338impl<M> MessageInner<M> {
1339    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1340        match self {
1341            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1342            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1343            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1344        }
1345    }
1346}
1347
1348pub type Message = MessageInner<BarrierMutationType>;
1349pub type DispatcherMessage = MessageInner<()>;
1350
1351/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1352/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1353#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1354pub enum MessageBatchInner<M> {
1355    Chunk(StreamChunk),
1356    BarrierBatch(Vec<BarrierInner<M>>),
1357    Watermark(Watermark),
1358}
1359pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1360pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1361pub type DispatcherMessageBatch = MessageBatchInner<()>;
1362
1363impl From<DispatcherMessage> for DispatcherMessageBatch {
1364    fn from(m: DispatcherMessage) -> Self {
1365        match m {
1366            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1367            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1368            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1369        }
1370    }
1371}
1372
1373impl From<StreamChunk> for Message {
1374    fn from(chunk: StreamChunk) -> Self {
1375        Message::Chunk(chunk)
1376    }
1377}
1378
1379impl<'a> TryFrom<&'a Message> for &'a Barrier {
1380    type Error = ();
1381
1382    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1383        match m {
1384            Message::Chunk(_) => Err(()),
1385            Message::Barrier(b) => Ok(b),
1386            Message::Watermark(_) => Err(()),
1387        }
1388    }
1389}
1390
1391impl Message {
1392    /// Return true if the message is a stop barrier, meaning the stream
1393    /// will not continue, false otherwise.
1394    ///
1395    /// Note that this does not mean we will stop the current actor.
1396    #[cfg(test)]
1397    pub fn is_stop(&self) -> bool {
1398        matches!(
1399            self,
1400            Message::Barrier(Barrier {
1401                mutation,
1402                ..
1403            }) if mutation.as_ref().unwrap().is_stop()
1404        )
1405    }
1406}
1407
1408impl DispatcherMessageBatch {
1409    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1410        let prost = match self {
1411            Self::Chunk(stream_chunk) => {
1412                let prost_stream_chunk = stream_chunk.to_protobuf();
1413                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1414            }
1415            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1416                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1417            }),
1418            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1419        };
1420        PbStreamMessageBatch {
1421            stream_message_batch: Some(prost),
1422        }
1423    }
1424
1425    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1426        let res = match prost.get_stream_message_batch()? {
1427            StreamMessageBatch::StreamChunk(chunk) => {
1428                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1429            }
1430            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1431                let barriers = barrier_batch
1432                    .barriers
1433                    .iter()
1434                    .map(|barrier| {
1435                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1436                            if mutation.is_some() {
1437                                if cfg!(debug_assertions) {
1438                                    panic!("should not receive message of barrier with mutation");
1439                                } else {
1440                                    warn!(?barrier, "receive message of barrier with mutation");
1441                                }
1442                            }
1443                            Ok(())
1444                        })
1445                    })
1446                    .try_collect()?;
1447                Self::BarrierBatch(barriers)
1448            }
1449            StreamMessageBatch::Watermark(watermark) => {
1450                Self::Watermark(Watermark::from_protobuf(watermark)?)
1451            }
1452        };
1453        Ok(res)
1454    }
1455
1456    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1457        ::prost::Message::encoded_len(msg)
1458    }
1459}
1460
1461pub type PkIndices = Vec<usize>;
1462pub type PkIndicesRef<'a> = &'a [usize];
1463pub type PkDataTypes = SmallVec<[DataType; 1]>;
1464
1465/// Expect the first message of the given `stream` as a barrier.
1466pub async fn expect_first_barrier<M: Debug>(
1467    stream: &mut (impl MessageStreamInner<M> + Unpin),
1468) -> StreamExecutorResult<BarrierInner<M>> {
1469    let message = stream
1470        .next()
1471        .instrument_await("expect_first_barrier")
1472        .await
1473        .context("failed to extract the first message: stream closed unexpectedly")??;
1474    let barrier = message
1475        .into_barrier()
1476        .expect("the first message must be a barrier");
1477    // TODO: Is this check correct?
1478    assert!(matches!(
1479        barrier.kind,
1480        BarrierKind::Checkpoint | BarrierKind::Initial
1481    ));
1482    Ok(barrier)
1483}
1484
1485/// Expect the first message of the given `stream` as a barrier.
1486pub async fn expect_first_barrier_from_aligned_stream(
1487    stream: &mut (impl AlignedMessageStream + Unpin),
1488) -> StreamExecutorResult<Barrier> {
1489    let message = stream
1490        .next()
1491        .instrument_await("expect_first_barrier")
1492        .await
1493        .context("failed to extract the first message: stream closed unexpectedly")??;
1494    let barrier = message
1495        .into_barrier()
1496        .expect("the first message must be a barrier");
1497    Ok(barrier)
1498}
1499
1500/// `StreamConsumer` is the last step in an actor.
1501pub trait StreamConsumer: Send + 'static {
1502    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1503
1504    fn execute(self: Box<Self>) -> Self::BarrierStream;
1505}
1506
1507type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1508
1509/// A stream for merging messages from multiple upstreams.
1510/// Can dynamically add and delete upstream streams.
1511/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1512pub struct DynamicReceivers<InputId, M> {
1513    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1514    barrier: Option<BarrierInner<M>>,
1515    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1516    start_ts: Option<Instant>,
1517    /// The upstreams that're blocked by the `barrier`.
1518    blocked: Vec<BoxedMessageInput<InputId, M>>,
1519    /// The upstreams that're not blocked and can be polled.
1520    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1521    /// watermark column index -> `BufferedWatermarks`
1522    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1523    /// Currently only used for union.
1524    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1525    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1526    merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1527}
1528
1529impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1530    for DynamicReceivers<InputId, M>
1531{
1532    type Item = MessageStreamItemInner<M>;
1533
1534    fn poll_next(
1535        mut self: Pin<&mut Self>,
1536        cx: &mut std::task::Context<'_>,
1537    ) -> Poll<Option<Self::Item>> {
1538        if self.is_empty() {
1539            return Poll::Ready(None);
1540        }
1541
1542        loop {
1543            match futures::ready!(self.active.poll_next_unpin(cx)) {
1544                // Directly forward the error.
1545                Some((Some(Err(e)), _)) => {
1546                    return Poll::Ready(Some(Err(e)));
1547                }
1548                // Handle the message from some upstream.
1549                Some((Some(Ok(message)), remaining)) => {
1550                    let input_id = remaining.id();
1551                    match message {
1552                        MessageInner::Chunk(chunk) => {
1553                            // Continue polling this upstream by pushing it back to `active`.
1554                            self.active.push(remaining.into_future());
1555                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1556                        }
1557                        MessageInner::Watermark(watermark) => {
1558                            // Continue polling this upstream by pushing it back to `active`.
1559                            self.active.push(remaining.into_future());
1560                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1561                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1562                            }
1563                        }
1564                        MessageInner::Barrier(barrier) => {
1565                            // Block this upstream by pushing it to `blocked`.
1566                            if self.blocked.is_empty() {
1567                                self.start_ts = Some(Instant::now());
1568                            }
1569                            self.blocked.push(remaining);
1570                            if let Some(current_barrier) = self.barrier.as_ref() {
1571                                if current_barrier.epoch != barrier.epoch {
1572                                    return Poll::Ready(Some(Err(
1573                                        StreamExecutorError::align_barrier(
1574                                            current_barrier.clone().map_mutation(|_| None),
1575                                            barrier.map_mutation(|_| None),
1576                                        ),
1577                                    )));
1578                                }
1579                            } else {
1580                                self.barrier = Some(barrier);
1581                            }
1582                        }
1583                    }
1584                }
1585                // We use barrier as the control message of the stream. That is, we always stop the
1586                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1587                // termination.
1588                //
1589                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1590                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1591                // So this branch will never be reached in all cases.
1592                Some((None, remaining)) => {
1593                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1594                        "upstream input {:?} unexpectedly closed",
1595                        remaining.id()
1596                    )))));
1597                }
1598                // There's no active upstreams. Process the barrier and resume the blocked ones.
1599                None => {
1600                    assert!(!self.blocked.is_empty());
1601
1602                    let start_ts = self
1603                        .start_ts
1604                        .take()
1605                        .expect("should have received at least one barrier");
1606                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1607                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1608                    }
1609                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1610                        // Observe did a few atomic operation inside, we want to avoid the overhead.
1611                        merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1612                    }
1613
1614                    break;
1615                }
1616            }
1617        }
1618
1619        assert!(self.active.is_terminated());
1620
1621        let barrier = self.barrier.take().unwrap();
1622
1623        let upstreams = std::mem::take(&mut self.blocked);
1624        self.extend_active(upstreams);
1625        assert!(!self.active.is_terminated());
1626
1627        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1628    }
1629}
1630
1631impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1632    pub fn new(
1633        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1634        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1635        merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1636    ) -> Self {
1637        let mut this = Self {
1638            barrier: None,
1639            start_ts: None,
1640            blocked: Vec::with_capacity(upstreams.len()),
1641            active: Default::default(),
1642            buffered_watermarks: Default::default(),
1643            merge_barrier_align_duration,
1644            barrier_align_duration,
1645        };
1646        this.extend_active(upstreams);
1647        this
1648    }
1649
1650    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1651    /// clean state right after a barrier.
1652    pub fn extend_active(
1653        &mut self,
1654        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1655    ) {
1656        assert!(self.blocked.is_empty() && self.barrier.is_none());
1657
1658        self.active
1659            .extend(upstreams.into_iter().map(|s| s.into_future()));
1660    }
1661
1662    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1663    pub fn handle_watermark(
1664        &mut self,
1665        input_id: InputId,
1666        watermark: Watermark,
1667    ) -> Option<Watermark> {
1668        let col_idx = watermark.col_idx;
1669        // Insert a buffer watermarks when first received from a column.
1670        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1671        let watermarks = self
1672            .buffered_watermarks
1673            .entry(col_idx)
1674            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1675        watermarks.handle_watermark(input_id, watermark)
1676    }
1677
1678    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1679    /// right after a barrier.
1680    pub fn add_upstreams_from(
1681        &mut self,
1682        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1683    ) {
1684        assert!(self.blocked.is_empty() && self.barrier.is_none());
1685
1686        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1687        let input_ids = new_inputs.iter().map(|input| input.id());
1688        self.buffered_watermarks.values_mut().for_each(|buffers| {
1689            // Add buffers to the buffered watermarks for all cols
1690            buffers.add_buffers(input_ids.clone());
1691        });
1692        self.active
1693            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1694    }
1695
1696    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1697    /// clean state right after a barrier.
1698    /// The current container does not necessarily contain all the input ids passed in.
1699    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1700        assert!(self.blocked.is_empty() && self.barrier.is_none());
1701
1702        let new_upstreams = std::mem::take(&mut self.active)
1703            .into_iter()
1704            .map(|s| s.into_inner().unwrap())
1705            .filter(|u| !upstream_input_ids.contains(&u.id()));
1706        self.extend_active(new_upstreams);
1707        self.buffered_watermarks.values_mut().for_each(|buffers| {
1708            // Call `check_heap` in case the only upstream(s) that does not have
1709            // watermark in heap is removed
1710            buffers.remove_buffer(upstream_input_ids.clone());
1711        });
1712    }
1713
1714    pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1715        self.merge_barrier_align_duration.clone()
1716    }
1717
1718    pub fn flush_buffered_watermarks(&mut self) {
1719        self.buffered_watermarks
1720            .values_mut()
1721            .for_each(|buffers| buffers.clear());
1722    }
1723
1724    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1725        self.blocked
1726            .iter()
1727            .map(|s| s.id())
1728            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1729    }
1730
1731    pub fn is_empty(&self) -> bool {
1732        self.blocked.is_empty() && self.active.is_empty()
1733    }
1734}
1735
1736// Explanation of why we need `DispatchBarrierBuffer`:
1737//
1738// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1739// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1740// for these old upstreams to completely process their barriers and align before we can safely update the
1741// `upstream-input-set`.
1742//
1743// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1744// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1745// downstream_merge_update -> old_actor_processing]
1746//
1747// To address this, we split the application of a barrier's `Mutation` into two steps:
1748// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1749//    and cache it.
1750// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1751//
1752// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1753// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1754// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1755// `barrier_rx`.
1756pub(crate) struct DispatchBarrierBuffer {
1757    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1758    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1759    recv_state: BarrierReceiverState,
1760    curr_upstream_fragment_id: FragmentId,
1761    actor_id: ActorId,
1762    // read-only context for building new inputs
1763    build_input_ctx: Arc<BuildInputContext>,
1764}
1765
1766struct BuildInputContext {
1767    pub actor_id: ActorId,
1768    pub local_barrier_manager: LocalBarrierManager,
1769    pub metrics: Arc<StreamingMetrics>,
1770    pub fragment_id: FragmentId,
1771}
1772
1773type BoxedNewInputsFuture =
1774    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1775
1776enum BarrierReceiverState {
1777    ReceivingBarrier,
1778    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1779}
1780
1781impl DispatchBarrierBuffer {
1782    pub fn new(
1783        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1784        actor_id: ActorId,
1785        curr_upstream_fragment_id: FragmentId,
1786        local_barrier_manager: LocalBarrierManager,
1787        metrics: Arc<StreamingMetrics>,
1788        fragment_id: FragmentId,
1789    ) -> Self {
1790        Self {
1791            buffer: VecDeque::new(),
1792            barrier_rx,
1793            recv_state: BarrierReceiverState::ReceivingBarrier,
1794            curr_upstream_fragment_id,
1795            actor_id,
1796            build_input_ctx: Arc::new(BuildInputContext {
1797                actor_id,
1798                local_barrier_manager,
1799                metrics,
1800                fragment_id,
1801            }),
1802        }
1803    }
1804
1805    pub async fn await_next_message(
1806        &mut self,
1807        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1808    ) -> StreamExecutorResult<DispatcherMessage> {
1809        tokio::select! {
1810            biased;
1811
1812            msg = stream.try_next() => {
1813                msg?.ok_or_else(
1814                    || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1815                )
1816            }
1817
1818            e = self.continuously_fetch_barrier_rx() => {
1819                Err(e)
1820            }
1821        }
1822    }
1823
1824    pub async fn pop_barrier_with_inputs(
1825        &mut self,
1826        barrier: DispatcherBarrier,
1827    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1828        while self.buffer.is_empty() {
1829            self.try_fetch_barrier_rx(false).await?;
1830        }
1831        let (mut recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1832        apply_dispatcher_barrier(&mut recv_barrier, barrier);
1833
1834        Ok((recv_barrier, inputs))
1835    }
1836
1837    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1838        loop {
1839            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1840                return e;
1841            }
1842        }
1843    }
1844
1845    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1846        match &mut self.recv_state {
1847            BarrierReceiverState::ReceivingBarrier => {
1848                let Some(barrier) = self.barrier_rx.recv().await else {
1849                    if pending_on_end {
1850                        return pending().await;
1851                    } else {
1852                        return Err(StreamExecutorError::channel_closed(
1853                            "barrier channel closed unexpectedly",
1854                        ));
1855                    }
1856                };
1857                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1858                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1859                } else {
1860                    self.buffer.push_back((barrier, None));
1861                }
1862            }
1863            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1864                let new_inputs = fut.await?;
1865                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1866                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1867            }
1868        }
1869        Ok(())
1870    }
1871
1872    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1873        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1874            && !update.added_upstream_actors.is_empty()
1875        {
1876            // When update upstream fragment, added_actors will not be empty.
1877            let upstream_fragment_id =
1878                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1879                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1880                    new_upstream_fragment_id
1881                } else {
1882                    self.curr_upstream_fragment_id
1883                };
1884            let ctx = self.build_input_ctx.clone();
1885            let added_upstream_actors = update.added_upstream_actors.clone();
1886            let barrier = barrier.clone();
1887            let fut = async move {
1888                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1889                    let mut new_input = new_input(
1890                        &ctx.local_barrier_manager,
1891                        ctx.metrics.clone(),
1892                        ctx.actor_id,
1893                        ctx.fragment_id,
1894                        upstream_actor,
1895                        upstream_fragment_id,
1896                    )
1897                    .await?;
1898
1899                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1900                    // original upstreams.
1901                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1902                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1903
1904                    StreamExecutorResult::Ok(new_input)
1905                }))
1906                .await
1907            }
1908            .boxed();
1909
1910            Some(fut)
1911        } else {
1912            None
1913        }
1914    }
1915}