risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Field, Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53    PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, PbWatermark,
54    SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62    BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
63};
64use crate::executor::monitor::ActorInputMetrics;
65use crate::executor::prelude::StreamingMetrics;
66use crate::executor::watermark::BufferedWatermarks;
67use crate::task::{ActorId, FragmentId, LocalBarrierManager};
68
69mod actor;
70mod barrier_align;
71pub mod exchange;
72pub mod monitor;
73
74pub mod aggregate;
75pub mod asof_join;
76mod backfill;
77mod barrier_recv;
78mod batch_query;
79mod chain;
80mod changelog;
81mod dedup;
82mod dispatch;
83pub mod dml;
84mod dynamic_filter;
85pub mod eowc;
86pub mod error;
87mod expand;
88mod filter;
89mod gap_fill;
90pub mod hash_join;
91mod hop_window;
92mod join;
93pub mod locality_provider;
94mod lookup;
95mod lookup_union;
96mod merge;
97mod mview;
98mod nested_loop_temporal_join;
99mod no_op;
100mod now;
101mod over_window;
102pub mod project;
103mod rearranged_chain;
104mod receiver;
105pub mod row_id_gen;
106mod sink;
107pub mod source;
108mod stream_reader;
109pub mod subtask;
110mod temporal_join;
111mod top_n;
112mod troublemaker;
113mod union;
114mod upstream_sink_union;
115mod values;
116mod watermark;
117mod watermark_filter;
118mod wrapper;
119
120mod approx_percentile;
121
122mod row_merge;
123
124#[cfg(test)]
125mod integration_tests;
126mod sync_kv_log_store;
127#[cfg(any(test, feature = "test"))]
128pub mod test_utils;
129mod utils;
130mod vector;
131
132pub use actor::{Actor, ActorContext, ActorContextRef};
133use anyhow::Context;
134pub use approx_percentile::global::GlobalApproxPercentileExecutor;
135pub use approx_percentile::local::LocalApproxPercentileExecutor;
136pub use backfill::arrangement_backfill::*;
137pub use backfill::cdc::{
138    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
139};
140pub use backfill::no_shuffle_backfill::*;
141pub use backfill::snapshot_backfill::*;
142pub use barrier_recv::BarrierRecvExecutor;
143pub use batch_query::BatchQueryExecutor;
144pub use chain::ChainExecutor;
145pub use changelog::ChangeLogExecutor;
146pub use dedup::AppendOnlyDedupExecutor;
147pub use dispatch::{DispatchExecutor, DispatcherImpl};
148pub use dynamic_filter::DynamicFilterExecutor;
149pub use error::{StreamExecutorError, StreamExecutorResult};
150pub use expand::ExpandExecutor;
151pub use filter::{FilterExecutor, UpsertFilterExecutor};
152pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
153pub use hash_join::*;
154pub use hop_window::HopWindowExecutor;
155pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
156pub use join::{AsOfDesc, AsOfJoinType, JoinType};
157pub use lookup::*;
158pub use lookup_union::LookupUnionExecutor;
159pub use merge::MergeExecutor;
160pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
161pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
162pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
163pub use no_op::NoOpExecutor;
164pub use now::*;
165pub use over_window::*;
166pub use rearranged_chain::RearrangedChainExecutor;
167pub use receiver::ReceiverExecutor;
168use risingwave_common::id::SourceId;
169pub use row_merge::RowMergeExecutor;
170pub use sink::SinkExecutor;
171pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
172pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
173pub use temporal_join::TemporalJoinExecutor;
174pub use top_n::{
175    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
176};
177pub use troublemaker::TroublemakerExecutor;
178pub use union::UnionExecutor;
179pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
180pub use utils::DummyExecutor;
181pub use values::ValuesExecutor;
182pub use vector::*;
183pub use watermark_filter::WatermarkFilterExecutor;
184pub use wrapper::WrapperExecutor;
185
186use self::barrier_align::AlignedMessageStream;
187
188pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
189pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
190pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
191pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
192
193pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
194use risingwave_connector::sink::catalog::SinkId;
195use risingwave_connector::source::cdc::{
196    CdcTableSnapshotSplitAssignmentWithGeneration,
197    build_actor_cdc_table_snapshot_splits_with_generation,
198};
199use risingwave_pb::id::{ExecutorId, SubscriberId};
200use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
201
202pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
203pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
204pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
205
206/// Static information of an executor.
207#[derive(Debug, Default, Clone)]
208pub struct ExecutorInfo {
209    /// The schema of the OUTPUT of the executor.
210    pub schema: Schema,
211
212    /// The stream key indices of the OUTPUT of the executor.
213    pub stream_key: StreamKey,
214
215    /// The stream kind of the OUTPUT of the executor.
216    pub stream_kind: PbStreamKind,
217
218    /// Identity of the executor.
219    pub identity: String,
220
221    /// The executor id of the executor.
222    pub id: ExecutorId,
223}
224
225impl ExecutorInfo {
226    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
227        Self {
228            schema,
229            stream_key,
230            stream_kind: PbStreamKind::Retract, // dummy value for test
231            identity,
232            id: id.into(),
233        }
234    }
235}
236
237/// [`Execute`] describes the methods an executor should implement to handle control messages.
238pub trait Execute: Send + 'static {
239    fn execute(self: Box<Self>) -> BoxedMessageStream;
240
241    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
242        self.execute()
243    }
244
245    fn boxed(self) -> Box<dyn Execute>
246    where
247        Self: Sized + Send + 'static,
248    {
249        Box::new(self)
250    }
251}
252
253/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
254/// handle messages ([`Execute`]).
255pub struct Executor {
256    info: ExecutorInfo,
257    execute: Box<dyn Execute>,
258}
259
260impl Executor {
261    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
262        Self { info, execute }
263    }
264
265    pub fn info(&self) -> &ExecutorInfo {
266        &self.info
267    }
268
269    pub fn schema(&self) -> &Schema {
270        &self.info.schema
271    }
272
273    pub fn stream_key(&self) -> StreamKeyRef<'_> {
274        &self.info.stream_key
275    }
276
277    pub fn stream_kind(&self) -> PbStreamKind {
278        self.info.stream_kind
279    }
280
281    pub fn identity(&self) -> &str {
282        &self.info.identity
283    }
284
285    pub fn execute(self) -> BoxedMessageStream {
286        self.execute.execute()
287    }
288
289    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
290        self.execute.execute_with_epoch(epoch)
291    }
292}
293
294impl std::fmt::Debug for Executor {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        f.write_str(self.identity())
297    }
298}
299
300impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
301    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
302        Self::new(info, execute)
303    }
304}
305
306impl<E> From<(ExecutorInfo, E)> for Executor
307where
308    E: Execute,
309{
310    fn from((info, execute): (ExecutorInfo, E)) -> Self {
311        Self::new(info, execute.boxed())
312    }
313}
314
315pub const INVALID_EPOCH: u64 = 0;
316
317type UpstreamFragmentId = FragmentId;
318type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
319
320#[derive(Debug, Clone, PartialEq)]
321#[cfg_attr(any(test, feature = "test"), derive(Default))]
322pub struct UpdateMutation {
323    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
324    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
325    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
326    pub dropped_actors: HashSet<ActorId>,
327    pub actor_splits: SplitAssignments,
328    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
329    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
330    pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
331}
332
333#[derive(Debug, Clone, PartialEq)]
334#[cfg_attr(any(test, feature = "test"), derive(Default))]
335pub struct AddMutation {
336    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
337    pub added_actors: HashSet<ActorId>,
338    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
339    pub splits: SplitAssignments,
340    pub pause: bool,
341    /// (`upstream_mv_table_id`,  `subscriber_id`)
342    pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
343    /// nodes which should start backfill
344    pub backfill_nodes_to_pause: HashSet<FragmentId>,
345    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
346    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
347}
348
349#[derive(Debug, Clone, PartialEq)]
350#[cfg_attr(any(test, feature = "test"), derive(Default))]
351pub struct StopMutation {
352    pub dropped_actors: HashSet<ActorId>,
353    pub dropped_sink_fragments: HashSet<FragmentId>,
354}
355
356/// See [`PbMutation`] for the semantics of each mutation.
357#[derive(Debug, Clone, PartialEq)]
358pub enum Mutation {
359    Stop(StopMutation),
360    Update(UpdateMutation),
361    Add(AddMutation),
362    SourceChangeSplit(SplitAssignments),
363    Pause,
364    Resume,
365    Throttle(HashMap<ActorId, Option<u32>>),
366    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
367    DropSubscriptions {
368        /// `subscriber` -> `upstream_mv_table_id`
369        subscriptions_to_drop: Vec<(SubscriberId, TableId)>,
370    },
371    StartFragmentBackfill {
372        fragment_ids: HashSet<FragmentId>,
373    },
374    RefreshStart {
375        table_id: TableId,
376        associated_source_id: SourceId,
377    },
378    ListFinish {
379        associated_source_id: SourceId,
380    },
381    LoadFinish {
382        associated_source_id: SourceId,
383    },
384}
385
386/// The generic type `M` is the mutation type of the barrier.
387///
388/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
389/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
390#[derive(Debug, Clone)]
391pub struct BarrierInner<M> {
392    pub epoch: EpochPair,
393    pub mutation: M,
394    pub kind: BarrierKind,
395
396    /// Tracing context for the **current** epoch of this barrier.
397    pub tracing_context: TracingContext,
398}
399
400pub type BarrierMutationType = Option<Arc<Mutation>>;
401pub type Barrier = BarrierInner<BarrierMutationType>;
402pub type DispatcherBarrier = BarrierInner<()>;
403
404impl<M: Default> BarrierInner<M> {
405    /// Create a plain barrier.
406    pub fn new_test_barrier(epoch: u64) -> Self {
407        Self {
408            epoch: EpochPair::new_test_epoch(epoch),
409            kind: BarrierKind::Checkpoint,
410            tracing_context: TracingContext::none(),
411            mutation: Default::default(),
412        }
413    }
414
415    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
416        Self {
417            epoch: EpochPair::new(epoch, prev_epoch),
418            kind: BarrierKind::Checkpoint,
419            tracing_context: TracingContext::none(),
420            mutation: Default::default(),
421        }
422    }
423}
424
425impl Barrier {
426    pub fn into_dispatcher(self) -> DispatcherBarrier {
427        DispatcherBarrier {
428            epoch: self.epoch,
429            mutation: (),
430            kind: self.kind,
431            tracing_context: self.tracing_context,
432        }
433    }
434
435    #[must_use]
436    pub fn with_mutation(self, mutation: Mutation) -> Self {
437        Self {
438            mutation: Some(Arc::new(mutation)),
439            ..self
440        }
441    }
442
443    #[must_use]
444    pub fn with_stop(self) -> Self {
445        self.with_mutation(Mutation::Stop(StopMutation {
446            dropped_actors: Default::default(),
447            dropped_sink_fragments: Default::default(),
448        }))
449    }
450
451    /// Whether this barrier carries stop mutation.
452    pub fn is_with_stop_mutation(&self) -> bool {
453        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
454    }
455
456    /// Whether this barrier is to stop the actor with `actor_id`.
457    pub fn is_stop(&self, actor_id: ActorId) -> bool {
458        self.all_stop_actors()
459            .is_some_and(|actors| actors.contains(&actor_id))
460    }
461
462    pub fn is_checkpoint(&self) -> bool {
463        self.kind == BarrierKind::Checkpoint
464    }
465
466    /// Get the initial split assignments for the actor with `actor_id`.
467    ///
468    /// This should only be called on the initial barrier received by the executor. It must be
469    ///
470    /// - `Add` mutation when it's a new streaming job, or recovery.
471    /// - `Update` mutation when it's created for scaling.
472    ///
473    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
474    /// of existing executors.
475    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
476        match self.mutation.as_deref()? {
477            Mutation::Update(UpdateMutation { actor_splits, .. })
478            | Mutation::Add(AddMutation {
479                splits: actor_splits,
480                ..
481            }) => actor_splits.get(&actor_id),
482
483            _ => {
484                if cfg!(debug_assertions) {
485                    panic!(
486                        "the initial mutation of the barrier should not be {:?}",
487                        self.mutation
488                    );
489                }
490                None
491            }
492        }
493        .map(|s| s.as_slice())
494    }
495
496    /// Get all actors that to be stopped (dropped) by this barrier.
497    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
498        match self.mutation.as_deref() {
499            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
500            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
501            _ => None,
502        }
503    }
504
505    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
506    /// `Values` to decide whether to output the existing (historical) data.
507    ///
508    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
509    /// added for scaling are not included.
510    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
511        match self.mutation.as_deref() {
512            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
513                added_actors.contains(&actor_id)
514            }
515            _ => false,
516        }
517    }
518
519    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
520        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
521            fragment_ids.contains(&fragment_id)
522        } else {
523            false
524        }
525    }
526
527    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
528    ///
529    /// # Use case
530    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
531    /// * Pause a standalone shared `SourceExecutor`.
532    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
533    ///
534    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
535    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
536    /// needs to be turned off.
537    ///
538    /// ## Some special cases not included
539    ///
540    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
541    /// care about **number of downstream fragments** (more precisely, existence).
542    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
543    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
544    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
545        let Some(mutation) = self.mutation.as_deref() else {
546            return false;
547        };
548        match mutation {
549            // Add is for mv, index and sink creation.
550            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
551            Mutation::Update(_)
552            | Mutation::Stop(_)
553            | Mutation::Pause
554            | Mutation::Resume
555            | Mutation::SourceChangeSplit(_)
556            | Mutation::Throttle(_)
557            | Mutation::DropSubscriptions { .. }
558            | Mutation::ConnectorPropsChange(_)
559            | Mutation::StartFragmentBackfill { .. }
560            | Mutation::RefreshStart { .. }
561            | Mutation::ListFinish { .. }
562            | Mutation::LoadFinish { .. } => false,
563        }
564    }
565
566    /// Whether this barrier requires the executor to pause its data stream on startup.
567    pub fn is_pause_on_startup(&self) -> bool {
568        match self.mutation.as_deref() {
569            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
570            _ => false,
571        }
572    }
573
574    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
575        match self.mutation.as_deref() {
576            Some(Mutation::Add(AddMutation {
577                backfill_nodes_to_pause,
578                ..
579            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
580            Some(Mutation::Update(_)) => false,
581            _ => {
582                tracing::warn!(
583                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
584                    self
585                );
586                false
587            }
588        }
589    }
590
591    /// Whether this barrier is for resume.
592    pub fn is_resume(&self) -> bool {
593        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
594    }
595
596    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
597    /// with `actor_id`.
598    pub fn as_update_merge(
599        &self,
600        actor_id: ActorId,
601        upstream_fragment_id: UpstreamFragmentId,
602    ) -> Option<&MergeUpdate> {
603        self.mutation
604            .as_deref()
605            .and_then(|mutation| match mutation {
606                Mutation::Update(UpdateMutation { merges, .. }) => {
607                    merges.get(&(actor_id, upstream_fragment_id))
608                }
609                _ => None,
610            })
611    }
612
613    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
614    /// the specified downstream fragment.
615    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
616        self.mutation
617            .as_deref()
618            .and_then(|mutation| match mutation {
619                Mutation::Add(AddMutation {
620                    new_upstream_sinks, ..
621                }) => new_upstream_sinks.get(&fragment_id),
622                _ => None,
623            })
624    }
625
626    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
627    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
628        self.mutation
629            .as_deref()
630            .and_then(|mutation| match mutation {
631                Mutation::Stop(StopMutation {
632                    dropped_sink_fragments,
633                    ..
634                }) => Some(dropped_sink_fragments),
635                _ => None,
636            })
637    }
638
639    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
640    /// with `actor_id`.
641    ///
642    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
643    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
644    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
645        self.mutation
646            .as_deref()
647            .and_then(|mutation| match mutation {
648                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
649                    vnode_bitmaps.get(&actor_id).cloned()
650                }
651                _ => None,
652            })
653    }
654
655    pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
656        self.mutation
657            .as_deref()
658            .and_then(|mutation| match mutation {
659                Mutation::Update(UpdateMutation {
660                    sink_add_columns, ..
661                }) => sink_add_columns.get(&sink_id).cloned(),
662                _ => None,
663            })
664    }
665
666    pub fn get_curr_epoch(&self) -> Epoch {
667        Epoch(self.epoch.curr)
668    }
669
670    /// Retrieve the tracing context for the **current** epoch of this barrier.
671    pub fn tracing_context(&self) -> &TracingContext {
672        &self.tracing_context
673    }
674
675    pub fn added_subscriber_on_mv_table(
676        &self,
677        mv_table_id: TableId,
678    ) -> impl Iterator<Item = SubscriberId> + '_ {
679        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
680            Some(add)
681        } else {
682            None
683        }
684        .into_iter()
685        .flat_map(move |add| {
686            add.subscriptions_to_add.iter().filter_map(
687                move |(upstream_mv_table_id, subscriber_id)| {
688                    if *upstream_mv_table_id == mv_table_id {
689                        Some(*subscriber_id)
690                    } else {
691                        None
692                    }
693                },
694            )
695        })
696    }
697}
698
699impl<M: PartialEq> PartialEq for BarrierInner<M> {
700    fn eq(&self, other: &Self) -> bool {
701        self.epoch == other.epoch && self.mutation == other.mutation
702    }
703}
704
705impl Mutation {
706    /// Return true if the mutation is stop.
707    ///
708    /// Note that this does not mean we will stop the current actor.
709    #[cfg(test)]
710    pub fn is_stop(&self) -> bool {
711        matches!(self, Mutation::Stop(_))
712    }
713
714    #[cfg(test)]
715    fn to_protobuf(&self) -> PbMutation {
716        use risingwave_pb::source::{
717            ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
718        };
719        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
720        use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
721        use risingwave_pb::stream_plan::{
722            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
723            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation, PbSinkAddColumns,
724            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
725            PbThrottleMutation, PbUpdateMutation,
726        };
727        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
728            actor_splits
729                .iter()
730                .map(|(&actor_id, splits)| {
731                    (
732                        actor_id,
733                        ConnectorSplits {
734                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
735                        },
736                    )
737                })
738                .collect::<HashMap<_, _>>()
739        };
740
741        match self {
742            Mutation::Stop(StopMutation {
743                dropped_actors,
744                dropped_sink_fragments,
745            }) => PbMutation::Stop(PbStopMutation {
746                actors: dropped_actors.iter().copied().collect(),
747                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
748            }),
749            Mutation::Update(UpdateMutation {
750                dispatchers,
751                merges,
752                vnode_bitmaps,
753                dropped_actors,
754                actor_splits,
755                actor_new_dispatchers,
756                actor_cdc_table_snapshot_splits,
757                sink_add_columns,
758            }) => PbMutation::Update(PbUpdateMutation {
759                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
760                merge_update: merges.values().cloned().collect(),
761                actor_vnode_bitmap_update: vnode_bitmaps
762                    .iter()
763                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
764                    .collect(),
765                dropped_actors: dropped_actors.iter().copied().collect(),
766                actor_splits: actor_splits_to_protobuf(actor_splits),
767                actor_new_dispatchers: actor_new_dispatchers
768                    .iter()
769                    .map(|(&actor_id, dispatchers)| {
770                        (
771                            actor_id,
772                            PbDispatchers {
773                                dispatchers: dispatchers.clone(),
774                            },
775                        )
776                    })
777                    .collect(),
778                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
779                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
780                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
781                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
782                            generation: *generation,
783                        })
784                    }).collect()
785                }),
786                sink_add_columns: sink_add_columns
787                    .iter()
788                    .map(|(sink_id, add_columns)| {
789                        (
790                            *sink_id,
791                            PbSinkAddColumns {
792                                fields: add_columns.iter().map(|field| field.to_prost()).collect(),
793                            },
794                        )
795                    })
796                    .collect(),
797            }),
798            Mutation::Add(AddMutation {
799                adds,
800                added_actors,
801                splits,
802                pause,
803                subscriptions_to_add,
804                backfill_nodes_to_pause,
805                actor_cdc_table_snapshot_splits,
806                new_upstream_sinks,
807            }) => PbMutation::Add(PbAddMutation {
808                actor_dispatchers: adds
809                    .iter()
810                    .map(|(&actor_id, dispatchers)| {
811                        (
812                            actor_id,
813                            PbDispatchers {
814                                dispatchers: dispatchers.clone(),
815                            },
816                        )
817                    })
818                    .collect(),
819                added_actors: added_actors.iter().copied().collect(),
820                actor_splits: actor_splits_to_protobuf(splits),
821                pause: *pause,
822                subscriptions_to_add: subscriptions_to_add
823                    .iter()
824                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
825                        subscriber_id: *subscriber_id,
826                        upstream_mv_table_id: *table_id,
827                    })
828                    .collect(),
829                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
830                actor_cdc_table_snapshot_splits:
831                Some(PbCdcTableSnapshotSplitsWithGeneration {
832                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
833                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
834                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
835                            generation: *generation,
836                        })
837                    }).collect()
838                }),
839                new_upstream_sinks: new_upstream_sinks
840                    .iter()
841                    .map(|(k, v)| (*k, v.clone()))
842                    .collect(),
843            }),
844            Mutation::SourceChangeSplit(changes) => {
845                PbMutation::Splits(PbSourceChangeSplitMutation {
846                    actor_splits: changes
847                        .iter()
848                        .map(|(&actor_id, splits)| {
849                            (
850                                actor_id,
851                                ConnectorSplits {
852                                    splits: splits
853                                        .clone()
854                                        .iter()
855                                        .map(ConnectorSplit::from)
856                                        .collect(),
857                                },
858                            )
859                        })
860                        .collect(),
861                })
862            }
863            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
864            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
865            Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
866                actor_throttle: changes
867                    .iter()
868                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
869                    .collect(),
870            }),
871            Mutation::DropSubscriptions {
872                subscriptions_to_drop,
873            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
874                info: subscriptions_to_drop
875                    .iter()
876                    .map(
877                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
878                            subscriber_id: *subscriber_id,
879                            upstream_mv_table_id: *upstream_mv_table_id,
880                        },
881                    )
882                    .collect(),
883            }),
884            Mutation::ConnectorPropsChange(map) => {
885                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
886                    connector_props_infos: map
887                        .iter()
888                        .map(|(actor_id, options)| {
889                            (
890                                *actor_id,
891                                ConnectorPropsInfo {
892                                    connector_props_info: options
893                                        .iter()
894                                        .map(|(k, v)| (k.clone(), v.clone()))
895                                        .collect(),
896                                },
897                            )
898                        })
899                        .collect(),
900                })
901            }
902            Mutation::StartFragmentBackfill { fragment_ids } => {
903                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
904                    fragment_ids: fragment_ids.iter().copied().collect(),
905                })
906            }
907            Mutation::RefreshStart {
908                table_id,
909                associated_source_id,
910            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
911                table_id: *table_id,
912                associated_source_id: *associated_source_id,
913            }),
914            Mutation::ListFinish {
915                associated_source_id,
916            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
917                associated_source_id: *associated_source_id,
918            }),
919            Mutation::LoadFinish {
920                associated_source_id,
921            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
922                associated_source_id: *associated_source_id,
923            }),
924        }
925    }
926
927    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
928        let mutation = match prost {
929            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
930                dropped_actors: stop.actors.iter().copied().collect(),
931                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
932            }),
933
934            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
935                dispatchers: update
936                    .dispatcher_update
937                    .iter()
938                    .map(|u| (u.actor_id, u.clone()))
939                    .into_group_map(),
940                merges: update
941                    .merge_update
942                    .iter()
943                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
944                    .collect(),
945                vnode_bitmaps: update
946                    .actor_vnode_bitmap_update
947                    .iter()
948                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
949                    .collect(),
950                dropped_actors: update.dropped_actors.iter().copied().collect(),
951                actor_splits: update
952                    .actor_splits
953                    .iter()
954                    .map(|(&actor_id, splits)| {
955                        (
956                            actor_id,
957                            splits
958                                .splits
959                                .iter()
960                                .map(|split| split.try_into().unwrap())
961                                .collect(),
962                        )
963                    })
964                    .collect(),
965                actor_new_dispatchers: update
966                    .actor_new_dispatchers
967                    .iter()
968                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
969                    .collect(),
970                actor_cdc_table_snapshot_splits:
971                    build_actor_cdc_table_snapshot_splits_with_generation(
972                        update
973                            .actor_cdc_table_snapshot_splits
974                            .clone()
975                            .unwrap_or_default(),
976                    ),
977                sink_add_columns: update
978                    .sink_add_columns
979                    .iter()
980                    .map(|(sink_id, add_columns)| {
981                        (
982                            *sink_id,
983                            add_columns.fields.iter().map(Field::from_prost).collect(),
984                        )
985                    })
986                    .collect(),
987            }),
988
989            PbMutation::Add(add) => Mutation::Add(AddMutation {
990                adds: add
991                    .actor_dispatchers
992                    .iter()
993                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
994                    .collect(),
995                added_actors: add.added_actors.iter().copied().collect(),
996                // TODO: remove this and use `SourceChangesSplit` after we support multiple
997                // mutations.
998                splits: add
999                    .actor_splits
1000                    .iter()
1001                    .map(|(&actor_id, splits)| {
1002                        (
1003                            actor_id,
1004                            splits
1005                                .splits
1006                                .iter()
1007                                .map(|split| split.try_into().unwrap())
1008                                .collect(),
1009                        )
1010                    })
1011                    .collect(),
1012                pause: add.pause,
1013                subscriptions_to_add: add
1014                    .subscriptions_to_add
1015                    .iter()
1016                    .map(
1017                        |SubscriptionUpstreamInfo {
1018                             subscriber_id,
1019                             upstream_mv_table_id,
1020                         }| { (*upstream_mv_table_id, *subscriber_id) },
1021                    )
1022                    .collect(),
1023                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1024                actor_cdc_table_snapshot_splits:
1025                    build_actor_cdc_table_snapshot_splits_with_generation(
1026                        add.actor_cdc_table_snapshot_splits
1027                            .clone()
1028                            .unwrap_or_default(),
1029                    ),
1030                new_upstream_sinks: add
1031                    .new_upstream_sinks
1032                    .iter()
1033                    .map(|(k, v)| (*k, v.clone()))
1034                    .collect(),
1035            }),
1036
1037            PbMutation::Splits(s) => {
1038                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1039                    Vec::with_capacity(s.actor_splits.len());
1040                for (&actor_id, splits) in &s.actor_splits {
1041                    if !splits.splits.is_empty() {
1042                        change_splits.push((
1043                            actor_id,
1044                            splits
1045                                .splits
1046                                .iter()
1047                                .map(SplitImpl::try_from)
1048                                .try_collect()?,
1049                        ));
1050                    }
1051                }
1052                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1053            }
1054            PbMutation::Pause(_) => Mutation::Pause,
1055            PbMutation::Resume(_) => Mutation::Resume,
1056            PbMutation::Throttle(changes) => Mutation::Throttle(
1057                changes
1058                    .actor_throttle
1059                    .iter()
1060                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1061                    .collect(),
1062            ),
1063            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1064                subscriptions_to_drop: drop
1065                    .info
1066                    .iter()
1067                    .map(|info| (info.subscriber_id, info.upstream_mv_table_id))
1068                    .collect(),
1069            },
1070            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1071                Mutation::ConnectorPropsChange(
1072                    alter_connector_props
1073                        .connector_props_infos
1074                        .iter()
1075                        .map(|(connector_id, options)| {
1076                            (
1077                                *connector_id,
1078                                options
1079                                    .connector_props_info
1080                                    .iter()
1081                                    .map(|(k, v)| (k.clone(), v.clone()))
1082                                    .collect(),
1083                            )
1084                        })
1085                        .collect(),
1086                )
1087            }
1088            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1089                Mutation::StartFragmentBackfill {
1090                    fragment_ids: start_fragment_backfill
1091                        .fragment_ids
1092                        .iter()
1093                        .copied()
1094                        .collect(),
1095                }
1096            }
1097            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1098                table_id: refresh_start.table_id,
1099                associated_source_id: refresh_start.associated_source_id,
1100            },
1101            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1102                associated_source_id: list_finish.associated_source_id,
1103            },
1104            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1105                associated_source_id: load_finish.associated_source_id,
1106            },
1107        };
1108        Ok(mutation)
1109    }
1110}
1111
1112impl<M> BarrierInner<M> {
1113    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1114        let Self {
1115            epoch,
1116            mutation,
1117            kind,
1118            tracing_context,
1119            ..
1120        } = self;
1121
1122        PbBarrier {
1123            epoch: Some(PbEpoch {
1124                curr: epoch.curr,
1125                prev: epoch.prev,
1126            }),
1127            mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1128                mutation: Some(mutation),
1129            }),
1130            tracing_context: tracing_context.to_protobuf(),
1131            kind: *kind as _,
1132        }
1133    }
1134
1135    fn from_protobuf_inner(
1136        prost: &PbBarrier,
1137        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1138    ) -> StreamExecutorResult<Self> {
1139        let epoch = prost.get_epoch()?;
1140
1141        Ok(Self {
1142            kind: prost.kind(),
1143            epoch: EpochPair::new(epoch.curr, epoch.prev),
1144            mutation: mutation_from_pb(
1145                (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1146            )?,
1147            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1148        })
1149    }
1150
1151    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1152        BarrierInner {
1153            epoch: self.epoch,
1154            mutation: f(self.mutation),
1155            kind: self.kind,
1156            tracing_context: self.tracing_context,
1157        }
1158    }
1159}
1160
1161impl DispatcherBarrier {
1162    pub fn to_protobuf(&self) -> PbBarrier {
1163        self.to_protobuf_inner(|_| None)
1164    }
1165}
1166
1167impl Barrier {
1168    #[cfg(test)]
1169    pub fn to_protobuf(&self) -> PbBarrier {
1170        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1171    }
1172
1173    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1174        Self::from_protobuf_inner(prost, |mutation| {
1175            mutation
1176                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1177                .transpose()
1178        })
1179    }
1180}
1181
1182#[derive(Debug, PartialEq, Eq, Clone)]
1183pub struct Watermark {
1184    pub col_idx: usize,
1185    pub data_type: DataType,
1186    pub val: ScalarImpl,
1187}
1188
1189impl PartialOrd for Watermark {
1190    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1191        Some(self.cmp(other))
1192    }
1193}
1194
1195impl Ord for Watermark {
1196    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1197        self.val.default_cmp(&other.val)
1198    }
1199}
1200
1201impl Watermark {
1202    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1203        Self {
1204            col_idx,
1205            data_type,
1206            val,
1207        }
1208    }
1209
1210    pub async fn transform_with_expr(
1211        self,
1212        expr: &NonStrictExpression<impl Expression>,
1213        new_col_idx: usize,
1214    ) -> Option<Self> {
1215        let Self { col_idx, val, .. } = self;
1216        let row = {
1217            let mut row = vec![None; col_idx + 1];
1218            row[col_idx] = Some(val);
1219            OwnedRow::new(row)
1220        };
1221        let val = expr.eval_row_infallible(&row).await?;
1222        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1223    }
1224
1225    /// Transform the watermark with the given output indices. If this watermark is not in the
1226    /// output, return `None`.
1227    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1228        output_indices
1229            .iter()
1230            .position(|p| *p == self.col_idx)
1231            .map(|new_col_idx| self.with_idx(new_col_idx))
1232    }
1233
1234    pub fn to_protobuf(&self) -> PbWatermark {
1235        PbWatermark {
1236            column: Some(PbInputRef {
1237                index: self.col_idx as _,
1238                r#type: Some(self.data_type.to_protobuf()),
1239            }),
1240            val: Some(&self.val).to_protobuf().into(),
1241        }
1242    }
1243
1244    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1245        let col_ref = prost.get_column()?;
1246        let data_type = DataType::from(col_ref.get_type()?);
1247        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1248            .expect("watermark value cannot be null");
1249        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1250    }
1251
1252    pub fn with_idx(self, idx: usize) -> Self {
1253        Self::new(idx, self.data_type, self.val)
1254    }
1255}
1256
1257#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1258pub enum MessageInner<M> {
1259    Chunk(StreamChunk),
1260    Barrier(BarrierInner<M>),
1261    Watermark(Watermark),
1262}
1263
1264impl<M> MessageInner<M> {
1265    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1266        match self {
1267            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1268            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1269            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1270        }
1271    }
1272}
1273
1274pub type Message = MessageInner<BarrierMutationType>;
1275pub type DispatcherMessage = MessageInner<()>;
1276
1277/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1278/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1279#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1280pub enum MessageBatchInner<M> {
1281    Chunk(StreamChunk),
1282    BarrierBatch(Vec<BarrierInner<M>>),
1283    Watermark(Watermark),
1284}
1285pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1286pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1287pub type DispatcherMessageBatch = MessageBatchInner<()>;
1288
1289impl From<DispatcherMessage> for DispatcherMessageBatch {
1290    fn from(m: DispatcherMessage) -> Self {
1291        match m {
1292            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1293            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1294            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1295        }
1296    }
1297}
1298
1299impl From<StreamChunk> for Message {
1300    fn from(chunk: StreamChunk) -> Self {
1301        Message::Chunk(chunk)
1302    }
1303}
1304
1305impl<'a> TryFrom<&'a Message> for &'a Barrier {
1306    type Error = ();
1307
1308    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1309        match m {
1310            Message::Chunk(_) => Err(()),
1311            Message::Barrier(b) => Ok(b),
1312            Message::Watermark(_) => Err(()),
1313        }
1314    }
1315}
1316
1317impl Message {
1318    /// Return true if the message is a stop barrier, meaning the stream
1319    /// will not continue, false otherwise.
1320    ///
1321    /// Note that this does not mean we will stop the current actor.
1322    #[cfg(test)]
1323    pub fn is_stop(&self) -> bool {
1324        matches!(
1325            self,
1326            Message::Barrier(Barrier {
1327                mutation,
1328                ..
1329            }) if mutation.as_ref().unwrap().is_stop()
1330        )
1331    }
1332}
1333
1334impl DispatcherMessageBatch {
1335    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1336        let prost = match self {
1337            Self::Chunk(stream_chunk) => {
1338                let prost_stream_chunk = stream_chunk.to_protobuf();
1339                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1340            }
1341            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1342                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1343            }),
1344            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1345        };
1346        PbStreamMessageBatch {
1347            stream_message_batch: Some(prost),
1348        }
1349    }
1350
1351    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1352        let res = match prost.get_stream_message_batch()? {
1353            StreamMessageBatch::StreamChunk(chunk) => {
1354                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1355            }
1356            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1357                let barriers = barrier_batch
1358                    .barriers
1359                    .iter()
1360                    .map(|barrier| {
1361                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1362                            if mutation.is_some() {
1363                                if cfg!(debug_assertions) {
1364                                    panic!("should not receive message of barrier with mutation");
1365                                } else {
1366                                    warn!(?barrier, "receive message of barrier with mutation");
1367                                }
1368                            }
1369                            Ok(())
1370                        })
1371                    })
1372                    .try_collect()?;
1373                Self::BarrierBatch(barriers)
1374            }
1375            StreamMessageBatch::Watermark(watermark) => {
1376                Self::Watermark(Watermark::from_protobuf(watermark)?)
1377            }
1378        };
1379        Ok(res)
1380    }
1381
1382    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1383        ::prost::Message::encoded_len(msg)
1384    }
1385}
1386
1387pub type StreamKey = Vec<usize>;
1388pub type StreamKeyRef<'a> = &'a [usize];
1389pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1390
1391/// Expect the first message of the given `stream` as a barrier.
1392pub async fn expect_first_barrier<M: Debug>(
1393    stream: &mut (impl MessageStreamInner<M> + Unpin),
1394) -> StreamExecutorResult<BarrierInner<M>> {
1395    let message = stream
1396        .next()
1397        .instrument_await("expect_first_barrier")
1398        .await
1399        .context("failed to extract the first message: stream closed unexpectedly")??;
1400    let barrier = message
1401        .into_barrier()
1402        .expect("the first message must be a barrier");
1403    // TODO: Is this check correct?
1404    assert!(matches!(
1405        barrier.kind,
1406        BarrierKind::Checkpoint | BarrierKind::Initial
1407    ));
1408    Ok(barrier)
1409}
1410
1411/// Expect the first message of the given `stream` as a barrier.
1412pub async fn expect_first_barrier_from_aligned_stream(
1413    stream: &mut (impl AlignedMessageStream + Unpin),
1414) -> StreamExecutorResult<Barrier> {
1415    let message = stream
1416        .next()
1417        .instrument_await("expect_first_barrier")
1418        .await
1419        .context("failed to extract the first message: stream closed unexpectedly")??;
1420    let barrier = message
1421        .into_barrier()
1422        .expect("the first message must be a barrier");
1423    Ok(barrier)
1424}
1425
1426/// `StreamConsumer` is the last step in an actor.
1427pub trait StreamConsumer: Send + 'static {
1428    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1429
1430    fn execute(self: Box<Self>) -> Self::BarrierStream;
1431}
1432
1433type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1434
1435/// A stream for merging messages from multiple upstreams.
1436/// Can dynamically add and delete upstream streams.
1437/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1438pub struct DynamicReceivers<InputId, M> {
1439    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1440    barrier: Option<BarrierInner<M>>,
1441    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1442    start_ts: Option<Instant>,
1443    /// The upstreams that're blocked by the `barrier`.
1444    blocked: Vec<BoxedMessageInput<InputId, M>>,
1445    /// The upstreams that're not blocked and can be polled.
1446    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1447    /// watermark column index -> `BufferedWatermarks`
1448    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1449    /// Currently only used for union.
1450    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1451    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1452    merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1453}
1454
1455impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1456    for DynamicReceivers<InputId, M>
1457{
1458    type Item = MessageStreamItemInner<M>;
1459
1460    fn poll_next(
1461        mut self: Pin<&mut Self>,
1462        cx: &mut std::task::Context<'_>,
1463    ) -> Poll<Option<Self::Item>> {
1464        if self.is_empty() {
1465            return Poll::Ready(None);
1466        }
1467
1468        loop {
1469            match futures::ready!(self.active.poll_next_unpin(cx)) {
1470                // Directly forward the error.
1471                Some((Some(Err(e)), _)) => {
1472                    return Poll::Ready(Some(Err(e)));
1473                }
1474                // Handle the message from some upstream.
1475                Some((Some(Ok(message)), remaining)) => {
1476                    let input_id = remaining.id();
1477                    match message {
1478                        MessageInner::Chunk(chunk) => {
1479                            // Continue polling this upstream by pushing it back to `active`.
1480                            self.active.push(remaining.into_future());
1481                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1482                        }
1483                        MessageInner::Watermark(watermark) => {
1484                            // Continue polling this upstream by pushing it back to `active`.
1485                            self.active.push(remaining.into_future());
1486                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1487                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1488                            }
1489                        }
1490                        MessageInner::Barrier(barrier) => {
1491                            // Block this upstream by pushing it to `blocked`.
1492                            if self.blocked.is_empty() {
1493                                self.start_ts = Some(Instant::now());
1494                            }
1495                            self.blocked.push(remaining);
1496                            if let Some(current_barrier) = self.barrier.as_ref() {
1497                                if current_barrier.epoch != barrier.epoch {
1498                                    return Poll::Ready(Some(Err(
1499                                        StreamExecutorError::align_barrier(
1500                                            current_barrier.clone().map_mutation(|_| None),
1501                                            barrier.map_mutation(|_| None),
1502                                        ),
1503                                    )));
1504                                }
1505                            } else {
1506                                self.barrier = Some(barrier);
1507                            }
1508                        }
1509                    }
1510                }
1511                // We use barrier as the control message of the stream. That is, we always stop the
1512                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1513                // termination.
1514                //
1515                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1516                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1517                // So this branch will never be reached in all cases.
1518                Some((None, remaining)) => {
1519                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1520                        "upstream input {:?} unexpectedly closed",
1521                        remaining.id()
1522                    )))));
1523                }
1524                // There's no active upstreams. Process the barrier and resume the blocked ones.
1525                None => {
1526                    assert!(!self.blocked.is_empty());
1527
1528                    let start_ts = self
1529                        .start_ts
1530                        .take()
1531                        .expect("should have received at least one barrier");
1532                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1533                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1534                    }
1535                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1536                        merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1537                    }
1538
1539                    break;
1540                }
1541            }
1542        }
1543
1544        assert!(self.active.is_terminated());
1545
1546        let barrier = self.barrier.take().unwrap();
1547
1548        let upstreams = std::mem::take(&mut self.blocked);
1549        self.extend_active(upstreams);
1550        assert!(!self.active.is_terminated());
1551
1552        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1553    }
1554}
1555
1556impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1557    pub fn new(
1558        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1559        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1560        merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1561    ) -> Self {
1562        let mut this = Self {
1563            barrier: None,
1564            start_ts: None,
1565            blocked: Vec::with_capacity(upstreams.len()),
1566            active: Default::default(),
1567            buffered_watermarks: Default::default(),
1568            merge_barrier_align_duration,
1569            barrier_align_duration,
1570        };
1571        this.extend_active(upstreams);
1572        this
1573    }
1574
1575    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1576    /// clean state right after a barrier.
1577    pub fn extend_active(
1578        &mut self,
1579        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1580    ) {
1581        assert!(self.blocked.is_empty() && self.barrier.is_none());
1582
1583        self.active
1584            .extend(upstreams.into_iter().map(|s| s.into_future()));
1585    }
1586
1587    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1588    pub fn handle_watermark(
1589        &mut self,
1590        input_id: InputId,
1591        watermark: Watermark,
1592    ) -> Option<Watermark> {
1593        let col_idx = watermark.col_idx;
1594        // Insert a buffer watermarks when first received from a column.
1595        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1596        let watermarks = self
1597            .buffered_watermarks
1598            .entry(col_idx)
1599            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1600        watermarks.handle_watermark(input_id, watermark)
1601    }
1602
1603    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1604    /// right after a barrier.
1605    pub fn add_upstreams_from(
1606        &mut self,
1607        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1608    ) {
1609        assert!(self.blocked.is_empty() && self.barrier.is_none());
1610
1611        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1612        let input_ids = new_inputs.iter().map(|input| input.id());
1613        self.buffered_watermarks.values_mut().for_each(|buffers| {
1614            // Add buffers to the buffered watermarks for all cols
1615            buffers.add_buffers(input_ids.clone());
1616        });
1617        self.active
1618            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1619    }
1620
1621    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1622    /// clean state right after a barrier.
1623    /// The current container does not necessarily contain all the input ids passed in.
1624    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1625        assert!(self.blocked.is_empty() && self.barrier.is_none());
1626
1627        let new_upstreams = std::mem::take(&mut self.active)
1628            .into_iter()
1629            .map(|s| s.into_inner().unwrap())
1630            .filter(|u| !upstream_input_ids.contains(&u.id()));
1631        self.extend_active(new_upstreams);
1632        self.buffered_watermarks.values_mut().for_each(|buffers| {
1633            // Call `check_heap` in case the only upstream(s) that does not have
1634            // watermark in heap is removed
1635            buffers.remove_buffer(upstream_input_ids.clone());
1636        });
1637    }
1638
1639    pub fn merge_barrier_align_duration(
1640        &self,
1641    ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1642        self.merge_barrier_align_duration.clone()
1643    }
1644
1645    pub fn flush_buffered_watermarks(&mut self) {
1646        self.buffered_watermarks
1647            .values_mut()
1648            .for_each(|buffers| buffers.clear());
1649    }
1650
1651    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1652        self.blocked
1653            .iter()
1654            .map(|s| s.id())
1655            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1656    }
1657
1658    pub fn is_empty(&self) -> bool {
1659        self.blocked.is_empty() && self.active.is_empty()
1660    }
1661}
1662
1663// Explanation of why we need `DispatchBarrierBuffer`:
1664//
1665// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1666// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1667// for these old upstreams to completely process their barriers and align before we can safely update the
1668// `upstream-input-set`.
1669//
1670// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1671// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1672// downstream_merge_update -> old_actor_processing]
1673//
1674// To address this, we split the application of a barrier's `Mutation` into two steps:
1675// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1676//    and cache it.
1677// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1678//
1679// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1680// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1681// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1682// `barrier_rx`.
1683pub(crate) struct DispatchBarrierBuffer {
1684    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1685    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1686    recv_state: BarrierReceiverState,
1687    curr_upstream_fragment_id: FragmentId,
1688    actor_id: ActorId,
1689    // read-only context for building new inputs
1690    build_input_ctx: Arc<BuildInputContext>,
1691}
1692
1693struct BuildInputContext {
1694    pub actor_id: ActorId,
1695    pub local_barrier_manager: LocalBarrierManager,
1696    pub metrics: Arc<StreamingMetrics>,
1697    pub fragment_id: FragmentId,
1698    pub actor_config: Arc<StreamingConfig>,
1699}
1700
1701type BoxedNewInputsFuture =
1702    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1703
1704enum BarrierReceiverState {
1705    ReceivingBarrier,
1706    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1707}
1708
1709impl DispatchBarrierBuffer {
1710    pub fn new(
1711        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1712        actor_id: ActorId,
1713        curr_upstream_fragment_id: FragmentId,
1714        local_barrier_manager: LocalBarrierManager,
1715        metrics: Arc<StreamingMetrics>,
1716        fragment_id: FragmentId,
1717        actor_config: Arc<StreamingConfig>,
1718    ) -> Self {
1719        Self {
1720            buffer: VecDeque::new(),
1721            barrier_rx,
1722            recv_state: BarrierReceiverState::ReceivingBarrier,
1723            curr_upstream_fragment_id,
1724            actor_id,
1725            build_input_ctx: Arc::new(BuildInputContext {
1726                actor_id,
1727                local_barrier_manager,
1728                metrics,
1729                fragment_id,
1730                actor_config,
1731            }),
1732        }
1733    }
1734
1735    pub async fn await_next_message(
1736        &mut self,
1737        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1738        metrics: &ActorInputMetrics,
1739    ) -> StreamExecutorResult<DispatcherMessage> {
1740        let mut start_time = Instant::now();
1741        let interval_duration = Duration::from_secs(15);
1742        let mut interval =
1743            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1744
1745        loop {
1746            tokio::select! {
1747                biased;
1748                msg = stream.try_next() => {
1749                    metrics
1750                        .actor_input_buffer_blocking_duration_ns
1751                        .inc_by(start_time.elapsed().as_nanos() as u64);
1752                    return msg?.ok_or_else(
1753                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1754                    );
1755                }
1756
1757                e = self.continuously_fetch_barrier_rx() => {
1758                    return Err(e);
1759                }
1760
1761                _ = interval.tick() => {
1762                    start_time = Instant::now();
1763                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1764                    continue;
1765                }
1766            }
1767        }
1768    }
1769
1770    pub async fn pop_barrier_with_inputs(
1771        &mut self,
1772        barrier: DispatcherBarrier,
1773    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1774        while self.buffer.is_empty() {
1775            self.try_fetch_barrier_rx(false).await?;
1776        }
1777        let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1778        assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1779
1780        Ok((recv_barrier, inputs))
1781    }
1782
1783    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1784        loop {
1785            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1786                return e;
1787            }
1788        }
1789    }
1790
1791    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1792        match &mut self.recv_state {
1793            BarrierReceiverState::ReceivingBarrier => {
1794                let Some(barrier) = self.barrier_rx.recv().await else {
1795                    if pending_on_end {
1796                        return pending().await;
1797                    } else {
1798                        return Err(StreamExecutorError::channel_closed(
1799                            "barrier channel closed unexpectedly",
1800                        ));
1801                    }
1802                };
1803                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1804                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1805                } else {
1806                    self.buffer.push_back((barrier, None));
1807                }
1808            }
1809            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1810                let new_inputs = fut.await?;
1811                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1812                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1813            }
1814        }
1815        Ok(())
1816    }
1817
1818    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1819        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1820            && !update.added_upstream_actors.is_empty()
1821        {
1822            // When update upstream fragment, added_actors will not be empty.
1823            let upstream_fragment_id =
1824                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1825                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1826                    new_upstream_fragment_id
1827                } else {
1828                    self.curr_upstream_fragment_id
1829                };
1830            let ctx = self.build_input_ctx.clone();
1831            let added_upstream_actors = update.added_upstream_actors.clone();
1832            let barrier = barrier.clone();
1833            let fut = async move {
1834                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1835                    let mut new_input = new_input(
1836                        &ctx.local_barrier_manager,
1837                        ctx.metrics.clone(),
1838                        ctx.actor_id,
1839                        ctx.fragment_id,
1840                        upstream_actor,
1841                        upstream_fragment_id,
1842                        ctx.actor_config.clone(),
1843                    )
1844                    .await?;
1845
1846                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1847                    // original upstreams.
1848                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1849                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1850
1851                    StreamExecutorResult::Ok(new_input)
1852                }))
1853                .await
1854            }
1855            .boxed();
1856
1857            Some(fut)
1858        } else {
1859            None
1860        }
1861    }
1862}