risingwave_stream/executor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53    PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
54    PbWatermark, SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62    BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
63};
64use crate::executor::monitor::ActorInputMetrics;
65use crate::executor::prelude::StreamingMetrics;
66use crate::executor::watermark::BufferedWatermarks;
67use crate::task::{ActorId, FragmentId, LocalBarrierManager};
68
69mod actor;
70mod barrier_align;
71pub mod exchange;
72pub mod monitor;
73
74pub mod aggregate;
75pub mod asof_join;
76mod backfill;
77mod barrier_recv;
78mod batch_query;
79mod chain;
80mod changelog;
81mod dedup;
82mod dispatch;
83pub mod dml;
84mod dynamic_filter;
85pub mod eowc;
86pub mod error;
87mod expand;
88mod filter;
89mod gap_fill;
90pub mod hash_join;
91mod hop_window;
92mod join;
93pub mod locality_provider;
94mod lookup;
95mod lookup_union;
96mod merge;
97mod mview;
98mod nested_loop_temporal_join;
99mod no_op;
100mod now;
101mod over_window;
102pub mod project;
103mod rearranged_chain;
104mod receiver;
105pub mod row_id_gen;
106mod sink;
107pub mod source;
108mod stream_reader;
109pub mod subtask;
110mod temporal_join;
111mod top_n;
112mod troublemaker;
113mod union;
114mod upstream_sink_union;
115mod values;
116mod watermark;
117mod watermark_filter;
118mod wrapper;
119
120mod approx_percentile;
121
122mod row_merge;
123
124#[cfg(test)]
125mod integration_tests;
126mod sync_kv_log_store;
127#[cfg(any(test, feature = "test"))]
128pub mod test_utils;
129mod utils;
130mod vector;
131
132pub use actor::{Actor, ActorContext, ActorContextRef};
133use anyhow::Context;
134pub use approx_percentile::global::GlobalApproxPercentileExecutor;
135pub use approx_percentile::local::LocalApproxPercentileExecutor;
136pub use backfill::arrangement_backfill::*;
137pub use backfill::cdc::{
138    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
139};
140pub use backfill::no_shuffle_backfill::*;
141pub use backfill::snapshot_backfill::*;
142pub use barrier_recv::BarrierRecvExecutor;
143pub use batch_query::BatchQueryExecutor;
144pub use chain::ChainExecutor;
145pub use changelog::ChangeLogExecutor;
146pub use dedup::AppendOnlyDedupExecutor;
147pub use dispatch::DispatchExecutor;
148pub use dynamic_filter::DynamicFilterExecutor;
149pub use error::{StreamExecutorError, StreamExecutorResult};
150pub use expand::ExpandExecutor;
151pub use filter::{FilterExecutor, UpsertFilterExecutor};
152pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
153pub use hash_join::*;
154pub use hop_window::HopWindowExecutor;
155pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
156pub use join::{AsOfDesc, AsOfJoinType, JoinType};
157pub use lookup::*;
158pub use lookup_union::LookupUnionExecutor;
159pub use merge::MergeExecutor;
160pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
161pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
162pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
163pub use no_op::NoOpExecutor;
164pub use now::*;
165pub use over_window::*;
166pub use rearranged_chain::RearrangedChainExecutor;
167pub use receiver::ReceiverExecutor;
168use risingwave_common::id::SourceId;
169pub use row_merge::RowMergeExecutor;
170pub use sink::SinkExecutor;
171pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
172pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
173pub use temporal_join::TemporalJoinExecutor;
174pub use top_n::{
175    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
176};
177pub use troublemaker::TroublemakerExecutor;
178pub use union::UnionExecutor;
179pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
180pub use utils::DummyExecutor;
181pub use values::ValuesExecutor;
182pub use vector::*;
183pub use watermark_filter::WatermarkFilterExecutor;
184pub use wrapper::WrapperExecutor;
185
186use self::barrier_align::AlignedMessageStream;
187
188pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
189pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
190pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
191pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
192
193pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
194use risingwave_connector::sink::catalog::SinkId;
195use risingwave_connector::source::cdc::{
196    CdcTableSnapshotSplitAssignmentWithGeneration,
197    build_actor_cdc_table_snapshot_splits_with_generation,
198};
199use risingwave_pb::id::{ExecutorId, SubscriberId};
200use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
201
202pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
203pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
204pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
205
206/// Static information of an executor.
207#[derive(Debug, Default, Clone)]
208pub struct ExecutorInfo {
209    /// The schema of the OUTPUT of the executor.
210    pub schema: Schema,
211
212    /// The stream key indices of the OUTPUT of the executor.
213    pub stream_key: StreamKey,
214
215    /// The stream kind of the OUTPUT of the executor.
216    pub stream_kind: PbStreamKind,
217
218    /// Identity of the executor.
219    pub identity: String,
220
221    /// The executor id of the executor.
222    pub id: ExecutorId,
223}
224
225impl ExecutorInfo {
226    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
227        Self {
228            schema,
229            stream_key,
230            stream_kind: PbStreamKind::Retract, // dummy value for test
231            identity,
232            id: id.into(),
233        }
234    }
235}
236
237/// [`Execute`] describes the methods an executor should implement to handle control messages.
238pub trait Execute: Send + 'static {
239    fn execute(self: Box<Self>) -> BoxedMessageStream;
240
241    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
242        self.execute()
243    }
244
245    fn boxed(self) -> Box<dyn Execute>
246    where
247        Self: Sized + Send + 'static,
248    {
249        Box::new(self)
250    }
251}
252
253/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
254/// handle messages ([`Execute`]).
255pub struct Executor {
256    info: ExecutorInfo,
257    execute: Box<dyn Execute>,
258}
259
260impl Executor {
261    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
262        Self { info, execute }
263    }
264
265    pub fn info(&self) -> &ExecutorInfo {
266        &self.info
267    }
268
269    pub fn schema(&self) -> &Schema {
270        &self.info.schema
271    }
272
273    pub fn stream_key(&self) -> StreamKeyRef<'_> {
274        &self.info.stream_key
275    }
276
277    pub fn stream_kind(&self) -> PbStreamKind {
278        self.info.stream_kind
279    }
280
281    pub fn identity(&self) -> &str {
282        &self.info.identity
283    }
284
285    pub fn execute(self) -> BoxedMessageStream {
286        self.execute.execute()
287    }
288
289    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
290        self.execute.execute_with_epoch(epoch)
291    }
292}
293
294impl std::fmt::Debug for Executor {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        f.write_str(self.identity())
297    }
298}
299
300impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
301    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
302        Self::new(info, execute)
303    }
304}
305
306impl<E> From<(ExecutorInfo, E)> for Executor
307where
308    E: Execute,
309{
310    fn from((info, execute): (ExecutorInfo, E)) -> Self {
311        Self::new(info, execute.boxed())
312    }
313}
314
315pub const INVALID_EPOCH: u64 = 0;
316
317type UpstreamFragmentId = FragmentId;
318type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
319
320#[derive(Debug, Clone)]
321#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
322pub struct UpdateMutation {
323    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
324    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
325    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
326    pub dropped_actors: HashSet<ActorId>,
327    pub actor_splits: SplitAssignments,
328    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
329    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
330    pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
331    pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
332}
333
334#[derive(Debug, Clone)]
335#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
336pub struct AddMutation {
337    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
338    pub added_actors: HashSet<ActorId>,
339    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
340    pub splits: SplitAssignments,
341    pub pause: bool,
342    /// (`upstream_mv_table_id`,  `subscriber_id`)
343    pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
344    /// nodes which should start backfill
345    pub backfill_nodes_to_pause: HashSet<FragmentId>,
346    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
347    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
348}
349
350#[derive(Debug, Clone)]
351#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
352pub struct StopMutation {
353    pub dropped_actors: HashSet<ActorId>,
354    pub dropped_sink_fragments: HashSet<FragmentId>,
355}
356
357/// See [`PbMutation`] for the semantics of each mutation.
358#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
359#[derive(Debug, Clone)]
360pub enum Mutation {
361    Stop(StopMutation),
362    Update(UpdateMutation),
363    Add(AddMutation),
364    SourceChangeSplit(SplitAssignments),
365    Pause,
366    Resume,
367    Throttle(HashMap<FragmentId, Option<u32>>),
368    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
369    DropSubscriptions {
370        /// `subscriber` -> `upstream_mv_table_id`
371        subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
372    },
373    StartFragmentBackfill {
374        fragment_ids: HashSet<FragmentId>,
375    },
376    RefreshStart {
377        table_id: TableId,
378        associated_source_id: SourceId,
379    },
380    ListFinish {
381        associated_source_id: SourceId,
382    },
383    LoadFinish {
384        associated_source_id: SourceId,
385    },
386    ResetSource {
387        source_id: SourceId,
388    },
389}
390
391/// The generic type `M` is the mutation type of the barrier.
392///
393/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
394/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
395#[derive(Debug, Clone)]
396pub struct BarrierInner<M> {
397    pub epoch: EpochPair,
398    pub mutation: M,
399    pub kind: BarrierKind,
400
401    /// Tracing context for the **current** epoch of this barrier.
402    pub tracing_context: TracingContext,
403}
404
405pub type BarrierMutationType = Option<Arc<Mutation>>;
406pub type Barrier = BarrierInner<BarrierMutationType>;
407pub type DispatcherBarrier = BarrierInner<()>;
408
409impl<M: Default> BarrierInner<M> {
410    /// Create a plain barrier.
411    pub fn new_test_barrier(epoch: u64) -> Self {
412        Self {
413            epoch: EpochPair::new_test_epoch(epoch),
414            kind: BarrierKind::Checkpoint,
415            tracing_context: TracingContext::none(),
416            mutation: Default::default(),
417        }
418    }
419
420    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
421        Self {
422            epoch: EpochPair::new(epoch, prev_epoch),
423            kind: BarrierKind::Checkpoint,
424            tracing_context: TracingContext::none(),
425            mutation: Default::default(),
426        }
427    }
428}
429
430impl Barrier {
431    pub fn into_dispatcher(self) -> DispatcherBarrier {
432        DispatcherBarrier {
433            epoch: self.epoch,
434            mutation: (),
435            kind: self.kind,
436            tracing_context: self.tracing_context,
437        }
438    }
439
440    #[must_use]
441    pub fn with_mutation(self, mutation: Mutation) -> Self {
442        Self {
443            mutation: Some(Arc::new(mutation)),
444            ..self
445        }
446    }
447
448    #[must_use]
449    pub fn with_stop(self) -> Self {
450        self.with_mutation(Mutation::Stop(StopMutation {
451            dropped_actors: Default::default(),
452            dropped_sink_fragments: Default::default(),
453        }))
454    }
455
456    /// Whether this barrier carries stop mutation.
457    pub fn is_with_stop_mutation(&self) -> bool {
458        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
459    }
460
461    /// Whether this barrier is to stop the actor with `actor_id`.
462    pub fn is_stop(&self, actor_id: ActorId) -> bool {
463        self.all_stop_actors()
464            .is_some_and(|actors| actors.contains(&actor_id))
465    }
466
467    pub fn is_checkpoint(&self) -> bool {
468        self.kind == BarrierKind::Checkpoint
469    }
470
471    /// Get the initial split assignments for the actor with `actor_id`.
472    ///
473    /// This should only be called on the initial barrier received by the executor. It must be
474    ///
475    /// - `Add` mutation when it's a new streaming job, or recovery.
476    /// - `Update` mutation when it's created for scaling.
477    ///
478    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
479    /// of existing executors.
480    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
481        match self.mutation.as_deref()? {
482            Mutation::Update(UpdateMutation { actor_splits, .. })
483            | Mutation::Add(AddMutation {
484                splits: actor_splits,
485                ..
486            }) => actor_splits.get(&actor_id),
487
488            _ => {
489                if cfg!(debug_assertions) {
490                    panic!(
491                        "the initial mutation of the barrier should not be {:?}",
492                        self.mutation
493                    );
494                }
495                None
496            }
497        }
498        .map(|s| s.as_slice())
499    }
500
501    /// Get all actors that to be stopped (dropped) by this barrier.
502    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
503        match self.mutation.as_deref() {
504            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
505            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
506            _ => None,
507        }
508    }
509
510    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
511    /// `Values` to decide whether to output the existing (historical) data.
512    ///
513    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
514    /// added for scaling are not included.
515    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
516        match self.mutation.as_deref() {
517            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
518                added_actors.contains(&actor_id)
519            }
520            _ => false,
521        }
522    }
523
524    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
525        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
526            fragment_ids.contains(&fragment_id)
527        } else {
528            false
529        }
530    }
531
532    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
533    ///
534    /// # Use case
535    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
536    /// * Pause a standalone shared `SourceExecutor`.
537    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
538    ///
539    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
540    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
541    /// needs to be turned off.
542    ///
543    /// ## Some special cases not included
544    ///
545    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
546    /// care about **number of downstream fragments** (more precisely, existence).
547    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
548    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
549    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
550        let Some(mutation) = self.mutation.as_deref() else {
551            return false;
552        };
553        match mutation {
554            // Add is for mv, index and sink creation.
555            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
556            Mutation::Update(_)
557            | Mutation::Stop(_)
558            | Mutation::Pause
559            | Mutation::Resume
560            | Mutation::SourceChangeSplit(_)
561            | Mutation::Throttle(_)
562            | Mutation::DropSubscriptions { .. }
563            | Mutation::ConnectorPropsChange(_)
564            | Mutation::StartFragmentBackfill { .. }
565            | Mutation::RefreshStart { .. }
566            | Mutation::ListFinish { .. }
567            | Mutation::LoadFinish { .. }
568            | Mutation::ResetSource { .. } => false,
569        }
570    }
571
572    /// Whether this barrier requires the executor to pause its data stream on startup.
573    pub fn is_pause_on_startup(&self) -> bool {
574        match self.mutation.as_deref() {
575            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
576            _ => false,
577        }
578    }
579
580    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
581        match self.mutation.as_deref() {
582            Some(Mutation::Add(AddMutation {
583                backfill_nodes_to_pause,
584                ..
585            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
586            Some(Mutation::Update(_)) => false,
587            _ => {
588                tracing::warn!(
589                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
590                    self
591                );
592                false
593            }
594        }
595    }
596
597    /// Whether this barrier is for resume.
598    pub fn is_resume(&self) -> bool {
599        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
600    }
601
602    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
603    /// with `actor_id`.
604    pub fn as_update_merge(
605        &self,
606        actor_id: ActorId,
607        upstream_fragment_id: UpstreamFragmentId,
608    ) -> Option<&MergeUpdate> {
609        self.mutation
610            .as_deref()
611            .and_then(|mutation| match mutation {
612                Mutation::Update(UpdateMutation { merges, .. }) => {
613                    merges.get(&(actor_id, upstream_fragment_id))
614                }
615                _ => None,
616            })
617    }
618
619    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
620    /// the specified downstream fragment.
621    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
622        self.mutation
623            .as_deref()
624            .and_then(|mutation| match mutation {
625                Mutation::Add(AddMutation {
626                    new_upstream_sinks, ..
627                }) => new_upstream_sinks.get(&fragment_id),
628                _ => None,
629            })
630    }
631
632    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
633    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
634        self.mutation
635            .as_deref()
636            .and_then(|mutation| match mutation {
637                Mutation::Stop(StopMutation {
638                    dropped_sink_fragments,
639                    ..
640                }) => Some(dropped_sink_fragments),
641                _ => None,
642            })
643    }
644
645    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
646    /// with `actor_id`.
647    ///
648    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
649    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
650    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
651        self.mutation
652            .as_deref()
653            .and_then(|mutation| match mutation {
654                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
655                    vnode_bitmaps.get(&actor_id).cloned()
656                }
657                _ => None,
658            })
659    }
660
661    pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
662        self.mutation
663            .as_deref()
664            .and_then(|mutation| match mutation {
665                Mutation::Update(UpdateMutation {
666                    sink_schema_change, ..
667                }) => sink_schema_change.get(&sink_id).cloned(),
668                _ => None,
669            })
670    }
671
672    pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
673        match self.mutation.as_deref() {
674            Some(Mutation::DropSubscriptions {
675                subscriptions_to_drop,
676            })
677            | Some(Mutation::Update(UpdateMutation {
678                subscriptions_to_drop,
679                ..
680            })) => Some(subscriptions_to_drop.as_slice()),
681            _ => None,
682        }
683    }
684
685    pub fn get_curr_epoch(&self) -> Epoch {
686        Epoch(self.epoch.curr)
687    }
688
689    /// Retrieve the tracing context for the **current** epoch of this barrier.
690    pub fn tracing_context(&self) -> &TracingContext {
691        &self.tracing_context
692    }
693
694    pub fn added_subscriber_on_mv_table(
695        &self,
696        mv_table_id: TableId,
697    ) -> impl Iterator<Item = SubscriberId> + '_ {
698        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
699            Some(add)
700        } else {
701            None
702        }
703        .into_iter()
704        .flat_map(move |add| {
705            add.subscriptions_to_add.iter().filter_map(
706                move |(upstream_mv_table_id, subscriber_id)| {
707                    if *upstream_mv_table_id == mv_table_id {
708                        Some(*subscriber_id)
709                    } else {
710                        None
711                    }
712                },
713            )
714        })
715    }
716}
717
718impl<M: PartialEq> PartialEq for BarrierInner<M> {
719    fn eq(&self, other: &Self) -> bool {
720        self.epoch == other.epoch && self.mutation == other.mutation
721    }
722}
723
724impl Mutation {
725    /// Return true if the mutation is stop.
726    ///
727    /// Note that this does not mean we will stop the current actor.
728    #[cfg(test)]
729    pub fn is_stop(&self) -> bool {
730        matches!(self, Mutation::Stop(_))
731    }
732
733    #[cfg(test)]
734    fn to_protobuf(&self) -> PbMutation {
735        use risingwave_pb::source::{
736            ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
737        };
738        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
739        use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
740        use risingwave_pb::stream_plan::{
741            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
742            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
743            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
744            PbThrottleMutation, PbUpdateMutation,
745        };
746        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
747            actor_splits
748                .iter()
749                .map(|(&actor_id, splits)| {
750                    (
751                        actor_id,
752                        ConnectorSplits {
753                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
754                        },
755                    )
756                })
757                .collect::<HashMap<_, _>>()
758        };
759
760        match self {
761            Mutation::Stop(StopMutation {
762                dropped_actors,
763                dropped_sink_fragments,
764            }) => PbMutation::Stop(PbStopMutation {
765                actors: dropped_actors.iter().copied().collect(),
766                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
767            }),
768            Mutation::Update(UpdateMutation {
769                dispatchers,
770                merges,
771                vnode_bitmaps,
772                dropped_actors,
773                actor_splits,
774                actor_new_dispatchers,
775                actor_cdc_table_snapshot_splits,
776                sink_schema_change,
777                subscriptions_to_drop,
778            }) => PbMutation::Update(PbUpdateMutation {
779                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
780                merge_update: merges.values().cloned().collect(),
781                actor_vnode_bitmap_update: vnode_bitmaps
782                    .iter()
783                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
784                    .collect(),
785                dropped_actors: dropped_actors.iter().copied().collect(),
786                actor_splits: actor_splits_to_protobuf(actor_splits),
787                actor_new_dispatchers: actor_new_dispatchers
788                    .iter()
789                    .map(|(&actor_id, dispatchers)| {
790                        (
791                            actor_id,
792                            PbDispatchers {
793                                dispatchers: dispatchers.clone(),
794                            },
795                        )
796                    })
797                    .collect(),
798                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
799                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
800                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
801                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
802                            generation: *generation,
803                        })
804                    }).collect()
805                }),
806                sink_schema_change: sink_schema_change
807                    .iter()
808                    .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
809                    .collect(),
810                subscriptions_to_drop: subscriptions_to_drop.clone(),
811            }),
812            Mutation::Add(AddMutation {
813                adds,
814                added_actors,
815                splits,
816                pause,
817                subscriptions_to_add,
818                backfill_nodes_to_pause,
819                actor_cdc_table_snapshot_splits,
820                new_upstream_sinks,
821            }) => PbMutation::Add(PbAddMutation {
822                actor_dispatchers: adds
823                    .iter()
824                    .map(|(&actor_id, dispatchers)| {
825                        (
826                            actor_id,
827                            PbDispatchers {
828                                dispatchers: dispatchers.clone(),
829                            },
830                        )
831                    })
832                    .collect(),
833                added_actors: added_actors.iter().copied().collect(),
834                actor_splits: actor_splits_to_protobuf(splits),
835                pause: *pause,
836                subscriptions_to_add: subscriptions_to_add
837                    .iter()
838                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
839                        subscriber_id: *subscriber_id,
840                        upstream_mv_table_id: *table_id,
841                    })
842                    .collect(),
843                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
844                actor_cdc_table_snapshot_splits:
845                Some(PbCdcTableSnapshotSplitsWithGeneration {
846                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
847                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
848                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
849                            generation: *generation,
850                        })
851                    }).collect()
852                }),
853                new_upstream_sinks: new_upstream_sinks
854                    .iter()
855                    .map(|(k, v)| (*k, v.clone()))
856                    .collect(),
857            }),
858            Mutation::SourceChangeSplit(changes) => {
859                PbMutation::Splits(PbSourceChangeSplitMutation {
860                    actor_splits: changes
861                        .iter()
862                        .map(|(&actor_id, splits)| {
863                            (
864                                actor_id,
865                                ConnectorSplits {
866                                    splits: splits
867                                        .clone()
868                                        .iter()
869                                        .map(ConnectorSplit::from)
870                                        .collect(),
871                                },
872                            )
873                        })
874                        .collect(),
875                })
876            }
877            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
878            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
879            Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
880                fragment_throttle: changes
881                    .iter()
882                    .map(|(fragment_id, limit)| (*fragment_id, RateLimit { rate_limit: *limit }))
883                    .collect(),
884            }),
885            Mutation::DropSubscriptions {
886                subscriptions_to_drop,
887            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
888                info: subscriptions_to_drop.clone(),
889            }),
890            Mutation::ConnectorPropsChange(map) => {
891                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
892                    connector_props_infos: map
893                        .iter()
894                        .map(|(actor_id, options)| {
895                            (
896                                *actor_id,
897                                ConnectorPropsInfo {
898                                    connector_props_info: options
899                                        .iter()
900                                        .map(|(k, v)| (k.clone(), v.clone()))
901                                        .collect(),
902                                },
903                            )
904                        })
905                        .collect(),
906                })
907            }
908            Mutation::StartFragmentBackfill { fragment_ids } => {
909                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
910                    fragment_ids: fragment_ids.iter().copied().collect(),
911                })
912            }
913            Mutation::RefreshStart {
914                table_id,
915                associated_source_id,
916            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
917                table_id: *table_id,
918                associated_source_id: *associated_source_id,
919            }),
920            Mutation::ListFinish {
921                associated_source_id,
922            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
923                associated_source_id: *associated_source_id,
924            }),
925            Mutation::LoadFinish {
926                associated_source_id,
927            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
928                associated_source_id: *associated_source_id,
929            }),
930            Mutation::ResetSource { source_id } => {
931                PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
932                    source_id: source_id.as_raw_id(),
933                })
934            }
935        }
936    }
937
938    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
939        let mutation = match prost {
940            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
941                dropped_actors: stop.actors.iter().copied().collect(),
942                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
943            }),
944
945            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
946                dispatchers: update
947                    .dispatcher_update
948                    .iter()
949                    .map(|u| (u.actor_id, u.clone()))
950                    .into_group_map(),
951                merges: update
952                    .merge_update
953                    .iter()
954                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
955                    .collect(),
956                vnode_bitmaps: update
957                    .actor_vnode_bitmap_update
958                    .iter()
959                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
960                    .collect(),
961                dropped_actors: update.dropped_actors.iter().copied().collect(),
962                actor_splits: update
963                    .actor_splits
964                    .iter()
965                    .map(|(&actor_id, splits)| {
966                        (
967                            actor_id,
968                            splits
969                                .splits
970                                .iter()
971                                .map(|split| split.try_into().unwrap())
972                                .collect(),
973                        )
974                    })
975                    .collect(),
976                actor_new_dispatchers: update
977                    .actor_new_dispatchers
978                    .iter()
979                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
980                    .collect(),
981                actor_cdc_table_snapshot_splits:
982                    build_actor_cdc_table_snapshot_splits_with_generation(
983                        update
984                            .actor_cdc_table_snapshot_splits
985                            .clone()
986                            .unwrap_or_default(),
987                    ),
988                sink_schema_change: update
989                    .sink_schema_change
990                    .iter()
991                    .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
992                    .collect(),
993                subscriptions_to_drop: update.subscriptions_to_drop.clone(),
994            }),
995
996            PbMutation::Add(add) => Mutation::Add(AddMutation {
997                adds: add
998                    .actor_dispatchers
999                    .iter()
1000                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1001                    .collect(),
1002                added_actors: add.added_actors.iter().copied().collect(),
1003                // TODO: remove this and use `SourceChangesSplit` after we support multiple
1004                // mutations.
1005                splits: add
1006                    .actor_splits
1007                    .iter()
1008                    .map(|(&actor_id, splits)| {
1009                        (
1010                            actor_id,
1011                            splits
1012                                .splits
1013                                .iter()
1014                                .map(|split| split.try_into().unwrap())
1015                                .collect(),
1016                        )
1017                    })
1018                    .collect(),
1019                pause: add.pause,
1020                subscriptions_to_add: add
1021                    .subscriptions_to_add
1022                    .iter()
1023                    .map(
1024                        |SubscriptionUpstreamInfo {
1025                             subscriber_id,
1026                             upstream_mv_table_id,
1027                         }| { (*upstream_mv_table_id, *subscriber_id) },
1028                    )
1029                    .collect(),
1030                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1031                actor_cdc_table_snapshot_splits:
1032                    build_actor_cdc_table_snapshot_splits_with_generation(
1033                        add.actor_cdc_table_snapshot_splits
1034                            .clone()
1035                            .unwrap_or_default(),
1036                    ),
1037                new_upstream_sinks: add
1038                    .new_upstream_sinks
1039                    .iter()
1040                    .map(|(k, v)| (*k, v.clone()))
1041                    .collect(),
1042            }),
1043
1044            PbMutation::Splits(s) => {
1045                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1046                    Vec::with_capacity(s.actor_splits.len());
1047                for (&actor_id, splits) in &s.actor_splits {
1048                    if !splits.splits.is_empty() {
1049                        change_splits.push((
1050                            actor_id,
1051                            splits
1052                                .splits
1053                                .iter()
1054                                .map(SplitImpl::try_from)
1055                                .try_collect()?,
1056                        ));
1057                    }
1058                }
1059                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1060            }
1061            PbMutation::Pause(_) => Mutation::Pause,
1062            PbMutation::Resume(_) => Mutation::Resume,
1063            PbMutation::Throttle(changes) => Mutation::Throttle(
1064                changes
1065                    .fragment_throttle
1066                    .iter()
1067                    .map(|(fragment_id, limit)| (*fragment_id, limit.rate_limit))
1068                    .collect(),
1069            ),
1070            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1071                subscriptions_to_drop: drop.info.clone(),
1072            },
1073            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1074                Mutation::ConnectorPropsChange(
1075                    alter_connector_props
1076                        .connector_props_infos
1077                        .iter()
1078                        .map(|(connector_id, options)| {
1079                            (
1080                                *connector_id,
1081                                options
1082                                    .connector_props_info
1083                                    .iter()
1084                                    .map(|(k, v)| (k.clone(), v.clone()))
1085                                    .collect(),
1086                            )
1087                        })
1088                        .collect(),
1089                )
1090            }
1091            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1092                Mutation::StartFragmentBackfill {
1093                    fragment_ids: start_fragment_backfill
1094                        .fragment_ids
1095                        .iter()
1096                        .copied()
1097                        .collect(),
1098                }
1099            }
1100            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1101                table_id: refresh_start.table_id,
1102                associated_source_id: refresh_start.associated_source_id,
1103            },
1104            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1105                associated_source_id: list_finish.associated_source_id,
1106            },
1107            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1108                associated_source_id: load_finish.associated_source_id,
1109            },
1110            PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1111                source_id: SourceId::from(reset_source.source_id),
1112            },
1113        };
1114        Ok(mutation)
1115    }
1116}
1117
1118impl<M> BarrierInner<M> {
1119    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1120        let Self {
1121            epoch,
1122            mutation,
1123            kind,
1124            tracing_context,
1125            ..
1126        } = self;
1127
1128        PbBarrier {
1129            epoch: Some(PbEpoch {
1130                curr: epoch.curr,
1131                prev: epoch.prev,
1132            }),
1133            mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1134                mutation: Some(mutation),
1135            }),
1136            tracing_context: tracing_context.to_protobuf(),
1137            kind: *kind as _,
1138        }
1139    }
1140
1141    fn from_protobuf_inner(
1142        prost: &PbBarrier,
1143        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1144    ) -> StreamExecutorResult<Self> {
1145        let epoch = prost.get_epoch()?;
1146
1147        Ok(Self {
1148            kind: prost.kind(),
1149            epoch: EpochPair::new(epoch.curr, epoch.prev),
1150            mutation: mutation_from_pb(
1151                (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1152            )?,
1153            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1154        })
1155    }
1156
1157    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1158        BarrierInner {
1159            epoch: self.epoch,
1160            mutation: f(self.mutation),
1161            kind: self.kind,
1162            tracing_context: self.tracing_context,
1163        }
1164    }
1165}
1166
1167impl DispatcherBarrier {
1168    pub fn to_protobuf(&self) -> PbBarrier {
1169        self.to_protobuf_inner(|_| None)
1170    }
1171}
1172
1173impl Barrier {
1174    #[cfg(test)]
1175    pub fn to_protobuf(&self) -> PbBarrier {
1176        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1177    }
1178
1179    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1180        Self::from_protobuf_inner(prost, |mutation| {
1181            mutation
1182                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1183                .transpose()
1184        })
1185    }
1186}
1187
1188#[derive(Debug, PartialEq, Eq, Clone)]
1189pub struct Watermark {
1190    pub col_idx: usize,
1191    pub data_type: DataType,
1192    pub val: ScalarImpl,
1193}
1194
1195impl PartialOrd for Watermark {
1196    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1197        Some(self.cmp(other))
1198    }
1199}
1200
1201impl Ord for Watermark {
1202    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1203        self.val.default_cmp(&other.val)
1204    }
1205}
1206
1207impl Watermark {
1208    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1209        Self {
1210            col_idx,
1211            data_type,
1212            val,
1213        }
1214    }
1215
1216    pub async fn transform_with_expr(
1217        self,
1218        expr: &NonStrictExpression<impl Expression>,
1219        new_col_idx: usize,
1220    ) -> Option<Self> {
1221        let Self { col_idx, val, .. } = self;
1222        let row = {
1223            let mut row = vec![None; col_idx + 1];
1224            row[col_idx] = Some(val);
1225            OwnedRow::new(row)
1226        };
1227        let val = expr.eval_row_infallible(&row).await?;
1228        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1229    }
1230
1231    /// Transform the watermark with the given output indices. If this watermark is not in the
1232    /// output, return `None`.
1233    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1234        output_indices
1235            .iter()
1236            .position(|p| *p == self.col_idx)
1237            .map(|new_col_idx| self.with_idx(new_col_idx))
1238    }
1239
1240    pub fn to_protobuf(&self) -> PbWatermark {
1241        PbWatermark {
1242            column: Some(PbInputRef {
1243                index: self.col_idx as _,
1244                r#type: Some(self.data_type.to_protobuf()),
1245            }),
1246            val: Some(&self.val).to_protobuf().into(),
1247        }
1248    }
1249
1250    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1251        let col_ref = prost.get_column()?;
1252        let data_type = DataType::from(col_ref.get_type()?);
1253        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1254            .expect("watermark value cannot be null");
1255        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1256    }
1257
1258    pub fn with_idx(self, idx: usize) -> Self {
1259        Self::new(idx, self.data_type, self.val)
1260    }
1261}
1262
1263#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1264#[derive(Debug, EnumAsInner, Clone)]
1265pub enum MessageInner<M> {
1266    Chunk(StreamChunk),
1267    Barrier(BarrierInner<M>),
1268    Watermark(Watermark),
1269}
1270
1271impl<M> MessageInner<M> {
1272    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1273        match self {
1274            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1275            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1276            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1277        }
1278    }
1279}
1280
1281pub type Message = MessageInner<BarrierMutationType>;
1282pub type DispatcherMessage = MessageInner<()>;
1283
1284/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1285/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1286#[derive(Debug, EnumAsInner, Clone)]
1287pub enum MessageBatchInner<M> {
1288    Chunk(StreamChunk),
1289    BarrierBatch(Vec<BarrierInner<M>>),
1290    Watermark(Watermark),
1291}
1292pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1293pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1294pub type DispatcherMessageBatch = MessageBatchInner<()>;
1295
1296impl From<DispatcherMessage> for DispatcherMessageBatch {
1297    fn from(m: DispatcherMessage) -> Self {
1298        match m {
1299            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1300            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1301            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1302        }
1303    }
1304}
1305
1306impl From<StreamChunk> for Message {
1307    fn from(chunk: StreamChunk) -> Self {
1308        Message::Chunk(chunk)
1309    }
1310}
1311
1312impl<'a> TryFrom<&'a Message> for &'a Barrier {
1313    type Error = ();
1314
1315    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1316        match m {
1317            Message::Chunk(_) => Err(()),
1318            Message::Barrier(b) => Ok(b),
1319            Message::Watermark(_) => Err(()),
1320        }
1321    }
1322}
1323
1324impl Message {
1325    /// Return true if the message is a stop barrier, meaning the stream
1326    /// will not continue, false otherwise.
1327    ///
1328    /// Note that this does not mean we will stop the current actor.
1329    #[cfg(test)]
1330    pub fn is_stop(&self) -> bool {
1331        matches!(
1332            self,
1333            Message::Barrier(Barrier {
1334                mutation,
1335                ..
1336            }) if mutation.as_ref().unwrap().is_stop()
1337        )
1338    }
1339}
1340
1341impl DispatcherMessageBatch {
1342    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1343        let prost = match self {
1344            Self::Chunk(stream_chunk) => {
1345                let prost_stream_chunk = stream_chunk.to_protobuf();
1346                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1347            }
1348            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1349                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1350            }),
1351            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1352        };
1353        PbStreamMessageBatch {
1354            stream_message_batch: Some(prost),
1355        }
1356    }
1357
1358    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1359        let res = match prost.get_stream_message_batch()? {
1360            StreamMessageBatch::StreamChunk(chunk) => {
1361                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1362            }
1363            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1364                let barriers = barrier_batch
1365                    .barriers
1366                    .iter()
1367                    .map(|barrier| {
1368                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1369                            if mutation.is_some() {
1370                                if cfg!(debug_assertions) {
1371                                    panic!("should not receive message of barrier with mutation");
1372                                } else {
1373                                    warn!(?barrier, "receive message of barrier with mutation");
1374                                }
1375                            }
1376                            Ok(())
1377                        })
1378                    })
1379                    .try_collect()?;
1380                Self::BarrierBatch(barriers)
1381            }
1382            StreamMessageBatch::Watermark(watermark) => {
1383                Self::Watermark(Watermark::from_protobuf(watermark)?)
1384            }
1385        };
1386        Ok(res)
1387    }
1388
1389    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1390        ::prost::Message::encoded_len(msg)
1391    }
1392}
1393
1394pub type StreamKey = Vec<usize>;
1395pub type StreamKeyRef<'a> = &'a [usize];
1396pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1397
1398/// Expect the first message of the given `stream` as a barrier.
1399pub async fn expect_first_barrier<M: Debug>(
1400    stream: &mut (impl MessageStreamInner<M> + Unpin),
1401) -> StreamExecutorResult<BarrierInner<M>> {
1402    let message = stream
1403        .next()
1404        .instrument_await("expect_first_barrier")
1405        .await
1406        .context("failed to extract the first message: stream closed unexpectedly")??;
1407    let barrier = message
1408        .into_barrier()
1409        .expect("the first message must be a barrier");
1410    // TODO: Is this check correct?
1411    assert!(matches!(
1412        barrier.kind,
1413        BarrierKind::Checkpoint | BarrierKind::Initial
1414    ));
1415    Ok(barrier)
1416}
1417
1418/// Expect the first message of the given `stream` as a barrier.
1419pub async fn expect_first_barrier_from_aligned_stream(
1420    stream: &mut (impl AlignedMessageStream + Unpin),
1421) -> StreamExecutorResult<Barrier> {
1422    let message = stream
1423        .next()
1424        .instrument_await("expect_first_barrier")
1425        .await
1426        .context("failed to extract the first message: stream closed unexpectedly")??;
1427    let barrier = message
1428        .into_barrier()
1429        .expect("the first message must be a barrier");
1430    Ok(barrier)
1431}
1432
1433/// `StreamConsumer` is the last step in an actor.
1434pub trait StreamConsumer: Send + 'static {
1435    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1436
1437    fn execute(self: Box<Self>) -> Self::BarrierStream;
1438}
1439
1440type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1441
1442/// A stream for merging messages from multiple upstreams.
1443/// Can dynamically add and delete upstream streams.
1444/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1445pub struct DynamicReceivers<InputId, M> {
1446    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1447    barrier: Option<BarrierInner<M>>,
1448    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1449    start_ts: Option<Instant>,
1450    /// The upstreams that're blocked by the `barrier`.
1451    blocked: Vec<BoxedMessageInput<InputId, M>>,
1452    /// The upstreams that're not blocked and can be polled.
1453    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1454    /// watermark column index -> `BufferedWatermarks`
1455    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1456    /// Currently only used for union.
1457    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1458    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1459    merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1460}
1461
1462impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1463    for DynamicReceivers<InputId, M>
1464{
1465    type Item = MessageStreamItemInner<M>;
1466
1467    fn poll_next(
1468        mut self: Pin<&mut Self>,
1469        cx: &mut std::task::Context<'_>,
1470    ) -> Poll<Option<Self::Item>> {
1471        if self.is_empty() {
1472            return Poll::Ready(None);
1473        }
1474
1475        loop {
1476            match futures::ready!(self.active.poll_next_unpin(cx)) {
1477                // Directly forward the error.
1478                Some((Some(Err(e)), _)) => {
1479                    return Poll::Ready(Some(Err(e)));
1480                }
1481                // Handle the message from some upstream.
1482                Some((Some(Ok(message)), remaining)) => {
1483                    let input_id = remaining.id();
1484                    match message {
1485                        MessageInner::Chunk(chunk) => {
1486                            // Continue polling this upstream by pushing it back to `active`.
1487                            self.active.push(remaining.into_future());
1488                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1489                        }
1490                        MessageInner::Watermark(watermark) => {
1491                            // Continue polling this upstream by pushing it back to `active`.
1492                            self.active.push(remaining.into_future());
1493                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1494                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1495                            }
1496                        }
1497                        MessageInner::Barrier(barrier) => {
1498                            // Block this upstream by pushing it to `blocked`.
1499                            if self.blocked.is_empty() {
1500                                self.start_ts = Some(Instant::now());
1501                            }
1502                            self.blocked.push(remaining);
1503                            if let Some(current_barrier) = self.barrier.as_ref() {
1504                                if current_barrier.epoch != barrier.epoch {
1505                                    return Poll::Ready(Some(Err(
1506                                        StreamExecutorError::align_barrier(
1507                                            current_barrier.clone().map_mutation(|_| None),
1508                                            barrier.map_mutation(|_| None),
1509                                        ),
1510                                    )));
1511                                }
1512                            } else {
1513                                self.barrier = Some(barrier);
1514                            }
1515                        }
1516                    }
1517                }
1518                // We use barrier as the control message of the stream. That is, we always stop the
1519                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1520                // termination.
1521                //
1522                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1523                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1524                // So this branch will never be reached in all cases.
1525                Some((None, remaining)) => {
1526                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1527                        "upstream input {:?} unexpectedly closed",
1528                        remaining.id()
1529                    )))));
1530                }
1531                // There's no active upstreams. Process the barrier and resume the blocked ones.
1532                None => {
1533                    assert!(!self.blocked.is_empty());
1534
1535                    let start_ts = self
1536                        .start_ts
1537                        .take()
1538                        .expect("should have received at least one barrier");
1539                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1540                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1541                    }
1542                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1543                        merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1544                    }
1545
1546                    break;
1547                }
1548            }
1549        }
1550
1551        assert!(self.active.is_terminated());
1552
1553        let barrier = self.barrier.take().unwrap();
1554
1555        let upstreams = std::mem::take(&mut self.blocked);
1556        self.extend_active(upstreams);
1557        assert!(!self.active.is_terminated());
1558
1559        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1560    }
1561}
1562
1563impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1564    pub fn new(
1565        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1566        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1567        merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1568    ) -> Self {
1569        let mut this = Self {
1570            barrier: None,
1571            start_ts: None,
1572            blocked: Vec::with_capacity(upstreams.len()),
1573            active: Default::default(),
1574            buffered_watermarks: Default::default(),
1575            merge_barrier_align_duration,
1576            barrier_align_duration,
1577        };
1578        this.extend_active(upstreams);
1579        this
1580    }
1581
1582    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1583    /// clean state right after a barrier.
1584    pub fn extend_active(
1585        &mut self,
1586        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1587    ) {
1588        assert!(self.blocked.is_empty() && self.barrier.is_none());
1589
1590        self.active
1591            .extend(upstreams.into_iter().map(|s| s.into_future()));
1592    }
1593
1594    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1595    pub fn handle_watermark(
1596        &mut self,
1597        input_id: InputId,
1598        watermark: Watermark,
1599    ) -> Option<Watermark> {
1600        let col_idx = watermark.col_idx;
1601        // Insert a buffer watermarks when first received from a column.
1602        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1603        let watermarks = self
1604            .buffered_watermarks
1605            .entry(col_idx)
1606            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1607        watermarks.handle_watermark(input_id, watermark)
1608    }
1609
1610    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1611    /// right after a barrier.
1612    pub fn add_upstreams_from(
1613        &mut self,
1614        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1615    ) {
1616        assert!(self.blocked.is_empty() && self.barrier.is_none());
1617
1618        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1619        let input_ids = new_inputs.iter().map(|input| input.id());
1620        self.buffered_watermarks.values_mut().for_each(|buffers| {
1621            // Add buffers to the buffered watermarks for all cols
1622            buffers.add_buffers(input_ids.clone());
1623        });
1624        self.active
1625            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1626    }
1627
1628    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1629    /// clean state right after a barrier.
1630    /// The current container does not necessarily contain all the input ids passed in.
1631    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1632        assert!(self.blocked.is_empty() && self.barrier.is_none());
1633
1634        let new_upstreams = std::mem::take(&mut self.active)
1635            .into_iter()
1636            .map(|s| s.into_inner().unwrap())
1637            .filter(|u| !upstream_input_ids.contains(&u.id()));
1638        self.extend_active(new_upstreams);
1639        self.buffered_watermarks.values_mut().for_each(|buffers| {
1640            // Call `check_heap` in case the only upstream(s) that does not have
1641            // watermark in heap is removed
1642            buffers.remove_buffer(upstream_input_ids.clone());
1643        });
1644    }
1645
1646    pub fn merge_barrier_align_duration(
1647        &self,
1648    ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1649        self.merge_barrier_align_duration.clone()
1650    }
1651
1652    pub fn flush_buffered_watermarks(&mut self) {
1653        self.buffered_watermarks
1654            .values_mut()
1655            .for_each(|buffers| buffers.clear());
1656    }
1657
1658    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1659        self.blocked
1660            .iter()
1661            .map(|s| s.id())
1662            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1663    }
1664
1665    pub fn is_empty(&self) -> bool {
1666        self.blocked.is_empty() && self.active.is_empty()
1667    }
1668}
1669
1670// Explanation of why we need `DispatchBarrierBuffer`:
1671//
1672// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1673// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1674// for these old upstreams to completely process their barriers and align before we can safely update the
1675// `upstream-input-set`.
1676//
1677// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1678// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1679// downstream_merge_update -> old_actor_processing]
1680//
1681// To address this, we split the application of a barrier's `Mutation` into two steps:
1682// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1683//    and cache it.
1684// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1685//
1686// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1687// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1688// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1689// `barrier_rx`.
1690pub(crate) struct DispatchBarrierBuffer {
1691    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1692    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1693    recv_state: BarrierReceiverState,
1694    curr_upstream_fragment_id: FragmentId,
1695    actor_id: ActorId,
1696    // read-only context for building new inputs
1697    build_input_ctx: Arc<BuildInputContext>,
1698}
1699
1700struct BuildInputContext {
1701    pub actor_id: ActorId,
1702    pub local_barrier_manager: LocalBarrierManager,
1703    pub metrics: Arc<StreamingMetrics>,
1704    pub fragment_id: FragmentId,
1705    pub actor_config: Arc<StreamingConfig>,
1706}
1707
1708type BoxedNewInputsFuture =
1709    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1710
1711enum BarrierReceiverState {
1712    ReceivingBarrier,
1713    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1714}
1715
1716impl DispatchBarrierBuffer {
1717    pub fn new(
1718        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1719        actor_id: ActorId,
1720        curr_upstream_fragment_id: FragmentId,
1721        local_barrier_manager: LocalBarrierManager,
1722        metrics: Arc<StreamingMetrics>,
1723        fragment_id: FragmentId,
1724        actor_config: Arc<StreamingConfig>,
1725    ) -> Self {
1726        Self {
1727            buffer: VecDeque::new(),
1728            barrier_rx,
1729            recv_state: BarrierReceiverState::ReceivingBarrier,
1730            curr_upstream_fragment_id,
1731            actor_id,
1732            build_input_ctx: Arc::new(BuildInputContext {
1733                actor_id,
1734                local_barrier_manager,
1735                metrics,
1736                fragment_id,
1737                actor_config,
1738            }),
1739        }
1740    }
1741
1742    pub async fn await_next_message(
1743        &mut self,
1744        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1745        metrics: &ActorInputMetrics,
1746    ) -> StreamExecutorResult<DispatcherMessage> {
1747        let mut start_time = Instant::now();
1748        let interval_duration = Duration::from_secs(15);
1749        let mut interval =
1750            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1751
1752        loop {
1753            tokio::select! {
1754                biased;
1755                msg = stream.try_next() => {
1756                    metrics
1757                        .actor_input_buffer_blocking_duration_ns
1758                        .inc_by(start_time.elapsed().as_nanos() as u64);
1759                    return msg?.ok_or_else(
1760                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1761                    );
1762                }
1763
1764                e = self.continuously_fetch_barrier_rx() => {
1765                    return Err(e);
1766                }
1767
1768                _ = interval.tick() => {
1769                    start_time = Instant::now();
1770                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1771                    continue;
1772                }
1773            }
1774        }
1775    }
1776
1777    pub async fn pop_barrier_with_inputs(
1778        &mut self,
1779        barrier: DispatcherBarrier,
1780    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1781        while self.buffer.is_empty() {
1782            self.try_fetch_barrier_rx(false).await?;
1783        }
1784        let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1785        assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1786
1787        Ok((recv_barrier, inputs))
1788    }
1789
1790    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1791        loop {
1792            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1793                return e;
1794            }
1795        }
1796    }
1797
1798    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1799        match &mut self.recv_state {
1800            BarrierReceiverState::ReceivingBarrier => {
1801                let Some(barrier) = self.barrier_rx.recv().await else {
1802                    if pending_on_end {
1803                        return pending().await;
1804                    } else {
1805                        return Err(StreamExecutorError::channel_closed(
1806                            "barrier channel closed unexpectedly",
1807                        ));
1808                    }
1809                };
1810                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1811                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1812                } else {
1813                    self.buffer.push_back((barrier, None));
1814                }
1815            }
1816            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1817                let new_inputs = fut.await?;
1818                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1819                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1820            }
1821        }
1822        Ok(())
1823    }
1824
1825    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1826        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1827            && !update.added_upstream_actors.is_empty()
1828        {
1829            // When update upstream fragment, added_actors will not be empty.
1830            let upstream_fragment_id =
1831                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1832                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1833                    new_upstream_fragment_id
1834                } else {
1835                    self.curr_upstream_fragment_id
1836                };
1837            let ctx = self.build_input_ctx.clone();
1838            let added_upstream_actors = update.added_upstream_actors.clone();
1839            let barrier = barrier.clone();
1840            let fut = async move {
1841                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1842                    let mut new_input = new_input(
1843                        &ctx.local_barrier_manager,
1844                        ctx.metrics.clone(),
1845                        ctx.actor_id,
1846                        ctx.fragment_id,
1847                        upstream_actor,
1848                        upstream_fragment_id,
1849                        ctx.actor_config.clone(),
1850                    )
1851                    .await?;
1852
1853                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1854                    // original upstreams.
1855                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1856                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1857
1858                    StreamExecutorResult::Ok(new_input)
1859                }))
1860                .await
1861            }
1862            .boxed();
1863
1864            Some(fut)
1865        } else {
1866            None
1867        }
1868    }
1869}