risingwave_stream/executor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54    PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
55    PbWatermark, SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63    BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::DispatchExecutor;
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector::*;
184pub use watermark_filter::{UpsertWatermarkFilterExecutor, WatermarkFilterExecutor};
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197    CdcTableSnapshotSplitAssignmentWithGeneration,
198    build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::id::{ExecutorId, SubscriberId};
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202
203pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
204pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
205pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
206
207/// Static information of an executor.
208#[derive(Debug, Default, Clone)]
209pub struct ExecutorInfo {
210    /// The schema of the OUTPUT of the executor.
211    pub schema: Schema,
212
213    /// The stream key indices of the OUTPUT of the executor.
214    pub stream_key: StreamKey,
215
216    /// The stream kind of the OUTPUT of the executor.
217    pub stream_kind: PbStreamKind,
218
219    /// Identity of the executor.
220    pub identity: String,
221
222    /// The executor id of the executor.
223    pub id: ExecutorId,
224}
225
226impl ExecutorInfo {
227    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
228        Self {
229            schema,
230            stream_key,
231            stream_kind: PbStreamKind::Retract, // dummy value for test
232            identity,
233            id: id.into(),
234        }
235    }
236}
237
238/// [`Execute`] describes the methods an executor should implement to handle control messages.
239pub trait Execute: Send + 'static {
240    fn execute(self: Box<Self>) -> BoxedMessageStream;
241
242    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
243        self.execute()
244    }
245
246    fn boxed(self) -> Box<dyn Execute>
247    where
248        Self: Sized + Send + 'static,
249    {
250        Box::new(self)
251    }
252}
253
254/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
255/// handle messages ([`Execute`]).
256pub struct Executor {
257    info: ExecutorInfo,
258    execute: Box<dyn Execute>,
259}
260
261impl Executor {
262    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
263        Self { info, execute }
264    }
265
266    pub fn info(&self) -> &ExecutorInfo {
267        &self.info
268    }
269
270    pub fn schema(&self) -> &Schema {
271        &self.info.schema
272    }
273
274    pub fn stream_key(&self) -> StreamKeyRef<'_> {
275        &self.info.stream_key
276    }
277
278    pub fn stream_kind(&self) -> PbStreamKind {
279        self.info.stream_kind
280    }
281
282    pub fn identity(&self) -> &str {
283        &self.info.identity
284    }
285
286    pub fn execute(self) -> BoxedMessageStream {
287        self.execute.execute()
288    }
289
290    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
291        self.execute.execute_with_epoch(epoch)
292    }
293}
294
295impl std::fmt::Debug for Executor {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.write_str(self.identity())
298    }
299}
300
301impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
302    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
303        Self::new(info, execute)
304    }
305}
306
307impl<E> From<(ExecutorInfo, E)> for Executor
308where
309    E: Execute,
310{
311    fn from((info, execute): (ExecutorInfo, E)) -> Self {
312        Self::new(info, execute.boxed())
313    }
314}
315
316pub const INVALID_EPOCH: u64 = 0;
317
318type UpstreamFragmentId = FragmentId;
319type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
320
321#[derive(Debug, Clone)]
322#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
323pub struct UpdateMutation {
324    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
325    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
326    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
327    pub dropped_actors: HashSet<ActorId>,
328    pub actor_splits: SplitAssignments,
329    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
330    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
331    pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
332    pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
333}
334
335#[derive(Debug, Clone)]
336#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
337pub struct AddMutation {
338    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
339    pub added_actors: HashSet<ActorId>,
340    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
341    pub splits: SplitAssignments,
342    pub pause: bool,
343    /// (`upstream_mv_table_id`,  `subscriber_id`)
344    pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
345    /// nodes which should start backfill
346    pub backfill_nodes_to_pause: HashSet<FragmentId>,
347    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
348    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
349}
350
351#[derive(Debug, Clone)]
352#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
353pub struct StopMutation {
354    pub dropped_actors: HashSet<ActorId>,
355    pub dropped_sink_fragments: HashSet<FragmentId>,
356}
357
358/// See [`PbMutation`] for the semantics of each mutation.
359#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
360#[derive(Debug, Clone)]
361pub enum Mutation {
362    Stop(StopMutation),
363    Update(UpdateMutation),
364    Add(AddMutation),
365    SourceChangeSplit(SplitAssignments),
366    Pause,
367    Resume,
368    Throttle(HashMap<FragmentId, ThrottleConfig>),
369    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
370    DropSubscriptions {
371        /// `subscriber` -> `upstream_mv_table_id`
372        subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
373    },
374    StartFragmentBackfill {
375        fragment_ids: HashSet<FragmentId>,
376    },
377    RefreshStart {
378        table_id: TableId,
379        associated_source_id: SourceId,
380    },
381    ListFinish {
382        associated_source_id: SourceId,
383    },
384    LoadFinish {
385        associated_source_id: SourceId,
386    },
387    ResetSource {
388        source_id: SourceId,
389    },
390}
391
392/// The generic type `M` is the mutation type of the barrier.
393///
394/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
395/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
396#[derive(Debug, Clone)]
397pub struct BarrierInner<M> {
398    pub epoch: EpochPair,
399    pub mutation: M,
400    pub kind: BarrierKind,
401
402    /// Tracing context for the **current** epoch of this barrier.
403    pub tracing_context: TracingContext,
404}
405
406pub type BarrierMutationType = Option<Arc<Mutation>>;
407pub type Barrier = BarrierInner<BarrierMutationType>;
408pub type DispatcherBarrier = BarrierInner<()>;
409
410impl<M: Default> BarrierInner<M> {
411    /// Create a plain barrier.
412    pub fn new_test_barrier(epoch: u64) -> Self {
413        Self {
414            epoch: EpochPair::new_test_epoch(epoch),
415            kind: BarrierKind::Checkpoint,
416            tracing_context: TracingContext::none(),
417            mutation: Default::default(),
418        }
419    }
420
421    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
422        Self {
423            epoch: EpochPair::new(epoch, prev_epoch),
424            kind: BarrierKind::Checkpoint,
425            tracing_context: TracingContext::none(),
426            mutation: Default::default(),
427        }
428    }
429}
430
431impl Barrier {
432    pub fn into_dispatcher(self) -> DispatcherBarrier {
433        DispatcherBarrier {
434            epoch: self.epoch,
435            mutation: (),
436            kind: self.kind,
437            tracing_context: self.tracing_context,
438        }
439    }
440
441    #[must_use]
442    pub fn with_mutation(self, mutation: Mutation) -> Self {
443        Self {
444            mutation: Some(Arc::new(mutation)),
445            ..self
446        }
447    }
448
449    #[must_use]
450    pub fn with_stop(self) -> Self {
451        self.with_mutation(Mutation::Stop(StopMutation {
452            dropped_actors: Default::default(),
453            dropped_sink_fragments: Default::default(),
454        }))
455    }
456
457    /// Whether this barrier carries stop mutation.
458    pub fn is_with_stop_mutation(&self) -> bool {
459        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
460    }
461
462    /// Whether this barrier is to stop the actor with `actor_id`.
463    pub fn is_stop(&self, actor_id: ActorId) -> bool {
464        self.all_stop_actors()
465            .is_some_and(|actors| actors.contains(&actor_id))
466    }
467
468    pub fn is_checkpoint(&self) -> bool {
469        self.kind == BarrierKind::Checkpoint
470    }
471
472    /// Get the initial split assignments for the actor with `actor_id`.
473    ///
474    /// This should only be called on the initial barrier received by the executor. It must be
475    ///
476    /// - `Add` mutation when it's a new streaming job, or recovery.
477    /// - `Update` mutation when it's created for scaling.
478    ///
479    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
480    /// of existing executors.
481    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
482        match self.mutation.as_deref()? {
483            Mutation::Update(UpdateMutation { actor_splits, .. })
484            | Mutation::Add(AddMutation {
485                splits: actor_splits,
486                ..
487            }) => actor_splits.get(&actor_id),
488
489            _ => {
490                if cfg!(debug_assertions) {
491                    panic!(
492                        "the initial mutation of the barrier should not be {:?}",
493                        self.mutation
494                    );
495                }
496                None
497            }
498        }
499        .map(|s| s.as_slice())
500    }
501
502    /// Get all actors that to be stopped (dropped) by this barrier.
503    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
504        match self.mutation.as_deref() {
505            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
506            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
507            _ => None,
508        }
509    }
510
511    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
512    /// `Values` to decide whether to output the existing (historical) data.
513    ///
514    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
515    /// added for scaling are not included.
516    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
517        match self.mutation.as_deref() {
518            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
519                added_actors.contains(&actor_id)
520            }
521            _ => false,
522        }
523    }
524
525    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
526        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
527            fragment_ids.contains(&fragment_id)
528        } else {
529            false
530        }
531    }
532
533    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
534    ///
535    /// # Use case
536    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
537    /// * Pause a standalone shared `SourceExecutor`.
538    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
539    ///
540    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
541    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
542    /// needs to be turned off.
543    ///
544    /// ## Some special cases not included
545    ///
546    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
547    /// care about **number of downstream fragments** (more precisely, existence).
548    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
549    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
550    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
551        let Some(mutation) = self.mutation.as_deref() else {
552            return false;
553        };
554        match mutation {
555            // Add is for mv, index and sink creation.
556            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
557            Mutation::Update(_)
558            | Mutation::Stop(_)
559            | Mutation::Pause
560            | Mutation::Resume
561            | Mutation::SourceChangeSplit(_)
562            | Mutation::Throttle { .. }
563            | Mutation::DropSubscriptions { .. }
564            | Mutation::ConnectorPropsChange(_)
565            | Mutation::StartFragmentBackfill { .. }
566            | Mutation::RefreshStart { .. }
567            | Mutation::ListFinish { .. }
568            | Mutation::LoadFinish { .. }
569            | Mutation::ResetSource { .. } => false,
570        }
571    }
572
573    /// Whether this barrier requires the executor to pause its data stream on startup.
574    pub fn is_pause_on_startup(&self) -> bool {
575        match self.mutation.as_deref() {
576            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
577            _ => false,
578        }
579    }
580
581    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
582        match self.mutation.as_deref() {
583            Some(Mutation::Add(AddMutation {
584                backfill_nodes_to_pause,
585                ..
586            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
587            Some(Mutation::Update(_)) => false,
588            _ => {
589                tracing::warn!(
590                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
591                    self
592                );
593                false
594            }
595        }
596    }
597
598    /// Whether this barrier is for resume.
599    pub fn is_resume(&self) -> bool {
600        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
601    }
602
603    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
604    /// with `actor_id`.
605    pub fn as_update_merge(
606        &self,
607        actor_id: ActorId,
608        upstream_fragment_id: UpstreamFragmentId,
609    ) -> Option<&MergeUpdate> {
610        self.mutation
611            .as_deref()
612            .and_then(|mutation| match mutation {
613                Mutation::Update(UpdateMutation { merges, .. }) => {
614                    merges.get(&(actor_id, upstream_fragment_id))
615                }
616                _ => None,
617            })
618    }
619
620    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
621    /// the specified downstream fragment.
622    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
623        self.mutation
624            .as_deref()
625            .and_then(|mutation| match mutation {
626                Mutation::Add(AddMutation {
627                    new_upstream_sinks, ..
628                }) => new_upstream_sinks.get(&fragment_id),
629                _ => None,
630            })
631    }
632
633    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
634    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
635        self.mutation
636            .as_deref()
637            .and_then(|mutation| match mutation {
638                Mutation::Stop(StopMutation {
639                    dropped_sink_fragments,
640                    ..
641                }) => Some(dropped_sink_fragments),
642                _ => None,
643            })
644    }
645
646    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
647    /// with `actor_id`.
648    ///
649    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
650    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
651    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
652        self.mutation
653            .as_deref()
654            .and_then(|mutation| match mutation {
655                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
656                    vnode_bitmaps.get(&actor_id).cloned()
657                }
658                _ => None,
659            })
660    }
661
662    pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
663        self.mutation
664            .as_deref()
665            .and_then(|mutation| match mutation {
666                Mutation::Update(UpdateMutation {
667                    sink_schema_change, ..
668                }) => sink_schema_change.get(&sink_id).cloned(),
669                _ => None,
670            })
671    }
672
673    pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
674        match self.mutation.as_deref() {
675            Some(Mutation::DropSubscriptions {
676                subscriptions_to_drop,
677            })
678            | Some(Mutation::Update(UpdateMutation {
679                subscriptions_to_drop,
680                ..
681            })) => Some(subscriptions_to_drop.as_slice()),
682            _ => None,
683        }
684    }
685
686    pub fn get_curr_epoch(&self) -> Epoch {
687        Epoch(self.epoch.curr)
688    }
689
690    /// Retrieve the tracing context for the **current** epoch of this barrier.
691    pub fn tracing_context(&self) -> &TracingContext {
692        &self.tracing_context
693    }
694
695    pub fn added_subscriber_on_mv_table(
696        &self,
697        mv_table_id: TableId,
698    ) -> impl Iterator<Item = SubscriberId> + '_ {
699        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
700            Some(add)
701        } else {
702            None
703        }
704        .into_iter()
705        .flat_map(move |add| {
706            add.subscriptions_to_add.iter().filter_map(
707                move |(upstream_mv_table_id, subscriber_id)| {
708                    if *upstream_mv_table_id == mv_table_id {
709                        Some(*subscriber_id)
710                    } else {
711                        None
712                    }
713                },
714            )
715        })
716    }
717}
718
719impl<M: PartialEq> PartialEq for BarrierInner<M> {
720    fn eq(&self, other: &Self) -> bool {
721        self.epoch == other.epoch && self.mutation == other.mutation
722    }
723}
724
725impl Mutation {
726    /// Return true if the mutation is stop.
727    ///
728    /// Note that this does not mean we will stop the current actor.
729    #[cfg(test)]
730    pub fn is_stop(&self) -> bool {
731        matches!(self, Mutation::Stop(_))
732    }
733
734    #[cfg(test)]
735    fn to_protobuf(&self) -> PbMutation {
736        use risingwave_pb::source::{
737            ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
738        };
739        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
740        use risingwave_pb::stream_plan::{
741            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
742            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
743            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
744            PbThrottleMutation, PbUpdateMutation,
745        };
746        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
747            actor_splits
748                .iter()
749                .map(|(&actor_id, splits)| {
750                    (
751                        actor_id,
752                        ConnectorSplits {
753                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
754                        },
755                    )
756                })
757                .collect::<HashMap<_, _>>()
758        };
759
760        match self {
761            Mutation::Stop(StopMutation {
762                dropped_actors,
763                dropped_sink_fragments,
764            }) => PbMutation::Stop(PbStopMutation {
765                actors: dropped_actors.iter().copied().collect(),
766                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
767            }),
768            Mutation::Update(UpdateMutation {
769                dispatchers,
770                merges,
771                vnode_bitmaps,
772                dropped_actors,
773                actor_splits,
774                actor_new_dispatchers,
775                actor_cdc_table_snapshot_splits,
776                sink_schema_change,
777                subscriptions_to_drop,
778            }) => PbMutation::Update(PbUpdateMutation {
779                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
780                merge_update: merges.values().cloned().collect(),
781                actor_vnode_bitmap_update: vnode_bitmaps
782                    .iter()
783                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
784                    .collect(),
785                dropped_actors: dropped_actors.iter().copied().collect(),
786                actor_splits: actor_splits_to_protobuf(actor_splits),
787                actor_new_dispatchers: actor_new_dispatchers
788                    .iter()
789                    .map(|(&actor_id, dispatchers)| {
790                        (
791                            actor_id,
792                            PbDispatchers {
793                                dispatchers: dispatchers.clone(),
794                            },
795                        )
796                    })
797                    .collect(),
798                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
799                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
800                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
801                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
802                            generation: *generation,
803                        })
804                    }).collect()
805                }),
806                sink_schema_change: sink_schema_change
807                    .iter()
808                    .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
809                    .collect(),
810                subscriptions_to_drop: subscriptions_to_drop.clone(),
811            }),
812            Mutation::Add(AddMutation {
813                adds,
814                added_actors,
815                splits,
816                pause,
817                subscriptions_to_add,
818                backfill_nodes_to_pause,
819                actor_cdc_table_snapshot_splits,
820                new_upstream_sinks,
821            }) => PbMutation::Add(PbAddMutation {
822                actor_dispatchers: adds
823                    .iter()
824                    .map(|(&actor_id, dispatchers)| {
825                        (
826                            actor_id,
827                            PbDispatchers {
828                                dispatchers: dispatchers.clone(),
829                            },
830                        )
831                    })
832                    .collect(),
833                added_actors: added_actors.iter().copied().collect(),
834                actor_splits: actor_splits_to_protobuf(splits),
835                pause: *pause,
836                subscriptions_to_add: subscriptions_to_add
837                    .iter()
838                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
839                        subscriber_id: *subscriber_id,
840                        upstream_mv_table_id: *table_id,
841                    })
842                    .collect(),
843                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
844                actor_cdc_table_snapshot_splits:
845                Some(PbCdcTableSnapshotSplitsWithGeneration {
846                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
847                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
848                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
849                            generation: *generation,
850                        })
851                    }).collect()
852                }),
853                new_upstream_sinks: new_upstream_sinks
854                    .iter()
855                    .map(|(k, v)| (*k, v.clone()))
856                    .collect(),
857            }),
858            Mutation::SourceChangeSplit(changes) => {
859                PbMutation::Splits(PbSourceChangeSplitMutation {
860                    actor_splits: changes
861                        .iter()
862                        .map(|(&actor_id, splits)| {
863                            (
864                                actor_id,
865                                ConnectorSplits {
866                                    splits: splits
867                                        .clone()
868                                        .iter()
869                                        .map(ConnectorSplit::from)
870                                        .collect(),
871                                },
872                            )
873                        })
874                        .collect(),
875                })
876            }
877            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
878            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
879            Mutation::Throttle (changes) => PbMutation::Throttle(PbThrottleMutation {
880                fragment_throttle: changes.clone(),
881            }),
882            Mutation::DropSubscriptions {
883                subscriptions_to_drop,
884            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
885                info: subscriptions_to_drop.clone(),
886            }),
887            Mutation::ConnectorPropsChange(map) => {
888                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
889                    connector_props_infos: map
890                        .iter()
891                        .map(|(actor_id, options)| {
892                            (
893                                *actor_id,
894                                ConnectorPropsInfo {
895                                    connector_props_info: options
896                                        .iter()
897                                        .map(|(k, v)| (k.clone(), v.clone()))
898                                        .collect(),
899                                },
900                            )
901                        })
902                        .collect(),
903                })
904            }
905            Mutation::StartFragmentBackfill { fragment_ids } => {
906                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
907                    fragment_ids: fragment_ids.iter().copied().collect(),
908                })
909            }
910            Mutation::RefreshStart {
911                table_id,
912                associated_source_id,
913            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
914                table_id: *table_id,
915                associated_source_id: *associated_source_id,
916            }),
917            Mutation::ListFinish {
918                associated_source_id,
919            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
920                associated_source_id: *associated_source_id,
921            }),
922            Mutation::LoadFinish {
923                associated_source_id,
924            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
925                associated_source_id: *associated_source_id,
926            }),
927            Mutation::ResetSource { source_id } => {
928                PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
929                    source_id: source_id.as_raw_id(),
930                })
931            }
932        }
933    }
934
935    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
936        let mutation = match prost {
937            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
938                dropped_actors: stop.actors.iter().copied().collect(),
939                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
940            }),
941
942            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
943                dispatchers: update
944                    .dispatcher_update
945                    .iter()
946                    .map(|u| (u.actor_id, u.clone()))
947                    .into_group_map(),
948                merges: update
949                    .merge_update
950                    .iter()
951                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
952                    .collect(),
953                vnode_bitmaps: update
954                    .actor_vnode_bitmap_update
955                    .iter()
956                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
957                    .collect(),
958                dropped_actors: update.dropped_actors.iter().copied().collect(),
959                actor_splits: update
960                    .actor_splits
961                    .iter()
962                    .map(|(&actor_id, splits)| {
963                        (
964                            actor_id,
965                            splits
966                                .splits
967                                .iter()
968                                .map(|split| split.try_into().unwrap())
969                                .collect(),
970                        )
971                    })
972                    .collect(),
973                actor_new_dispatchers: update
974                    .actor_new_dispatchers
975                    .iter()
976                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
977                    .collect(),
978                actor_cdc_table_snapshot_splits:
979                    build_actor_cdc_table_snapshot_splits_with_generation(
980                        update
981                            .actor_cdc_table_snapshot_splits
982                            .clone()
983                            .unwrap_or_default(),
984                    ),
985                sink_schema_change: update
986                    .sink_schema_change
987                    .iter()
988                    .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
989                    .collect(),
990                subscriptions_to_drop: update.subscriptions_to_drop.clone(),
991            }),
992
993            PbMutation::Add(add) => Mutation::Add(AddMutation {
994                adds: add
995                    .actor_dispatchers
996                    .iter()
997                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
998                    .collect(),
999                added_actors: add.added_actors.iter().copied().collect(),
1000                // TODO: remove this and use `SourceChangesSplit` after we support multiple
1001                // mutations.
1002                splits: add
1003                    .actor_splits
1004                    .iter()
1005                    .map(|(&actor_id, splits)| {
1006                        (
1007                            actor_id,
1008                            splits
1009                                .splits
1010                                .iter()
1011                                .map(|split| split.try_into().unwrap())
1012                                .collect(),
1013                        )
1014                    })
1015                    .collect(),
1016                pause: add.pause,
1017                subscriptions_to_add: add
1018                    .subscriptions_to_add
1019                    .iter()
1020                    .map(
1021                        |SubscriptionUpstreamInfo {
1022                             subscriber_id,
1023                             upstream_mv_table_id,
1024                         }| { (*upstream_mv_table_id, *subscriber_id) },
1025                    )
1026                    .collect(),
1027                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1028                actor_cdc_table_snapshot_splits:
1029                    build_actor_cdc_table_snapshot_splits_with_generation(
1030                        add.actor_cdc_table_snapshot_splits
1031                            .clone()
1032                            .unwrap_or_default(),
1033                    ),
1034                new_upstream_sinks: add
1035                    .new_upstream_sinks
1036                    .iter()
1037                    .map(|(k, v)| (*k, v.clone()))
1038                    .collect(),
1039            }),
1040
1041            PbMutation::Splits(s) => {
1042                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1043                    Vec::with_capacity(s.actor_splits.len());
1044                for (&actor_id, splits) in &s.actor_splits {
1045                    if !splits.splits.is_empty() {
1046                        change_splits.push((
1047                            actor_id,
1048                            splits
1049                                .splits
1050                                .iter()
1051                                .map(SplitImpl::try_from)
1052                                .try_collect()?,
1053                        ));
1054                    }
1055                }
1056                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1057            }
1058            PbMutation::Pause(_) => Mutation::Pause,
1059            PbMutation::Resume(_) => Mutation::Resume,
1060            PbMutation::Throttle(changes) => Mutation::Throttle(changes.fragment_throttle.clone()),
1061            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1062                subscriptions_to_drop: drop.info.clone(),
1063            },
1064            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1065                Mutation::ConnectorPropsChange(
1066                    alter_connector_props
1067                        .connector_props_infos
1068                        .iter()
1069                        .map(|(connector_id, options)| {
1070                            (
1071                                *connector_id,
1072                                options
1073                                    .connector_props_info
1074                                    .iter()
1075                                    .map(|(k, v)| (k.clone(), v.clone()))
1076                                    .collect(),
1077                            )
1078                        })
1079                        .collect(),
1080                )
1081            }
1082            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1083                Mutation::StartFragmentBackfill {
1084                    fragment_ids: start_fragment_backfill
1085                        .fragment_ids
1086                        .iter()
1087                        .copied()
1088                        .collect(),
1089                }
1090            }
1091            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1092                table_id: refresh_start.table_id,
1093                associated_source_id: refresh_start.associated_source_id,
1094            },
1095            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1096                associated_source_id: list_finish.associated_source_id,
1097            },
1098            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1099                associated_source_id: load_finish.associated_source_id,
1100            },
1101            PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1102                source_id: SourceId::from(reset_source.source_id),
1103            },
1104        };
1105        Ok(mutation)
1106    }
1107}
1108
1109impl<M> BarrierInner<M> {
1110    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1111        let Self {
1112            epoch,
1113            mutation,
1114            kind,
1115            tracing_context,
1116            ..
1117        } = self;
1118
1119        PbBarrier {
1120            epoch: Some(PbEpoch {
1121                curr: epoch.curr,
1122                prev: epoch.prev,
1123            }),
1124            mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1125                mutation: Some(mutation),
1126            }),
1127            tracing_context: tracing_context.to_protobuf(),
1128            kind: *kind as _,
1129        }
1130    }
1131
1132    fn from_protobuf_inner(
1133        prost: &PbBarrier,
1134        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1135    ) -> StreamExecutorResult<Self> {
1136        let epoch = prost.get_epoch()?;
1137
1138        Ok(Self {
1139            kind: prost.kind(),
1140            epoch: EpochPair::new(epoch.curr, epoch.prev),
1141            mutation: mutation_from_pb(
1142                (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1143            )?,
1144            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1145        })
1146    }
1147
1148    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1149        BarrierInner {
1150            epoch: self.epoch,
1151            mutation: f(self.mutation),
1152            kind: self.kind,
1153            tracing_context: self.tracing_context,
1154        }
1155    }
1156}
1157
1158impl DispatcherBarrier {
1159    pub fn to_protobuf(&self) -> PbBarrier {
1160        self.to_protobuf_inner(|_| None)
1161    }
1162}
1163
1164impl Barrier {
1165    #[cfg(test)]
1166    pub fn to_protobuf(&self) -> PbBarrier {
1167        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1168    }
1169
1170    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1171        Self::from_protobuf_inner(prost, |mutation| {
1172            mutation
1173                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1174                .transpose()
1175        })
1176    }
1177}
1178
1179#[derive(Debug, PartialEq, Eq, Clone)]
1180pub struct Watermark {
1181    pub col_idx: usize,
1182    pub data_type: DataType,
1183    pub val: ScalarImpl,
1184}
1185
1186impl PartialOrd for Watermark {
1187    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1188        Some(self.cmp(other))
1189    }
1190}
1191
1192impl Ord for Watermark {
1193    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1194        self.val.default_cmp(&other.val)
1195    }
1196}
1197
1198impl Watermark {
1199    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1200        Self {
1201            col_idx,
1202            data_type,
1203            val,
1204        }
1205    }
1206
1207    pub async fn transform_with_expr(
1208        self,
1209        expr: &NonStrictExpression<impl Expression>,
1210        new_col_idx: usize,
1211    ) -> Option<Self> {
1212        let Self { col_idx, val, .. } = self;
1213        let row = {
1214            let mut row = vec![None; col_idx + 1];
1215            row[col_idx] = Some(val);
1216            OwnedRow::new(row)
1217        };
1218        let val = expr.eval_row_infallible(&row).await?;
1219        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1220    }
1221
1222    /// Transform the watermark with the given output indices. If this watermark is not in the
1223    /// output, return `None`.
1224    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1225        output_indices
1226            .iter()
1227            .position(|p| *p == self.col_idx)
1228            .map(|new_col_idx| self.with_idx(new_col_idx))
1229    }
1230
1231    pub fn to_protobuf(&self) -> PbWatermark {
1232        PbWatermark {
1233            column: Some(PbInputRef {
1234                index: self.col_idx as _,
1235                r#type: Some(self.data_type.to_protobuf()),
1236            }),
1237            val: Some(&self.val).to_protobuf().into(),
1238        }
1239    }
1240
1241    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1242        let col_ref = prost.get_column()?;
1243        let data_type = DataType::from(col_ref.get_type()?);
1244        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1245            .expect("watermark value cannot be null");
1246        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1247    }
1248
1249    pub fn with_idx(self, idx: usize) -> Self {
1250        Self::new(idx, self.data_type, self.val)
1251    }
1252}
1253
1254#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1255#[derive(Debug, EnumAsInner, Clone)]
1256pub enum MessageInner<M> {
1257    Chunk(StreamChunk),
1258    Barrier(BarrierInner<M>),
1259    Watermark(Watermark),
1260}
1261
1262impl<M> MessageInner<M> {
1263    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1264        match self {
1265            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1266            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1267            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1268        }
1269    }
1270}
1271
1272pub type Message = MessageInner<BarrierMutationType>;
1273pub type DispatcherMessage = MessageInner<()>;
1274
1275/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1276/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1277#[derive(Debug, EnumAsInner, Clone)]
1278pub enum MessageBatchInner<M> {
1279    Chunk(StreamChunk),
1280    BarrierBatch(Vec<BarrierInner<M>>),
1281    Watermark(Watermark),
1282}
1283pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1284pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1285pub type DispatcherMessageBatch = MessageBatchInner<()>;
1286
1287impl From<DispatcherMessage> for DispatcherMessageBatch {
1288    fn from(m: DispatcherMessage) -> Self {
1289        match m {
1290            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1291            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1292            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1293        }
1294    }
1295}
1296
1297impl From<StreamChunk> for Message {
1298    fn from(chunk: StreamChunk) -> Self {
1299        Message::Chunk(chunk)
1300    }
1301}
1302
1303impl<'a> TryFrom<&'a Message> for &'a Barrier {
1304    type Error = ();
1305
1306    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1307        match m {
1308            Message::Chunk(_) => Err(()),
1309            Message::Barrier(b) => Ok(b),
1310            Message::Watermark(_) => Err(()),
1311        }
1312    }
1313}
1314
1315impl Message {
1316    /// Return true if the message is a stop barrier, meaning the stream
1317    /// will not continue, false otherwise.
1318    ///
1319    /// Note that this does not mean we will stop the current actor.
1320    #[cfg(test)]
1321    pub fn is_stop(&self) -> bool {
1322        matches!(
1323            self,
1324            Message::Barrier(Barrier {
1325                mutation,
1326                ..
1327            }) if mutation.as_ref().unwrap().is_stop()
1328        )
1329    }
1330}
1331
1332impl DispatcherMessageBatch {
1333    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1334        let prost = match self {
1335            Self::Chunk(stream_chunk) => {
1336                let prost_stream_chunk = stream_chunk.to_protobuf();
1337                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1338            }
1339            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1340                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1341            }),
1342            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1343        };
1344        PbStreamMessageBatch {
1345            stream_message_batch: Some(prost),
1346        }
1347    }
1348
1349    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1350        let res = match prost.get_stream_message_batch()? {
1351            StreamMessageBatch::StreamChunk(chunk) => {
1352                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1353            }
1354            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1355                let barriers = barrier_batch
1356                    .barriers
1357                    .iter()
1358                    .map(|barrier| {
1359                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1360                            if mutation.is_some() {
1361                                if cfg!(debug_assertions) {
1362                                    panic!("should not receive message of barrier with mutation");
1363                                } else {
1364                                    warn!(?barrier, "receive message of barrier with mutation");
1365                                }
1366                            }
1367                            Ok(())
1368                        })
1369                    })
1370                    .try_collect()?;
1371                Self::BarrierBatch(barriers)
1372            }
1373            StreamMessageBatch::Watermark(watermark) => {
1374                Self::Watermark(Watermark::from_protobuf(watermark)?)
1375            }
1376        };
1377        Ok(res)
1378    }
1379
1380    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1381        ::prost::Message::encoded_len(msg)
1382    }
1383}
1384
1385pub type StreamKey = Vec<usize>;
1386pub type StreamKeyRef<'a> = &'a [usize];
1387pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1388
1389/// Expect the first message of the given `stream` as a barrier.
1390pub async fn expect_first_barrier<M: Debug>(
1391    stream: &mut (impl MessageStreamInner<M> + Unpin),
1392) -> StreamExecutorResult<BarrierInner<M>> {
1393    let message = stream
1394        .next()
1395        .instrument_await("expect_first_barrier")
1396        .await
1397        .context("failed to extract the first message: stream closed unexpectedly")??;
1398    let barrier = message
1399        .into_barrier()
1400        .expect("the first message must be a barrier");
1401    // TODO: Is this check correct?
1402    assert!(matches!(
1403        barrier.kind,
1404        BarrierKind::Checkpoint | BarrierKind::Initial
1405    ));
1406    Ok(barrier)
1407}
1408
1409/// Expect the first message of the given `stream` as a barrier.
1410pub async fn expect_first_barrier_from_aligned_stream(
1411    stream: &mut (impl AlignedMessageStream + Unpin),
1412) -> StreamExecutorResult<Barrier> {
1413    let message = stream
1414        .next()
1415        .instrument_await("expect_first_barrier")
1416        .await
1417        .context("failed to extract the first message: stream closed unexpectedly")??;
1418    let barrier = message
1419        .into_barrier()
1420        .expect("the first message must be a barrier");
1421    Ok(barrier)
1422}
1423
1424/// `StreamConsumer` is the last step in an actor.
1425pub trait StreamConsumer: Send + 'static {
1426    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1427
1428    fn execute(self: Box<Self>) -> Self::BarrierStream;
1429}
1430
1431type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1432
1433/// A stream for merging messages from multiple upstreams.
1434/// Can dynamically add and delete upstream streams.
1435/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1436pub struct DynamicReceivers<InputId, M> {
1437    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1438    barrier: Option<BarrierInner<M>>,
1439    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1440    start_ts: Option<Instant>,
1441    /// The upstreams that're blocked by the `barrier`.
1442    blocked: Vec<BoxedMessageInput<InputId, M>>,
1443    /// The upstreams that're not blocked and can be polled.
1444    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1445    /// watermark column index -> `BufferedWatermarks`
1446    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1447    /// Currently only used for union.
1448    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1449    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1450    merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1451}
1452
1453impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1454    for DynamicReceivers<InputId, M>
1455{
1456    type Item = MessageStreamItemInner<M>;
1457
1458    fn poll_next(
1459        mut self: Pin<&mut Self>,
1460        cx: &mut std::task::Context<'_>,
1461    ) -> Poll<Option<Self::Item>> {
1462        if self.is_empty() {
1463            return Poll::Ready(None);
1464        }
1465
1466        loop {
1467            match futures::ready!(self.active.poll_next_unpin(cx)) {
1468                // Directly forward the error.
1469                Some((Some(Err(e)), _)) => {
1470                    return Poll::Ready(Some(Err(e)));
1471                }
1472                // Handle the message from some upstream.
1473                Some((Some(Ok(message)), remaining)) => {
1474                    let input_id = remaining.id();
1475                    match message {
1476                        MessageInner::Chunk(chunk) => {
1477                            // Continue polling this upstream by pushing it back to `active`.
1478                            self.active.push(remaining.into_future());
1479                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1480                        }
1481                        MessageInner::Watermark(watermark) => {
1482                            // Continue polling this upstream by pushing it back to `active`.
1483                            self.active.push(remaining.into_future());
1484                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1485                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1486                            }
1487                        }
1488                        MessageInner::Barrier(barrier) => {
1489                            // Block this upstream by pushing it to `blocked`.
1490                            if self.blocked.is_empty() {
1491                                self.start_ts = Some(Instant::now());
1492                            }
1493                            self.blocked.push(remaining);
1494                            if let Some(current_barrier) = self.barrier.as_ref() {
1495                                if current_barrier.epoch != barrier.epoch {
1496                                    return Poll::Ready(Some(Err(
1497                                        StreamExecutorError::align_barrier(
1498                                            current_barrier.clone().map_mutation(|_| None),
1499                                            barrier.map_mutation(|_| None),
1500                                        ),
1501                                    )));
1502                                }
1503                            } else {
1504                                self.barrier = Some(barrier);
1505                            }
1506                        }
1507                    }
1508                }
1509                // We use barrier as the control message of the stream. That is, we always stop the
1510                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1511                // termination.
1512                //
1513                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1514                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1515                // So this branch will never be reached in all cases.
1516                Some((None, remaining)) => {
1517                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1518                        "upstream input {:?} unexpectedly closed",
1519                        remaining.id()
1520                    )))));
1521                }
1522                // There's no active upstreams. Process the barrier and resume the blocked ones.
1523                None => {
1524                    assert!(!self.blocked.is_empty());
1525
1526                    let start_ts = self
1527                        .start_ts
1528                        .take()
1529                        .expect("should have received at least one barrier");
1530                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1531                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1532                    }
1533                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1534                        merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1535                    }
1536
1537                    break;
1538                }
1539            }
1540        }
1541
1542        assert!(self.active.is_terminated());
1543
1544        let barrier = self.barrier.take().unwrap();
1545
1546        let upstreams = std::mem::take(&mut self.blocked);
1547        self.extend_active(upstreams);
1548        assert!(!self.active.is_terminated());
1549
1550        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1551    }
1552}
1553
1554impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1555    pub fn new(
1556        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1557        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1558        merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1559    ) -> Self {
1560        let mut this = Self {
1561            barrier: None,
1562            start_ts: None,
1563            blocked: Vec::with_capacity(upstreams.len()),
1564            active: Default::default(),
1565            buffered_watermarks: Default::default(),
1566            merge_barrier_align_duration,
1567            barrier_align_duration,
1568        };
1569        this.extend_active(upstreams);
1570        this
1571    }
1572
1573    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1574    /// clean state right after a barrier.
1575    pub fn extend_active(
1576        &mut self,
1577        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1578    ) {
1579        assert!(self.blocked.is_empty() && self.barrier.is_none());
1580
1581        self.active
1582            .extend(upstreams.into_iter().map(|s| s.into_future()));
1583    }
1584
1585    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1586    pub fn handle_watermark(
1587        &mut self,
1588        input_id: InputId,
1589        watermark: Watermark,
1590    ) -> Option<Watermark> {
1591        let col_idx = watermark.col_idx;
1592        // Insert a buffer watermarks when first received from a column.
1593        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1594        let watermarks = self
1595            .buffered_watermarks
1596            .entry(col_idx)
1597            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1598        watermarks.handle_watermark(input_id, watermark)
1599    }
1600
1601    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1602    /// right after a barrier.
1603    pub fn add_upstreams_from(
1604        &mut self,
1605        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1606    ) {
1607        assert!(self.blocked.is_empty() && self.barrier.is_none());
1608
1609        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1610        let input_ids = new_inputs.iter().map(|input| input.id());
1611        self.buffered_watermarks.values_mut().for_each(|buffers| {
1612            // Add buffers to the buffered watermarks for all cols
1613            buffers.add_buffers(input_ids.clone());
1614        });
1615        self.active
1616            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1617    }
1618
1619    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1620    /// clean state right after a barrier.
1621    /// The current container does not necessarily contain all the input ids passed in.
1622    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1623        assert!(self.blocked.is_empty() && self.barrier.is_none());
1624
1625        let new_upstreams = std::mem::take(&mut self.active)
1626            .into_iter()
1627            .map(|s| s.into_inner().unwrap())
1628            .filter(|u| !upstream_input_ids.contains(&u.id()));
1629        self.extend_active(new_upstreams);
1630        self.buffered_watermarks.values_mut().for_each(|buffers| {
1631            // Call `check_heap` in case the only upstream(s) that does not have
1632            // watermark in heap is removed
1633            buffers.remove_buffer(upstream_input_ids.clone());
1634        });
1635    }
1636
1637    pub fn merge_barrier_align_duration(
1638        &self,
1639    ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1640        self.merge_barrier_align_duration.clone()
1641    }
1642
1643    pub fn flush_buffered_watermarks(&mut self) {
1644        self.buffered_watermarks
1645            .values_mut()
1646            .for_each(|buffers| buffers.clear());
1647    }
1648
1649    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1650        self.blocked
1651            .iter()
1652            .map(|s| s.id())
1653            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1654    }
1655
1656    pub fn is_empty(&self) -> bool {
1657        self.blocked.is_empty() && self.active.is_empty()
1658    }
1659}
1660
1661// Explanation of why we need `DispatchBarrierBuffer`:
1662//
1663// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1664// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1665// for these old upstreams to completely process their barriers and align before we can safely update the
1666// `upstream-input-set`.
1667//
1668// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1669// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1670// downstream_merge_update -> old_actor_processing]
1671//
1672// To address this, we split the application of a barrier's `Mutation` into two steps:
1673// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1674//    and cache it.
1675// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1676//
1677// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1678// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1679// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1680// `barrier_rx`.
1681pub(crate) struct DispatchBarrierBuffer {
1682    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1683    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1684    recv_state: BarrierReceiverState,
1685    curr_upstream_fragment_id: FragmentId,
1686    actor_id: ActorId,
1687    // read-only context for building new inputs
1688    build_input_ctx: Arc<BuildInputContext>,
1689}
1690
1691struct BuildInputContext {
1692    pub actor_id: ActorId,
1693    pub local_barrier_manager: LocalBarrierManager,
1694    pub metrics: Arc<StreamingMetrics>,
1695    pub fragment_id: FragmentId,
1696    pub actor_config: Arc<StreamingConfig>,
1697}
1698
1699type BoxedNewInputsFuture =
1700    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1701
1702enum BarrierReceiverState {
1703    ReceivingBarrier,
1704    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1705}
1706
1707impl DispatchBarrierBuffer {
1708    pub fn new(
1709        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1710        actor_id: ActorId,
1711        curr_upstream_fragment_id: FragmentId,
1712        local_barrier_manager: LocalBarrierManager,
1713        metrics: Arc<StreamingMetrics>,
1714        fragment_id: FragmentId,
1715        actor_config: Arc<StreamingConfig>,
1716    ) -> Self {
1717        Self {
1718            buffer: VecDeque::new(),
1719            barrier_rx,
1720            recv_state: BarrierReceiverState::ReceivingBarrier,
1721            curr_upstream_fragment_id,
1722            actor_id,
1723            build_input_ctx: Arc::new(BuildInputContext {
1724                actor_id,
1725                local_barrier_manager,
1726                metrics,
1727                fragment_id,
1728                actor_config,
1729            }),
1730        }
1731    }
1732
1733    pub async fn await_next_message(
1734        &mut self,
1735        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1736        metrics: &ActorInputMetrics,
1737    ) -> StreamExecutorResult<DispatcherMessage> {
1738        let mut start_time = Instant::now();
1739        let interval_duration = Duration::from_secs(15);
1740        let mut interval =
1741            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1742
1743        loop {
1744            tokio::select! {
1745                biased;
1746                msg = stream.try_next() => {
1747                    metrics
1748                        .actor_input_buffer_blocking_duration_ns
1749                        .inc_by(start_time.elapsed().as_nanos() as u64);
1750                    return msg?.ok_or_else(
1751                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1752                    );
1753                }
1754
1755                e = self.continuously_fetch_barrier_rx() => {
1756                    return Err(e);
1757                }
1758
1759                _ = interval.tick() => {
1760                    start_time = Instant::now();
1761                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1762                    continue;
1763                }
1764            }
1765        }
1766    }
1767
1768    pub async fn pop_barrier_with_inputs(
1769        &mut self,
1770        barrier: DispatcherBarrier,
1771    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1772        while self.buffer.is_empty() {
1773            self.try_fetch_barrier_rx(false).await?;
1774        }
1775        let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1776        assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1777
1778        Ok((recv_barrier, inputs))
1779    }
1780
1781    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1782        loop {
1783            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1784                return e;
1785            }
1786        }
1787    }
1788
1789    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1790        match &mut self.recv_state {
1791            BarrierReceiverState::ReceivingBarrier => {
1792                let Some(barrier) = self.barrier_rx.recv().await else {
1793                    if pending_on_end {
1794                        return pending().await;
1795                    } else {
1796                        return Err(StreamExecutorError::channel_closed(
1797                            "barrier channel closed unexpectedly",
1798                        ));
1799                    }
1800                };
1801                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1802                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1803                } else {
1804                    self.buffer.push_back((barrier, None));
1805                }
1806            }
1807            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1808                let new_inputs = fut.await?;
1809                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1810                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1811            }
1812        }
1813        Ok(())
1814    }
1815
1816    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1817        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1818            && !update.added_upstream_actors.is_empty()
1819        {
1820            // When update upstream fragment, added_actors will not be empty.
1821            let upstream_fragment_id =
1822                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1823                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1824                    new_upstream_fragment_id
1825                } else {
1826                    self.curr_upstream_fragment_id
1827                };
1828            let ctx = self.build_input_ctx.clone();
1829            let added_upstream_actors = update.added_upstream_actors.clone();
1830            let barrier = barrier.clone();
1831            let fut = async move {
1832                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1833                    let mut new_input = new_input(
1834                        &ctx.local_barrier_manager,
1835                        ctx.metrics.clone(),
1836                        ctx.actor_id,
1837                        ctx.fragment_id,
1838                        upstream_actor,
1839                        upstream_fragment_id,
1840                        ctx.actor_config.clone(),
1841                    )
1842                    .await?;
1843
1844                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1845                    // original upstreams.
1846                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1847                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1848
1849                    StreamExecutorResult::Ok(new_input)
1850                }))
1851                .await
1852            }
1853            .boxed();
1854
1855            Some(fut)
1856        } else {
1857            None
1858        }
1859    }
1860}