risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53    PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, PbWatermark,
54    SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62    BoxedActorInput, BoxedInput, apply_dispatcher_barrier, assert_equal_dispatcher_barrier,
63    new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector_index;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::{DispatchExecutor, DispatcherImpl};
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::*;
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector_index::VectorIndexWriteExecutor;
184pub use watermark_filter::WatermarkFilterExecutor;
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197    CdcTableSnapshotSplitAssignmentWithGeneration,
198    build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
201
202pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
203pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
204pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
205
206/// Static information of an executor.
207#[derive(Debug, Default, Clone)]
208pub struct ExecutorInfo {
209    /// The schema of the OUTPUT of the executor.
210    pub schema: Schema,
211
212    /// The stream key indices of the OUTPUT of the executor.
213    pub stream_key: StreamKey,
214
215    /// The stream kind of the OUTPUT of the executor.
216    pub stream_kind: PbStreamKind,
217
218    /// Identity of the executor.
219    pub identity: String,
220
221    /// The executor id of the executor.
222    pub id: u64,
223}
224
225impl ExecutorInfo {
226    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
227        Self {
228            schema,
229            stream_key,
230            stream_kind: PbStreamKind::Retract, // dummy value for test
231            identity,
232            id,
233        }
234    }
235}
236
237/// [`Execute`] describes the methods an executor should implement to handle control messages.
238pub trait Execute: Send + 'static {
239    fn execute(self: Box<Self>) -> BoxedMessageStream;
240
241    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
242        self.execute()
243    }
244
245    fn boxed(self) -> Box<dyn Execute>
246    where
247        Self: Sized + Send + 'static,
248    {
249        Box::new(self)
250    }
251}
252
253/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
254/// handle messages ([`Execute`]).
255pub struct Executor {
256    info: ExecutorInfo,
257    execute: Box<dyn Execute>,
258}
259
260impl Executor {
261    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
262        Self { info, execute }
263    }
264
265    pub fn info(&self) -> &ExecutorInfo {
266        &self.info
267    }
268
269    pub fn schema(&self) -> &Schema {
270        &self.info.schema
271    }
272
273    pub fn stream_key(&self) -> StreamKeyRef<'_> {
274        &self.info.stream_key
275    }
276
277    pub fn stream_kind(&self) -> PbStreamKind {
278        self.info.stream_kind
279    }
280
281    pub fn identity(&self) -> &str {
282        &self.info.identity
283    }
284
285    pub fn execute(self) -> BoxedMessageStream {
286        self.execute.execute()
287    }
288
289    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
290        self.execute.execute_with_epoch(epoch)
291    }
292}
293
294impl std::fmt::Debug for Executor {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        f.write_str(self.identity())
297    }
298}
299
300impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
301    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
302        Self::new(info, execute)
303    }
304}
305
306impl<E> From<(ExecutorInfo, E)> for Executor
307where
308    E: Execute,
309{
310    fn from((info, execute): (ExecutorInfo, E)) -> Self {
311        Self::new(info, execute.boxed())
312    }
313}
314
315pub const INVALID_EPOCH: u64 = 0;
316
317type UpstreamFragmentId = FragmentId;
318type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
319
320#[derive(Debug, Clone, PartialEq)]
321#[cfg_attr(any(test, feature = "test"), derive(Default))]
322pub struct UpdateMutation {
323    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
324    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
325    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
326    pub dropped_actors: HashSet<ActorId>,
327    pub actor_splits: SplitAssignments,
328    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
329    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
330    pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
331}
332
333#[derive(Debug, Clone, PartialEq)]
334#[cfg_attr(any(test, feature = "test"), derive(Default))]
335pub struct AddMutation {
336    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
337    pub added_actors: HashSet<ActorId>,
338    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
339    pub splits: SplitAssignments,
340    pub pause: bool,
341    /// (`upstream_mv_table_id`,  `subscriber_id`)
342    pub subscriptions_to_add: Vec<(TableId, u32)>,
343    /// nodes which should start backfill
344    pub backfill_nodes_to_pause: HashSet<FragmentId>,
345    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
346    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
347}
348
349#[derive(Debug, Clone, PartialEq)]
350#[cfg_attr(any(test, feature = "test"), derive(Default))]
351pub struct StopMutation {
352    pub dropped_actors: HashSet<ActorId>,
353    pub dropped_sink_fragments: HashSet<FragmentId>,
354}
355
356/// See [`PbMutation`] for the semantics of each mutation.
357#[derive(Debug, Clone, PartialEq)]
358pub enum Mutation {
359    Stop(StopMutation),
360    Update(UpdateMutation),
361    Add(AddMutation),
362    SourceChangeSplit(SplitAssignments),
363    Pause,
364    Resume,
365    Throttle(HashMap<ActorId, Option<u32>>),
366    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
367    DropSubscriptions {
368        /// `subscriber` -> `upstream_mv_table_id`
369        subscriptions_to_drop: Vec<(u32, TableId)>,
370    },
371    StartFragmentBackfill {
372        fragment_ids: HashSet<FragmentId>,
373    },
374    RefreshStart {
375        table_id: TableId,
376        associated_source_id: SourceId,
377    },
378    ListFinish {
379        associated_source_id: SourceId,
380    },
381    LoadFinish {
382        associated_source_id: SourceId,
383    },
384}
385
386/// The generic type `M` is the mutation type of the barrier.
387///
388/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
389/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
390#[derive(Debug, Clone)]
391pub struct BarrierInner<M> {
392    pub epoch: EpochPair,
393    pub mutation: M,
394    pub kind: BarrierKind,
395
396    /// Tracing context for the **current** epoch of this barrier.
397    pub tracing_context: TracingContext,
398
399    /// The actors that this barrier has passed locally. Used for debugging only.
400    pub passed_actors: Vec<ActorId>,
401}
402
403pub type BarrierMutationType = Option<Arc<Mutation>>;
404pub type Barrier = BarrierInner<BarrierMutationType>;
405pub type DispatcherBarrier = BarrierInner<()>;
406
407impl<M: Default> BarrierInner<M> {
408    /// Create a plain barrier.
409    pub fn new_test_barrier(epoch: u64) -> Self {
410        Self {
411            epoch: EpochPair::new_test_epoch(epoch),
412            kind: BarrierKind::Checkpoint,
413            tracing_context: TracingContext::none(),
414            mutation: Default::default(),
415            passed_actors: Default::default(),
416        }
417    }
418
419    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
420        Self {
421            epoch: EpochPair::new(epoch, prev_epoch),
422            kind: BarrierKind::Checkpoint,
423            tracing_context: TracingContext::none(),
424            mutation: Default::default(),
425            passed_actors: Default::default(),
426        }
427    }
428}
429
430impl Barrier {
431    pub fn into_dispatcher(self) -> DispatcherBarrier {
432        DispatcherBarrier {
433            epoch: self.epoch,
434            mutation: (),
435            kind: self.kind,
436            tracing_context: self.tracing_context,
437            passed_actors: self.passed_actors,
438        }
439    }
440
441    #[must_use]
442    pub fn with_mutation(self, mutation: Mutation) -> Self {
443        Self {
444            mutation: Some(Arc::new(mutation)),
445            ..self
446        }
447    }
448
449    #[must_use]
450    pub fn with_stop(self) -> Self {
451        self.with_mutation(Mutation::Stop(StopMutation {
452            dropped_actors: Default::default(),
453            dropped_sink_fragments: Default::default(),
454        }))
455    }
456
457    /// Whether this barrier carries stop mutation.
458    pub fn is_with_stop_mutation(&self) -> bool {
459        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
460    }
461
462    /// Whether this barrier is to stop the actor with `actor_id`.
463    pub fn is_stop(&self, actor_id: ActorId) -> bool {
464        self.all_stop_actors()
465            .is_some_and(|actors| actors.contains(&actor_id))
466    }
467
468    pub fn is_checkpoint(&self) -> bool {
469        self.kind == BarrierKind::Checkpoint
470    }
471
472    /// Get the initial split assignments for the actor with `actor_id`.
473    ///
474    /// This should only be called on the initial barrier received by the executor. It must be
475    ///
476    /// - `Add` mutation when it's a new streaming job, or recovery.
477    /// - `Update` mutation when it's created for scaling.
478    ///
479    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
480    /// of existing executors.
481    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
482        match self.mutation.as_deref()? {
483            Mutation::Update(UpdateMutation { actor_splits, .. })
484            | Mutation::Add(AddMutation {
485                splits: actor_splits,
486                ..
487            }) => actor_splits.get(&actor_id),
488
489            _ => {
490                if cfg!(debug_assertions) {
491                    panic!(
492                        "the initial mutation of the barrier should not be {:?}",
493                        self.mutation
494                    );
495                }
496                None
497            }
498        }
499        .map(|s| s.as_slice())
500    }
501
502    /// Get all actors that to be stopped (dropped) by this barrier.
503    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
504        match self.mutation.as_deref() {
505            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
506            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
507            _ => None,
508        }
509    }
510
511    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
512    /// `Values` to decide whether to output the existing (historical) data.
513    ///
514    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
515    /// added for scaling are not included.
516    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
517        match self.mutation.as_deref() {
518            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
519                added_actors.contains(&actor_id)
520            }
521            _ => false,
522        }
523    }
524
525    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
526        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
527            fragment_ids.contains(&fragment_id)
528        } else {
529            false
530        }
531    }
532
533    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
534    ///
535    /// # Use case
536    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
537    /// * Pause a standalone shared `SourceExecutor`.
538    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
539    ///
540    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
541    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
542    /// needs to be turned off.
543    ///
544    /// ## Some special cases not included
545    ///
546    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
547    /// care about **number of downstream fragments** (more precisely, existence).
548    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
549    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
550    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
551        let Some(mutation) = self.mutation.as_deref() else {
552            return false;
553        };
554        match mutation {
555            // Add is for mv, index and sink creation.
556            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
557            Mutation::Update(_)
558            | Mutation::Stop(_)
559            | Mutation::Pause
560            | Mutation::Resume
561            | Mutation::SourceChangeSplit(_)
562            | Mutation::Throttle(_)
563            | Mutation::DropSubscriptions { .. }
564            | Mutation::ConnectorPropsChange(_)
565            | Mutation::StartFragmentBackfill { .. }
566            | Mutation::RefreshStart { .. }
567            | Mutation::ListFinish { .. }
568            | Mutation::LoadFinish { .. } => false,
569        }
570    }
571
572    /// Whether this barrier requires the executor to pause its data stream on startup.
573    pub fn is_pause_on_startup(&self) -> bool {
574        match self.mutation.as_deref() {
575            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
576            _ => false,
577        }
578    }
579
580    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
581        match self.mutation.as_deref() {
582            Some(Mutation::Add(AddMutation {
583                backfill_nodes_to_pause,
584                ..
585            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
586            Some(Mutation::Update(_)) => false,
587            _ => {
588                tracing::warn!(
589                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
590                    self
591                );
592                false
593            }
594        }
595    }
596
597    /// Whether this barrier is for resume.
598    pub fn is_resume(&self) -> bool {
599        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
600    }
601
602    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
603    /// with `actor_id`.
604    pub fn as_update_merge(
605        &self,
606        actor_id: ActorId,
607        upstream_fragment_id: UpstreamFragmentId,
608    ) -> Option<&MergeUpdate> {
609        self.mutation
610            .as_deref()
611            .and_then(|mutation| match mutation {
612                Mutation::Update(UpdateMutation { merges, .. }) => {
613                    merges.get(&(actor_id, upstream_fragment_id))
614                }
615                _ => None,
616            })
617    }
618
619    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
620    /// the specified downstream fragment.
621    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
622        self.mutation
623            .as_deref()
624            .and_then(|mutation| match mutation {
625                Mutation::Add(AddMutation {
626                    new_upstream_sinks, ..
627                }) => new_upstream_sinks.get(&fragment_id),
628                _ => None,
629            })
630    }
631
632    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
633    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
634        self.mutation
635            .as_deref()
636            .and_then(|mutation| match mutation {
637                Mutation::Stop(StopMutation {
638                    dropped_sink_fragments,
639                    ..
640                }) => Some(dropped_sink_fragments),
641                _ => None,
642            })
643    }
644
645    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
646    /// with `actor_id`.
647    ///
648    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
649    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
650    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
651        self.mutation
652            .as_deref()
653            .and_then(|mutation| match mutation {
654                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
655                    vnode_bitmaps.get(&actor_id).cloned()
656                }
657                _ => None,
658            })
659    }
660
661    pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
662        self.mutation
663            .as_deref()
664            .and_then(|mutation| match mutation {
665                Mutation::Update(UpdateMutation {
666                    sink_add_columns, ..
667                }) => sink_add_columns.get(&sink_id).cloned(),
668                _ => None,
669            })
670    }
671
672    pub fn get_curr_epoch(&self) -> Epoch {
673        Epoch(self.epoch.curr)
674    }
675
676    /// Retrieve the tracing context for the **current** epoch of this barrier.
677    pub fn tracing_context(&self) -> &TracingContext {
678        &self.tracing_context
679    }
680
681    pub fn added_subscriber_on_mv_table(
682        &self,
683        mv_table_id: TableId,
684    ) -> impl Iterator<Item = u32> + '_ {
685        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
686            Some(add)
687        } else {
688            None
689        }
690        .into_iter()
691        .flat_map(move |add| {
692            add.subscriptions_to_add.iter().filter_map(
693                move |(upstream_mv_table_id, subscriber_id)| {
694                    if *upstream_mv_table_id == mv_table_id {
695                        Some(*subscriber_id)
696                    } else {
697                        None
698                    }
699                },
700            )
701        })
702    }
703}
704
705impl<M: PartialEq> PartialEq for BarrierInner<M> {
706    fn eq(&self, other: &Self) -> bool {
707        self.epoch == other.epoch && self.mutation == other.mutation
708    }
709}
710
711impl Mutation {
712    /// Return true if the mutation is stop.
713    ///
714    /// Note that this does not mean we will stop the current actor.
715    #[cfg(test)]
716    pub fn is_stop(&self) -> bool {
717        matches!(self, Mutation::Stop(_))
718    }
719
720    #[cfg(test)]
721    fn to_protobuf(&self) -> PbMutation {
722        use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
723        use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
724        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
725        use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
726        use risingwave_pb::stream_plan::{
727            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
728            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation, PbSinkAddColumns,
729            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
730            PbThrottleMutation, PbUpdateMutation,
731        };
732        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
733            actor_splits
734                .iter()
735                .map(|(&actor_id, splits)| {
736                    (
737                        actor_id,
738                        ConnectorSplits {
739                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
740                        },
741                    )
742                })
743                .collect::<HashMap<_, _>>()
744        };
745
746        match self {
747            Mutation::Stop(StopMutation {
748                dropped_actors,
749                dropped_sink_fragments,
750            }) => PbMutation::Stop(PbStopMutation {
751                actors: dropped_actors.iter().copied().collect(),
752                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
753            }),
754            Mutation::Update(UpdateMutation {
755                dispatchers,
756                merges,
757                vnode_bitmaps,
758                dropped_actors,
759                actor_splits,
760                actor_new_dispatchers,
761                actor_cdc_table_snapshot_splits,
762                sink_add_columns,
763            }) => PbMutation::Update(PbUpdateMutation {
764                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
765                merge_update: merges.values().cloned().collect(),
766                actor_vnode_bitmap_update: vnode_bitmaps
767                    .iter()
768                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
769                    .collect(),
770                dropped_actors: dropped_actors.iter().copied().collect(),
771                actor_splits: actor_splits_to_protobuf(actor_splits),
772                actor_new_dispatchers: actor_new_dispatchers
773                    .iter()
774                    .map(|(&actor_id, dispatchers)| {
775                        (
776                            actor_id,
777                            PbDispatchers {
778                                dispatchers: dispatchers.clone(),
779                            },
780                        )
781                    })
782                    .collect(),
783                actor_cdc_table_snapshot_splits:
784                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
785                        actor_cdc_table_snapshot_splits.clone(),
786                    )
787                    .into(),
788                sink_add_columns: sink_add_columns
789                    .iter()
790                    .map(|(sink_id, add_columns)| {
791                        (
792                            *sink_id,
793                            PbSinkAddColumns {
794                                fields: add_columns.iter().map(|field| field.to_prost()).collect(),
795                            },
796                        )
797                    })
798                    .collect(),
799            }),
800            Mutation::Add(AddMutation {
801                adds,
802                added_actors,
803                splits,
804                pause,
805                subscriptions_to_add,
806                backfill_nodes_to_pause,
807                actor_cdc_table_snapshot_splits,
808                new_upstream_sinks,
809            }) => PbMutation::Add(PbAddMutation {
810                actor_dispatchers: adds
811                    .iter()
812                    .map(|(&actor_id, dispatchers)| {
813                        (
814                            actor_id,
815                            PbDispatchers {
816                                dispatchers: dispatchers.clone(),
817                            },
818                        )
819                    })
820                    .collect(),
821                added_actors: added_actors.iter().copied().collect(),
822                actor_splits: actor_splits_to_protobuf(splits),
823                pause: *pause,
824                subscriptions_to_add: subscriptions_to_add
825                    .iter()
826                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
827                        subscriber_id: *subscriber_id,
828                        upstream_mv_table_id: *table_id,
829                    })
830                    .collect(),
831                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
832                actor_cdc_table_snapshot_splits:
833                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
834                        actor_cdc_table_snapshot_splits.clone(),
835                    )
836                    .into(),
837                new_upstream_sinks: new_upstream_sinks
838                    .iter()
839                    .map(|(k, v)| (*k, v.clone()))
840                    .collect(),
841            }),
842            Mutation::SourceChangeSplit(changes) => {
843                PbMutation::Splits(PbSourceChangeSplitMutation {
844                    actor_splits: changes
845                        .iter()
846                        .map(|(&actor_id, splits)| {
847                            (
848                                actor_id,
849                                ConnectorSplits {
850                                    splits: splits
851                                        .clone()
852                                        .iter()
853                                        .map(ConnectorSplit::from)
854                                        .collect(),
855                                },
856                            )
857                        })
858                        .collect(),
859                })
860            }
861            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
862            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
863            Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
864                actor_throttle: changes
865                    .iter()
866                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
867                    .collect(),
868            }),
869            Mutation::DropSubscriptions {
870                subscriptions_to_drop,
871            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
872                info: subscriptions_to_drop
873                    .iter()
874                    .map(
875                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
876                            subscriber_id: *subscriber_id,
877                            upstream_mv_table_id: *upstream_mv_table_id,
878                        },
879                    )
880                    .collect(),
881            }),
882            Mutation::ConnectorPropsChange(map) => {
883                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
884                    connector_props_infos: map
885                        .iter()
886                        .map(|(actor_id, options)| {
887                            (
888                                *actor_id,
889                                ConnectorPropsInfo {
890                                    connector_props_info: options
891                                        .iter()
892                                        .map(|(k, v)| (k.clone(), v.clone()))
893                                        .collect(),
894                                },
895                            )
896                        })
897                        .collect(),
898                })
899            }
900            Mutation::StartFragmentBackfill { fragment_ids } => {
901                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
902                    fragment_ids: fragment_ids.iter().copied().collect(),
903                })
904            }
905            Mutation::RefreshStart {
906                table_id,
907                associated_source_id,
908            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
909                table_id: *table_id,
910                associated_source_id: *associated_source_id,
911            }),
912            Mutation::ListFinish {
913                associated_source_id,
914            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
915                associated_source_id: *associated_source_id,
916            }),
917            Mutation::LoadFinish {
918                associated_source_id,
919            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
920                associated_source_id: *associated_source_id,
921            }),
922        }
923    }
924
925    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
926        let mutation = match prost {
927            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
928                dropped_actors: stop.actors.iter().copied().collect(),
929                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
930            }),
931
932            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
933                dispatchers: update
934                    .dispatcher_update
935                    .iter()
936                    .map(|u| (u.actor_id, u.clone()))
937                    .into_group_map(),
938                merges: update
939                    .merge_update
940                    .iter()
941                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
942                    .collect(),
943                vnode_bitmaps: update
944                    .actor_vnode_bitmap_update
945                    .iter()
946                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
947                    .collect(),
948                dropped_actors: update.dropped_actors.iter().copied().collect(),
949                actor_splits: update
950                    .actor_splits
951                    .iter()
952                    .map(|(&actor_id, splits)| {
953                        (
954                            actor_id,
955                            splits
956                                .splits
957                                .iter()
958                                .map(|split| split.try_into().unwrap())
959                                .collect(),
960                        )
961                    })
962                    .collect(),
963                actor_new_dispatchers: update
964                    .actor_new_dispatchers
965                    .iter()
966                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
967                    .collect(),
968                actor_cdc_table_snapshot_splits:
969                    build_actor_cdc_table_snapshot_splits_with_generation(
970                        update
971                            .actor_cdc_table_snapshot_splits
972                            .clone()
973                            .unwrap_or_default(),
974                    ),
975                sink_add_columns: update
976                    .sink_add_columns
977                    .iter()
978                    .map(|(sink_id, add_columns)| {
979                        (
980                            *sink_id,
981                            add_columns.fields.iter().map(Field::from_prost).collect(),
982                        )
983                    })
984                    .collect(),
985            }),
986
987            PbMutation::Add(add) => Mutation::Add(AddMutation {
988                adds: add
989                    .actor_dispatchers
990                    .iter()
991                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
992                    .collect(),
993                added_actors: add.added_actors.iter().copied().collect(),
994                // TODO: remove this and use `SourceChangesSplit` after we support multiple
995                // mutations.
996                splits: add
997                    .actor_splits
998                    .iter()
999                    .map(|(&actor_id, splits)| {
1000                        (
1001                            actor_id,
1002                            splits
1003                                .splits
1004                                .iter()
1005                                .map(|split| split.try_into().unwrap())
1006                                .collect(),
1007                        )
1008                    })
1009                    .collect(),
1010                pause: add.pause,
1011                subscriptions_to_add: add
1012                    .subscriptions_to_add
1013                    .iter()
1014                    .map(
1015                        |SubscriptionUpstreamInfo {
1016                             subscriber_id,
1017                             upstream_mv_table_id,
1018                         }| { (*upstream_mv_table_id, *subscriber_id) },
1019                    )
1020                    .collect(),
1021                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1022                actor_cdc_table_snapshot_splits:
1023                    build_actor_cdc_table_snapshot_splits_with_generation(
1024                        add.actor_cdc_table_snapshot_splits
1025                            .clone()
1026                            .unwrap_or_default(),
1027                    ),
1028                new_upstream_sinks: add
1029                    .new_upstream_sinks
1030                    .iter()
1031                    .map(|(k, v)| (*k, v.clone()))
1032                    .collect(),
1033            }),
1034
1035            PbMutation::Splits(s) => {
1036                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1037                    Vec::with_capacity(s.actor_splits.len());
1038                for (&actor_id, splits) in &s.actor_splits {
1039                    if !splits.splits.is_empty() {
1040                        change_splits.push((
1041                            actor_id,
1042                            splits
1043                                .splits
1044                                .iter()
1045                                .map(SplitImpl::try_from)
1046                                .try_collect()?,
1047                        ));
1048                    }
1049                }
1050                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1051            }
1052            PbMutation::Pause(_) => Mutation::Pause,
1053            PbMutation::Resume(_) => Mutation::Resume,
1054            PbMutation::Throttle(changes) => Mutation::Throttle(
1055                changes
1056                    .actor_throttle
1057                    .iter()
1058                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1059                    .collect(),
1060            ),
1061            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1062                subscriptions_to_drop: drop
1063                    .info
1064                    .iter()
1065                    .map(|info| (info.subscriber_id, info.upstream_mv_table_id))
1066                    .collect(),
1067            },
1068            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1069                Mutation::ConnectorPropsChange(
1070                    alter_connector_props
1071                        .connector_props_infos
1072                        .iter()
1073                        .map(|(connector_id, options)| {
1074                            (
1075                                *connector_id,
1076                                options
1077                                    .connector_props_info
1078                                    .iter()
1079                                    .map(|(k, v)| (k.clone(), v.clone()))
1080                                    .collect(),
1081                            )
1082                        })
1083                        .collect(),
1084                )
1085            }
1086            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1087                Mutation::StartFragmentBackfill {
1088                    fragment_ids: start_fragment_backfill
1089                        .fragment_ids
1090                        .iter()
1091                        .copied()
1092                        .collect(),
1093                }
1094            }
1095            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1096                table_id: refresh_start.table_id,
1097                associated_source_id: refresh_start.associated_source_id,
1098            },
1099            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1100                associated_source_id: list_finish.associated_source_id,
1101            },
1102            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1103                associated_source_id: load_finish.associated_source_id,
1104            },
1105        };
1106        Ok(mutation)
1107    }
1108}
1109
1110impl<M> BarrierInner<M> {
1111    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1112        let Self {
1113            epoch,
1114            mutation,
1115            kind,
1116            passed_actors,
1117            tracing_context,
1118            ..
1119        } = self;
1120
1121        PbBarrier {
1122            epoch: Some(PbEpoch {
1123                curr: epoch.curr,
1124                prev: epoch.prev,
1125            }),
1126            mutation: Some(PbBarrierMutation {
1127                mutation: barrier_fn(mutation),
1128            }),
1129            tracing_context: tracing_context.to_protobuf(),
1130            kind: *kind as _,
1131            passed_actors: passed_actors.clone(),
1132        }
1133    }
1134
1135    fn from_protobuf_inner(
1136        prost: &PbBarrier,
1137        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1138    ) -> StreamExecutorResult<Self> {
1139        let epoch = prost.get_epoch()?;
1140
1141        Ok(Self {
1142            kind: prost.kind(),
1143            epoch: EpochPair::new(epoch.curr, epoch.prev),
1144            mutation: mutation_from_pb(
1145                prost
1146                    .mutation
1147                    .as_ref()
1148                    .and_then(|mutation| mutation.mutation.as_ref()),
1149            )?,
1150            passed_actors: prost.get_passed_actors().clone(),
1151            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1152        })
1153    }
1154
1155    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1156        BarrierInner {
1157            epoch: self.epoch,
1158            mutation: f(self.mutation),
1159            kind: self.kind,
1160            tracing_context: self.tracing_context,
1161            passed_actors: self.passed_actors,
1162        }
1163    }
1164}
1165
1166impl DispatcherBarrier {
1167    pub fn to_protobuf(&self) -> PbBarrier {
1168        self.to_protobuf_inner(|_| None)
1169    }
1170}
1171
1172impl Barrier {
1173    #[cfg(test)]
1174    pub fn to_protobuf(&self) -> PbBarrier {
1175        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1176    }
1177
1178    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1179        Self::from_protobuf_inner(prost, |mutation| {
1180            mutation
1181                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1182                .transpose()
1183        })
1184    }
1185}
1186
1187#[derive(Debug, PartialEq, Eq, Clone)]
1188pub struct Watermark {
1189    pub col_idx: usize,
1190    pub data_type: DataType,
1191    pub val: ScalarImpl,
1192}
1193
1194impl PartialOrd for Watermark {
1195    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1196        Some(self.cmp(other))
1197    }
1198}
1199
1200impl Ord for Watermark {
1201    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1202        self.val.default_cmp(&other.val)
1203    }
1204}
1205
1206impl Watermark {
1207    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1208        Self {
1209            col_idx,
1210            data_type,
1211            val,
1212        }
1213    }
1214
1215    pub async fn transform_with_expr(
1216        self,
1217        expr: &NonStrictExpression<impl Expression>,
1218        new_col_idx: usize,
1219    ) -> Option<Self> {
1220        let Self { col_idx, val, .. } = self;
1221        let row = {
1222            let mut row = vec![None; col_idx + 1];
1223            row[col_idx] = Some(val);
1224            OwnedRow::new(row)
1225        };
1226        let val = expr.eval_row_infallible(&row).await?;
1227        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1228    }
1229
1230    /// Transform the watermark with the given output indices. If this watermark is not in the
1231    /// output, return `None`.
1232    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1233        output_indices
1234            .iter()
1235            .position(|p| *p == self.col_idx)
1236            .map(|new_col_idx| self.with_idx(new_col_idx))
1237    }
1238
1239    pub fn to_protobuf(&self) -> PbWatermark {
1240        PbWatermark {
1241            column: Some(PbInputRef {
1242                index: self.col_idx as _,
1243                r#type: Some(self.data_type.to_protobuf()),
1244            }),
1245            val: Some(&self.val).to_protobuf().into(),
1246        }
1247    }
1248
1249    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1250        let col_ref = prost.get_column()?;
1251        let data_type = DataType::from(col_ref.get_type()?);
1252        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1253            .expect("watermark value cannot be null");
1254        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1255    }
1256
1257    pub fn with_idx(self, idx: usize) -> Self {
1258        Self::new(idx, self.data_type, self.val)
1259    }
1260}
1261
1262#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1263pub enum MessageInner<M> {
1264    Chunk(StreamChunk),
1265    Barrier(BarrierInner<M>),
1266    Watermark(Watermark),
1267}
1268
1269impl<M> MessageInner<M> {
1270    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1271        match self {
1272            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1273            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1274            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1275        }
1276    }
1277}
1278
1279pub type Message = MessageInner<BarrierMutationType>;
1280pub type DispatcherMessage = MessageInner<()>;
1281
1282/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1283/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1284#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1285pub enum MessageBatchInner<M> {
1286    Chunk(StreamChunk),
1287    BarrierBatch(Vec<BarrierInner<M>>),
1288    Watermark(Watermark),
1289}
1290pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1291pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1292pub type DispatcherMessageBatch = MessageBatchInner<()>;
1293
1294impl From<DispatcherMessage> for DispatcherMessageBatch {
1295    fn from(m: DispatcherMessage) -> Self {
1296        match m {
1297            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1298            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1299            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1300        }
1301    }
1302}
1303
1304impl From<StreamChunk> for Message {
1305    fn from(chunk: StreamChunk) -> Self {
1306        Message::Chunk(chunk)
1307    }
1308}
1309
1310impl<'a> TryFrom<&'a Message> for &'a Barrier {
1311    type Error = ();
1312
1313    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1314        match m {
1315            Message::Chunk(_) => Err(()),
1316            Message::Barrier(b) => Ok(b),
1317            Message::Watermark(_) => Err(()),
1318        }
1319    }
1320}
1321
1322impl Message {
1323    /// Return true if the message is a stop barrier, meaning the stream
1324    /// will not continue, false otherwise.
1325    ///
1326    /// Note that this does not mean we will stop the current actor.
1327    #[cfg(test)]
1328    pub fn is_stop(&self) -> bool {
1329        matches!(
1330            self,
1331            Message::Barrier(Barrier {
1332                mutation,
1333                ..
1334            }) if mutation.as_ref().unwrap().is_stop()
1335        )
1336    }
1337}
1338
1339impl DispatcherMessageBatch {
1340    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1341        let prost = match self {
1342            Self::Chunk(stream_chunk) => {
1343                let prost_stream_chunk = stream_chunk.to_protobuf();
1344                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1345            }
1346            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1347                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1348            }),
1349            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1350        };
1351        PbStreamMessageBatch {
1352            stream_message_batch: Some(prost),
1353        }
1354    }
1355
1356    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1357        let res = match prost.get_stream_message_batch()? {
1358            StreamMessageBatch::StreamChunk(chunk) => {
1359                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1360            }
1361            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1362                let barriers = barrier_batch
1363                    .barriers
1364                    .iter()
1365                    .map(|barrier| {
1366                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1367                            if mutation.is_some() {
1368                                if cfg!(debug_assertions) {
1369                                    panic!("should not receive message of barrier with mutation");
1370                                } else {
1371                                    warn!(?barrier, "receive message of barrier with mutation");
1372                                }
1373                            }
1374                            Ok(())
1375                        })
1376                    })
1377                    .try_collect()?;
1378                Self::BarrierBatch(barriers)
1379            }
1380            StreamMessageBatch::Watermark(watermark) => {
1381                Self::Watermark(Watermark::from_protobuf(watermark)?)
1382            }
1383        };
1384        Ok(res)
1385    }
1386
1387    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1388        ::prost::Message::encoded_len(msg)
1389    }
1390}
1391
1392pub type StreamKey = Vec<usize>;
1393pub type StreamKeyRef<'a> = &'a [usize];
1394pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1395
1396/// Expect the first message of the given `stream` as a barrier.
1397pub async fn expect_first_barrier<M: Debug>(
1398    stream: &mut (impl MessageStreamInner<M> + Unpin),
1399) -> StreamExecutorResult<BarrierInner<M>> {
1400    let message = stream
1401        .next()
1402        .instrument_await("expect_first_barrier")
1403        .await
1404        .context("failed to extract the first message: stream closed unexpectedly")??;
1405    let barrier = message
1406        .into_barrier()
1407        .expect("the first message must be a barrier");
1408    // TODO: Is this check correct?
1409    assert!(matches!(
1410        barrier.kind,
1411        BarrierKind::Checkpoint | BarrierKind::Initial
1412    ));
1413    Ok(barrier)
1414}
1415
1416/// Expect the first message of the given `stream` as a barrier.
1417pub async fn expect_first_barrier_from_aligned_stream(
1418    stream: &mut (impl AlignedMessageStream + Unpin),
1419) -> StreamExecutorResult<Barrier> {
1420    let message = stream
1421        .next()
1422        .instrument_await("expect_first_barrier")
1423        .await
1424        .context("failed to extract the first message: stream closed unexpectedly")??;
1425    let barrier = message
1426        .into_barrier()
1427        .expect("the first message must be a barrier");
1428    Ok(barrier)
1429}
1430
1431/// `StreamConsumer` is the last step in an actor.
1432pub trait StreamConsumer: Send + 'static {
1433    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1434
1435    fn execute(self: Box<Self>) -> Self::BarrierStream;
1436}
1437
1438type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1439
1440/// A stream for merging messages from multiple upstreams.
1441/// Can dynamically add and delete upstream streams.
1442/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1443pub struct DynamicReceivers<InputId, M> {
1444    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1445    barrier: Option<BarrierInner<M>>,
1446    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1447    start_ts: Option<Instant>,
1448    /// The upstreams that're blocked by the `barrier`.
1449    blocked: Vec<BoxedMessageInput<InputId, M>>,
1450    /// The upstreams that're not blocked and can be polled.
1451    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1452    /// watermark column index -> `BufferedWatermarks`
1453    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1454    /// Currently only used for union.
1455    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1456    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1457    merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1458}
1459
1460impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1461    for DynamicReceivers<InputId, M>
1462{
1463    type Item = MessageStreamItemInner<M>;
1464
1465    fn poll_next(
1466        mut self: Pin<&mut Self>,
1467        cx: &mut std::task::Context<'_>,
1468    ) -> Poll<Option<Self::Item>> {
1469        if self.is_empty() {
1470            return Poll::Ready(None);
1471        }
1472
1473        loop {
1474            match futures::ready!(self.active.poll_next_unpin(cx)) {
1475                // Directly forward the error.
1476                Some((Some(Err(e)), _)) => {
1477                    return Poll::Ready(Some(Err(e)));
1478                }
1479                // Handle the message from some upstream.
1480                Some((Some(Ok(message)), remaining)) => {
1481                    let input_id = remaining.id();
1482                    match message {
1483                        MessageInner::Chunk(chunk) => {
1484                            // Continue polling this upstream by pushing it back to `active`.
1485                            self.active.push(remaining.into_future());
1486                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1487                        }
1488                        MessageInner::Watermark(watermark) => {
1489                            // Continue polling this upstream by pushing it back to `active`.
1490                            self.active.push(remaining.into_future());
1491                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1492                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1493                            }
1494                        }
1495                        MessageInner::Barrier(barrier) => {
1496                            // Block this upstream by pushing it to `blocked`.
1497                            if self.blocked.is_empty() {
1498                                self.start_ts = Some(Instant::now());
1499                            }
1500                            self.blocked.push(remaining);
1501                            if let Some(current_barrier) = self.barrier.as_ref() {
1502                                if current_barrier.epoch != barrier.epoch {
1503                                    return Poll::Ready(Some(Err(
1504                                        StreamExecutorError::align_barrier(
1505                                            current_barrier.clone().map_mutation(|_| None),
1506                                            barrier.map_mutation(|_| None),
1507                                        ),
1508                                    )));
1509                                }
1510                            } else {
1511                                self.barrier = Some(barrier);
1512                            }
1513                        }
1514                    }
1515                }
1516                // We use barrier as the control message of the stream. That is, we always stop the
1517                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1518                // termination.
1519                //
1520                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1521                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1522                // So this branch will never be reached in all cases.
1523                Some((None, remaining)) => {
1524                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1525                        "upstream input {:?} unexpectedly closed",
1526                        remaining.id()
1527                    )))));
1528                }
1529                // There's no active upstreams. Process the barrier and resume the blocked ones.
1530                None => {
1531                    assert!(!self.blocked.is_empty());
1532
1533                    let start_ts = self
1534                        .start_ts
1535                        .take()
1536                        .expect("should have received at least one barrier");
1537                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1538                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1539                    }
1540                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1541                        // Observe did a few atomic operation inside, we want to avoid the overhead.
1542                        merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1543                    }
1544
1545                    break;
1546                }
1547            }
1548        }
1549
1550        assert!(self.active.is_terminated());
1551
1552        let barrier = self.barrier.take().unwrap();
1553
1554        let upstreams = std::mem::take(&mut self.blocked);
1555        self.extend_active(upstreams);
1556        assert!(!self.active.is_terminated());
1557
1558        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1559    }
1560}
1561
1562impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1563    pub fn new(
1564        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1565        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1566        merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1567    ) -> Self {
1568        let mut this = Self {
1569            barrier: None,
1570            start_ts: None,
1571            blocked: Vec::with_capacity(upstreams.len()),
1572            active: Default::default(),
1573            buffered_watermarks: Default::default(),
1574            merge_barrier_align_duration,
1575            barrier_align_duration,
1576        };
1577        this.extend_active(upstreams);
1578        this
1579    }
1580
1581    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1582    /// clean state right after a barrier.
1583    pub fn extend_active(
1584        &mut self,
1585        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1586    ) {
1587        assert!(self.blocked.is_empty() && self.barrier.is_none());
1588
1589        self.active
1590            .extend(upstreams.into_iter().map(|s| s.into_future()));
1591    }
1592
1593    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1594    pub fn handle_watermark(
1595        &mut self,
1596        input_id: InputId,
1597        watermark: Watermark,
1598    ) -> Option<Watermark> {
1599        let col_idx = watermark.col_idx;
1600        // Insert a buffer watermarks when first received from a column.
1601        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1602        let watermarks = self
1603            .buffered_watermarks
1604            .entry(col_idx)
1605            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1606        watermarks.handle_watermark(input_id, watermark)
1607    }
1608
1609    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1610    /// right after a barrier.
1611    pub fn add_upstreams_from(
1612        &mut self,
1613        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1614    ) {
1615        assert!(self.blocked.is_empty() && self.barrier.is_none());
1616
1617        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1618        let input_ids = new_inputs.iter().map(|input| input.id());
1619        self.buffered_watermarks.values_mut().for_each(|buffers| {
1620            // Add buffers to the buffered watermarks for all cols
1621            buffers.add_buffers(input_ids.clone());
1622        });
1623        self.active
1624            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1625    }
1626
1627    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1628    /// clean state right after a barrier.
1629    /// The current container does not necessarily contain all the input ids passed in.
1630    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1631        assert!(self.blocked.is_empty() && self.barrier.is_none());
1632
1633        let new_upstreams = std::mem::take(&mut self.active)
1634            .into_iter()
1635            .map(|s| s.into_inner().unwrap())
1636            .filter(|u| !upstream_input_ids.contains(&u.id()));
1637        self.extend_active(new_upstreams);
1638        self.buffered_watermarks.values_mut().for_each(|buffers| {
1639            // Call `check_heap` in case the only upstream(s) that does not have
1640            // watermark in heap is removed
1641            buffers.remove_buffer(upstream_input_ids.clone());
1642        });
1643    }
1644
1645    pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1646        self.merge_barrier_align_duration.clone()
1647    }
1648
1649    pub fn flush_buffered_watermarks(&mut self) {
1650        self.buffered_watermarks
1651            .values_mut()
1652            .for_each(|buffers| buffers.clear());
1653    }
1654
1655    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1656        self.blocked
1657            .iter()
1658            .map(|s| s.id())
1659            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1660    }
1661
1662    pub fn is_empty(&self) -> bool {
1663        self.blocked.is_empty() && self.active.is_empty()
1664    }
1665}
1666
1667// Explanation of why we need `DispatchBarrierBuffer`:
1668//
1669// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1670// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1671// for these old upstreams to completely process their barriers and align before we can safely update the
1672// `upstream-input-set`.
1673//
1674// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1675// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1676// downstream_merge_update -> old_actor_processing]
1677//
1678// To address this, we split the application of a barrier's `Mutation` into two steps:
1679// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1680//    and cache it.
1681// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1682//
1683// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1684// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1685// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1686// `barrier_rx`.
1687pub(crate) struct DispatchBarrierBuffer {
1688    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1689    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1690    recv_state: BarrierReceiverState,
1691    curr_upstream_fragment_id: FragmentId,
1692    actor_id: ActorId,
1693    // read-only context for building new inputs
1694    build_input_ctx: Arc<BuildInputContext>,
1695}
1696
1697struct BuildInputContext {
1698    pub actor_id: ActorId,
1699    pub local_barrier_manager: LocalBarrierManager,
1700    pub metrics: Arc<StreamingMetrics>,
1701    pub fragment_id: FragmentId,
1702}
1703
1704type BoxedNewInputsFuture =
1705    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1706
1707enum BarrierReceiverState {
1708    ReceivingBarrier,
1709    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1710}
1711
1712impl DispatchBarrierBuffer {
1713    pub fn new(
1714        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1715        actor_id: ActorId,
1716        curr_upstream_fragment_id: FragmentId,
1717        local_barrier_manager: LocalBarrierManager,
1718        metrics: Arc<StreamingMetrics>,
1719        fragment_id: FragmentId,
1720    ) -> Self {
1721        Self {
1722            buffer: VecDeque::new(),
1723            barrier_rx,
1724            recv_state: BarrierReceiverState::ReceivingBarrier,
1725            curr_upstream_fragment_id,
1726            actor_id,
1727            build_input_ctx: Arc::new(BuildInputContext {
1728                actor_id,
1729                local_barrier_manager,
1730                metrics,
1731                fragment_id,
1732            }),
1733        }
1734    }
1735
1736    pub async fn await_next_message(
1737        &mut self,
1738        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1739        metrics: &ActorInputMetrics,
1740    ) -> StreamExecutorResult<DispatcherMessage> {
1741        let mut start_time = Instant::now();
1742        let interval_duration = Duration::from_secs(15);
1743        let mut interval =
1744            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1745
1746        loop {
1747            tokio::select! {
1748                biased;
1749                msg = stream.try_next() => {
1750                    metrics
1751                        .actor_input_buffer_blocking_duration_ns
1752                        .inc_by(start_time.elapsed().as_nanos() as u64);
1753                    return msg?.ok_or_else(
1754                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1755                    );
1756                }
1757
1758                e = self.continuously_fetch_barrier_rx() => {
1759                    return Err(e);
1760                }
1761
1762                _ = interval.tick() => {
1763                    start_time = Instant::now();
1764                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1765                    continue;
1766                }
1767            }
1768        }
1769    }
1770
1771    pub async fn pop_barrier_with_inputs(
1772        &mut self,
1773        barrier: DispatcherBarrier,
1774    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1775        while self.buffer.is_empty() {
1776            self.try_fetch_barrier_rx(false).await?;
1777        }
1778        let (mut recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1779        apply_dispatcher_barrier(&mut recv_barrier, barrier);
1780
1781        Ok((recv_barrier, inputs))
1782    }
1783
1784    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1785        loop {
1786            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1787                return e;
1788            }
1789        }
1790    }
1791
1792    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1793        match &mut self.recv_state {
1794            BarrierReceiverState::ReceivingBarrier => {
1795                let Some(barrier) = self.barrier_rx.recv().await else {
1796                    if pending_on_end {
1797                        return pending().await;
1798                    } else {
1799                        return Err(StreamExecutorError::channel_closed(
1800                            "barrier channel closed unexpectedly",
1801                        ));
1802                    }
1803                };
1804                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1805                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1806                } else {
1807                    self.buffer.push_back((barrier, None));
1808                }
1809            }
1810            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1811                let new_inputs = fut.await?;
1812                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1813                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1814            }
1815        }
1816        Ok(())
1817    }
1818
1819    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1820        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1821            && !update.added_upstream_actors.is_empty()
1822        {
1823            // When update upstream fragment, added_actors will not be empty.
1824            let upstream_fragment_id =
1825                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1826                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1827                    new_upstream_fragment_id
1828                } else {
1829                    self.curr_upstream_fragment_id
1830                };
1831            let ctx = self.build_input_ctx.clone();
1832            let added_upstream_actors = update.added_upstream_actors.clone();
1833            let barrier = barrier.clone();
1834            let fut = async move {
1835                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1836                    let mut new_input = new_input(
1837                        &ctx.local_barrier_manager,
1838                        ctx.metrics.clone(),
1839                        ctx.actor_id,
1840                        ctx.fragment_id,
1841                        upstream_actor,
1842                        upstream_fragment_id,
1843                    )
1844                    .await?;
1845
1846                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1847                    // original upstreams.
1848                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1849                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1850
1851                    StreamExecutorResult::Ok(new_input)
1852                }))
1853                .await
1854            }
1855            .boxed();
1856
1857            Some(fut)
1858        } else {
1859            None
1860        }
1861    }
1862}