risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53    PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, PbWatermark,
54    SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62    BoxedActorInput, BoxedInput, apply_dispatcher_barrier, assert_equal_dispatcher_barrier,
63    new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::{DispatchExecutor, DispatcherImpl};
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::*;
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector::*;
184pub use watermark_filter::WatermarkFilterExecutor;
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197    CdcTableSnapshotSplitAssignmentWithGeneration,
198    build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::id::SubscriberId;
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202
203pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
204pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
205pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
206
207/// Static information of an executor.
208#[derive(Debug, Default, Clone)]
209pub struct ExecutorInfo {
210    /// The schema of the OUTPUT of the executor.
211    pub schema: Schema,
212
213    /// The stream key indices of the OUTPUT of the executor.
214    pub stream_key: StreamKey,
215
216    /// The stream kind of the OUTPUT of the executor.
217    pub stream_kind: PbStreamKind,
218
219    /// Identity of the executor.
220    pub identity: String,
221
222    /// The executor id of the executor.
223    pub id: u64,
224}
225
226impl ExecutorInfo {
227    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
228        Self {
229            schema,
230            stream_key,
231            stream_kind: PbStreamKind::Retract, // dummy value for test
232            identity,
233            id,
234        }
235    }
236}
237
238/// [`Execute`] describes the methods an executor should implement to handle control messages.
239pub trait Execute: Send + 'static {
240    fn execute(self: Box<Self>) -> BoxedMessageStream;
241
242    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
243        self.execute()
244    }
245
246    fn boxed(self) -> Box<dyn Execute>
247    where
248        Self: Sized + Send + 'static,
249    {
250        Box::new(self)
251    }
252}
253
254/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
255/// handle messages ([`Execute`]).
256pub struct Executor {
257    info: ExecutorInfo,
258    execute: Box<dyn Execute>,
259}
260
261impl Executor {
262    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
263        Self { info, execute }
264    }
265
266    pub fn info(&self) -> &ExecutorInfo {
267        &self.info
268    }
269
270    pub fn schema(&self) -> &Schema {
271        &self.info.schema
272    }
273
274    pub fn stream_key(&self) -> StreamKeyRef<'_> {
275        &self.info.stream_key
276    }
277
278    pub fn stream_kind(&self) -> PbStreamKind {
279        self.info.stream_kind
280    }
281
282    pub fn identity(&self) -> &str {
283        &self.info.identity
284    }
285
286    pub fn execute(self) -> BoxedMessageStream {
287        self.execute.execute()
288    }
289
290    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
291        self.execute.execute_with_epoch(epoch)
292    }
293}
294
295impl std::fmt::Debug for Executor {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.write_str(self.identity())
298    }
299}
300
301impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
302    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
303        Self::new(info, execute)
304    }
305}
306
307impl<E> From<(ExecutorInfo, E)> for Executor
308where
309    E: Execute,
310{
311    fn from((info, execute): (ExecutorInfo, E)) -> Self {
312        Self::new(info, execute.boxed())
313    }
314}
315
316pub const INVALID_EPOCH: u64 = 0;
317
318type UpstreamFragmentId = FragmentId;
319type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
320
321#[derive(Debug, Clone, PartialEq)]
322#[cfg_attr(any(test, feature = "test"), derive(Default))]
323pub struct UpdateMutation {
324    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
325    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
326    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
327    pub dropped_actors: HashSet<ActorId>,
328    pub actor_splits: SplitAssignments,
329    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
330    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
331    pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
332}
333
334#[derive(Debug, Clone, PartialEq)]
335#[cfg_attr(any(test, feature = "test"), derive(Default))]
336pub struct AddMutation {
337    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
338    pub added_actors: HashSet<ActorId>,
339    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
340    pub splits: SplitAssignments,
341    pub pause: bool,
342    /// (`upstream_mv_table_id`,  `subscriber_id`)
343    pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
344    /// nodes which should start backfill
345    pub backfill_nodes_to_pause: HashSet<FragmentId>,
346    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
347    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
348}
349
350#[derive(Debug, Clone, PartialEq)]
351#[cfg_attr(any(test, feature = "test"), derive(Default))]
352pub struct StopMutation {
353    pub dropped_actors: HashSet<ActorId>,
354    pub dropped_sink_fragments: HashSet<FragmentId>,
355}
356
357/// See [`PbMutation`] for the semantics of each mutation.
358#[derive(Debug, Clone, PartialEq)]
359pub enum Mutation {
360    Stop(StopMutation),
361    Update(UpdateMutation),
362    Add(AddMutation),
363    SourceChangeSplit(SplitAssignments),
364    Pause,
365    Resume,
366    Throttle(HashMap<ActorId, Option<u32>>),
367    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
368    DropSubscriptions {
369        /// `subscriber` -> `upstream_mv_table_id`
370        subscriptions_to_drop: Vec<(SubscriberId, TableId)>,
371    },
372    StartFragmentBackfill {
373        fragment_ids: HashSet<FragmentId>,
374    },
375    RefreshStart {
376        table_id: TableId,
377        associated_source_id: SourceId,
378    },
379    ListFinish {
380        associated_source_id: SourceId,
381    },
382    LoadFinish {
383        associated_source_id: SourceId,
384    },
385}
386
387/// The generic type `M` is the mutation type of the barrier.
388///
389/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
390/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
391#[derive(Debug, Clone)]
392pub struct BarrierInner<M> {
393    pub epoch: EpochPair,
394    pub mutation: M,
395    pub kind: BarrierKind,
396
397    /// Tracing context for the **current** epoch of this barrier.
398    pub tracing_context: TracingContext,
399
400    /// The actors that this barrier has passed locally. Used for debugging only.
401    pub passed_actors: Vec<ActorId>,
402}
403
404pub type BarrierMutationType = Option<Arc<Mutation>>;
405pub type Barrier = BarrierInner<BarrierMutationType>;
406pub type DispatcherBarrier = BarrierInner<()>;
407
408impl<M: Default> BarrierInner<M> {
409    /// Create a plain barrier.
410    pub fn new_test_barrier(epoch: u64) -> Self {
411        Self {
412            epoch: EpochPair::new_test_epoch(epoch),
413            kind: BarrierKind::Checkpoint,
414            tracing_context: TracingContext::none(),
415            mutation: Default::default(),
416            passed_actors: Default::default(),
417        }
418    }
419
420    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
421        Self {
422            epoch: EpochPair::new(epoch, prev_epoch),
423            kind: BarrierKind::Checkpoint,
424            tracing_context: TracingContext::none(),
425            mutation: Default::default(),
426            passed_actors: Default::default(),
427        }
428    }
429}
430
431impl Barrier {
432    pub fn into_dispatcher(self) -> DispatcherBarrier {
433        DispatcherBarrier {
434            epoch: self.epoch,
435            mutation: (),
436            kind: self.kind,
437            tracing_context: self.tracing_context,
438            passed_actors: self.passed_actors,
439        }
440    }
441
442    #[must_use]
443    pub fn with_mutation(self, mutation: Mutation) -> Self {
444        Self {
445            mutation: Some(Arc::new(mutation)),
446            ..self
447        }
448    }
449
450    #[must_use]
451    pub fn with_stop(self) -> Self {
452        self.with_mutation(Mutation::Stop(StopMutation {
453            dropped_actors: Default::default(),
454            dropped_sink_fragments: Default::default(),
455        }))
456    }
457
458    /// Whether this barrier carries stop mutation.
459    pub fn is_with_stop_mutation(&self) -> bool {
460        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
461    }
462
463    /// Whether this barrier is to stop the actor with `actor_id`.
464    pub fn is_stop(&self, actor_id: ActorId) -> bool {
465        self.all_stop_actors()
466            .is_some_and(|actors| actors.contains(&actor_id))
467    }
468
469    pub fn is_checkpoint(&self) -> bool {
470        self.kind == BarrierKind::Checkpoint
471    }
472
473    /// Get the initial split assignments for the actor with `actor_id`.
474    ///
475    /// This should only be called on the initial barrier received by the executor. It must be
476    ///
477    /// - `Add` mutation when it's a new streaming job, or recovery.
478    /// - `Update` mutation when it's created for scaling.
479    ///
480    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
481    /// of existing executors.
482    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
483        match self.mutation.as_deref()? {
484            Mutation::Update(UpdateMutation { actor_splits, .. })
485            | Mutation::Add(AddMutation {
486                splits: actor_splits,
487                ..
488            }) => actor_splits.get(&actor_id),
489
490            _ => {
491                if cfg!(debug_assertions) {
492                    panic!(
493                        "the initial mutation of the barrier should not be {:?}",
494                        self.mutation
495                    );
496                }
497                None
498            }
499        }
500        .map(|s| s.as_slice())
501    }
502
503    /// Get all actors that to be stopped (dropped) by this barrier.
504    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
505        match self.mutation.as_deref() {
506            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
507            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
508            _ => None,
509        }
510    }
511
512    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
513    /// `Values` to decide whether to output the existing (historical) data.
514    ///
515    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
516    /// added for scaling are not included.
517    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
518        match self.mutation.as_deref() {
519            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
520                added_actors.contains(&actor_id)
521            }
522            _ => false,
523        }
524    }
525
526    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
527        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
528            fragment_ids.contains(&fragment_id)
529        } else {
530            false
531        }
532    }
533
534    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
535    ///
536    /// # Use case
537    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
538    /// * Pause a standalone shared `SourceExecutor`.
539    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
540    ///
541    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
542    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
543    /// needs to be turned off.
544    ///
545    /// ## Some special cases not included
546    ///
547    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
548    /// care about **number of downstream fragments** (more precisely, existence).
549    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
550    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
551    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
552        let Some(mutation) = self.mutation.as_deref() else {
553            return false;
554        };
555        match mutation {
556            // Add is for mv, index and sink creation.
557            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
558            Mutation::Update(_)
559            | Mutation::Stop(_)
560            | Mutation::Pause
561            | Mutation::Resume
562            | Mutation::SourceChangeSplit(_)
563            | Mutation::Throttle(_)
564            | Mutation::DropSubscriptions { .. }
565            | Mutation::ConnectorPropsChange(_)
566            | Mutation::StartFragmentBackfill { .. }
567            | Mutation::RefreshStart { .. }
568            | Mutation::ListFinish { .. }
569            | Mutation::LoadFinish { .. } => false,
570        }
571    }
572
573    /// Whether this barrier requires the executor to pause its data stream on startup.
574    pub fn is_pause_on_startup(&self) -> bool {
575        match self.mutation.as_deref() {
576            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
577            _ => false,
578        }
579    }
580
581    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
582        match self.mutation.as_deref() {
583            Some(Mutation::Add(AddMutation {
584                backfill_nodes_to_pause,
585                ..
586            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
587            Some(Mutation::Update(_)) => false,
588            _ => {
589                tracing::warn!(
590                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
591                    self
592                );
593                false
594            }
595        }
596    }
597
598    /// Whether this barrier is for resume.
599    pub fn is_resume(&self) -> bool {
600        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
601    }
602
603    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
604    /// with `actor_id`.
605    pub fn as_update_merge(
606        &self,
607        actor_id: ActorId,
608        upstream_fragment_id: UpstreamFragmentId,
609    ) -> Option<&MergeUpdate> {
610        self.mutation
611            .as_deref()
612            .and_then(|mutation| match mutation {
613                Mutation::Update(UpdateMutation { merges, .. }) => {
614                    merges.get(&(actor_id, upstream_fragment_id))
615                }
616                _ => None,
617            })
618    }
619
620    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
621    /// the specified downstream fragment.
622    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
623        self.mutation
624            .as_deref()
625            .and_then(|mutation| match mutation {
626                Mutation::Add(AddMutation {
627                    new_upstream_sinks, ..
628                }) => new_upstream_sinks.get(&fragment_id),
629                _ => None,
630            })
631    }
632
633    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
634    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
635        self.mutation
636            .as_deref()
637            .and_then(|mutation| match mutation {
638                Mutation::Stop(StopMutation {
639                    dropped_sink_fragments,
640                    ..
641                }) => Some(dropped_sink_fragments),
642                _ => None,
643            })
644    }
645
646    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
647    /// with `actor_id`.
648    ///
649    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
650    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
651    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
652        self.mutation
653            .as_deref()
654            .and_then(|mutation| match mutation {
655                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
656                    vnode_bitmaps.get(&actor_id).cloned()
657                }
658                _ => None,
659            })
660    }
661
662    pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
663        self.mutation
664            .as_deref()
665            .and_then(|mutation| match mutation {
666                Mutation::Update(UpdateMutation {
667                    sink_add_columns, ..
668                }) => sink_add_columns.get(&sink_id).cloned(),
669                _ => None,
670            })
671    }
672
673    pub fn get_curr_epoch(&self) -> Epoch {
674        Epoch(self.epoch.curr)
675    }
676
677    /// Retrieve the tracing context for the **current** epoch of this barrier.
678    pub fn tracing_context(&self) -> &TracingContext {
679        &self.tracing_context
680    }
681
682    pub fn added_subscriber_on_mv_table(
683        &self,
684        mv_table_id: TableId,
685    ) -> impl Iterator<Item = SubscriberId> + '_ {
686        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
687            Some(add)
688        } else {
689            None
690        }
691        .into_iter()
692        .flat_map(move |add| {
693            add.subscriptions_to_add.iter().filter_map(
694                move |(upstream_mv_table_id, subscriber_id)| {
695                    if *upstream_mv_table_id == mv_table_id {
696                        Some(*subscriber_id)
697                    } else {
698                        None
699                    }
700                },
701            )
702        })
703    }
704}
705
706impl<M: PartialEq> PartialEq for BarrierInner<M> {
707    fn eq(&self, other: &Self) -> bool {
708        self.epoch == other.epoch && self.mutation == other.mutation
709    }
710}
711
712impl Mutation {
713    /// Return true if the mutation is stop.
714    ///
715    /// Note that this does not mean we will stop the current actor.
716    #[cfg(test)]
717    pub fn is_stop(&self) -> bool {
718        matches!(self, Mutation::Stop(_))
719    }
720
721    #[cfg(test)]
722    fn to_protobuf(&self) -> PbMutation {
723        use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
724        use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
725        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
726        use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
727        use risingwave_pb::stream_plan::{
728            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
729            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation, PbSinkAddColumns,
730            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
731            PbThrottleMutation, PbUpdateMutation,
732        };
733        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
734            actor_splits
735                .iter()
736                .map(|(&actor_id, splits)| {
737                    (
738                        actor_id,
739                        ConnectorSplits {
740                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
741                        },
742                    )
743                })
744                .collect::<HashMap<_, _>>()
745        };
746
747        match self {
748            Mutation::Stop(StopMutation {
749                dropped_actors,
750                dropped_sink_fragments,
751            }) => PbMutation::Stop(PbStopMutation {
752                actors: dropped_actors.iter().copied().collect(),
753                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
754            }),
755            Mutation::Update(UpdateMutation {
756                dispatchers,
757                merges,
758                vnode_bitmaps,
759                dropped_actors,
760                actor_splits,
761                actor_new_dispatchers,
762                actor_cdc_table_snapshot_splits,
763                sink_add_columns,
764            }) => PbMutation::Update(PbUpdateMutation {
765                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
766                merge_update: merges.values().cloned().collect(),
767                actor_vnode_bitmap_update: vnode_bitmaps
768                    .iter()
769                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
770                    .collect(),
771                dropped_actors: dropped_actors.iter().copied().collect(),
772                actor_splits: actor_splits_to_protobuf(actor_splits),
773                actor_new_dispatchers: actor_new_dispatchers
774                    .iter()
775                    .map(|(&actor_id, dispatchers)| {
776                        (
777                            actor_id,
778                            PbDispatchers {
779                                dispatchers: dispatchers.clone(),
780                            },
781                        )
782                    })
783                    .collect(),
784                actor_cdc_table_snapshot_splits:
785                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
786                        actor_cdc_table_snapshot_splits.clone(),
787                    )
788                    .into(),
789                sink_add_columns: sink_add_columns
790                    .iter()
791                    .map(|(sink_id, add_columns)| {
792                        (
793                            *sink_id,
794                            PbSinkAddColumns {
795                                fields: add_columns.iter().map(|field| field.to_prost()).collect(),
796                            },
797                        )
798                    })
799                    .collect(),
800            }),
801            Mutation::Add(AddMutation {
802                adds,
803                added_actors,
804                splits,
805                pause,
806                subscriptions_to_add,
807                backfill_nodes_to_pause,
808                actor_cdc_table_snapshot_splits,
809                new_upstream_sinks,
810            }) => PbMutation::Add(PbAddMutation {
811                actor_dispatchers: adds
812                    .iter()
813                    .map(|(&actor_id, dispatchers)| {
814                        (
815                            actor_id,
816                            PbDispatchers {
817                                dispatchers: dispatchers.clone(),
818                            },
819                        )
820                    })
821                    .collect(),
822                added_actors: added_actors.iter().copied().collect(),
823                actor_splits: actor_splits_to_protobuf(splits),
824                pause: *pause,
825                subscriptions_to_add: subscriptions_to_add
826                    .iter()
827                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
828                        subscriber_id: *subscriber_id,
829                        upstream_mv_table_id: *table_id,
830                    })
831                    .collect(),
832                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
833                actor_cdc_table_snapshot_splits:
834                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
835                        actor_cdc_table_snapshot_splits.clone(),
836                    )
837                    .into(),
838                new_upstream_sinks: new_upstream_sinks
839                    .iter()
840                    .map(|(k, v)| (*k, v.clone()))
841                    .collect(),
842            }),
843            Mutation::SourceChangeSplit(changes) => {
844                PbMutation::Splits(PbSourceChangeSplitMutation {
845                    actor_splits: changes
846                        .iter()
847                        .map(|(&actor_id, splits)| {
848                            (
849                                actor_id,
850                                ConnectorSplits {
851                                    splits: splits
852                                        .clone()
853                                        .iter()
854                                        .map(ConnectorSplit::from)
855                                        .collect(),
856                                },
857                            )
858                        })
859                        .collect(),
860                })
861            }
862            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
863            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
864            Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
865                actor_throttle: changes
866                    .iter()
867                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
868                    .collect(),
869            }),
870            Mutation::DropSubscriptions {
871                subscriptions_to_drop,
872            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
873                info: subscriptions_to_drop
874                    .iter()
875                    .map(
876                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
877                            subscriber_id: *subscriber_id,
878                            upstream_mv_table_id: *upstream_mv_table_id,
879                        },
880                    )
881                    .collect(),
882            }),
883            Mutation::ConnectorPropsChange(map) => {
884                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
885                    connector_props_infos: map
886                        .iter()
887                        .map(|(actor_id, options)| {
888                            (
889                                *actor_id,
890                                ConnectorPropsInfo {
891                                    connector_props_info: options
892                                        .iter()
893                                        .map(|(k, v)| (k.clone(), v.clone()))
894                                        .collect(),
895                                },
896                            )
897                        })
898                        .collect(),
899                })
900            }
901            Mutation::StartFragmentBackfill { fragment_ids } => {
902                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
903                    fragment_ids: fragment_ids.iter().copied().collect(),
904                })
905            }
906            Mutation::RefreshStart {
907                table_id,
908                associated_source_id,
909            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
910                table_id: *table_id,
911                associated_source_id: *associated_source_id,
912            }),
913            Mutation::ListFinish {
914                associated_source_id,
915            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
916                associated_source_id: *associated_source_id,
917            }),
918            Mutation::LoadFinish {
919                associated_source_id,
920            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
921                associated_source_id: *associated_source_id,
922            }),
923        }
924    }
925
926    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
927        let mutation = match prost {
928            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
929                dropped_actors: stop.actors.iter().copied().collect(),
930                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
931            }),
932
933            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
934                dispatchers: update
935                    .dispatcher_update
936                    .iter()
937                    .map(|u| (u.actor_id, u.clone()))
938                    .into_group_map(),
939                merges: update
940                    .merge_update
941                    .iter()
942                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
943                    .collect(),
944                vnode_bitmaps: update
945                    .actor_vnode_bitmap_update
946                    .iter()
947                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
948                    .collect(),
949                dropped_actors: update.dropped_actors.iter().copied().collect(),
950                actor_splits: update
951                    .actor_splits
952                    .iter()
953                    .map(|(&actor_id, splits)| {
954                        (
955                            actor_id,
956                            splits
957                                .splits
958                                .iter()
959                                .map(|split| split.try_into().unwrap())
960                                .collect(),
961                        )
962                    })
963                    .collect(),
964                actor_new_dispatchers: update
965                    .actor_new_dispatchers
966                    .iter()
967                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
968                    .collect(),
969                actor_cdc_table_snapshot_splits:
970                    build_actor_cdc_table_snapshot_splits_with_generation(
971                        update
972                            .actor_cdc_table_snapshot_splits
973                            .clone()
974                            .unwrap_or_default(),
975                    ),
976                sink_add_columns: update
977                    .sink_add_columns
978                    .iter()
979                    .map(|(sink_id, add_columns)| {
980                        (
981                            *sink_id,
982                            add_columns.fields.iter().map(Field::from_prost).collect(),
983                        )
984                    })
985                    .collect(),
986            }),
987
988            PbMutation::Add(add) => Mutation::Add(AddMutation {
989                adds: add
990                    .actor_dispatchers
991                    .iter()
992                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
993                    .collect(),
994                added_actors: add.added_actors.iter().copied().collect(),
995                // TODO: remove this and use `SourceChangesSplit` after we support multiple
996                // mutations.
997                splits: add
998                    .actor_splits
999                    .iter()
1000                    .map(|(&actor_id, splits)| {
1001                        (
1002                            actor_id,
1003                            splits
1004                                .splits
1005                                .iter()
1006                                .map(|split| split.try_into().unwrap())
1007                                .collect(),
1008                        )
1009                    })
1010                    .collect(),
1011                pause: add.pause,
1012                subscriptions_to_add: add
1013                    .subscriptions_to_add
1014                    .iter()
1015                    .map(
1016                        |SubscriptionUpstreamInfo {
1017                             subscriber_id,
1018                             upstream_mv_table_id,
1019                         }| { (*upstream_mv_table_id, *subscriber_id) },
1020                    )
1021                    .collect(),
1022                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1023                actor_cdc_table_snapshot_splits:
1024                    build_actor_cdc_table_snapshot_splits_with_generation(
1025                        add.actor_cdc_table_snapshot_splits
1026                            .clone()
1027                            .unwrap_or_default(),
1028                    ),
1029                new_upstream_sinks: add
1030                    .new_upstream_sinks
1031                    .iter()
1032                    .map(|(k, v)| (*k, v.clone()))
1033                    .collect(),
1034            }),
1035
1036            PbMutation::Splits(s) => {
1037                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1038                    Vec::with_capacity(s.actor_splits.len());
1039                for (&actor_id, splits) in &s.actor_splits {
1040                    if !splits.splits.is_empty() {
1041                        change_splits.push((
1042                            actor_id,
1043                            splits
1044                                .splits
1045                                .iter()
1046                                .map(SplitImpl::try_from)
1047                                .try_collect()?,
1048                        ));
1049                    }
1050                }
1051                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1052            }
1053            PbMutation::Pause(_) => Mutation::Pause,
1054            PbMutation::Resume(_) => Mutation::Resume,
1055            PbMutation::Throttle(changes) => Mutation::Throttle(
1056                changes
1057                    .actor_throttle
1058                    .iter()
1059                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1060                    .collect(),
1061            ),
1062            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1063                subscriptions_to_drop: drop
1064                    .info
1065                    .iter()
1066                    .map(|info| (info.subscriber_id, info.upstream_mv_table_id))
1067                    .collect(),
1068            },
1069            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1070                Mutation::ConnectorPropsChange(
1071                    alter_connector_props
1072                        .connector_props_infos
1073                        .iter()
1074                        .map(|(connector_id, options)| {
1075                            (
1076                                *connector_id,
1077                                options
1078                                    .connector_props_info
1079                                    .iter()
1080                                    .map(|(k, v)| (k.clone(), v.clone()))
1081                                    .collect(),
1082                            )
1083                        })
1084                        .collect(),
1085                )
1086            }
1087            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1088                Mutation::StartFragmentBackfill {
1089                    fragment_ids: start_fragment_backfill
1090                        .fragment_ids
1091                        .iter()
1092                        .copied()
1093                        .collect(),
1094                }
1095            }
1096            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1097                table_id: refresh_start.table_id,
1098                associated_source_id: refresh_start.associated_source_id,
1099            },
1100            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1101                associated_source_id: list_finish.associated_source_id,
1102            },
1103            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1104                associated_source_id: load_finish.associated_source_id,
1105            },
1106        };
1107        Ok(mutation)
1108    }
1109}
1110
1111impl<M> BarrierInner<M> {
1112    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1113        let Self {
1114            epoch,
1115            mutation,
1116            kind,
1117            passed_actors,
1118            tracing_context,
1119            ..
1120        } = self;
1121
1122        PbBarrier {
1123            epoch: Some(PbEpoch {
1124                curr: epoch.curr,
1125                prev: epoch.prev,
1126            }),
1127            mutation: Some(PbBarrierMutation {
1128                mutation: barrier_fn(mutation),
1129            }),
1130            tracing_context: tracing_context.to_protobuf(),
1131            kind: *kind as _,
1132            passed_actors: passed_actors.clone(),
1133        }
1134    }
1135
1136    fn from_protobuf_inner(
1137        prost: &PbBarrier,
1138        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1139    ) -> StreamExecutorResult<Self> {
1140        let epoch = prost.get_epoch()?;
1141
1142        Ok(Self {
1143            kind: prost.kind(),
1144            epoch: EpochPair::new(epoch.curr, epoch.prev),
1145            mutation: mutation_from_pb(
1146                prost
1147                    .mutation
1148                    .as_ref()
1149                    .and_then(|mutation| mutation.mutation.as_ref()),
1150            )?,
1151            passed_actors: prost.get_passed_actors().clone(),
1152            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1153        })
1154    }
1155
1156    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1157        BarrierInner {
1158            epoch: self.epoch,
1159            mutation: f(self.mutation),
1160            kind: self.kind,
1161            tracing_context: self.tracing_context,
1162            passed_actors: self.passed_actors,
1163        }
1164    }
1165}
1166
1167impl DispatcherBarrier {
1168    pub fn to_protobuf(&self) -> PbBarrier {
1169        self.to_protobuf_inner(|_| None)
1170    }
1171}
1172
1173impl Barrier {
1174    #[cfg(test)]
1175    pub fn to_protobuf(&self) -> PbBarrier {
1176        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1177    }
1178
1179    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1180        Self::from_protobuf_inner(prost, |mutation| {
1181            mutation
1182                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1183                .transpose()
1184        })
1185    }
1186}
1187
1188#[derive(Debug, PartialEq, Eq, Clone)]
1189pub struct Watermark {
1190    pub col_idx: usize,
1191    pub data_type: DataType,
1192    pub val: ScalarImpl,
1193}
1194
1195impl PartialOrd for Watermark {
1196    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1197        Some(self.cmp(other))
1198    }
1199}
1200
1201impl Ord for Watermark {
1202    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1203        self.val.default_cmp(&other.val)
1204    }
1205}
1206
1207impl Watermark {
1208    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1209        Self {
1210            col_idx,
1211            data_type,
1212            val,
1213        }
1214    }
1215
1216    pub async fn transform_with_expr(
1217        self,
1218        expr: &NonStrictExpression<impl Expression>,
1219        new_col_idx: usize,
1220    ) -> Option<Self> {
1221        let Self { col_idx, val, .. } = self;
1222        let row = {
1223            let mut row = vec![None; col_idx + 1];
1224            row[col_idx] = Some(val);
1225            OwnedRow::new(row)
1226        };
1227        let val = expr.eval_row_infallible(&row).await?;
1228        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1229    }
1230
1231    /// Transform the watermark with the given output indices. If this watermark is not in the
1232    /// output, return `None`.
1233    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1234        output_indices
1235            .iter()
1236            .position(|p| *p == self.col_idx)
1237            .map(|new_col_idx| self.with_idx(new_col_idx))
1238    }
1239
1240    pub fn to_protobuf(&self) -> PbWatermark {
1241        PbWatermark {
1242            column: Some(PbInputRef {
1243                index: self.col_idx as _,
1244                r#type: Some(self.data_type.to_protobuf()),
1245            }),
1246            val: Some(&self.val).to_protobuf().into(),
1247        }
1248    }
1249
1250    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1251        let col_ref = prost.get_column()?;
1252        let data_type = DataType::from(col_ref.get_type()?);
1253        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1254            .expect("watermark value cannot be null");
1255        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1256    }
1257
1258    pub fn with_idx(self, idx: usize) -> Self {
1259        Self::new(idx, self.data_type, self.val)
1260    }
1261}
1262
1263#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1264pub enum MessageInner<M> {
1265    Chunk(StreamChunk),
1266    Barrier(BarrierInner<M>),
1267    Watermark(Watermark),
1268}
1269
1270impl<M> MessageInner<M> {
1271    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1272        match self {
1273            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1274            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1275            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1276        }
1277    }
1278}
1279
1280pub type Message = MessageInner<BarrierMutationType>;
1281pub type DispatcherMessage = MessageInner<()>;
1282
1283/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1284/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1285#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1286pub enum MessageBatchInner<M> {
1287    Chunk(StreamChunk),
1288    BarrierBatch(Vec<BarrierInner<M>>),
1289    Watermark(Watermark),
1290}
1291pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1292pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1293pub type DispatcherMessageBatch = MessageBatchInner<()>;
1294
1295impl From<DispatcherMessage> for DispatcherMessageBatch {
1296    fn from(m: DispatcherMessage) -> Self {
1297        match m {
1298            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1299            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1300            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1301        }
1302    }
1303}
1304
1305impl From<StreamChunk> for Message {
1306    fn from(chunk: StreamChunk) -> Self {
1307        Message::Chunk(chunk)
1308    }
1309}
1310
1311impl<'a> TryFrom<&'a Message> for &'a Barrier {
1312    type Error = ();
1313
1314    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1315        match m {
1316            Message::Chunk(_) => Err(()),
1317            Message::Barrier(b) => Ok(b),
1318            Message::Watermark(_) => Err(()),
1319        }
1320    }
1321}
1322
1323impl Message {
1324    /// Return true if the message is a stop barrier, meaning the stream
1325    /// will not continue, false otherwise.
1326    ///
1327    /// Note that this does not mean we will stop the current actor.
1328    #[cfg(test)]
1329    pub fn is_stop(&self) -> bool {
1330        matches!(
1331            self,
1332            Message::Barrier(Barrier {
1333                mutation,
1334                ..
1335            }) if mutation.as_ref().unwrap().is_stop()
1336        )
1337    }
1338}
1339
1340impl DispatcherMessageBatch {
1341    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1342        let prost = match self {
1343            Self::Chunk(stream_chunk) => {
1344                let prost_stream_chunk = stream_chunk.to_protobuf();
1345                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1346            }
1347            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1348                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1349            }),
1350            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1351        };
1352        PbStreamMessageBatch {
1353            stream_message_batch: Some(prost),
1354        }
1355    }
1356
1357    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1358        let res = match prost.get_stream_message_batch()? {
1359            StreamMessageBatch::StreamChunk(chunk) => {
1360                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1361            }
1362            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1363                let barriers = barrier_batch
1364                    .barriers
1365                    .iter()
1366                    .map(|barrier| {
1367                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1368                            if mutation.is_some() {
1369                                if cfg!(debug_assertions) {
1370                                    panic!("should not receive message of barrier with mutation");
1371                                } else {
1372                                    warn!(?barrier, "receive message of barrier with mutation");
1373                                }
1374                            }
1375                            Ok(())
1376                        })
1377                    })
1378                    .try_collect()?;
1379                Self::BarrierBatch(barriers)
1380            }
1381            StreamMessageBatch::Watermark(watermark) => {
1382                Self::Watermark(Watermark::from_protobuf(watermark)?)
1383            }
1384        };
1385        Ok(res)
1386    }
1387
1388    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1389        ::prost::Message::encoded_len(msg)
1390    }
1391}
1392
1393pub type StreamKey = Vec<usize>;
1394pub type StreamKeyRef<'a> = &'a [usize];
1395pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1396
1397/// Expect the first message of the given `stream` as a barrier.
1398pub async fn expect_first_barrier<M: Debug>(
1399    stream: &mut (impl MessageStreamInner<M> + Unpin),
1400) -> StreamExecutorResult<BarrierInner<M>> {
1401    let message = stream
1402        .next()
1403        .instrument_await("expect_first_barrier")
1404        .await
1405        .context("failed to extract the first message: stream closed unexpectedly")??;
1406    let barrier = message
1407        .into_barrier()
1408        .expect("the first message must be a barrier");
1409    // TODO: Is this check correct?
1410    assert!(matches!(
1411        barrier.kind,
1412        BarrierKind::Checkpoint | BarrierKind::Initial
1413    ));
1414    Ok(barrier)
1415}
1416
1417/// Expect the first message of the given `stream` as a barrier.
1418pub async fn expect_first_barrier_from_aligned_stream(
1419    stream: &mut (impl AlignedMessageStream + Unpin),
1420) -> StreamExecutorResult<Barrier> {
1421    let message = stream
1422        .next()
1423        .instrument_await("expect_first_barrier")
1424        .await
1425        .context("failed to extract the first message: stream closed unexpectedly")??;
1426    let barrier = message
1427        .into_barrier()
1428        .expect("the first message must be a barrier");
1429    Ok(barrier)
1430}
1431
1432/// `StreamConsumer` is the last step in an actor.
1433pub trait StreamConsumer: Send + 'static {
1434    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1435
1436    fn execute(self: Box<Self>) -> Self::BarrierStream;
1437}
1438
1439type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1440
1441/// A stream for merging messages from multiple upstreams.
1442/// Can dynamically add and delete upstream streams.
1443/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1444pub struct DynamicReceivers<InputId, M> {
1445    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1446    barrier: Option<BarrierInner<M>>,
1447    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1448    start_ts: Option<Instant>,
1449    /// The upstreams that're blocked by the `barrier`.
1450    blocked: Vec<BoxedMessageInput<InputId, M>>,
1451    /// The upstreams that're not blocked and can be polled.
1452    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1453    /// watermark column index -> `BufferedWatermarks`
1454    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1455    /// Currently only used for union.
1456    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1457    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1458    merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1459}
1460
1461impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1462    for DynamicReceivers<InputId, M>
1463{
1464    type Item = MessageStreamItemInner<M>;
1465
1466    fn poll_next(
1467        mut self: Pin<&mut Self>,
1468        cx: &mut std::task::Context<'_>,
1469    ) -> Poll<Option<Self::Item>> {
1470        if self.is_empty() {
1471            return Poll::Ready(None);
1472        }
1473
1474        loop {
1475            match futures::ready!(self.active.poll_next_unpin(cx)) {
1476                // Directly forward the error.
1477                Some((Some(Err(e)), _)) => {
1478                    return Poll::Ready(Some(Err(e)));
1479                }
1480                // Handle the message from some upstream.
1481                Some((Some(Ok(message)), remaining)) => {
1482                    let input_id = remaining.id();
1483                    match message {
1484                        MessageInner::Chunk(chunk) => {
1485                            // Continue polling this upstream by pushing it back to `active`.
1486                            self.active.push(remaining.into_future());
1487                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1488                        }
1489                        MessageInner::Watermark(watermark) => {
1490                            // Continue polling this upstream by pushing it back to `active`.
1491                            self.active.push(remaining.into_future());
1492                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1493                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1494                            }
1495                        }
1496                        MessageInner::Barrier(barrier) => {
1497                            // Block this upstream by pushing it to `blocked`.
1498                            if self.blocked.is_empty() {
1499                                self.start_ts = Some(Instant::now());
1500                            }
1501                            self.blocked.push(remaining);
1502                            if let Some(current_barrier) = self.barrier.as_ref() {
1503                                if current_barrier.epoch != barrier.epoch {
1504                                    return Poll::Ready(Some(Err(
1505                                        StreamExecutorError::align_barrier(
1506                                            current_barrier.clone().map_mutation(|_| None),
1507                                            barrier.map_mutation(|_| None),
1508                                        ),
1509                                    )));
1510                                }
1511                            } else {
1512                                self.barrier = Some(barrier);
1513                            }
1514                        }
1515                    }
1516                }
1517                // We use barrier as the control message of the stream. That is, we always stop the
1518                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1519                // termination.
1520                //
1521                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1522                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1523                // So this branch will never be reached in all cases.
1524                Some((None, remaining)) => {
1525                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1526                        "upstream input {:?} unexpectedly closed",
1527                        remaining.id()
1528                    )))));
1529                }
1530                // There's no active upstreams. Process the barrier and resume the blocked ones.
1531                None => {
1532                    assert!(!self.blocked.is_empty());
1533
1534                    let start_ts = self
1535                        .start_ts
1536                        .take()
1537                        .expect("should have received at least one barrier");
1538                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1539                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1540                    }
1541                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1542                        // Observe did a few atomic operation inside, we want to avoid the overhead.
1543                        merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1544                    }
1545
1546                    break;
1547                }
1548            }
1549        }
1550
1551        assert!(self.active.is_terminated());
1552
1553        let barrier = self.barrier.take().unwrap();
1554
1555        let upstreams = std::mem::take(&mut self.blocked);
1556        self.extend_active(upstreams);
1557        assert!(!self.active.is_terminated());
1558
1559        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1560    }
1561}
1562
1563impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1564    pub fn new(
1565        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1566        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1567        merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1568    ) -> Self {
1569        let mut this = Self {
1570            barrier: None,
1571            start_ts: None,
1572            blocked: Vec::with_capacity(upstreams.len()),
1573            active: Default::default(),
1574            buffered_watermarks: Default::default(),
1575            merge_barrier_align_duration,
1576            barrier_align_duration,
1577        };
1578        this.extend_active(upstreams);
1579        this
1580    }
1581
1582    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1583    /// clean state right after a barrier.
1584    pub fn extend_active(
1585        &mut self,
1586        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1587    ) {
1588        assert!(self.blocked.is_empty() && self.barrier.is_none());
1589
1590        self.active
1591            .extend(upstreams.into_iter().map(|s| s.into_future()));
1592    }
1593
1594    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1595    pub fn handle_watermark(
1596        &mut self,
1597        input_id: InputId,
1598        watermark: Watermark,
1599    ) -> Option<Watermark> {
1600        let col_idx = watermark.col_idx;
1601        // Insert a buffer watermarks when first received from a column.
1602        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1603        let watermarks = self
1604            .buffered_watermarks
1605            .entry(col_idx)
1606            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1607        watermarks.handle_watermark(input_id, watermark)
1608    }
1609
1610    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1611    /// right after a barrier.
1612    pub fn add_upstreams_from(
1613        &mut self,
1614        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1615    ) {
1616        assert!(self.blocked.is_empty() && self.barrier.is_none());
1617
1618        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1619        let input_ids = new_inputs.iter().map(|input| input.id());
1620        self.buffered_watermarks.values_mut().for_each(|buffers| {
1621            // Add buffers to the buffered watermarks for all cols
1622            buffers.add_buffers(input_ids.clone());
1623        });
1624        self.active
1625            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1626    }
1627
1628    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1629    /// clean state right after a barrier.
1630    /// The current container does not necessarily contain all the input ids passed in.
1631    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1632        assert!(self.blocked.is_empty() && self.barrier.is_none());
1633
1634        let new_upstreams = std::mem::take(&mut self.active)
1635            .into_iter()
1636            .map(|s| s.into_inner().unwrap())
1637            .filter(|u| !upstream_input_ids.contains(&u.id()));
1638        self.extend_active(new_upstreams);
1639        self.buffered_watermarks.values_mut().for_each(|buffers| {
1640            // Call `check_heap` in case the only upstream(s) that does not have
1641            // watermark in heap is removed
1642            buffers.remove_buffer(upstream_input_ids.clone());
1643        });
1644    }
1645
1646    pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1647        self.merge_barrier_align_duration.clone()
1648    }
1649
1650    pub fn flush_buffered_watermarks(&mut self) {
1651        self.buffered_watermarks
1652            .values_mut()
1653            .for_each(|buffers| buffers.clear());
1654    }
1655
1656    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1657        self.blocked
1658            .iter()
1659            .map(|s| s.id())
1660            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1661    }
1662
1663    pub fn is_empty(&self) -> bool {
1664        self.blocked.is_empty() && self.active.is_empty()
1665    }
1666}
1667
1668// Explanation of why we need `DispatchBarrierBuffer`:
1669//
1670// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1671// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1672// for these old upstreams to completely process their barriers and align before we can safely update the
1673// `upstream-input-set`.
1674//
1675// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1676// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1677// downstream_merge_update -> old_actor_processing]
1678//
1679// To address this, we split the application of a barrier's `Mutation` into two steps:
1680// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1681//    and cache it.
1682// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1683//
1684// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1685// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1686// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1687// `barrier_rx`.
1688pub(crate) struct DispatchBarrierBuffer {
1689    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1690    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1691    recv_state: BarrierReceiverState,
1692    curr_upstream_fragment_id: FragmentId,
1693    actor_id: ActorId,
1694    // read-only context for building new inputs
1695    build_input_ctx: Arc<BuildInputContext>,
1696}
1697
1698struct BuildInputContext {
1699    pub actor_id: ActorId,
1700    pub local_barrier_manager: LocalBarrierManager,
1701    pub metrics: Arc<StreamingMetrics>,
1702    pub fragment_id: FragmentId,
1703}
1704
1705type BoxedNewInputsFuture =
1706    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1707
1708enum BarrierReceiverState {
1709    ReceivingBarrier,
1710    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1711}
1712
1713impl DispatchBarrierBuffer {
1714    pub fn new(
1715        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1716        actor_id: ActorId,
1717        curr_upstream_fragment_id: FragmentId,
1718        local_barrier_manager: LocalBarrierManager,
1719        metrics: Arc<StreamingMetrics>,
1720        fragment_id: FragmentId,
1721    ) -> Self {
1722        Self {
1723            buffer: VecDeque::new(),
1724            barrier_rx,
1725            recv_state: BarrierReceiverState::ReceivingBarrier,
1726            curr_upstream_fragment_id,
1727            actor_id,
1728            build_input_ctx: Arc::new(BuildInputContext {
1729                actor_id,
1730                local_barrier_manager,
1731                metrics,
1732                fragment_id,
1733            }),
1734        }
1735    }
1736
1737    pub async fn await_next_message(
1738        &mut self,
1739        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1740        metrics: &ActorInputMetrics,
1741    ) -> StreamExecutorResult<DispatcherMessage> {
1742        let mut start_time = Instant::now();
1743        let interval_duration = Duration::from_secs(15);
1744        let mut interval =
1745            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1746
1747        loop {
1748            tokio::select! {
1749                biased;
1750                msg = stream.try_next() => {
1751                    metrics
1752                        .actor_input_buffer_blocking_duration_ns
1753                        .inc_by(start_time.elapsed().as_nanos() as u64);
1754                    return msg?.ok_or_else(
1755                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1756                    );
1757                }
1758
1759                e = self.continuously_fetch_barrier_rx() => {
1760                    return Err(e);
1761                }
1762
1763                _ = interval.tick() => {
1764                    start_time = Instant::now();
1765                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1766                    continue;
1767                }
1768            }
1769        }
1770    }
1771
1772    pub async fn pop_barrier_with_inputs(
1773        &mut self,
1774        barrier: DispatcherBarrier,
1775    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1776        while self.buffer.is_empty() {
1777            self.try_fetch_barrier_rx(false).await?;
1778        }
1779        let (mut recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1780        apply_dispatcher_barrier(&mut recv_barrier, barrier);
1781
1782        Ok((recv_barrier, inputs))
1783    }
1784
1785    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1786        loop {
1787            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1788                return e;
1789            }
1790        }
1791    }
1792
1793    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1794        match &mut self.recv_state {
1795            BarrierReceiverState::ReceivingBarrier => {
1796                let Some(barrier) = self.barrier_rx.recv().await else {
1797                    if pending_on_end {
1798                        return pending().await;
1799                    } else {
1800                        return Err(StreamExecutorError::channel_closed(
1801                            "barrier channel closed unexpectedly",
1802                        ));
1803                    }
1804                };
1805                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1806                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1807                } else {
1808                    self.buffer.push_back((barrier, None));
1809                }
1810            }
1811            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1812                let new_inputs = fut.await?;
1813                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1814                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1815            }
1816        }
1817        Ok(())
1818    }
1819
1820    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1821        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1822            && !update.added_upstream_actors.is_empty()
1823        {
1824            // When update upstream fragment, added_actors will not be empty.
1825            let upstream_fragment_id =
1826                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1827                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1828                    new_upstream_fragment_id
1829                } else {
1830                    self.curr_upstream_fragment_id
1831                };
1832            let ctx = self.build_input_ctx.clone();
1833            let added_upstream_actors = update.added_upstream_actors.clone();
1834            let barrier = barrier.clone();
1835            let fut = async move {
1836                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1837                    let mut new_input = new_input(
1838                        &ctx.local_barrier_manager,
1839                        ctx.metrics.clone(),
1840                        ctx.actor_id,
1841                        ctx.fragment_id,
1842                        upstream_actor,
1843                        upstream_fragment_id,
1844                    )
1845                    .await?;
1846
1847                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1848                    // original upstreams.
1849                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1850                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1851
1852                    StreamExecutorResult::Ok(new_input)
1853                }))
1854                .await
1855            }
1856            .boxed();
1857
1858            Some(fut)
1859        } else {
1860            None
1861        }
1862    }
1863}