risingwave_stream/executor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54    PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
55    PbWatermark, SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63    BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod iceberg_with_pk_index;
94mod join;
95pub mod locality_provider;
96mod lookup;
97mod lookup_union;
98mod merge;
99mod mview;
100mod nested_loop_temporal_join;
101mod no_op;
102mod now;
103mod over_window;
104pub mod project;
105mod rearranged_chain;
106mod receiver;
107pub mod row_id_gen;
108mod sink;
109pub mod source;
110mod stream_reader;
111pub mod subtask;
112mod temporal_join;
113mod top_n;
114mod troublemaker;
115mod union;
116mod upstream_sink_union;
117mod values;
118mod watermark;
119mod watermark_filter;
120mod wrapper;
121
122mod approx_percentile;
123
124mod row_merge;
125
126#[cfg(test)]
127mod integration_tests;
128mod sync_kv_log_store;
129#[cfg(any(test, feature = "test"))]
130pub mod test_utils;
131mod utils;
132mod vector;
133
134pub use actor::{Actor, ActorContext, ActorContextRef};
135use anyhow::{Context, anyhow};
136pub use approx_percentile::global::GlobalApproxPercentileExecutor;
137pub use approx_percentile::local::LocalApproxPercentileExecutor;
138pub use backfill::arrangement_backfill::*;
139pub use backfill::cdc::{
140    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
141};
142pub use backfill::no_shuffle_backfill::*;
143pub use backfill::snapshot_backfill::*;
144pub use barrier_recv::BarrierRecvExecutor;
145pub use batch_query::BatchQueryExecutor;
146pub use chain::ChainExecutor;
147pub use changelog::ChangeLogExecutor;
148pub use dedup::AppendOnlyDedupExecutor;
149pub use dispatch::{DispatchExecutor, SyncLogStoreDispatchExecutor};
150pub use dynamic_filter::DynamicFilterExecutor;
151pub use error::{StreamExecutorError, StreamExecutorResult};
152pub use expand::ExpandExecutor;
153pub use filter::{FilterExecutor, UpsertFilterExecutor};
154pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
155pub use hash_join::*;
156pub use hop_window::HopWindowExecutor;
157pub use iceberg_with_pk_index::{
158    DvHandlerImpl, DvMergerExecutor, IcebergWriterImpl, WriterExecutor,
159};
160pub use join::asof_join::{AsOfCpuEncoding, AsOfMemoryEncoding};
161pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
162pub use join::{AsOfDesc, AsOfJoinType, JoinType};
163pub use lookup::*;
164pub use lookup_union::LookupUnionExecutor;
165pub use merge::MergeExecutor;
166pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
167pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
168pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
169pub use no_op::NoOpExecutor;
170pub use now::*;
171pub use over_window::*;
172pub use rearranged_chain::RearrangedChainExecutor;
173pub use receiver::ReceiverExecutor;
174use risingwave_common::id::SourceId;
175pub use row_merge::RowMergeExecutor;
176pub use sink::SinkExecutor;
177pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
178pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
179pub use temporal_join::TemporalJoinExecutor;
180pub use top_n::{
181    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
182};
183pub use troublemaker::TroublemakerExecutor;
184pub use union::UnionExecutor;
185pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
186pub use utils::DummyExecutor;
187pub use values::ValuesExecutor;
188pub use vector::*;
189pub use watermark_filter::{UpsertWatermarkFilterExecutor, WatermarkFilterExecutor};
190pub use wrapper::WrapperExecutor;
191
192use self::barrier_align::AlignedMessageStream;
193
194pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
195pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
196pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
197pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
198
199pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
200use risingwave_connector::sink::catalog::SinkId;
201use risingwave_connector::source::cdc::{
202    CdcTableSnapshotSplitAssignmentWithGeneration,
203    build_actor_cdc_table_snapshot_splits_with_generation,
204};
205use risingwave_pb::id::{ExecutorId, SubscriberId};
206use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
207
208pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
209pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
210pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
211
212/// Static information of an executor.
213#[derive(Debug, Default, Clone)]
214pub struct ExecutorInfo {
215    /// The schema of the OUTPUT of the executor.
216    pub schema: Schema,
217
218    /// The stream key indices of the OUTPUT of the executor.
219    pub stream_key: StreamKey,
220
221    /// The stream kind of the OUTPUT of the executor.
222    pub stream_kind: PbStreamKind,
223
224    /// Identity of the executor.
225    pub identity: String,
226
227    /// The executor id of the executor.
228    pub id: ExecutorId,
229}
230
231impl ExecutorInfo {
232    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
233        Self {
234            schema,
235            stream_key,
236            stream_kind: PbStreamKind::Retract, // dummy value for test
237            identity,
238            id: id.into(),
239        }
240    }
241}
242
243/// [`Execute`] describes the methods an executor should implement to handle control messages.
244pub trait Execute: Send + 'static {
245    fn execute(self: Box<Self>) -> BoxedMessageStream;
246
247    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
248        self.execute()
249    }
250
251    fn boxed(self) -> Box<dyn Execute>
252    where
253        Self: Sized + Send + 'static,
254    {
255        Box::new(self)
256    }
257}
258
259/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
260/// handle messages ([`Execute`]).
261pub struct Executor {
262    info: ExecutorInfo,
263    execute: Box<dyn Execute>,
264}
265
266impl Executor {
267    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
268        Self { info, execute }
269    }
270
271    pub fn info(&self) -> &ExecutorInfo {
272        &self.info
273    }
274
275    pub fn schema(&self) -> &Schema {
276        &self.info.schema
277    }
278
279    pub fn stream_key(&self) -> StreamKeyRef<'_> {
280        &self.info.stream_key
281    }
282
283    pub fn stream_kind(&self) -> PbStreamKind {
284        self.info.stream_kind
285    }
286
287    pub fn identity(&self) -> &str {
288        &self.info.identity
289    }
290
291    pub fn execute(self) -> BoxedMessageStream {
292        self.execute.execute()
293    }
294
295    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
296        self.execute.execute_with_epoch(epoch)
297    }
298}
299
300impl std::fmt::Debug for Executor {
301    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302        f.write_str(self.identity())
303    }
304}
305
306impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
307    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
308        Self::new(info, execute)
309    }
310}
311
312impl<E> From<(ExecutorInfo, E)> for Executor
313where
314    E: Execute,
315{
316    fn from((info, execute): (ExecutorInfo, E)) -> Self {
317        Self::new(info, execute.boxed())
318    }
319}
320
321pub const INVALID_EPOCH: u64 = 0;
322
323type UpstreamFragmentId = FragmentId;
324type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
325
326#[derive(Debug, Clone)]
327#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
328pub struct UpdateMutation {
329    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
330    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
331    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
332    pub dropped_actors: HashSet<ActorId>,
333    pub actor_splits: SplitAssignments,
334    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
335    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
336    pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
337    pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
338}
339
340#[derive(Debug, Clone)]
341#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
342pub struct AddMutation {
343    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
344    pub added_actors: HashSet<ActorId>,
345    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
346    pub splits: SplitAssignments,
347    pub pause: bool,
348    /// (`upstream_mv_table_id`,  `subscriber_id`)
349    pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
350    /// nodes which should start backfill
351    pub backfill_nodes_to_pause: HashSet<FragmentId>,
352    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
353    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
354}
355
356#[derive(Debug, Clone)]
357#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
358pub struct StopMutation {
359    pub dropped_actors: HashSet<ActorId>,
360    pub dropped_sink_fragments: HashSet<FragmentId>,
361}
362
363/// See [`PbMutation`] for the semantics of each mutation.
364#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
365#[derive(Debug, Clone)]
366pub enum Mutation {
367    Stop(StopMutation),
368    Update(UpdateMutation),
369    Add(AddMutation),
370    SourceChangeSplit(SplitAssignments),
371    Pause,
372    Resume,
373    Throttle(HashMap<FragmentId, ThrottleConfig>),
374    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
375    DropSubscriptions {
376        /// `subscriber` -> `upstream_mv_table_id`
377        subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
378    },
379    StartFragmentBackfill {
380        fragment_ids: HashSet<FragmentId>,
381    },
382    RefreshStart {
383        table_id: TableId,
384        associated_source_id: SourceId,
385    },
386    ListFinish {
387        associated_source_id: SourceId,
388    },
389    LoadFinish {
390        associated_source_id: SourceId,
391    },
392    ResetSource {
393        source_id: SourceId,
394    },
395    InjectSourceOffsets {
396        source_id: SourceId,
397        /// Split ID -> offset (JSON-encoded based on connector type)
398        split_offsets: HashMap<String, String>,
399    },
400}
401
402/// The generic type `M` is the mutation type of the barrier.
403///
404/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
405/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
406#[derive(Debug, Clone)]
407pub struct BarrierInner<M> {
408    pub epoch: EpochPair,
409    pub mutation: M,
410    pub kind: BarrierKind,
411
412    /// Tracing context for the **current** epoch of this barrier.
413    pub tracing_context: TracingContext,
414}
415
416pub type BarrierMutationType = Option<Arc<Mutation>>;
417pub type Barrier = BarrierInner<BarrierMutationType>;
418pub type DispatcherBarrier = BarrierInner<()>;
419
420impl<M: Default> BarrierInner<M> {
421    /// Create a plain barrier.
422    pub fn new_test_barrier(epoch: u64) -> Self {
423        Self {
424            epoch: EpochPair::new_test_epoch(epoch),
425            kind: BarrierKind::Checkpoint,
426            tracing_context: TracingContext::none(),
427            mutation: Default::default(),
428        }
429    }
430
431    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
432        Self {
433            epoch: EpochPair::new(epoch, prev_epoch),
434            kind: BarrierKind::Checkpoint,
435            tracing_context: TracingContext::none(),
436            mutation: Default::default(),
437        }
438    }
439}
440
441impl Barrier {
442    pub fn into_dispatcher(self) -> DispatcherBarrier {
443        DispatcherBarrier {
444            epoch: self.epoch,
445            mutation: (),
446            kind: self.kind,
447            tracing_context: self.tracing_context,
448        }
449    }
450
451    #[must_use]
452    pub fn with_mutation(self, mutation: Mutation) -> Self {
453        Self {
454            mutation: Some(Arc::new(mutation)),
455            ..self
456        }
457    }
458
459    #[must_use]
460    pub fn with_stop(self) -> Self {
461        self.with_mutation(Mutation::Stop(StopMutation {
462            dropped_actors: Default::default(),
463            dropped_sink_fragments: Default::default(),
464        }))
465    }
466
467    /// Whether this barrier carries stop mutation.
468    pub fn is_with_stop_mutation(&self) -> bool {
469        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
470    }
471
472    /// Whether this barrier is to stop the actor with `actor_id`.
473    pub fn is_stop(&self, actor_id: ActorId) -> bool {
474        self.all_stop_actors()
475            .is_some_and(|actors| actors.contains(&actor_id))
476    }
477
478    pub fn is_checkpoint(&self) -> bool {
479        self.kind == BarrierKind::Checkpoint
480    }
481
482    /// Get the initial split assignments for the actor with `actor_id`.
483    ///
484    /// This should only be called on the initial barrier received by the executor. It must be
485    ///
486    /// - `Add` mutation when it's a new streaming job, or recovery.
487    /// - `Update` mutation when it's created for scaling.
488    ///
489    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
490    /// of existing executors.
491    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
492        match self.mutation.as_deref()? {
493            Mutation::Update(UpdateMutation { actor_splits, .. })
494            | Mutation::Add(AddMutation {
495                splits: actor_splits,
496                ..
497            }) => actor_splits.get(&actor_id),
498
499            _ => {
500                if cfg!(debug_assertions) {
501                    panic!(
502                        "the initial mutation of the barrier should not be {:?}",
503                        self.mutation
504                    );
505                }
506                None
507            }
508        }
509        .map(|s| s.as_slice())
510    }
511
512    /// Get all actors that to be stopped (dropped) by this barrier.
513    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
514        match self.mutation.as_deref() {
515            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
516            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
517            _ => None,
518        }
519    }
520
521    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
522    /// `Values` to decide whether to output the existing (historical) data.
523    ///
524    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
525    /// added for scaling are not included.
526    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
527        match self.mutation.as_deref() {
528            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
529                added_actors.contains(&actor_id)
530            }
531            _ => false,
532        }
533    }
534
535    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
536        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
537            fragment_ids.contains(&fragment_id)
538        } else {
539            false
540        }
541    }
542
543    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
544    ///
545    /// # Use case
546    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
547    /// * Pause a standalone shared `SourceExecutor`.
548    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
549    ///
550    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
551    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
552    /// needs to be turned off.
553    ///
554    /// ## Some special cases not included
555    ///
556    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
557    /// care about **number of downstream fragments** (more precisely, existence).
558    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
559    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
560    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
561        let Some(mutation) = self.mutation.as_deref() else {
562            return false;
563        };
564        match mutation {
565            // Add is for mv, index and sink creation.
566            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
567            Mutation::Update(_)
568            | Mutation::Stop(_)
569            | Mutation::Pause
570            | Mutation::Resume
571            | Mutation::SourceChangeSplit(_)
572            | Mutation::Throttle { .. }
573            | Mutation::DropSubscriptions { .. }
574            | Mutation::ConnectorPropsChange(_)
575            | Mutation::StartFragmentBackfill { .. }
576            | Mutation::RefreshStart { .. }
577            | Mutation::ListFinish { .. }
578            | Mutation::LoadFinish { .. }
579            | Mutation::ResetSource { .. }
580            | Mutation::InjectSourceOffsets { .. } => false,
581        }
582    }
583
584    /// Whether this barrier requires the executor to pause its data stream on startup.
585    pub fn is_pause_on_startup(&self) -> bool {
586        match self.mutation.as_deref() {
587            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
588            _ => false,
589        }
590    }
591
592    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
593        match self.mutation.as_deref() {
594            Some(Mutation::Add(AddMutation {
595                backfill_nodes_to_pause,
596                ..
597            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
598            Some(Mutation::Update(_)) => false,
599            _ => {
600                tracing::warn!(
601                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
602                    self
603                );
604                false
605            }
606        }
607    }
608
609    /// Whether this barrier is for resume.
610    pub fn is_resume(&self) -> bool {
611        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
612    }
613
614    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
615    /// with `actor_id`.
616    pub fn as_update_merge(
617        &self,
618        actor_id: ActorId,
619        upstream_fragment_id: UpstreamFragmentId,
620    ) -> Option<&MergeUpdate> {
621        self.mutation
622            .as_deref()
623            .and_then(|mutation| match mutation {
624                Mutation::Update(UpdateMutation { merges, .. }) => {
625                    merges.get(&(actor_id, upstream_fragment_id))
626                }
627                _ => None,
628            })
629    }
630
631    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
632    /// the specified downstream fragment.
633    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
634        self.mutation
635            .as_deref()
636            .and_then(|mutation| match mutation {
637                Mutation::Add(AddMutation {
638                    new_upstream_sinks, ..
639                }) => new_upstream_sinks.get(&fragment_id),
640                _ => None,
641            })
642    }
643
644    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
645    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
646        self.mutation
647            .as_deref()
648            .and_then(|mutation| match mutation {
649                Mutation::Stop(StopMutation {
650                    dropped_sink_fragments,
651                    ..
652                }) => Some(dropped_sink_fragments),
653                _ => None,
654            })
655    }
656
657    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
658    /// with `actor_id`.
659    ///
660    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
661    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
662    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
663        self.mutation
664            .as_deref()
665            .and_then(|mutation| match mutation {
666                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
667                    vnode_bitmaps.get(&actor_id).cloned()
668                }
669                _ => None,
670            })
671    }
672
673    pub fn assume_no_update_vnode_bitmap(&self, actor_id: ActorId) -> StreamExecutorResult<()> {
674        if self.as_update_vnode_bitmap(actor_id).is_some() {
675            return Err(anyhow!("updating vnode bitmap in place is not supported").into());
676        }
677        Ok(())
678    }
679
680    pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
681        self.mutation
682            .as_deref()
683            .and_then(|mutation| match mutation {
684                Mutation::Update(UpdateMutation {
685                    sink_schema_change, ..
686                }) => sink_schema_change.get(&sink_id).cloned(),
687                _ => None,
688            })
689    }
690
691    pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
692        match self.mutation.as_deref() {
693            Some(Mutation::DropSubscriptions {
694                subscriptions_to_drop,
695            })
696            | Some(Mutation::Update(UpdateMutation {
697                subscriptions_to_drop,
698                ..
699            })) => Some(subscriptions_to_drop.as_slice()),
700            _ => None,
701        }
702    }
703
704    pub fn get_curr_epoch(&self) -> Epoch {
705        Epoch(self.epoch.curr)
706    }
707
708    /// Retrieve the tracing context for the **current** epoch of this barrier.
709    pub fn tracing_context(&self) -> &TracingContext {
710        &self.tracing_context
711    }
712
713    pub fn added_subscriber_on_mv_table(
714        &self,
715        mv_table_id: TableId,
716    ) -> impl Iterator<Item = SubscriberId> + '_ {
717        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
718            Some(add)
719        } else {
720            None
721        }
722        .into_iter()
723        .flat_map(move |add| {
724            add.subscriptions_to_add.iter().filter_map(
725                move |(upstream_mv_table_id, subscriber_id)| {
726                    if *upstream_mv_table_id == mv_table_id {
727                        Some(*subscriber_id)
728                    } else {
729                        None
730                    }
731                },
732            )
733        })
734    }
735}
736
737impl<M: PartialEq> PartialEq for BarrierInner<M> {
738    fn eq(&self, other: &Self) -> bool {
739        self.epoch == other.epoch && self.mutation == other.mutation
740    }
741}
742
743impl Mutation {
744    /// Return true if the mutation is stop.
745    ///
746    /// Note that this does not mean we will stop the current actor.
747    #[cfg(test)]
748    pub fn is_stop(&self) -> bool {
749        matches!(self, Mutation::Stop(_))
750    }
751
752    #[cfg(test)]
753    fn to_protobuf(&self) -> PbMutation {
754        use risingwave_pb::source::{
755            ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
756        };
757        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
758        use risingwave_pb::stream_plan::{
759            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
760            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
761            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
762            PbThrottleMutation, PbUpdateMutation,
763        };
764        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
765            actor_splits
766                .iter()
767                .map(|(&actor_id, splits)| {
768                    (
769                        actor_id,
770                        ConnectorSplits {
771                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
772                        },
773                    )
774                })
775                .collect::<HashMap<_, _>>()
776        };
777
778        match self {
779            Mutation::Stop(StopMutation {
780                dropped_actors,
781                dropped_sink_fragments,
782            }) => PbMutation::Stop(PbStopMutation {
783                actors: dropped_actors.iter().copied().collect(),
784                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
785            }),
786            Mutation::Update(UpdateMutation {
787                dispatchers,
788                merges,
789                vnode_bitmaps,
790                dropped_actors,
791                actor_splits,
792                actor_new_dispatchers,
793                actor_cdc_table_snapshot_splits,
794                sink_schema_change,
795                subscriptions_to_drop,
796            }) => PbMutation::Update(PbUpdateMutation {
797                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
798                merge_update: merges.values().cloned().collect(),
799                actor_vnode_bitmap_update: vnode_bitmaps
800                    .iter()
801                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
802                    .collect(),
803                dropped_actors: dropped_actors.iter().copied().collect(),
804                actor_splits: actor_splits_to_protobuf(actor_splits),
805                actor_new_dispatchers: actor_new_dispatchers
806                    .iter()
807                    .map(|(&actor_id, dispatchers)| {
808                        (
809                            actor_id,
810                            PbDispatchers {
811                                dispatchers: dispatchers.clone(),
812                            },
813                        )
814                    })
815                    .collect(),
816                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
817                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
818                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
819                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
820                            generation: *generation,
821                        })
822                    }).collect()
823                }),
824                sink_schema_change: sink_schema_change
825                    .iter()
826                    .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
827                    .collect(),
828                subscriptions_to_drop: subscriptions_to_drop.clone(),
829            }),
830            Mutation::Add(AddMutation {
831                adds,
832                added_actors,
833                splits,
834                pause,
835                subscriptions_to_add,
836                backfill_nodes_to_pause,
837                actor_cdc_table_snapshot_splits,
838                new_upstream_sinks,
839            }) => PbMutation::Add(PbAddMutation {
840                actor_dispatchers: adds
841                    .iter()
842                    .map(|(&actor_id, dispatchers)| {
843                        (
844                            actor_id,
845                            PbDispatchers {
846                                dispatchers: dispatchers.clone(),
847                            },
848                        )
849                    })
850                    .collect(),
851                added_actors: added_actors.iter().copied().collect(),
852                actor_splits: actor_splits_to_protobuf(splits),
853                pause: *pause,
854                subscriptions_to_add: subscriptions_to_add
855                    .iter()
856                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
857                        subscriber_id: *subscriber_id,
858                        upstream_mv_table_id: *table_id,
859                    })
860                    .collect(),
861                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
862                actor_cdc_table_snapshot_splits:
863                Some(PbCdcTableSnapshotSplitsWithGeneration {
864                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
865                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
866                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
867                            generation: *generation,
868                        })
869                    }).collect()
870                }),
871                new_upstream_sinks: new_upstream_sinks
872                    .iter()
873                    .map(|(k, v)| (*k, v.clone()))
874                    .collect(),
875            }),
876            Mutation::SourceChangeSplit(changes) => {
877                PbMutation::Splits(PbSourceChangeSplitMutation {
878                    actor_splits: changes
879                        .iter()
880                        .map(|(&actor_id, splits)| {
881                            (
882                                actor_id,
883                                ConnectorSplits {
884                                    splits: splits
885                                        .clone()
886                                        .iter()
887                                        .map(ConnectorSplit::from)
888                                        .collect(),
889                                },
890                            )
891                        })
892                        .collect(),
893                })
894            }
895            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
896            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
897            Mutation::Throttle (changes) => PbMutation::Throttle(PbThrottleMutation {
898                fragment_throttle: changes.clone(),
899            }),
900            Mutation::DropSubscriptions {
901                subscriptions_to_drop,
902            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
903                info: subscriptions_to_drop.clone(),
904            }),
905            Mutation::ConnectorPropsChange(map) => {
906                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
907                    connector_props_infos: map
908                        .iter()
909                        .map(|(actor_id, options)| {
910                            (
911                                *actor_id,
912                                ConnectorPropsInfo {
913                                    connector_props_info: options
914                                        .iter()
915                                        .map(|(k, v)| (k.clone(), v.clone()))
916                                        .collect(),
917                                },
918                            )
919                        })
920                        .collect(),
921                })
922            }
923            Mutation::StartFragmentBackfill { fragment_ids } => {
924                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
925                    fragment_ids: fragment_ids.iter().copied().collect(),
926                })
927            }
928            Mutation::RefreshStart {
929                table_id,
930                associated_source_id,
931            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
932                table_id: *table_id,
933                associated_source_id: *associated_source_id,
934            }),
935            Mutation::ListFinish {
936                associated_source_id,
937            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
938                associated_source_id: *associated_source_id,
939            }),
940            Mutation::LoadFinish {
941                associated_source_id,
942            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
943                associated_source_id: *associated_source_id,
944            }),
945            Mutation::ResetSource { source_id } => {
946                PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
947                    source_id: source_id.as_raw_id(),
948                })
949            }
950            Mutation::InjectSourceOffsets {
951                source_id,
952                split_offsets,
953            } => PbMutation::InjectSourceOffsets(
954                risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
955                    source_id: source_id.as_raw_id(),
956                    split_offsets: split_offsets.clone(),
957                },
958            ),
959        }
960    }
961
962    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
963        let mutation = match prost {
964            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
965                dropped_actors: stop.actors.iter().copied().collect(),
966                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
967            }),
968
969            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
970                dispatchers: update
971                    .dispatcher_update
972                    .iter()
973                    .map(|u| (u.actor_id, u.clone()))
974                    .into_group_map(),
975                merges: update
976                    .merge_update
977                    .iter()
978                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
979                    .collect(),
980                vnode_bitmaps: update
981                    .actor_vnode_bitmap_update
982                    .iter()
983                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
984                    .collect(),
985                dropped_actors: update.dropped_actors.iter().copied().collect(),
986                actor_splits: update
987                    .actor_splits
988                    .iter()
989                    .map(|(&actor_id, splits)| {
990                        (
991                            actor_id,
992                            splits
993                                .splits
994                                .iter()
995                                .map(|split| split.try_into().unwrap())
996                                .collect(),
997                        )
998                    })
999                    .collect(),
1000                actor_new_dispatchers: update
1001                    .actor_new_dispatchers
1002                    .iter()
1003                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1004                    .collect(),
1005                actor_cdc_table_snapshot_splits:
1006                    build_actor_cdc_table_snapshot_splits_with_generation(
1007                        update
1008                            .actor_cdc_table_snapshot_splits
1009                            .clone()
1010                            .unwrap_or_default(),
1011                    ),
1012                sink_schema_change: update
1013                    .sink_schema_change
1014                    .iter()
1015                    .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
1016                    .collect(),
1017                subscriptions_to_drop: update.subscriptions_to_drop.clone(),
1018            }),
1019
1020            PbMutation::Add(add) => Mutation::Add(AddMutation {
1021                adds: add
1022                    .actor_dispatchers
1023                    .iter()
1024                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1025                    .collect(),
1026                added_actors: add.added_actors.iter().copied().collect(),
1027                // TODO: remove this and use `SourceChangesSplit` after we support multiple
1028                // mutations.
1029                splits: add
1030                    .actor_splits
1031                    .iter()
1032                    .map(|(&actor_id, splits)| {
1033                        (
1034                            actor_id,
1035                            splits
1036                                .splits
1037                                .iter()
1038                                .map(|split| split.try_into().unwrap())
1039                                .collect(),
1040                        )
1041                    })
1042                    .collect(),
1043                pause: add.pause,
1044                subscriptions_to_add: add
1045                    .subscriptions_to_add
1046                    .iter()
1047                    .map(
1048                        |SubscriptionUpstreamInfo {
1049                             subscriber_id,
1050                             upstream_mv_table_id,
1051                         }| { (*upstream_mv_table_id, *subscriber_id) },
1052                    )
1053                    .collect(),
1054                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1055                actor_cdc_table_snapshot_splits:
1056                    build_actor_cdc_table_snapshot_splits_with_generation(
1057                        add.actor_cdc_table_snapshot_splits
1058                            .clone()
1059                            .unwrap_or_default(),
1060                    ),
1061                new_upstream_sinks: add
1062                    .new_upstream_sinks
1063                    .iter()
1064                    .map(|(k, v)| (*k, v.clone()))
1065                    .collect(),
1066            }),
1067
1068            PbMutation::Splits(s) => {
1069                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1070                    Vec::with_capacity(s.actor_splits.len());
1071                for (&actor_id, splits) in &s.actor_splits {
1072                    if !splits.splits.is_empty() {
1073                        change_splits.push((
1074                            actor_id,
1075                            splits
1076                                .splits
1077                                .iter()
1078                                .map(SplitImpl::try_from)
1079                                .try_collect()?,
1080                        ));
1081                    }
1082                }
1083                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1084            }
1085            PbMutation::Pause(_) => Mutation::Pause,
1086            PbMutation::Resume(_) => Mutation::Resume,
1087            PbMutation::Throttle(changes) => Mutation::Throttle(changes.fragment_throttle.clone()),
1088            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1089                subscriptions_to_drop: drop.info.clone(),
1090            },
1091            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1092                Mutation::ConnectorPropsChange(
1093                    alter_connector_props
1094                        .connector_props_infos
1095                        .iter()
1096                        .map(|(connector_id, options)| {
1097                            (
1098                                *connector_id,
1099                                options
1100                                    .connector_props_info
1101                                    .iter()
1102                                    .map(|(k, v)| (k.clone(), v.clone()))
1103                                    .collect(),
1104                            )
1105                        })
1106                        .collect(),
1107                )
1108            }
1109            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1110                Mutation::StartFragmentBackfill {
1111                    fragment_ids: start_fragment_backfill
1112                        .fragment_ids
1113                        .iter()
1114                        .copied()
1115                        .collect(),
1116                }
1117            }
1118            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1119                table_id: refresh_start.table_id,
1120                associated_source_id: refresh_start.associated_source_id,
1121            },
1122            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1123                associated_source_id: list_finish.associated_source_id,
1124            },
1125            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1126                associated_source_id: load_finish.associated_source_id,
1127            },
1128            PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1129                source_id: SourceId::from(reset_source.source_id),
1130            },
1131            PbMutation::InjectSourceOffsets(inject) => Mutation::InjectSourceOffsets {
1132                source_id: SourceId::from(inject.source_id),
1133                split_offsets: inject.split_offsets.clone(),
1134            },
1135        };
1136        Ok(mutation)
1137    }
1138}
1139
1140impl<M> BarrierInner<M> {
1141    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1142        let Self {
1143            epoch,
1144            mutation,
1145            kind,
1146            tracing_context,
1147            ..
1148        } = self;
1149
1150        PbBarrier {
1151            epoch: Some(PbEpoch {
1152                curr: epoch.curr,
1153                prev: epoch.prev,
1154            }),
1155            mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1156                mutation: Some(mutation),
1157            }),
1158            tracing_context: tracing_context.to_protobuf(),
1159            kind: *kind as _,
1160        }
1161    }
1162
1163    fn from_protobuf_inner(
1164        prost: &PbBarrier,
1165        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1166    ) -> StreamExecutorResult<Self> {
1167        let epoch = prost.get_epoch()?;
1168
1169        Ok(Self {
1170            kind: prost.kind(),
1171            epoch: EpochPair::new(epoch.curr, epoch.prev),
1172            mutation: mutation_from_pb(
1173                (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1174            )?,
1175            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1176        })
1177    }
1178
1179    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1180        BarrierInner {
1181            epoch: self.epoch,
1182            mutation: f(self.mutation),
1183            kind: self.kind,
1184            tracing_context: self.tracing_context,
1185        }
1186    }
1187}
1188
1189impl DispatcherBarrier {
1190    pub fn to_protobuf(&self) -> PbBarrier {
1191        self.to_protobuf_inner(|_| None)
1192    }
1193}
1194
1195impl Barrier {
1196    #[cfg(test)]
1197    pub fn to_protobuf(&self) -> PbBarrier {
1198        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1199    }
1200
1201    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1202        Self::from_protobuf_inner(prost, |mutation| {
1203            mutation
1204                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1205                .transpose()
1206        })
1207    }
1208}
1209
1210#[derive(Debug, PartialEq, Eq, Clone)]
1211pub struct Watermark {
1212    pub col_idx: usize,
1213    pub data_type: DataType,
1214    pub val: ScalarImpl,
1215}
1216
1217impl PartialOrd for Watermark {
1218    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1219        Some(self.cmp(other))
1220    }
1221}
1222
1223impl Ord for Watermark {
1224    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1225        self.val.default_cmp(&other.val)
1226    }
1227}
1228
1229impl Watermark {
1230    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1231        Self {
1232            col_idx,
1233            data_type,
1234            val,
1235        }
1236    }
1237
1238    pub async fn transform_with_expr(
1239        self,
1240        expr: &NonStrictExpression<impl Expression>,
1241        new_col_idx: usize,
1242    ) -> Option<Self> {
1243        let Self { col_idx, val, .. } = self;
1244        let row = {
1245            let mut row = vec![None; col_idx + 1];
1246            row[col_idx] = Some(val);
1247            OwnedRow::new(row)
1248        };
1249        let val = expr.eval_row_infallible(&row).await?;
1250        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1251    }
1252
1253    /// Transform the watermark with the given output indices. If this watermark is not in the
1254    /// output, return `None`.
1255    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1256        output_indices
1257            .iter()
1258            .position(|p| *p == self.col_idx)
1259            .map(|new_col_idx| self.with_idx(new_col_idx))
1260    }
1261
1262    pub fn to_protobuf(&self) -> PbWatermark {
1263        PbWatermark {
1264            column: Some(PbInputRef {
1265                index: self.col_idx as _,
1266                r#type: Some(self.data_type.to_protobuf()),
1267            }),
1268            val: Some(&self.val).to_protobuf().into(),
1269        }
1270    }
1271
1272    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1273        let col_ref = prost.get_column()?;
1274        let data_type = DataType::from(col_ref.get_type()?);
1275        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1276            .expect("watermark value cannot be null");
1277        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1278    }
1279
1280    pub fn with_idx(self, idx: usize) -> Self {
1281        Self::new(idx, self.data_type, self.val)
1282    }
1283}
1284
1285#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1286#[derive(Debug, EnumAsInner, Clone)]
1287pub enum MessageInner<M> {
1288    Chunk(StreamChunk),
1289    Barrier(BarrierInner<M>),
1290    Watermark(Watermark),
1291}
1292
1293impl<M> MessageInner<M> {
1294    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1295        match self {
1296            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1297            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1298            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1299        }
1300    }
1301}
1302
1303pub type Message = MessageInner<BarrierMutationType>;
1304pub type DispatcherMessage = MessageInner<()>;
1305
1306/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1307/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1308#[derive(Debug, EnumAsInner, Clone)]
1309pub enum MessageBatchInner<M> {
1310    Chunk(StreamChunk),
1311    BarrierBatch(Vec<BarrierInner<M>>),
1312    Watermark(Watermark),
1313}
1314pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1315pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1316pub type DispatcherMessageBatch = MessageBatchInner<()>;
1317
1318impl<M> From<MessageInner<M>> for MessageBatchInner<M> {
1319    fn from(m: MessageInner<M>) -> Self {
1320        match m {
1321            MessageInner::Chunk(c) => Self::Chunk(c),
1322            MessageInner::Barrier(b) => Self::BarrierBatch(vec![b]),
1323            MessageInner::Watermark(w) => Self::Watermark(w),
1324        }
1325    }
1326}
1327
1328impl From<StreamChunk> for Message {
1329    fn from(chunk: StreamChunk) -> Self {
1330        Message::Chunk(chunk)
1331    }
1332}
1333
1334impl<'a> TryFrom<&'a Message> for &'a Barrier {
1335    type Error = ();
1336
1337    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1338        match m {
1339            Message::Chunk(_) => Err(()),
1340            Message::Barrier(b) => Ok(b),
1341            Message::Watermark(_) => Err(()),
1342        }
1343    }
1344}
1345
1346impl Message {
1347    /// Return true if the message is a stop barrier, meaning the stream
1348    /// will not continue, false otherwise.
1349    ///
1350    /// Note that this does not mean we will stop the current actor.
1351    #[cfg(test)]
1352    pub fn is_stop(&self) -> bool {
1353        matches!(
1354            self,
1355            Message::Barrier(Barrier {
1356                mutation,
1357                ..
1358            }) if mutation.as_ref().unwrap().is_stop()
1359        )
1360    }
1361}
1362
1363impl DispatcherMessageBatch {
1364    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1365        let prost = match self {
1366            Self::Chunk(stream_chunk) => {
1367                let prost_stream_chunk = stream_chunk.to_protobuf();
1368                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1369            }
1370            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1371                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1372            }),
1373            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1374        };
1375        PbStreamMessageBatch {
1376            stream_message_batch: Some(prost),
1377        }
1378    }
1379
1380    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1381        let res = match prost.get_stream_message_batch()? {
1382            StreamMessageBatch::StreamChunk(chunk) => {
1383                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1384            }
1385            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1386                let barriers = barrier_batch
1387                    .barriers
1388                    .iter()
1389                    .map(|barrier| {
1390                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1391                            if mutation.is_some() {
1392                                if cfg!(debug_assertions) {
1393                                    panic!("should not receive message of barrier with mutation");
1394                                } else {
1395                                    warn!(?barrier, "receive message of barrier with mutation");
1396                                }
1397                            }
1398                            Ok(())
1399                        })
1400                    })
1401                    .try_collect()?;
1402                Self::BarrierBatch(barriers)
1403            }
1404            StreamMessageBatch::Watermark(watermark) => {
1405                Self::Watermark(Watermark::from_protobuf(watermark)?)
1406            }
1407        };
1408        Ok(res)
1409    }
1410
1411    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1412        ::prost::Message::encoded_len(msg)
1413    }
1414}
1415
1416pub type StreamKey = Vec<usize>;
1417pub type StreamKeyRef<'a> = &'a [usize];
1418pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1419
1420/// Expect the first message of the given `stream` as a barrier.
1421pub async fn expect_first_barrier<M: Debug>(
1422    stream: &mut (impl MessageStreamInner<M> + Unpin),
1423) -> StreamExecutorResult<BarrierInner<M>> {
1424    let message = stream
1425        .next()
1426        .instrument_await("expect_first_barrier")
1427        .await
1428        .context("failed to extract the first message: stream closed unexpectedly")??;
1429    let barrier = message
1430        .into_barrier()
1431        .expect("the first message must be a barrier");
1432    // TODO: Is this check correct?
1433    assert!(matches!(
1434        barrier.kind,
1435        BarrierKind::Checkpoint | BarrierKind::Initial
1436    ));
1437    Ok(barrier)
1438}
1439
1440/// Expect the first message of the given `stream` as a barrier.
1441pub async fn expect_first_barrier_from_aligned_stream(
1442    stream: &mut (impl AlignedMessageStream + Unpin),
1443) -> StreamExecutorResult<Barrier> {
1444    let message = stream
1445        .next()
1446        .instrument_await("expect_first_barrier")
1447        .await
1448        .context("failed to extract the first message: stream closed unexpectedly")??;
1449    let barrier = message
1450        .into_barrier()
1451        .expect("the first message must be a barrier");
1452    Ok(barrier)
1453}
1454
1455/// `StreamConsumer` is the last step in an actor.
1456pub trait StreamConsumer: Send + 'static {
1457    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1458
1459    fn execute(self: Box<Self>) -> Self::BarrierStream;
1460}
1461
1462type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1463
1464/// A stream for merging messages from multiple upstreams.
1465/// Can dynamically add and delete upstream streams.
1466/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1467pub struct DynamicReceivers<InputId, M> {
1468    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1469    barrier: Option<BarrierInner<M>>,
1470    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1471    start_ts: Option<Instant>,
1472    /// The upstreams that're blocked by the `barrier`.
1473    blocked: Vec<BoxedMessageInput<InputId, M>>,
1474    /// The upstreams that're not blocked and can be polled.
1475    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1476    /// watermark column index -> `BufferedWatermarks`
1477    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1478    /// Currently only used for union.
1479    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1480    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1481    merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1482}
1483
1484impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1485    for DynamicReceivers<InputId, M>
1486{
1487    type Item = MessageStreamItemInner<M>;
1488
1489    fn poll_next(
1490        mut self: Pin<&mut Self>,
1491        cx: &mut std::task::Context<'_>,
1492    ) -> Poll<Option<Self::Item>> {
1493        if self.is_empty() {
1494            return Poll::Ready(None);
1495        }
1496
1497        loop {
1498            match futures::ready!(self.active.poll_next_unpin(cx)) {
1499                // Directly forward the error.
1500                Some((Some(Err(e)), _)) => {
1501                    return Poll::Ready(Some(Err(e)));
1502                }
1503                // Handle the message from some upstream.
1504                Some((Some(Ok(message)), remaining)) => {
1505                    let input_id = remaining.id();
1506                    match message {
1507                        MessageInner::Chunk(chunk) => {
1508                            // Continue polling this upstream by pushing it back to `active`.
1509                            self.active.push(remaining.into_future());
1510                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1511                        }
1512                        MessageInner::Watermark(watermark) => {
1513                            // Continue polling this upstream by pushing it back to `active`.
1514                            self.active.push(remaining.into_future());
1515                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1516                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1517                            }
1518                        }
1519                        MessageInner::Barrier(barrier) => {
1520                            // Block this upstream by pushing it to `blocked`.
1521                            if self.blocked.is_empty() {
1522                                self.start_ts = Some(Instant::now());
1523                            }
1524                            self.blocked.push(remaining);
1525                            if let Some(current_barrier) = self.barrier.as_ref() {
1526                                if current_barrier.epoch != barrier.epoch {
1527                                    return Poll::Ready(Some(Err(
1528                                        StreamExecutorError::align_barrier(
1529                                            current_barrier.clone().map_mutation(|_| None),
1530                                            barrier.map_mutation(|_| None),
1531                                        ),
1532                                    )));
1533                                }
1534                            } else {
1535                                self.barrier = Some(barrier);
1536                            }
1537                        }
1538                    }
1539                }
1540                // We use barrier as the control message of the stream. That is, we always stop the
1541                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1542                // termination.
1543                //
1544                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1545                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1546                // So this branch will never be reached in all cases.
1547                Some((None, remaining)) => {
1548                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1549                        "upstream input {:?} unexpectedly closed",
1550                        remaining.id()
1551                    )))));
1552                }
1553                // There's no active upstreams. Process the barrier and resume the blocked ones.
1554                None => {
1555                    assert!(!self.blocked.is_empty());
1556
1557                    let start_ts = self
1558                        .start_ts
1559                        .take()
1560                        .expect("should have received at least one barrier");
1561                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1562                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1563                    }
1564                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1565                        merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1566                    }
1567
1568                    break;
1569                }
1570            }
1571        }
1572
1573        assert!(self.active.is_terminated());
1574
1575        let barrier = self.barrier.take().unwrap();
1576
1577        let upstreams = std::mem::take(&mut self.blocked);
1578        self.extend_active(upstreams);
1579        assert!(!self.active.is_terminated());
1580
1581        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1582    }
1583}
1584
1585impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1586    pub fn new(
1587        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1588        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1589        merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1590    ) -> Self {
1591        let mut this = Self {
1592            barrier: None,
1593            start_ts: None,
1594            blocked: Vec::with_capacity(upstreams.len()),
1595            active: Default::default(),
1596            buffered_watermarks: Default::default(),
1597            merge_barrier_align_duration,
1598            barrier_align_duration,
1599        };
1600        this.extend_active(upstreams);
1601        this
1602    }
1603
1604    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1605    /// clean state right after a barrier.
1606    pub fn extend_active(
1607        &mut self,
1608        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1609    ) {
1610        assert!(self.blocked.is_empty() && self.barrier.is_none());
1611
1612        self.active
1613            .extend(upstreams.into_iter().map(|s| s.into_future()));
1614    }
1615
1616    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1617    pub fn handle_watermark(
1618        &mut self,
1619        input_id: InputId,
1620        watermark: Watermark,
1621    ) -> Option<Watermark> {
1622        let col_idx = watermark.col_idx;
1623        // Insert a buffer watermarks when first received from a column.
1624        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1625        let watermarks = self
1626            .buffered_watermarks
1627            .entry(col_idx)
1628            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1629        watermarks.handle_watermark(input_id, watermark)
1630    }
1631
1632    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1633    /// right after a barrier.
1634    pub fn add_upstreams_from(
1635        &mut self,
1636        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1637    ) {
1638        assert!(self.blocked.is_empty() && self.barrier.is_none());
1639
1640        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1641        let input_ids = new_inputs.iter().map(|input| input.id());
1642        self.buffered_watermarks.values_mut().for_each(|buffers| {
1643            // Add buffers to the buffered watermarks for all cols
1644            buffers.add_buffers(input_ids.clone());
1645        });
1646        self.active
1647            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1648    }
1649
1650    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1651    /// clean state right after a barrier.
1652    /// The current container does not necessarily contain all the input ids passed in.
1653    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1654        assert!(self.blocked.is_empty() && self.barrier.is_none());
1655
1656        let new_upstreams = std::mem::take(&mut self.active)
1657            .into_iter()
1658            .map(|s| s.into_inner().unwrap())
1659            .filter(|u| !upstream_input_ids.contains(&u.id()));
1660        self.extend_active(new_upstreams);
1661        self.buffered_watermarks.values_mut().for_each(|buffers| {
1662            // Call `check_heap` in case the only upstream(s) that does not have
1663            // watermark in heap is removed
1664            buffers.remove_buffer(upstream_input_ids.clone());
1665        });
1666    }
1667
1668    pub fn merge_barrier_align_duration(
1669        &self,
1670    ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1671        self.merge_barrier_align_duration.clone()
1672    }
1673
1674    pub fn flush_buffered_watermarks(&mut self) {
1675        self.buffered_watermarks
1676            .values_mut()
1677            .for_each(|buffers| buffers.clear());
1678    }
1679
1680    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1681        self.blocked
1682            .iter()
1683            .map(|s| s.id())
1684            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1685    }
1686
1687    pub fn is_empty(&self) -> bool {
1688        self.blocked.is_empty() && self.active.is_empty()
1689    }
1690}
1691
1692// Explanation of why we need `DispatchBarrierBuffer`:
1693//
1694// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1695// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1696// for these old upstreams to completely process their barriers and align before we can safely update the
1697// `upstream-input-set`.
1698//
1699// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1700// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1701// downstream_merge_update -> old_actor_processing]
1702//
1703// To address this, we split the application of a barrier's `Mutation` into two steps:
1704// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1705//    and cache it.
1706// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1707//
1708// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1709// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1710// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1711// `barrier_rx`.
1712pub(crate) struct DispatchBarrierBuffer {
1713    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1714    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1715    recv_state: BarrierReceiverState,
1716    curr_upstream_fragment_id: FragmentId,
1717    actor_id: ActorId,
1718    // read-only context for building new inputs
1719    build_input_ctx: Arc<BuildInputContext>,
1720}
1721
1722struct BuildInputContext {
1723    pub actor_id: ActorId,
1724    pub local_barrier_manager: LocalBarrierManager,
1725    pub metrics: Arc<StreamingMetrics>,
1726    pub fragment_id: FragmentId,
1727    pub actor_config: Arc<StreamingConfig>,
1728}
1729
1730type BoxedNewInputsFuture =
1731    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1732
1733enum BarrierReceiverState {
1734    ReceivingBarrier,
1735    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1736}
1737
1738impl DispatchBarrierBuffer {
1739    pub fn new(
1740        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1741        actor_id: ActorId,
1742        curr_upstream_fragment_id: FragmentId,
1743        local_barrier_manager: LocalBarrierManager,
1744        metrics: Arc<StreamingMetrics>,
1745        fragment_id: FragmentId,
1746        actor_config: Arc<StreamingConfig>,
1747    ) -> Self {
1748        Self {
1749            buffer: VecDeque::new(),
1750            barrier_rx,
1751            recv_state: BarrierReceiverState::ReceivingBarrier,
1752            curr_upstream_fragment_id,
1753            actor_id,
1754            build_input_ctx: Arc::new(BuildInputContext {
1755                actor_id,
1756                local_barrier_manager,
1757                metrics,
1758                fragment_id,
1759                actor_config,
1760            }),
1761        }
1762    }
1763
1764    pub async fn await_next_message(
1765        &mut self,
1766        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1767        metrics: &ActorInputMetrics,
1768    ) -> StreamExecutorResult<DispatcherMessage> {
1769        let mut start_time = Instant::now();
1770        let interval_duration = Duration::from_secs(15);
1771        let mut interval =
1772            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1773
1774        loop {
1775            tokio::select! {
1776                biased;
1777                msg = stream.try_next() => {
1778                    metrics
1779                        .actor_input_buffer_blocking_duration_ns
1780                        .inc_by(start_time.elapsed().as_nanos() as u64);
1781                    return msg?.ok_or_else(
1782                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1783                    );
1784                }
1785
1786                e = self.continuously_fetch_barrier_rx() => {
1787                    return Err(e);
1788                }
1789
1790                _ = interval.tick() => {
1791                    start_time = Instant::now();
1792                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1793                    continue;
1794                }
1795            }
1796        }
1797    }
1798
1799    pub async fn pop_barrier_with_inputs(
1800        &mut self,
1801        barrier: DispatcherBarrier,
1802    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1803        while self.buffer.is_empty() {
1804            self.try_fetch_barrier_rx(false).await?;
1805        }
1806        let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1807        assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1808
1809        Ok((recv_barrier, inputs))
1810    }
1811
1812    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1813        loop {
1814            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1815                return e;
1816            }
1817        }
1818    }
1819
1820    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1821        match &mut self.recv_state {
1822            BarrierReceiverState::ReceivingBarrier => {
1823                let Some(barrier) = self.barrier_rx.recv().await else {
1824                    if pending_on_end {
1825                        return pending().await;
1826                    } else {
1827                        return Err(StreamExecutorError::channel_closed(
1828                            "barrier channel closed unexpectedly",
1829                        ));
1830                    }
1831                };
1832                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1833                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1834                } else {
1835                    self.buffer.push_back((barrier, None));
1836                }
1837            }
1838            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1839                let new_inputs = fut.await?;
1840                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1841                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1842            }
1843        }
1844        Ok(())
1845    }
1846
1847    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1848        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1849            && !update.added_upstream_actors.is_empty()
1850        {
1851            // When update upstream fragment, added_actors will not be empty.
1852            let upstream_fragment_id =
1853                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1854                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1855                    new_upstream_fragment_id
1856                } else {
1857                    self.curr_upstream_fragment_id
1858                };
1859            let ctx = self.build_input_ctx.clone();
1860            let added_upstream_actors = update.added_upstream_actors.clone();
1861            let barrier = barrier.clone();
1862            let fut = async move {
1863                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1864                    let mut new_input = new_input(
1865                        &ctx.local_barrier_manager,
1866                        ctx.metrics.clone(),
1867                        ctx.actor_id,
1868                        ctx.fragment_id,
1869                        upstream_actor,
1870                        upstream_fragment_id,
1871                        ctx.actor_config.clone(),
1872                    )
1873                    .await?;
1874
1875                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1876                    // original upstreams.
1877                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1878                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1879
1880                    StreamExecutorResult::Ok(new_input)
1881                }))
1882                .await
1883            }
1884            .boxed();
1885
1886            Some(fut)
1887        } else {
1888            None
1889        }
1890    }
1891}