risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::config::StreamingConfig;
38use risingwave_common::metrics::LabelGuardedMetric;
39use risingwave_common::row::OwnedRow;
40use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
41use risingwave_common::util::epoch::{Epoch, EpochPair};
42use risingwave_common::util::tracing::TracingContext;
43use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
44use risingwave_connector::source::SplitImpl;
45use risingwave_expr::expr::{Expression, NonStrictExpression};
46use risingwave_pb::data::PbEpoch;
47use risingwave_pb::expr::PbInputRef;
48use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
49use risingwave_pb::stream_plan::barrier::BarrierKind;
50use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
51use risingwave_pb::stream_plan::stream_node::PbStreamKind;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54    PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, PbWatermark,
55    SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63    BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::{DispatchExecutor, DispatcherImpl};
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector::*;
184pub use watermark_filter::WatermarkFilterExecutor;
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197    CdcTableSnapshotSplitAssignmentWithGeneration,
198    build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::id::SubscriberId;
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202
203pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
204pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
205pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
206
207/// Static information of an executor.
208#[derive(Debug, Default, Clone)]
209pub struct ExecutorInfo {
210    /// The schema of the OUTPUT of the executor.
211    pub schema: Schema,
212
213    /// The stream key indices of the OUTPUT of the executor.
214    pub stream_key: StreamKey,
215
216    /// The stream kind of the OUTPUT of the executor.
217    pub stream_kind: PbStreamKind,
218
219    /// Identity of the executor.
220    pub identity: String,
221
222    /// The executor id of the executor.
223    pub id: u64,
224}
225
226impl ExecutorInfo {
227    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
228        Self {
229            schema,
230            stream_key,
231            stream_kind: PbStreamKind::Retract, // dummy value for test
232            identity,
233            id,
234        }
235    }
236}
237
238/// [`Execute`] describes the methods an executor should implement to handle control messages.
239pub trait Execute: Send + 'static {
240    fn execute(self: Box<Self>) -> BoxedMessageStream;
241
242    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
243        self.execute()
244    }
245
246    fn boxed(self) -> Box<dyn Execute>
247    where
248        Self: Sized + Send + 'static,
249    {
250        Box::new(self)
251    }
252}
253
254/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
255/// handle messages ([`Execute`]).
256pub struct Executor {
257    info: ExecutorInfo,
258    execute: Box<dyn Execute>,
259}
260
261impl Executor {
262    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
263        Self { info, execute }
264    }
265
266    pub fn info(&self) -> &ExecutorInfo {
267        &self.info
268    }
269
270    pub fn schema(&self) -> &Schema {
271        &self.info.schema
272    }
273
274    pub fn stream_key(&self) -> StreamKeyRef<'_> {
275        &self.info.stream_key
276    }
277
278    pub fn stream_kind(&self) -> PbStreamKind {
279        self.info.stream_kind
280    }
281
282    pub fn identity(&self) -> &str {
283        &self.info.identity
284    }
285
286    pub fn execute(self) -> BoxedMessageStream {
287        self.execute.execute()
288    }
289
290    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
291        self.execute.execute_with_epoch(epoch)
292    }
293}
294
295impl std::fmt::Debug for Executor {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.write_str(self.identity())
298    }
299}
300
301impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
302    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
303        Self::new(info, execute)
304    }
305}
306
307impl<E> From<(ExecutorInfo, E)> for Executor
308where
309    E: Execute,
310{
311    fn from((info, execute): (ExecutorInfo, E)) -> Self {
312        Self::new(info, execute.boxed())
313    }
314}
315
316pub const INVALID_EPOCH: u64 = 0;
317
318type UpstreamFragmentId = FragmentId;
319type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
320
321#[derive(Debug, Clone, PartialEq)]
322#[cfg_attr(any(test, feature = "test"), derive(Default))]
323pub struct UpdateMutation {
324    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
325    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
326    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
327    pub dropped_actors: HashSet<ActorId>,
328    pub actor_splits: SplitAssignments,
329    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
330    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
331    pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
332}
333
334#[derive(Debug, Clone, PartialEq)]
335#[cfg_attr(any(test, feature = "test"), derive(Default))]
336pub struct AddMutation {
337    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
338    pub added_actors: HashSet<ActorId>,
339    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
340    pub splits: SplitAssignments,
341    pub pause: bool,
342    /// (`upstream_mv_table_id`,  `subscriber_id`)
343    pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
344    /// nodes which should start backfill
345    pub backfill_nodes_to_pause: HashSet<FragmentId>,
346    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
347    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
348}
349
350#[derive(Debug, Clone, PartialEq)]
351#[cfg_attr(any(test, feature = "test"), derive(Default))]
352pub struct StopMutation {
353    pub dropped_actors: HashSet<ActorId>,
354    pub dropped_sink_fragments: HashSet<FragmentId>,
355}
356
357/// See [`PbMutation`] for the semantics of each mutation.
358#[derive(Debug, Clone, PartialEq)]
359pub enum Mutation {
360    Stop(StopMutation),
361    Update(UpdateMutation),
362    Add(AddMutation),
363    SourceChangeSplit(SplitAssignments),
364    Pause,
365    Resume,
366    Throttle(HashMap<ActorId, Option<u32>>),
367    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
368    DropSubscriptions {
369        /// `subscriber` -> `upstream_mv_table_id`
370        subscriptions_to_drop: Vec<(SubscriberId, TableId)>,
371    },
372    StartFragmentBackfill {
373        fragment_ids: HashSet<FragmentId>,
374    },
375    RefreshStart {
376        table_id: TableId,
377        associated_source_id: SourceId,
378    },
379    ListFinish {
380        associated_source_id: SourceId,
381    },
382    LoadFinish {
383        associated_source_id: SourceId,
384    },
385}
386
387/// The generic type `M` is the mutation type of the barrier.
388///
389/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
390/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
391#[derive(Debug, Clone)]
392pub struct BarrierInner<M> {
393    pub epoch: EpochPair,
394    pub mutation: M,
395    pub kind: BarrierKind,
396
397    /// Tracing context for the **current** epoch of this barrier.
398    pub tracing_context: TracingContext,
399}
400
401pub type BarrierMutationType = Option<Arc<Mutation>>;
402pub type Barrier = BarrierInner<BarrierMutationType>;
403pub type DispatcherBarrier = BarrierInner<()>;
404
405impl<M: Default> BarrierInner<M> {
406    /// Create a plain barrier.
407    pub fn new_test_barrier(epoch: u64) -> Self {
408        Self {
409            epoch: EpochPair::new_test_epoch(epoch),
410            kind: BarrierKind::Checkpoint,
411            tracing_context: TracingContext::none(),
412            mutation: Default::default(),
413        }
414    }
415
416    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
417        Self {
418            epoch: EpochPair::new(epoch, prev_epoch),
419            kind: BarrierKind::Checkpoint,
420            tracing_context: TracingContext::none(),
421            mutation: Default::default(),
422        }
423    }
424}
425
426impl Barrier {
427    pub fn into_dispatcher(self) -> DispatcherBarrier {
428        DispatcherBarrier {
429            epoch: self.epoch,
430            mutation: (),
431            kind: self.kind,
432            tracing_context: self.tracing_context,
433        }
434    }
435
436    #[must_use]
437    pub fn with_mutation(self, mutation: Mutation) -> Self {
438        Self {
439            mutation: Some(Arc::new(mutation)),
440            ..self
441        }
442    }
443
444    #[must_use]
445    pub fn with_stop(self) -> Self {
446        self.with_mutation(Mutation::Stop(StopMutation {
447            dropped_actors: Default::default(),
448            dropped_sink_fragments: Default::default(),
449        }))
450    }
451
452    /// Whether this barrier carries stop mutation.
453    pub fn is_with_stop_mutation(&self) -> bool {
454        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
455    }
456
457    /// Whether this barrier is to stop the actor with `actor_id`.
458    pub fn is_stop(&self, actor_id: ActorId) -> bool {
459        self.all_stop_actors()
460            .is_some_and(|actors| actors.contains(&actor_id))
461    }
462
463    pub fn is_checkpoint(&self) -> bool {
464        self.kind == BarrierKind::Checkpoint
465    }
466
467    /// Get the initial split assignments for the actor with `actor_id`.
468    ///
469    /// This should only be called on the initial barrier received by the executor. It must be
470    ///
471    /// - `Add` mutation when it's a new streaming job, or recovery.
472    /// - `Update` mutation when it's created for scaling.
473    ///
474    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
475    /// of existing executors.
476    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
477        match self.mutation.as_deref()? {
478            Mutation::Update(UpdateMutation { actor_splits, .. })
479            | Mutation::Add(AddMutation {
480                splits: actor_splits,
481                ..
482            }) => actor_splits.get(&actor_id),
483
484            _ => {
485                if cfg!(debug_assertions) {
486                    panic!(
487                        "the initial mutation of the barrier should not be {:?}",
488                        self.mutation
489                    );
490                }
491                None
492            }
493        }
494        .map(|s| s.as_slice())
495    }
496
497    /// Get all actors that to be stopped (dropped) by this barrier.
498    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
499        match self.mutation.as_deref() {
500            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
501            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
502            _ => None,
503        }
504    }
505
506    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
507    /// `Values` to decide whether to output the existing (historical) data.
508    ///
509    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
510    /// added for scaling are not included.
511    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
512        match self.mutation.as_deref() {
513            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
514                added_actors.contains(&actor_id)
515            }
516            _ => false,
517        }
518    }
519
520    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
521        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
522            fragment_ids.contains(&fragment_id)
523        } else {
524            false
525        }
526    }
527
528    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
529    ///
530    /// # Use case
531    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
532    /// * Pause a standalone shared `SourceExecutor`.
533    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
534    ///
535    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
536    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
537    /// needs to be turned off.
538    ///
539    /// ## Some special cases not included
540    ///
541    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
542    /// care about **number of downstream fragments** (more precisely, existence).
543    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
544    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
545    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
546        let Some(mutation) = self.mutation.as_deref() else {
547            return false;
548        };
549        match mutation {
550            // Add is for mv, index and sink creation.
551            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
552            Mutation::Update(_)
553            | Mutation::Stop(_)
554            | Mutation::Pause
555            | Mutation::Resume
556            | Mutation::SourceChangeSplit(_)
557            | Mutation::Throttle(_)
558            | Mutation::DropSubscriptions { .. }
559            | Mutation::ConnectorPropsChange(_)
560            | Mutation::StartFragmentBackfill { .. }
561            | Mutation::RefreshStart { .. }
562            | Mutation::ListFinish { .. }
563            | Mutation::LoadFinish { .. } => false,
564        }
565    }
566
567    /// Whether this barrier requires the executor to pause its data stream on startup.
568    pub fn is_pause_on_startup(&self) -> bool {
569        match self.mutation.as_deref() {
570            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
571            _ => false,
572        }
573    }
574
575    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
576        match self.mutation.as_deref() {
577            Some(Mutation::Add(AddMutation {
578                backfill_nodes_to_pause,
579                ..
580            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
581            Some(Mutation::Update(_)) => false,
582            _ => {
583                tracing::warn!(
584                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
585                    self
586                );
587                false
588            }
589        }
590    }
591
592    /// Whether this barrier is for resume.
593    pub fn is_resume(&self) -> bool {
594        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
595    }
596
597    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
598    /// with `actor_id`.
599    pub fn as_update_merge(
600        &self,
601        actor_id: ActorId,
602        upstream_fragment_id: UpstreamFragmentId,
603    ) -> Option<&MergeUpdate> {
604        self.mutation
605            .as_deref()
606            .and_then(|mutation| match mutation {
607                Mutation::Update(UpdateMutation { merges, .. }) => {
608                    merges.get(&(actor_id, upstream_fragment_id))
609                }
610                _ => None,
611            })
612    }
613
614    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
615    /// the specified downstream fragment.
616    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
617        self.mutation
618            .as_deref()
619            .and_then(|mutation| match mutation {
620                Mutation::Add(AddMutation {
621                    new_upstream_sinks, ..
622                }) => new_upstream_sinks.get(&fragment_id),
623                _ => None,
624            })
625    }
626
627    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
628    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
629        self.mutation
630            .as_deref()
631            .and_then(|mutation| match mutation {
632                Mutation::Stop(StopMutation {
633                    dropped_sink_fragments,
634                    ..
635                }) => Some(dropped_sink_fragments),
636                _ => None,
637            })
638    }
639
640    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
641    /// with `actor_id`.
642    ///
643    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
644    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
645    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
646        self.mutation
647            .as_deref()
648            .and_then(|mutation| match mutation {
649                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
650                    vnode_bitmaps.get(&actor_id).cloned()
651                }
652                _ => None,
653            })
654    }
655
656    pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
657        self.mutation
658            .as_deref()
659            .and_then(|mutation| match mutation {
660                Mutation::Update(UpdateMutation {
661                    sink_add_columns, ..
662                }) => sink_add_columns.get(&sink_id).cloned(),
663                _ => None,
664            })
665    }
666
667    pub fn get_curr_epoch(&self) -> Epoch {
668        Epoch(self.epoch.curr)
669    }
670
671    /// Retrieve the tracing context for the **current** epoch of this barrier.
672    pub fn tracing_context(&self) -> &TracingContext {
673        &self.tracing_context
674    }
675
676    pub fn added_subscriber_on_mv_table(
677        &self,
678        mv_table_id: TableId,
679    ) -> impl Iterator<Item = SubscriberId> + '_ {
680        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
681            Some(add)
682        } else {
683            None
684        }
685        .into_iter()
686        .flat_map(move |add| {
687            add.subscriptions_to_add.iter().filter_map(
688                move |(upstream_mv_table_id, subscriber_id)| {
689                    if *upstream_mv_table_id == mv_table_id {
690                        Some(*subscriber_id)
691                    } else {
692                        None
693                    }
694                },
695            )
696        })
697    }
698}
699
700impl<M: PartialEq> PartialEq for BarrierInner<M> {
701    fn eq(&self, other: &Self) -> bool {
702        self.epoch == other.epoch && self.mutation == other.mutation
703    }
704}
705
706impl Mutation {
707    /// Return true if the mutation is stop.
708    ///
709    /// Note that this does not mean we will stop the current actor.
710    #[cfg(test)]
711    pub fn is_stop(&self) -> bool {
712        matches!(self, Mutation::Stop(_))
713    }
714
715    #[cfg(test)]
716    fn to_protobuf(&self) -> PbMutation {
717        use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
718        use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
719        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
720        use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
721        use risingwave_pb::stream_plan::{
722            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
723            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation, PbSinkAddColumns,
724            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
725            PbThrottleMutation, PbUpdateMutation,
726        };
727        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
728            actor_splits
729                .iter()
730                .map(|(&actor_id, splits)| {
731                    (
732                        actor_id,
733                        ConnectorSplits {
734                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
735                        },
736                    )
737                })
738                .collect::<HashMap<_, _>>()
739        };
740
741        match self {
742            Mutation::Stop(StopMutation {
743                dropped_actors,
744                dropped_sink_fragments,
745            }) => PbMutation::Stop(PbStopMutation {
746                actors: dropped_actors.iter().copied().collect(),
747                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
748            }),
749            Mutation::Update(UpdateMutation {
750                dispatchers,
751                merges,
752                vnode_bitmaps,
753                dropped_actors,
754                actor_splits,
755                actor_new_dispatchers,
756                actor_cdc_table_snapshot_splits,
757                sink_add_columns,
758            }) => PbMutation::Update(PbUpdateMutation {
759                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
760                merge_update: merges.values().cloned().collect(),
761                actor_vnode_bitmap_update: vnode_bitmaps
762                    .iter()
763                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
764                    .collect(),
765                dropped_actors: dropped_actors.iter().copied().collect(),
766                actor_splits: actor_splits_to_protobuf(actor_splits),
767                actor_new_dispatchers: actor_new_dispatchers
768                    .iter()
769                    .map(|(&actor_id, dispatchers)| {
770                        (
771                            actor_id,
772                            PbDispatchers {
773                                dispatchers: dispatchers.clone(),
774                            },
775                        )
776                    })
777                    .collect(),
778                actor_cdc_table_snapshot_splits:
779                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
780                        actor_cdc_table_snapshot_splits.clone(),
781                    )
782                    .into(),
783                sink_add_columns: sink_add_columns
784                    .iter()
785                    .map(|(sink_id, add_columns)| {
786                        (
787                            *sink_id,
788                            PbSinkAddColumns {
789                                fields: add_columns.iter().map(|field| field.to_prost()).collect(),
790                            },
791                        )
792                    })
793                    .collect(),
794            }),
795            Mutation::Add(AddMutation {
796                adds,
797                added_actors,
798                splits,
799                pause,
800                subscriptions_to_add,
801                backfill_nodes_to_pause,
802                actor_cdc_table_snapshot_splits,
803                new_upstream_sinks,
804            }) => PbMutation::Add(PbAddMutation {
805                actor_dispatchers: adds
806                    .iter()
807                    .map(|(&actor_id, dispatchers)| {
808                        (
809                            actor_id,
810                            PbDispatchers {
811                                dispatchers: dispatchers.clone(),
812                            },
813                        )
814                    })
815                    .collect(),
816                added_actors: added_actors.iter().copied().collect(),
817                actor_splits: actor_splits_to_protobuf(splits),
818                pause: *pause,
819                subscriptions_to_add: subscriptions_to_add
820                    .iter()
821                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
822                        subscriber_id: *subscriber_id,
823                        upstream_mv_table_id: *table_id,
824                    })
825                    .collect(),
826                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
827                actor_cdc_table_snapshot_splits:
828                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
829                        actor_cdc_table_snapshot_splits.clone(),
830                    )
831                    .into(),
832                new_upstream_sinks: new_upstream_sinks
833                    .iter()
834                    .map(|(k, v)| (*k, v.clone()))
835                    .collect(),
836            }),
837            Mutation::SourceChangeSplit(changes) => {
838                PbMutation::Splits(PbSourceChangeSplitMutation {
839                    actor_splits: changes
840                        .iter()
841                        .map(|(&actor_id, splits)| {
842                            (
843                                actor_id,
844                                ConnectorSplits {
845                                    splits: splits
846                                        .clone()
847                                        .iter()
848                                        .map(ConnectorSplit::from)
849                                        .collect(),
850                                },
851                            )
852                        })
853                        .collect(),
854                })
855            }
856            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
857            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
858            Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
859                actor_throttle: changes
860                    .iter()
861                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
862                    .collect(),
863            }),
864            Mutation::DropSubscriptions {
865                subscriptions_to_drop,
866            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
867                info: subscriptions_to_drop
868                    .iter()
869                    .map(
870                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
871                            subscriber_id: *subscriber_id,
872                            upstream_mv_table_id: *upstream_mv_table_id,
873                        },
874                    )
875                    .collect(),
876            }),
877            Mutation::ConnectorPropsChange(map) => {
878                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
879                    connector_props_infos: map
880                        .iter()
881                        .map(|(actor_id, options)| {
882                            (
883                                *actor_id,
884                                ConnectorPropsInfo {
885                                    connector_props_info: options
886                                        .iter()
887                                        .map(|(k, v)| (k.clone(), v.clone()))
888                                        .collect(),
889                                },
890                            )
891                        })
892                        .collect(),
893                })
894            }
895            Mutation::StartFragmentBackfill { fragment_ids } => {
896                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
897                    fragment_ids: fragment_ids.iter().copied().collect(),
898                })
899            }
900            Mutation::RefreshStart {
901                table_id,
902                associated_source_id,
903            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
904                table_id: *table_id,
905                associated_source_id: *associated_source_id,
906            }),
907            Mutation::ListFinish {
908                associated_source_id,
909            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
910                associated_source_id: *associated_source_id,
911            }),
912            Mutation::LoadFinish {
913                associated_source_id,
914            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
915                associated_source_id: *associated_source_id,
916            }),
917        }
918    }
919
920    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
921        let mutation = match prost {
922            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
923                dropped_actors: stop.actors.iter().copied().collect(),
924                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
925            }),
926
927            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
928                dispatchers: update
929                    .dispatcher_update
930                    .iter()
931                    .map(|u| (u.actor_id, u.clone()))
932                    .into_group_map(),
933                merges: update
934                    .merge_update
935                    .iter()
936                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
937                    .collect(),
938                vnode_bitmaps: update
939                    .actor_vnode_bitmap_update
940                    .iter()
941                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
942                    .collect(),
943                dropped_actors: update.dropped_actors.iter().copied().collect(),
944                actor_splits: update
945                    .actor_splits
946                    .iter()
947                    .map(|(&actor_id, splits)| {
948                        (
949                            actor_id,
950                            splits
951                                .splits
952                                .iter()
953                                .map(|split| split.try_into().unwrap())
954                                .collect(),
955                        )
956                    })
957                    .collect(),
958                actor_new_dispatchers: update
959                    .actor_new_dispatchers
960                    .iter()
961                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
962                    .collect(),
963                actor_cdc_table_snapshot_splits:
964                    build_actor_cdc_table_snapshot_splits_with_generation(
965                        update
966                            .actor_cdc_table_snapshot_splits
967                            .clone()
968                            .unwrap_or_default(),
969                    ),
970                sink_add_columns: update
971                    .sink_add_columns
972                    .iter()
973                    .map(|(sink_id, add_columns)| {
974                        (
975                            *sink_id,
976                            add_columns.fields.iter().map(Field::from_prost).collect(),
977                        )
978                    })
979                    .collect(),
980            }),
981
982            PbMutation::Add(add) => Mutation::Add(AddMutation {
983                adds: add
984                    .actor_dispatchers
985                    .iter()
986                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
987                    .collect(),
988                added_actors: add.added_actors.iter().copied().collect(),
989                // TODO: remove this and use `SourceChangesSplit` after we support multiple
990                // mutations.
991                splits: add
992                    .actor_splits
993                    .iter()
994                    .map(|(&actor_id, splits)| {
995                        (
996                            actor_id,
997                            splits
998                                .splits
999                                .iter()
1000                                .map(|split| split.try_into().unwrap())
1001                                .collect(),
1002                        )
1003                    })
1004                    .collect(),
1005                pause: add.pause,
1006                subscriptions_to_add: add
1007                    .subscriptions_to_add
1008                    .iter()
1009                    .map(
1010                        |SubscriptionUpstreamInfo {
1011                             subscriber_id,
1012                             upstream_mv_table_id,
1013                         }| { (*upstream_mv_table_id, *subscriber_id) },
1014                    )
1015                    .collect(),
1016                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1017                actor_cdc_table_snapshot_splits:
1018                    build_actor_cdc_table_snapshot_splits_with_generation(
1019                        add.actor_cdc_table_snapshot_splits
1020                            .clone()
1021                            .unwrap_or_default(),
1022                    ),
1023                new_upstream_sinks: add
1024                    .new_upstream_sinks
1025                    .iter()
1026                    .map(|(k, v)| (*k, v.clone()))
1027                    .collect(),
1028            }),
1029
1030            PbMutation::Splits(s) => {
1031                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1032                    Vec::with_capacity(s.actor_splits.len());
1033                for (&actor_id, splits) in &s.actor_splits {
1034                    if !splits.splits.is_empty() {
1035                        change_splits.push((
1036                            actor_id,
1037                            splits
1038                                .splits
1039                                .iter()
1040                                .map(SplitImpl::try_from)
1041                                .try_collect()?,
1042                        ));
1043                    }
1044                }
1045                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1046            }
1047            PbMutation::Pause(_) => Mutation::Pause,
1048            PbMutation::Resume(_) => Mutation::Resume,
1049            PbMutation::Throttle(changes) => Mutation::Throttle(
1050                changes
1051                    .actor_throttle
1052                    .iter()
1053                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1054                    .collect(),
1055            ),
1056            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1057                subscriptions_to_drop: drop
1058                    .info
1059                    .iter()
1060                    .map(|info| (info.subscriber_id, info.upstream_mv_table_id))
1061                    .collect(),
1062            },
1063            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1064                Mutation::ConnectorPropsChange(
1065                    alter_connector_props
1066                        .connector_props_infos
1067                        .iter()
1068                        .map(|(connector_id, options)| {
1069                            (
1070                                *connector_id,
1071                                options
1072                                    .connector_props_info
1073                                    .iter()
1074                                    .map(|(k, v)| (k.clone(), v.clone()))
1075                                    .collect(),
1076                            )
1077                        })
1078                        .collect(),
1079                )
1080            }
1081            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1082                Mutation::StartFragmentBackfill {
1083                    fragment_ids: start_fragment_backfill
1084                        .fragment_ids
1085                        .iter()
1086                        .copied()
1087                        .collect(),
1088                }
1089            }
1090            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1091                table_id: refresh_start.table_id,
1092                associated_source_id: refresh_start.associated_source_id,
1093            },
1094            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1095                associated_source_id: list_finish.associated_source_id,
1096            },
1097            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1098                associated_source_id: load_finish.associated_source_id,
1099            },
1100        };
1101        Ok(mutation)
1102    }
1103}
1104
1105impl<M> BarrierInner<M> {
1106    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1107        let Self {
1108            epoch,
1109            mutation,
1110            kind,
1111            tracing_context,
1112            ..
1113        } = self;
1114
1115        PbBarrier {
1116            epoch: Some(PbEpoch {
1117                curr: epoch.curr,
1118                prev: epoch.prev,
1119            }),
1120            mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1121                mutation: Some(mutation),
1122            }),
1123            tracing_context: tracing_context.to_protobuf(),
1124            kind: *kind as _,
1125        }
1126    }
1127
1128    fn from_protobuf_inner(
1129        prost: &PbBarrier,
1130        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1131    ) -> StreamExecutorResult<Self> {
1132        let epoch = prost.get_epoch()?;
1133
1134        Ok(Self {
1135            kind: prost.kind(),
1136            epoch: EpochPair::new(epoch.curr, epoch.prev),
1137            mutation: mutation_from_pb(
1138                (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1139            )?,
1140            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1141        })
1142    }
1143
1144    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1145        BarrierInner {
1146            epoch: self.epoch,
1147            mutation: f(self.mutation),
1148            kind: self.kind,
1149            tracing_context: self.tracing_context,
1150        }
1151    }
1152}
1153
1154impl DispatcherBarrier {
1155    pub fn to_protobuf(&self) -> PbBarrier {
1156        self.to_protobuf_inner(|_| None)
1157    }
1158}
1159
1160impl Barrier {
1161    #[cfg(test)]
1162    pub fn to_protobuf(&self) -> PbBarrier {
1163        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1164    }
1165
1166    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1167        Self::from_protobuf_inner(prost, |mutation| {
1168            mutation
1169                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1170                .transpose()
1171        })
1172    }
1173}
1174
1175#[derive(Debug, PartialEq, Eq, Clone)]
1176pub struct Watermark {
1177    pub col_idx: usize,
1178    pub data_type: DataType,
1179    pub val: ScalarImpl,
1180}
1181
1182impl PartialOrd for Watermark {
1183    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1184        Some(self.cmp(other))
1185    }
1186}
1187
1188impl Ord for Watermark {
1189    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1190        self.val.default_cmp(&other.val)
1191    }
1192}
1193
1194impl Watermark {
1195    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1196        Self {
1197            col_idx,
1198            data_type,
1199            val,
1200        }
1201    }
1202
1203    pub async fn transform_with_expr(
1204        self,
1205        expr: &NonStrictExpression<impl Expression>,
1206        new_col_idx: usize,
1207    ) -> Option<Self> {
1208        let Self { col_idx, val, .. } = self;
1209        let row = {
1210            let mut row = vec![None; col_idx + 1];
1211            row[col_idx] = Some(val);
1212            OwnedRow::new(row)
1213        };
1214        let val = expr.eval_row_infallible(&row).await?;
1215        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1216    }
1217
1218    /// Transform the watermark with the given output indices. If this watermark is not in the
1219    /// output, return `None`.
1220    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1221        output_indices
1222            .iter()
1223            .position(|p| *p == self.col_idx)
1224            .map(|new_col_idx| self.with_idx(new_col_idx))
1225    }
1226
1227    pub fn to_protobuf(&self) -> PbWatermark {
1228        PbWatermark {
1229            column: Some(PbInputRef {
1230                index: self.col_idx as _,
1231                r#type: Some(self.data_type.to_protobuf()),
1232            }),
1233            val: Some(&self.val).to_protobuf().into(),
1234        }
1235    }
1236
1237    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1238        let col_ref = prost.get_column()?;
1239        let data_type = DataType::from(col_ref.get_type()?);
1240        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1241            .expect("watermark value cannot be null");
1242        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1243    }
1244
1245    pub fn with_idx(self, idx: usize) -> Self {
1246        Self::new(idx, self.data_type, self.val)
1247    }
1248}
1249
1250#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1251pub enum MessageInner<M> {
1252    Chunk(StreamChunk),
1253    Barrier(BarrierInner<M>),
1254    Watermark(Watermark),
1255}
1256
1257impl<M> MessageInner<M> {
1258    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1259        match self {
1260            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1261            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1262            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1263        }
1264    }
1265}
1266
1267pub type Message = MessageInner<BarrierMutationType>;
1268pub type DispatcherMessage = MessageInner<()>;
1269
1270/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1271/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1272#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1273pub enum MessageBatchInner<M> {
1274    Chunk(StreamChunk),
1275    BarrierBatch(Vec<BarrierInner<M>>),
1276    Watermark(Watermark),
1277}
1278pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1279pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1280pub type DispatcherMessageBatch = MessageBatchInner<()>;
1281
1282impl From<DispatcherMessage> for DispatcherMessageBatch {
1283    fn from(m: DispatcherMessage) -> Self {
1284        match m {
1285            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1286            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1287            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1288        }
1289    }
1290}
1291
1292impl From<StreamChunk> for Message {
1293    fn from(chunk: StreamChunk) -> Self {
1294        Message::Chunk(chunk)
1295    }
1296}
1297
1298impl<'a> TryFrom<&'a Message> for &'a Barrier {
1299    type Error = ();
1300
1301    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1302        match m {
1303            Message::Chunk(_) => Err(()),
1304            Message::Barrier(b) => Ok(b),
1305            Message::Watermark(_) => Err(()),
1306        }
1307    }
1308}
1309
1310impl Message {
1311    /// Return true if the message is a stop barrier, meaning the stream
1312    /// will not continue, false otherwise.
1313    ///
1314    /// Note that this does not mean we will stop the current actor.
1315    #[cfg(test)]
1316    pub fn is_stop(&self) -> bool {
1317        matches!(
1318            self,
1319            Message::Barrier(Barrier {
1320                mutation,
1321                ..
1322            }) if mutation.as_ref().unwrap().is_stop()
1323        )
1324    }
1325}
1326
1327impl DispatcherMessageBatch {
1328    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1329        let prost = match self {
1330            Self::Chunk(stream_chunk) => {
1331                let prost_stream_chunk = stream_chunk.to_protobuf();
1332                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1333            }
1334            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1335                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1336            }),
1337            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1338        };
1339        PbStreamMessageBatch {
1340            stream_message_batch: Some(prost),
1341        }
1342    }
1343
1344    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1345        let res = match prost.get_stream_message_batch()? {
1346            StreamMessageBatch::StreamChunk(chunk) => {
1347                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1348            }
1349            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1350                let barriers = barrier_batch
1351                    .barriers
1352                    .iter()
1353                    .map(|barrier| {
1354                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1355                            if mutation.is_some() {
1356                                if cfg!(debug_assertions) {
1357                                    panic!("should not receive message of barrier with mutation");
1358                                } else {
1359                                    warn!(?barrier, "receive message of barrier with mutation");
1360                                }
1361                            }
1362                            Ok(())
1363                        })
1364                    })
1365                    .try_collect()?;
1366                Self::BarrierBatch(barriers)
1367            }
1368            StreamMessageBatch::Watermark(watermark) => {
1369                Self::Watermark(Watermark::from_protobuf(watermark)?)
1370            }
1371        };
1372        Ok(res)
1373    }
1374
1375    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1376        ::prost::Message::encoded_len(msg)
1377    }
1378}
1379
1380pub type StreamKey = Vec<usize>;
1381pub type StreamKeyRef<'a> = &'a [usize];
1382pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1383
1384/// Expect the first message of the given `stream` as a barrier.
1385pub async fn expect_first_barrier<M: Debug>(
1386    stream: &mut (impl MessageStreamInner<M> + Unpin),
1387) -> StreamExecutorResult<BarrierInner<M>> {
1388    let message = stream
1389        .next()
1390        .instrument_await("expect_first_barrier")
1391        .await
1392        .context("failed to extract the first message: stream closed unexpectedly")??;
1393    let barrier = message
1394        .into_barrier()
1395        .expect("the first message must be a barrier");
1396    // TODO: Is this check correct?
1397    assert!(matches!(
1398        barrier.kind,
1399        BarrierKind::Checkpoint | BarrierKind::Initial
1400    ));
1401    Ok(barrier)
1402}
1403
1404/// Expect the first message of the given `stream` as a barrier.
1405pub async fn expect_first_barrier_from_aligned_stream(
1406    stream: &mut (impl AlignedMessageStream + Unpin),
1407) -> StreamExecutorResult<Barrier> {
1408    let message = stream
1409        .next()
1410        .instrument_await("expect_first_barrier")
1411        .await
1412        .context("failed to extract the first message: stream closed unexpectedly")??;
1413    let barrier = message
1414        .into_barrier()
1415        .expect("the first message must be a barrier");
1416    Ok(barrier)
1417}
1418
1419/// `StreamConsumer` is the last step in an actor.
1420pub trait StreamConsumer: Send + 'static {
1421    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1422
1423    fn execute(self: Box<Self>) -> Self::BarrierStream;
1424}
1425
1426type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1427
1428/// A stream for merging messages from multiple upstreams.
1429/// Can dynamically add and delete upstream streams.
1430/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1431pub struct DynamicReceivers<InputId, M> {
1432    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1433    barrier: Option<BarrierInner<M>>,
1434    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1435    start_ts: Option<Instant>,
1436    /// The upstreams that're blocked by the `barrier`.
1437    blocked: Vec<BoxedMessageInput<InputId, M>>,
1438    /// The upstreams that're not blocked and can be polled.
1439    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1440    /// watermark column index -> `BufferedWatermarks`
1441    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1442    /// Currently only used for union.
1443    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1444    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1445    merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1446}
1447
1448impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1449    for DynamicReceivers<InputId, M>
1450{
1451    type Item = MessageStreamItemInner<M>;
1452
1453    fn poll_next(
1454        mut self: Pin<&mut Self>,
1455        cx: &mut std::task::Context<'_>,
1456    ) -> Poll<Option<Self::Item>> {
1457        if self.is_empty() {
1458            return Poll::Ready(None);
1459        }
1460
1461        loop {
1462            match futures::ready!(self.active.poll_next_unpin(cx)) {
1463                // Directly forward the error.
1464                Some((Some(Err(e)), _)) => {
1465                    return Poll::Ready(Some(Err(e)));
1466                }
1467                // Handle the message from some upstream.
1468                Some((Some(Ok(message)), remaining)) => {
1469                    let input_id = remaining.id();
1470                    match message {
1471                        MessageInner::Chunk(chunk) => {
1472                            // Continue polling this upstream by pushing it back to `active`.
1473                            self.active.push(remaining.into_future());
1474                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1475                        }
1476                        MessageInner::Watermark(watermark) => {
1477                            // Continue polling this upstream by pushing it back to `active`.
1478                            self.active.push(remaining.into_future());
1479                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1480                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1481                            }
1482                        }
1483                        MessageInner::Barrier(barrier) => {
1484                            // Block this upstream by pushing it to `blocked`.
1485                            if self.blocked.is_empty() {
1486                                self.start_ts = Some(Instant::now());
1487                            }
1488                            self.blocked.push(remaining);
1489                            if let Some(current_barrier) = self.barrier.as_ref() {
1490                                if current_barrier.epoch != barrier.epoch {
1491                                    return Poll::Ready(Some(Err(
1492                                        StreamExecutorError::align_barrier(
1493                                            current_barrier.clone().map_mutation(|_| None),
1494                                            barrier.map_mutation(|_| None),
1495                                        ),
1496                                    )));
1497                                }
1498                            } else {
1499                                self.barrier = Some(barrier);
1500                            }
1501                        }
1502                    }
1503                }
1504                // We use barrier as the control message of the stream. That is, we always stop the
1505                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1506                // termination.
1507                //
1508                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1509                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1510                // So this branch will never be reached in all cases.
1511                Some((None, remaining)) => {
1512                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1513                        "upstream input {:?} unexpectedly closed",
1514                        remaining.id()
1515                    )))));
1516                }
1517                // There's no active upstreams. Process the barrier and resume the blocked ones.
1518                None => {
1519                    assert!(!self.blocked.is_empty());
1520
1521                    let start_ts = self
1522                        .start_ts
1523                        .take()
1524                        .expect("should have received at least one barrier");
1525                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1526                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1527                    }
1528                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1529                        // Observe did a few atomic operation inside, we want to avoid the overhead.
1530                        merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1531                    }
1532
1533                    break;
1534                }
1535            }
1536        }
1537
1538        assert!(self.active.is_terminated());
1539
1540        let barrier = self.barrier.take().unwrap();
1541
1542        let upstreams = std::mem::take(&mut self.blocked);
1543        self.extend_active(upstreams);
1544        assert!(!self.active.is_terminated());
1545
1546        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1547    }
1548}
1549
1550impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1551    pub fn new(
1552        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1553        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1554        merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1555    ) -> Self {
1556        let mut this = Self {
1557            barrier: None,
1558            start_ts: None,
1559            blocked: Vec::with_capacity(upstreams.len()),
1560            active: Default::default(),
1561            buffered_watermarks: Default::default(),
1562            merge_barrier_align_duration,
1563            barrier_align_duration,
1564        };
1565        this.extend_active(upstreams);
1566        this
1567    }
1568
1569    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1570    /// clean state right after a barrier.
1571    pub fn extend_active(
1572        &mut self,
1573        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1574    ) {
1575        assert!(self.blocked.is_empty() && self.barrier.is_none());
1576
1577        self.active
1578            .extend(upstreams.into_iter().map(|s| s.into_future()));
1579    }
1580
1581    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1582    pub fn handle_watermark(
1583        &mut self,
1584        input_id: InputId,
1585        watermark: Watermark,
1586    ) -> Option<Watermark> {
1587        let col_idx = watermark.col_idx;
1588        // Insert a buffer watermarks when first received from a column.
1589        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1590        let watermarks = self
1591            .buffered_watermarks
1592            .entry(col_idx)
1593            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1594        watermarks.handle_watermark(input_id, watermark)
1595    }
1596
1597    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1598    /// right after a barrier.
1599    pub fn add_upstreams_from(
1600        &mut self,
1601        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1602    ) {
1603        assert!(self.blocked.is_empty() && self.barrier.is_none());
1604
1605        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1606        let input_ids = new_inputs.iter().map(|input| input.id());
1607        self.buffered_watermarks.values_mut().for_each(|buffers| {
1608            // Add buffers to the buffered watermarks for all cols
1609            buffers.add_buffers(input_ids.clone());
1610        });
1611        self.active
1612            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1613    }
1614
1615    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1616    /// clean state right after a barrier.
1617    /// The current container does not necessarily contain all the input ids passed in.
1618    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1619        assert!(self.blocked.is_empty() && self.barrier.is_none());
1620
1621        let new_upstreams = std::mem::take(&mut self.active)
1622            .into_iter()
1623            .map(|s| s.into_inner().unwrap())
1624            .filter(|u| !upstream_input_ids.contains(&u.id()));
1625        self.extend_active(new_upstreams);
1626        self.buffered_watermarks.values_mut().for_each(|buffers| {
1627            // Call `check_heap` in case the only upstream(s) that does not have
1628            // watermark in heap is removed
1629            buffers.remove_buffer(upstream_input_ids.clone());
1630        });
1631    }
1632
1633    pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1634        self.merge_barrier_align_duration.clone()
1635    }
1636
1637    pub fn flush_buffered_watermarks(&mut self) {
1638        self.buffered_watermarks
1639            .values_mut()
1640            .for_each(|buffers| buffers.clear());
1641    }
1642
1643    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1644        self.blocked
1645            .iter()
1646            .map(|s| s.id())
1647            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1648    }
1649
1650    pub fn is_empty(&self) -> bool {
1651        self.blocked.is_empty() && self.active.is_empty()
1652    }
1653}
1654
1655// Explanation of why we need `DispatchBarrierBuffer`:
1656//
1657// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1658// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1659// for these old upstreams to completely process their barriers and align before we can safely update the
1660// `upstream-input-set`.
1661//
1662// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1663// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1664// downstream_merge_update -> old_actor_processing]
1665//
1666// To address this, we split the application of a barrier's `Mutation` into two steps:
1667// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1668//    and cache it.
1669// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1670//
1671// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1672// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1673// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1674// `barrier_rx`.
1675pub(crate) struct DispatchBarrierBuffer {
1676    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1677    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1678    recv_state: BarrierReceiverState,
1679    curr_upstream_fragment_id: FragmentId,
1680    actor_id: ActorId,
1681    // read-only context for building new inputs
1682    build_input_ctx: Arc<BuildInputContext>,
1683}
1684
1685struct BuildInputContext {
1686    pub actor_id: ActorId,
1687    pub local_barrier_manager: LocalBarrierManager,
1688    pub metrics: Arc<StreamingMetrics>,
1689    pub fragment_id: FragmentId,
1690    pub actor_config: Arc<StreamingConfig>,
1691}
1692
1693type BoxedNewInputsFuture =
1694    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1695
1696enum BarrierReceiverState {
1697    ReceivingBarrier,
1698    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1699}
1700
1701impl DispatchBarrierBuffer {
1702    pub fn new(
1703        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1704        actor_id: ActorId,
1705        curr_upstream_fragment_id: FragmentId,
1706        local_barrier_manager: LocalBarrierManager,
1707        metrics: Arc<StreamingMetrics>,
1708        fragment_id: FragmentId,
1709        actor_config: Arc<StreamingConfig>,
1710    ) -> Self {
1711        Self {
1712            buffer: VecDeque::new(),
1713            barrier_rx,
1714            recv_state: BarrierReceiverState::ReceivingBarrier,
1715            curr_upstream_fragment_id,
1716            actor_id,
1717            build_input_ctx: Arc::new(BuildInputContext {
1718                actor_id,
1719                local_barrier_manager,
1720                metrics,
1721                fragment_id,
1722                actor_config,
1723            }),
1724        }
1725    }
1726
1727    pub async fn await_next_message(
1728        &mut self,
1729        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1730        metrics: &ActorInputMetrics,
1731    ) -> StreamExecutorResult<DispatcherMessage> {
1732        let mut start_time = Instant::now();
1733        let interval_duration = Duration::from_secs(15);
1734        let mut interval =
1735            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1736
1737        loop {
1738            tokio::select! {
1739                biased;
1740                msg = stream.try_next() => {
1741                    metrics
1742                        .actor_input_buffer_blocking_duration_ns
1743                        .inc_by(start_time.elapsed().as_nanos() as u64);
1744                    return msg?.ok_or_else(
1745                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1746                    );
1747                }
1748
1749                e = self.continuously_fetch_barrier_rx() => {
1750                    return Err(e);
1751                }
1752
1753                _ = interval.tick() => {
1754                    start_time = Instant::now();
1755                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1756                    continue;
1757                }
1758            }
1759        }
1760    }
1761
1762    pub async fn pop_barrier_with_inputs(
1763        &mut self,
1764        barrier: DispatcherBarrier,
1765    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1766        while self.buffer.is_empty() {
1767            self.try_fetch_barrier_rx(false).await?;
1768        }
1769        let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1770        assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1771
1772        Ok((recv_barrier, inputs))
1773    }
1774
1775    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1776        loop {
1777            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1778                return e;
1779            }
1780        }
1781    }
1782
1783    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1784        match &mut self.recv_state {
1785            BarrierReceiverState::ReceivingBarrier => {
1786                let Some(barrier) = self.barrier_rx.recv().await else {
1787                    if pending_on_end {
1788                        return pending().await;
1789                    } else {
1790                        return Err(StreamExecutorError::channel_closed(
1791                            "barrier channel closed unexpectedly",
1792                        ));
1793                    }
1794                };
1795                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1796                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1797                } else {
1798                    self.buffer.push_back((barrier, None));
1799                }
1800            }
1801            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1802                let new_inputs = fut.await?;
1803                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1804                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1805            }
1806        }
1807        Ok(())
1808    }
1809
1810    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1811        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1812            && !update.added_upstream_actors.is_empty()
1813        {
1814            // When update upstream fragment, added_actors will not be empty.
1815            let upstream_fragment_id =
1816                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1817                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1818                    new_upstream_fragment_id
1819                } else {
1820                    self.curr_upstream_fragment_id
1821                };
1822            let ctx = self.build_input_ctx.clone();
1823            let added_upstream_actors = update.added_upstream_actors.clone();
1824            let barrier = barrier.clone();
1825            let fut = async move {
1826                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1827                    let mut new_input = new_input(
1828                        &ctx.local_barrier_manager,
1829                        ctx.metrics.clone(),
1830                        ctx.actor_id,
1831                        ctx.fragment_id,
1832                        upstream_actor,
1833                        upstream_fragment_id,
1834                        ctx.actor_config.clone(),
1835                    )
1836                    .await?;
1837
1838                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1839                    // original upstreams.
1840                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1841                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1842
1843                    StreamExecutorResult::Ok(new_input)
1844                }))
1845                .await
1846            }
1847            .boxed();
1848
1849            Some(fut)
1850        } else {
1851            None
1852        }
1853    }
1854}