risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::fmt::Debug;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Poll;
23use std::vec;
24
25use await_tree::InstrumentAwait;
26use enum_as_inner::EnumAsInner;
27use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
28use futures::{Stream, StreamExt};
29use itertools::Itertools;
30use prometheus::Histogram;
31use prometheus::core::{AtomicU64, GenericCounter};
32use risingwave_common::array::StreamChunk;
33use risingwave_common::bitmap::Bitmap;
34use risingwave_common::catalog::{Field, Schema, TableId};
35use risingwave_common::metrics::LabelGuardedMetric;
36use risingwave_common::row::OwnedRow;
37use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
38use risingwave_common::util::epoch::{Epoch, EpochPair};
39use risingwave_common::util::tracing::TracingContext;
40use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
41use risingwave_connector::source::SplitImpl;
42use risingwave_expr::expr::{Expression, NonStrictExpression};
43use risingwave_pb::data::PbEpoch;
44use risingwave_pb::expr::PbInputRef;
45use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
46use risingwave_pb::stream_plan::barrier::BarrierKind;
47use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
48use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
49use risingwave_pb::stream_plan::stream_node::PbStreamKind;
50use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
51use risingwave_pb::stream_plan::{
52    BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
53    DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
54    PbDispatcher, PbSinkAddColumns, PbStopMutation, PbStreamMessageBatch, PbUpdateMutation,
55    PbWatermark, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
56    SubscriptionUpstreamInfo, ThrottleMutation,
57};
58use smallvec::SmallVec;
59use tokio::time::Instant;
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::BoxedInput;
63use crate::executor::watermark::BufferedWatermarks;
64use crate::task::{ActorId, FragmentId};
65
66mod actor;
67mod barrier_align;
68pub mod exchange;
69pub mod monitor;
70
71pub mod aggregate;
72pub mod asof_join;
73mod backfill;
74mod barrier_recv;
75mod batch_query;
76mod chain;
77mod changelog;
78mod dedup;
79mod dispatch;
80pub mod dml;
81mod dynamic_filter;
82pub mod eowc;
83pub mod error;
84mod expand;
85mod filter;
86pub mod hash_join;
87mod hop_window;
88mod join;
89mod lookup;
90mod lookup_union;
91mod merge;
92mod mview;
93mod nested_loop_temporal_join;
94mod no_op;
95mod now;
96mod over_window;
97pub mod project;
98mod rearranged_chain;
99mod receiver;
100pub mod row_id_gen;
101mod sink;
102pub mod source;
103mod stream_reader;
104pub mod subtask;
105mod temporal_join;
106mod top_n;
107mod troublemaker;
108mod union;
109mod upstream_sink_union;
110mod values;
111mod watermark;
112mod watermark_filter;
113mod wrapper;
114
115mod approx_percentile;
116
117mod row_merge;
118
119#[cfg(test)]
120mod integration_tests;
121mod sync_kv_log_store;
122#[cfg(any(test, feature = "test"))]
123pub mod test_utils;
124mod utils;
125mod vector_index;
126
127pub use actor::{Actor, ActorContext, ActorContextRef};
128use anyhow::Context;
129pub use approx_percentile::global::GlobalApproxPercentileExecutor;
130pub use approx_percentile::local::LocalApproxPercentileExecutor;
131pub use backfill::arrangement_backfill::*;
132pub use backfill::cdc::{
133    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
134};
135pub use backfill::no_shuffle_backfill::*;
136pub use backfill::snapshot_backfill::*;
137pub use barrier_recv::BarrierRecvExecutor;
138pub use batch_query::BatchQueryExecutor;
139pub use chain::ChainExecutor;
140pub use changelog::ChangeLogExecutor;
141pub use dedup::AppendOnlyDedupExecutor;
142pub use dispatch::{DispatchExecutor, DispatcherImpl};
143pub use dynamic_filter::DynamicFilterExecutor;
144pub use error::{StreamExecutorError, StreamExecutorResult};
145pub use expand::ExpandExecutor;
146pub use filter::FilterExecutor;
147pub use hash_join::*;
148pub use hop_window::HopWindowExecutor;
149pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
150pub use join::{AsOfDesc, AsOfJoinType, JoinType};
151pub use lookup::*;
152pub use lookup_union::LookupUnionExecutor;
153pub use merge::MergeExecutor;
154pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
155pub use mview::*;
156pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
157pub use no_op::NoOpExecutor;
158pub use now::*;
159pub use over_window::*;
160pub use rearranged_chain::RearrangedChainExecutor;
161pub use receiver::ReceiverExecutor;
162use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
163pub use row_merge::RowMergeExecutor;
164pub use sink::SinkExecutor;
165pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
166pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
167pub use temporal_join::TemporalJoinExecutor;
168pub use top_n::{
169    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
170};
171pub use troublemaker::TroublemakerExecutor;
172pub use union::UnionExecutor;
173pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
174pub use utils::DummyExecutor;
175pub use values::ValuesExecutor;
176pub use vector_index::VectorIndexWriteExecutor;
177pub use watermark_filter::WatermarkFilterExecutor;
178pub use wrapper::WrapperExecutor;
179
180use self::barrier_align::AlignedMessageStream;
181
182pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
183pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
184pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
185pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
186
187pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
188use risingwave_connector::sink::catalog::SinkId;
189use risingwave_connector::source::cdc::{
190    CdcTableSnapshotSplitAssignmentWithGeneration,
191    build_actor_cdc_table_snapshot_splits_with_generation,
192    build_pb_actor_cdc_table_snapshot_splits_with_generation,
193};
194use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
195use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
196
197pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
198pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
199pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
200
201/// Static information of an executor.
202#[derive(Debug, Default, Clone)]
203pub struct ExecutorInfo {
204    /// The schema of the OUTPUT of the executor.
205    pub schema: Schema,
206
207    /// The primary key indices of the OUTPUT of the executor.
208    /// Schema is used by both OLAP and streaming, therefore
209    /// pk indices are maintained independently.
210    pub pk_indices: PkIndices,
211
212    /// The stream kind of the OUTPUT of the executor.
213    pub stream_kind: PbStreamKind,
214
215    /// Identity of the executor.
216    pub identity: String,
217
218    /// The executor id of the executor.
219    pub id: u64,
220}
221
222impl ExecutorInfo {
223    pub fn for_test(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
224        Self {
225            schema,
226            pk_indices,
227            stream_kind: PbStreamKind::Retract, // dummy value for test
228            identity,
229            id,
230        }
231    }
232}
233
234/// [`Execute`] describes the methods an executor should implement to handle control messages.
235pub trait Execute: Send + 'static {
236    fn execute(self: Box<Self>) -> BoxedMessageStream;
237
238    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
239        self.execute()
240    }
241
242    fn boxed(self) -> Box<dyn Execute>
243    where
244        Self: Sized + Send + 'static,
245    {
246        Box::new(self)
247    }
248}
249
250/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
251/// handle messages ([`Execute`]).
252pub struct Executor {
253    info: ExecutorInfo,
254    execute: Box<dyn Execute>,
255}
256
257impl Executor {
258    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
259        Self { info, execute }
260    }
261
262    pub fn info(&self) -> &ExecutorInfo {
263        &self.info
264    }
265
266    pub fn schema(&self) -> &Schema {
267        &self.info.schema
268    }
269
270    pub fn pk_indices(&self) -> PkIndicesRef<'_> {
271        &self.info.pk_indices
272    }
273
274    pub fn stream_kind(&self) -> PbStreamKind {
275        self.info.stream_kind
276    }
277
278    pub fn identity(&self) -> &str {
279        &self.info.identity
280    }
281
282    pub fn execute(self) -> BoxedMessageStream {
283        self.execute.execute()
284    }
285
286    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
287        self.execute.execute_with_epoch(epoch)
288    }
289}
290
291impl std::fmt::Debug for Executor {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        f.write_str(self.identity())
294    }
295}
296
297impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
298    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
299        Self::new(info, execute)
300    }
301}
302
303impl<E> From<(ExecutorInfo, E)> for Executor
304where
305    E: Execute,
306{
307    fn from((info, execute): (ExecutorInfo, E)) -> Self {
308        Self::new(info, execute.boxed())
309    }
310}
311
312pub const INVALID_EPOCH: u64 = 0;
313
314type UpstreamFragmentId = FragmentId;
315type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
316
317#[derive(Debug, Clone, PartialEq)]
318#[cfg_attr(any(test, feature = "test"), derive(Default))]
319pub struct UpdateMutation {
320    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
321    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
322    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
323    pub dropped_actors: HashSet<ActorId>,
324    pub actor_splits: SplitAssignments,
325    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
326    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
327    pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
328}
329
330#[derive(Debug, Clone, PartialEq)]
331#[cfg_attr(any(test, feature = "test"), derive(Default))]
332pub struct AddMutation {
333    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
334    pub added_actors: HashSet<ActorId>,
335    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
336    pub splits: SplitAssignments,
337    pub pause: bool,
338    /// (`upstream_mv_table_id`,  `subscriber_id`)
339    pub subscriptions_to_add: Vec<(TableId, u32)>,
340    /// nodes which should start backfill
341    pub backfill_nodes_to_pause: HashSet<FragmentId>,
342    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
343    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
344}
345
346#[derive(Debug, Clone, PartialEq)]
347#[cfg_attr(any(test, feature = "test"), derive(Default))]
348pub struct StopMutation {
349    pub dropped_actors: HashSet<ActorId>,
350    pub dropped_sink_fragments: HashSet<FragmentId>,
351}
352
353/// See [`PbMutation`] for the semantics of each mutation.
354#[derive(Debug, Clone, PartialEq)]
355pub enum Mutation {
356    Stop(StopMutation),
357    Update(UpdateMutation),
358    Add(AddMutation),
359    SourceChangeSplit(SplitAssignments),
360    Pause,
361    Resume,
362    Throttle(HashMap<ActorId, Option<u32>>),
363    AddAndUpdate(AddMutation, UpdateMutation),
364    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
365    DropSubscriptions {
366        /// `subscriber` -> `upstream_mv_table_id`
367        subscriptions_to_drop: Vec<(u32, TableId)>,
368    },
369    StartFragmentBackfill {
370        fragment_ids: HashSet<FragmentId>,
371    },
372    RefreshStart {
373        table_id: TableId,
374        associated_source_id: TableId,
375    },
376    /// This mutation is generated by the source executor
377    LoadFinish {
378        associated_source_id: TableId,
379    },
380}
381
382/// The generic type `M` is the mutation type of the barrier.
383///
384/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
385/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
386#[derive(Debug, Clone)]
387pub struct BarrierInner<M> {
388    pub epoch: EpochPair,
389    pub mutation: M,
390    pub kind: BarrierKind,
391
392    /// Tracing context for the **current** epoch of this barrier.
393    pub tracing_context: TracingContext,
394
395    /// The actors that this barrier has passed locally. Used for debugging only.
396    pub passed_actors: Vec<ActorId>,
397}
398
399pub type BarrierMutationType = Option<Arc<Mutation>>;
400pub type Barrier = BarrierInner<BarrierMutationType>;
401pub type DispatcherBarrier = BarrierInner<()>;
402
403impl<M: Default> BarrierInner<M> {
404    /// Create a plain barrier.
405    pub fn new_test_barrier(epoch: u64) -> Self {
406        Self {
407            epoch: EpochPair::new_test_epoch(epoch),
408            kind: BarrierKind::Checkpoint,
409            tracing_context: TracingContext::none(),
410            mutation: Default::default(),
411            passed_actors: Default::default(),
412        }
413    }
414
415    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
416        Self {
417            epoch: EpochPair::new(epoch, prev_epoch),
418            kind: BarrierKind::Checkpoint,
419            tracing_context: TracingContext::none(),
420            mutation: Default::default(),
421            passed_actors: Default::default(),
422        }
423    }
424}
425
426impl Barrier {
427    pub fn into_dispatcher(self) -> DispatcherBarrier {
428        DispatcherBarrier {
429            epoch: self.epoch,
430            mutation: (),
431            kind: self.kind,
432            tracing_context: self.tracing_context,
433            passed_actors: self.passed_actors,
434        }
435    }
436
437    #[must_use]
438    pub fn with_mutation(self, mutation: Mutation) -> Self {
439        Self {
440            mutation: Some(Arc::new(mutation)),
441            ..self
442        }
443    }
444
445    #[must_use]
446    pub fn with_stop(self) -> Self {
447        self.with_mutation(Mutation::Stop(StopMutation {
448            dropped_actors: Default::default(),
449            dropped_sink_fragments: Default::default(),
450        }))
451    }
452
453    /// Whether this barrier carries stop mutation.
454    pub fn is_with_stop_mutation(&self) -> bool {
455        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
456    }
457
458    /// Whether this barrier is to stop the actor with `actor_id`.
459    pub fn is_stop(&self, actor_id: ActorId) -> bool {
460        self.all_stop_actors()
461            .is_some_and(|actors| actors.contains(&actor_id))
462    }
463
464    pub fn is_checkpoint(&self) -> bool {
465        self.kind == BarrierKind::Checkpoint
466    }
467
468    /// Get the initial split assignments for the actor with `actor_id`.
469    ///
470    /// This should only be called on the initial barrier received by the executor. It must be
471    ///
472    /// - `Add` mutation when it's a new streaming job, or recovery.
473    /// - `Update` mutation when it's created for scaling.
474    /// - `AddAndUpdate` mutation when it's created for sink-into-table.
475    ///
476    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
477    /// of existing executors.
478    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
479        match self.mutation.as_deref()? {
480            Mutation::Update(UpdateMutation { actor_splits, .. })
481            | Mutation::Add(AddMutation {
482                splits: actor_splits,
483                ..
484            }) => actor_splits.get(&actor_id),
485
486            Mutation::AddAndUpdate(
487                AddMutation {
488                    splits: add_actor_splits,
489                    ..
490                },
491                UpdateMutation {
492                    actor_splits: update_actor_splits,
493                    ..
494                },
495            ) => add_actor_splits
496                .get(&actor_id)
497                // `Add` and `Update` should apply to different fragments, so we don't need to merge them.
498                .or_else(|| update_actor_splits.get(&actor_id)),
499
500            _ => {
501                if cfg!(debug_assertions) {
502                    panic!(
503                        "the initial mutation of the barrier should not be {:?}",
504                        self.mutation
505                    );
506                }
507                None
508            }
509        }
510        .map(|s| s.as_slice())
511    }
512
513    /// Get all actors that to be stopped (dropped) by this barrier.
514    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
515        match self.mutation.as_deref() {
516            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
517            Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
518            | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
519                Some(dropped_actors)
520            }
521            _ => None,
522        }
523    }
524
525    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
526    /// `Values` to decide whether to output the existing (historical) data.
527    ///
528    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
529    /// added for scaling are not included.
530    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
531        match self.mutation.as_deref() {
532            Some(Mutation::Add(AddMutation { added_actors, .. }))
533            | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
534                added_actors.contains(&actor_id)
535            }
536            _ => false,
537        }
538    }
539
540    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
541        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
542            fragment_ids.contains(&fragment_id)
543        } else {
544            false
545        }
546    }
547
548    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
549    ///
550    /// # Use case
551    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
552    /// * Pause a standalone shared `SourceExecutor`.
553    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
554    ///
555    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
556    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
557    /// needs to be turned off.
558    ///
559    /// ## Some special cases not included
560    ///
561    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
562    /// care about **number of downstream fragments** (more precisely, existence).
563    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
564    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
565    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
566        let Some(mutation) = self.mutation.as_deref() else {
567            return false;
568        };
569        match mutation {
570            // Add is for mv, index and sink creation.
571            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
572            // AddAndUpdate is for sink-into-table.
573            Mutation::AddAndUpdate(
574                AddMutation { adds, .. },
575                UpdateMutation {
576                    dispatchers,
577                    actor_new_dispatchers,
578                    ..
579                },
580            ) => {
581                adds.get(&upstream_actor_id).is_some()
582                    || actor_new_dispatchers.get(&upstream_actor_id).is_some()
583                    || dispatchers.get(&upstream_actor_id).is_some()
584            }
585            Mutation::Update(_)
586            | Mutation::Stop(_)
587            | Mutation::Pause
588            | Mutation::Resume
589            | Mutation::SourceChangeSplit(_)
590            | Mutation::Throttle(_)
591            | Mutation::DropSubscriptions { .. }
592            | Mutation::ConnectorPropsChange(_)
593            | Mutation::StartFragmentBackfill { .. }
594            | Mutation::RefreshStart { .. }
595            | Mutation::LoadFinish { .. } => false,
596        }
597    }
598
599    /// Whether this barrier requires the executor to pause its data stream on startup.
600    pub fn is_pause_on_startup(&self) -> bool {
601        match self.mutation.as_deref() {
602            Some(Mutation::Add(AddMutation { pause, .. }))
603            | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
604            _ => false,
605        }
606    }
607
608    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
609        match self.mutation.as_deref() {
610            Some(Mutation::Add(AddMutation {
611                backfill_nodes_to_pause,
612                ..
613            }))
614            | Some(Mutation::AddAndUpdate(
615                AddMutation {
616                    backfill_nodes_to_pause,
617                    ..
618                },
619                _,
620            )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
621            _ => {
622                tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
623                true
624            }
625        }
626    }
627
628    /// Whether this barrier is for resume.
629    pub fn is_resume(&self) -> bool {
630        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
631    }
632
633    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
634    /// with `actor_id`.
635    pub fn as_update_merge(
636        &self,
637        actor_id: ActorId,
638        upstream_fragment_id: UpstreamFragmentId,
639    ) -> Option<&MergeUpdate> {
640        self.mutation
641            .as_deref()
642            .and_then(|mutation| match mutation {
643                Mutation::Update(UpdateMutation { merges, .. })
644                | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
645                    merges.get(&(actor_id, upstream_fragment_id))
646                }
647                _ => None,
648            })
649    }
650
651    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
652    /// the specified downstream fragment.
653    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
654        self.mutation
655            .as_deref()
656            .and_then(|mutation| match mutation {
657                Mutation::Add(AddMutation {
658                    new_upstream_sinks, ..
659                }) => new_upstream_sinks.get(&fragment_id),
660                _ => None,
661            })
662    }
663
664    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
665    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
666        self.mutation
667            .as_deref()
668            .and_then(|mutation| match mutation {
669                Mutation::Stop(StopMutation {
670                    dropped_sink_fragments,
671                    ..
672                }) => Some(dropped_sink_fragments),
673                _ => None,
674            })
675    }
676
677    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
678    /// with `actor_id`.
679    ///
680    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
681    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
682    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
683        self.mutation
684            .as_deref()
685            .and_then(|mutation| match mutation {
686                Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
687                | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
688                    vnode_bitmaps.get(&actor_id).cloned()
689                }
690                _ => None,
691            })
692    }
693
694    pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
695        self.mutation
696            .as_deref()
697            .and_then(|mutation| match mutation {
698                Mutation::Update(UpdateMutation {
699                    sink_add_columns, ..
700                })
701                | Mutation::AddAndUpdate(
702                    _,
703                    UpdateMutation {
704                        sink_add_columns, ..
705                    },
706                ) => sink_add_columns.get(&sink_id).cloned(),
707                _ => None,
708            })
709    }
710
711    pub fn get_curr_epoch(&self) -> Epoch {
712        Epoch(self.epoch.curr)
713    }
714
715    /// Retrieve the tracing context for the **current** epoch of this barrier.
716    pub fn tracing_context(&self) -> &TracingContext {
717        &self.tracing_context
718    }
719
720    pub fn added_subscriber_on_mv_table(
721        &self,
722        mv_table_id: TableId,
723    ) -> impl Iterator<Item = u32> + '_ {
724        if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
725            self.mutation.as_deref()
726        {
727            Some(add)
728        } else {
729            None
730        }
731        .into_iter()
732        .flat_map(move |add| {
733            add.subscriptions_to_add.iter().filter_map(
734                move |(upstream_mv_table_id, subscriber_id)| {
735                    if *upstream_mv_table_id == mv_table_id {
736                        Some(*subscriber_id)
737                    } else {
738                        None
739                    }
740                },
741            )
742        })
743    }
744}
745
746impl<M: PartialEq> PartialEq for BarrierInner<M> {
747    fn eq(&self, other: &Self) -> bool {
748        self.epoch == other.epoch && self.mutation == other.mutation
749    }
750}
751
752impl Mutation {
753    /// Return true if the mutation is stop.
754    ///
755    /// Note that this does not mean we will stop the current actor.
756    #[cfg(test)]
757    pub fn is_stop(&self) -> bool {
758        matches!(self, Mutation::Stop(_))
759    }
760
761    fn to_protobuf(&self) -> PbMutation {
762        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
763            actor_splits
764                .iter()
765                .map(|(&actor_id, splits)| {
766                    (
767                        actor_id,
768                        ConnectorSplits {
769                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
770                        },
771                    )
772                })
773                .collect::<HashMap<_, _>>()
774        };
775
776        match self {
777            Mutation::Stop(StopMutation {
778                dropped_actors,
779                dropped_sink_fragments,
780            }) => PbMutation::Stop(PbStopMutation {
781                actors: dropped_actors.iter().copied().collect(),
782                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
783            }),
784            Mutation::Update(UpdateMutation {
785                dispatchers,
786                merges,
787                vnode_bitmaps,
788                dropped_actors,
789                actor_splits,
790                actor_new_dispatchers,
791                actor_cdc_table_snapshot_splits,
792                sink_add_columns,
793            }) => PbMutation::Update(PbUpdateMutation {
794                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
795                merge_update: merges.values().cloned().collect(),
796                actor_vnode_bitmap_update: vnode_bitmaps
797                    .iter()
798                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
799                    .collect(),
800                dropped_actors: dropped_actors.iter().cloned().collect(),
801                actor_splits: actor_splits_to_protobuf(actor_splits),
802                actor_new_dispatchers: actor_new_dispatchers
803                    .iter()
804                    .map(|(&actor_id, dispatchers)| {
805                        (
806                            actor_id,
807                            Dispatchers {
808                                dispatchers: dispatchers.clone(),
809                            },
810                        )
811                    })
812                    .collect(),
813                actor_cdc_table_snapshot_splits:
814                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
815                        actor_cdc_table_snapshot_splits.clone(),
816                    )
817                    .into(),
818                sink_add_columns: sink_add_columns
819                    .iter()
820                    .map(|(sink_id, add_columns)| {
821                        (
822                            sink_id.sink_id,
823                            PbSinkAddColumns {
824                                fields: add_columns.iter().map(|field| field.to_prost()).collect(),
825                            },
826                        )
827                    })
828                    .collect(),
829            }),
830            Mutation::Add(AddMutation {
831                adds,
832                added_actors,
833                splits,
834                pause,
835                subscriptions_to_add,
836                backfill_nodes_to_pause,
837                actor_cdc_table_snapshot_splits,
838                new_upstream_sinks,
839            }) => PbMutation::Add(PbAddMutation {
840                actor_dispatchers: adds
841                    .iter()
842                    .map(|(&actor_id, dispatchers)| {
843                        (
844                            actor_id,
845                            Dispatchers {
846                                dispatchers: dispatchers.clone(),
847                            },
848                        )
849                    })
850                    .collect(),
851                added_actors: added_actors.iter().copied().collect(),
852                actor_splits: actor_splits_to_protobuf(splits),
853                pause: *pause,
854                subscriptions_to_add: subscriptions_to_add
855                    .iter()
856                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
857                        subscriber_id: *subscriber_id,
858                        upstream_mv_table_id: table_id.table_id,
859                    })
860                    .collect(),
861                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
862                actor_cdc_table_snapshot_splits:
863                    build_pb_actor_cdc_table_snapshot_splits_with_generation(
864                        actor_cdc_table_snapshot_splits.clone(),
865                    )
866                    .into(),
867                new_upstream_sinks: new_upstream_sinks
868                    .iter()
869                    .map(|(k, v)| (*k, v.clone()))
870                    .collect(),
871            }),
872            Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
873                actor_splits: changes
874                    .iter()
875                    .map(|(&actor_id, splits)| {
876                        (
877                            actor_id,
878                            ConnectorSplits {
879                                splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
880                            },
881                        )
882                    })
883                    .collect(),
884            }),
885            Mutation::Pause => PbMutation::Pause(PauseMutation {}),
886            Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
887            Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
888                actor_throttle: changes
889                    .iter()
890                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
891                    .collect(),
892            }),
893
894            Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
895                mutations: vec![
896                    BarrierMutation {
897                        mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
898                    },
899                    BarrierMutation {
900                        mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
901                    },
902                ],
903            }),
904            Mutation::DropSubscriptions {
905                subscriptions_to_drop,
906            } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
907                info: subscriptions_to_drop
908                    .iter()
909                    .map(
910                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
911                            subscriber_id: *subscriber_id,
912                            upstream_mv_table_id: upstream_mv_table_id.table_id,
913                        },
914                    )
915                    .collect(),
916            }),
917            Mutation::ConnectorPropsChange(map) => {
918                PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
919                    connector_props_infos: map
920                        .iter()
921                        .map(|(actor_id, options)| {
922                            (
923                                *actor_id,
924                                ConnectorPropsInfo {
925                                    connector_props_info: options
926                                        .iter()
927                                        .map(|(k, v)| (k.clone(), v.clone()))
928                                        .collect(),
929                                },
930                            )
931                        })
932                        .collect(),
933                })
934            }
935            Mutation::StartFragmentBackfill { fragment_ids } => {
936                PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
937                    fragment_ids: fragment_ids.iter().copied().collect(),
938                })
939            }
940            Mutation::RefreshStart {
941                table_id,
942                associated_source_id,
943            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
944                table_id: table_id.table_id,
945                associated_source_id: associated_source_id.table_id,
946            }),
947            Mutation::LoadFinish {
948                associated_source_id,
949            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
950                associated_source_id: associated_source_id.table_id,
951            }),
952        }
953    }
954
955    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
956        let mutation = match prost {
957            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
958                dropped_actors: stop.actors.iter().copied().collect(),
959                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
960            }),
961
962            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
963                dispatchers: update
964                    .dispatcher_update
965                    .iter()
966                    .map(|u| (u.actor_id, u.clone()))
967                    .into_group_map(),
968                merges: update
969                    .merge_update
970                    .iter()
971                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
972                    .collect(),
973                vnode_bitmaps: update
974                    .actor_vnode_bitmap_update
975                    .iter()
976                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
977                    .collect(),
978                dropped_actors: update.dropped_actors.iter().cloned().collect(),
979                actor_splits: update
980                    .actor_splits
981                    .iter()
982                    .map(|(&actor_id, splits)| {
983                        (
984                            actor_id,
985                            splits
986                                .splits
987                                .iter()
988                                .map(|split| split.try_into().unwrap())
989                                .collect(),
990                        )
991                    })
992                    .collect(),
993                actor_new_dispatchers: update
994                    .actor_new_dispatchers
995                    .iter()
996                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
997                    .collect(),
998                actor_cdc_table_snapshot_splits:
999                    build_actor_cdc_table_snapshot_splits_with_generation(
1000                        update
1001                            .actor_cdc_table_snapshot_splits
1002                            .clone()
1003                            .unwrap_or_default(),
1004                    ),
1005                sink_add_columns: update
1006                    .sink_add_columns
1007                    .iter()
1008                    .map(|(sink_id, add_columns)| {
1009                        (
1010                            SinkId::new(*sink_id),
1011                            add_columns.fields.iter().map(Field::from_prost).collect(),
1012                        )
1013                    })
1014                    .collect(),
1015            }),
1016
1017            PbMutation::Add(add) => Mutation::Add(AddMutation {
1018                adds: add
1019                    .actor_dispatchers
1020                    .iter()
1021                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1022                    .collect(),
1023                added_actors: add.added_actors.iter().copied().collect(),
1024                // TODO: remove this and use `SourceChangesSplit` after we support multiple
1025                // mutations.
1026                splits: add
1027                    .actor_splits
1028                    .iter()
1029                    .map(|(&actor_id, splits)| {
1030                        (
1031                            actor_id,
1032                            splits
1033                                .splits
1034                                .iter()
1035                                .map(|split| split.try_into().unwrap())
1036                                .collect(),
1037                        )
1038                    })
1039                    .collect(),
1040                pause: add.pause,
1041                subscriptions_to_add: add
1042                    .subscriptions_to_add
1043                    .iter()
1044                    .map(
1045                        |SubscriptionUpstreamInfo {
1046                             subscriber_id,
1047                             upstream_mv_table_id,
1048                         }| {
1049                            (TableId::new(*upstream_mv_table_id), *subscriber_id)
1050                        },
1051                    )
1052                    .collect(),
1053                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1054                actor_cdc_table_snapshot_splits:
1055                    build_actor_cdc_table_snapshot_splits_with_generation(
1056                        add.actor_cdc_table_snapshot_splits
1057                            .clone()
1058                            .unwrap_or_default(),
1059                    ),
1060                new_upstream_sinks: add
1061                    .new_upstream_sinks
1062                    .iter()
1063                    .map(|(k, v)| (*k, v.clone()))
1064                    .collect(),
1065            }),
1066
1067            PbMutation::Splits(s) => {
1068                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1069                    Vec::with_capacity(s.actor_splits.len());
1070                for (&actor_id, splits) in &s.actor_splits {
1071                    if !splits.splits.is_empty() {
1072                        change_splits.push((
1073                            actor_id,
1074                            splits
1075                                .splits
1076                                .iter()
1077                                .map(SplitImpl::try_from)
1078                                .try_collect()?,
1079                        ));
1080                    }
1081                }
1082                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1083            }
1084            PbMutation::Pause(_) => Mutation::Pause,
1085            PbMutation::Resume(_) => Mutation::Resume,
1086            PbMutation::Throttle(changes) => Mutation::Throttle(
1087                changes
1088                    .actor_throttle
1089                    .iter()
1090                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1091                    .collect(),
1092            ),
1093            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1094                subscriptions_to_drop: drop
1095                    .info
1096                    .iter()
1097                    .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
1098                    .collect(),
1099            },
1100            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1101                Mutation::ConnectorPropsChange(
1102                    alter_connector_props
1103                        .connector_props_infos
1104                        .iter()
1105                        .map(|(actor_id, options)| {
1106                            (
1107                                *actor_id,
1108                                options
1109                                    .connector_props_info
1110                                    .iter()
1111                                    .map(|(k, v)| (k.clone(), v.clone()))
1112                                    .collect(),
1113                            )
1114                        })
1115                        .collect(),
1116                )
1117            }
1118            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1119                Mutation::StartFragmentBackfill {
1120                    fragment_ids: start_fragment_backfill
1121                        .fragment_ids
1122                        .iter()
1123                        .copied()
1124                        .collect(),
1125                }
1126            }
1127            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1128                table_id: TableId::new(refresh_start.table_id),
1129                associated_source_id: TableId::new(refresh_start.associated_source_id),
1130            },
1131            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1132                associated_source_id: TableId::new(load_finish.associated_source_id),
1133            },
1134            PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
1135                [
1136                    BarrierMutation {
1137                        mutation: Some(add),
1138                    },
1139                    BarrierMutation {
1140                        mutation: Some(update),
1141                    },
1142                ] => {
1143                    let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
1144                        unreachable!();
1145                    };
1146
1147                    let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
1148                        unreachable!();
1149                    };
1150
1151                    Mutation::AddAndUpdate(add_mutation, update_mutation)
1152                }
1153
1154                _ => unreachable!(),
1155            },
1156        };
1157        Ok(mutation)
1158    }
1159}
1160
1161impl<M> BarrierInner<M> {
1162    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1163        let Self {
1164            epoch,
1165            mutation,
1166            kind,
1167            passed_actors,
1168            tracing_context,
1169            ..
1170        } = self;
1171
1172        PbBarrier {
1173            epoch: Some(PbEpoch {
1174                curr: epoch.curr,
1175                prev: epoch.prev,
1176            }),
1177            mutation: Some(PbBarrierMutation {
1178                mutation: barrier_fn(mutation),
1179            }),
1180            tracing_context: tracing_context.to_protobuf(),
1181            kind: *kind as _,
1182            passed_actors: passed_actors.clone(),
1183        }
1184    }
1185
1186    fn from_protobuf_inner(
1187        prost: &PbBarrier,
1188        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1189    ) -> StreamExecutorResult<Self> {
1190        let epoch = prost.get_epoch()?;
1191
1192        Ok(Self {
1193            kind: prost.kind(),
1194            epoch: EpochPair::new(epoch.curr, epoch.prev),
1195            mutation: mutation_from_pb(
1196                prost
1197                    .mutation
1198                    .as_ref()
1199                    .and_then(|mutation| mutation.mutation.as_ref()),
1200            )?,
1201            passed_actors: prost.get_passed_actors().clone(),
1202            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1203        })
1204    }
1205
1206    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1207        BarrierInner {
1208            epoch: self.epoch,
1209            mutation: f(self.mutation),
1210            kind: self.kind,
1211            tracing_context: self.tracing_context,
1212            passed_actors: self.passed_actors,
1213        }
1214    }
1215}
1216
1217impl DispatcherBarrier {
1218    pub fn to_protobuf(&self) -> PbBarrier {
1219        self.to_protobuf_inner(|_| None)
1220    }
1221}
1222
1223impl Barrier {
1224    pub fn to_protobuf(&self) -> PbBarrier {
1225        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1226    }
1227
1228    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1229        Self::from_protobuf_inner(prost, |mutation| {
1230            mutation
1231                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1232                .transpose()
1233        })
1234    }
1235}
1236
1237#[derive(Debug, PartialEq, Eq, Clone)]
1238pub struct Watermark {
1239    pub col_idx: usize,
1240    pub data_type: DataType,
1241    pub val: ScalarImpl,
1242}
1243
1244impl PartialOrd for Watermark {
1245    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1246        Some(self.cmp(other))
1247    }
1248}
1249
1250impl Ord for Watermark {
1251    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1252        self.val.default_cmp(&other.val)
1253    }
1254}
1255
1256impl Watermark {
1257    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1258        Self {
1259            col_idx,
1260            data_type,
1261            val,
1262        }
1263    }
1264
1265    pub async fn transform_with_expr(
1266        self,
1267        expr: &NonStrictExpression<impl Expression>,
1268        new_col_idx: usize,
1269    ) -> Option<Self> {
1270        let Self { col_idx, val, .. } = self;
1271        let row = {
1272            let mut row = vec![None; col_idx + 1];
1273            row[col_idx] = Some(val);
1274            OwnedRow::new(row)
1275        };
1276        let val = expr.eval_row_infallible(&row).await?;
1277        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1278    }
1279
1280    /// Transform the watermark with the given output indices. If this watermark is not in the
1281    /// output, return `None`.
1282    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1283        output_indices
1284            .iter()
1285            .position(|p| *p == self.col_idx)
1286            .map(|new_col_idx| self.with_idx(new_col_idx))
1287    }
1288
1289    pub fn to_protobuf(&self) -> PbWatermark {
1290        PbWatermark {
1291            column: Some(PbInputRef {
1292                index: self.col_idx as _,
1293                r#type: Some(self.data_type.to_protobuf()),
1294            }),
1295            val: Some(&self.val).to_protobuf().into(),
1296        }
1297    }
1298
1299    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1300        let col_ref = prost.get_column()?;
1301        let data_type = DataType::from(col_ref.get_type()?);
1302        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1303            .expect("watermark value cannot be null");
1304        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1305    }
1306
1307    pub fn with_idx(self, idx: usize) -> Self {
1308        Self::new(idx, self.data_type, self.val)
1309    }
1310}
1311
1312#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1313pub enum MessageInner<M> {
1314    Chunk(StreamChunk),
1315    Barrier(BarrierInner<M>),
1316    Watermark(Watermark),
1317}
1318
1319impl<M> MessageInner<M> {
1320    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1321        match self {
1322            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1323            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1324            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1325        }
1326    }
1327}
1328
1329pub type Message = MessageInner<BarrierMutationType>;
1330pub type DispatcherMessage = MessageInner<()>;
1331
1332/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1333/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1334#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1335pub enum MessageBatchInner<M> {
1336    Chunk(StreamChunk),
1337    BarrierBatch(Vec<BarrierInner<M>>),
1338    Watermark(Watermark),
1339}
1340pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1341pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1342pub type DispatcherMessageBatch = MessageBatchInner<()>;
1343
1344impl From<DispatcherMessage> for DispatcherMessageBatch {
1345    fn from(m: DispatcherMessage) -> Self {
1346        match m {
1347            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1348            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1349            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1350        }
1351    }
1352}
1353
1354impl From<StreamChunk> for Message {
1355    fn from(chunk: StreamChunk) -> Self {
1356        Message::Chunk(chunk)
1357    }
1358}
1359
1360impl<'a> TryFrom<&'a Message> for &'a Barrier {
1361    type Error = ();
1362
1363    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1364        match m {
1365            Message::Chunk(_) => Err(()),
1366            Message::Barrier(b) => Ok(b),
1367            Message::Watermark(_) => Err(()),
1368        }
1369    }
1370}
1371
1372impl Message {
1373    /// Return true if the message is a stop barrier, meaning the stream
1374    /// will not continue, false otherwise.
1375    ///
1376    /// Note that this does not mean we will stop the current actor.
1377    #[cfg(test)]
1378    pub fn is_stop(&self) -> bool {
1379        matches!(
1380            self,
1381            Message::Barrier(Barrier {
1382                mutation,
1383                ..
1384            }) if mutation.as_ref().unwrap().is_stop()
1385        )
1386    }
1387}
1388
1389impl DispatcherMessageBatch {
1390    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1391        let prost = match self {
1392            Self::Chunk(stream_chunk) => {
1393                let prost_stream_chunk = stream_chunk.to_protobuf();
1394                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1395            }
1396            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1397                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1398            }),
1399            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1400        };
1401        PbStreamMessageBatch {
1402            stream_message_batch: Some(prost),
1403        }
1404    }
1405
1406    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1407        let res = match prost.get_stream_message_batch()? {
1408            StreamMessageBatch::StreamChunk(chunk) => {
1409                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1410            }
1411            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1412                let barriers = barrier_batch
1413                    .barriers
1414                    .iter()
1415                    .map(|barrier| {
1416                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1417                            if mutation.is_some() {
1418                                if cfg!(debug_assertions) {
1419                                    panic!("should not receive message of barrier with mutation");
1420                                } else {
1421                                    warn!(?barrier, "receive message of barrier with mutation");
1422                                }
1423                            }
1424                            Ok(())
1425                        })
1426                    })
1427                    .try_collect()?;
1428                Self::BarrierBatch(barriers)
1429            }
1430            StreamMessageBatch::Watermark(watermark) => {
1431                Self::Watermark(Watermark::from_protobuf(watermark)?)
1432            }
1433        };
1434        Ok(res)
1435    }
1436
1437    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1438        ::prost::Message::encoded_len(msg)
1439    }
1440}
1441
1442pub type PkIndices = Vec<usize>;
1443pub type PkIndicesRef<'a> = &'a [usize];
1444pub type PkDataTypes = SmallVec<[DataType; 1]>;
1445
1446/// Expect the first message of the given `stream` as a barrier.
1447pub async fn expect_first_barrier<M: Debug>(
1448    stream: &mut (impl MessageStreamInner<M> + Unpin),
1449) -> StreamExecutorResult<BarrierInner<M>> {
1450    let message = stream
1451        .next()
1452        .instrument_await("expect_first_barrier")
1453        .await
1454        .context("failed to extract the first message: stream closed unexpectedly")??;
1455    let barrier = message
1456        .into_barrier()
1457        .expect("the first message must be a barrier");
1458    // TODO: Is this check correct?
1459    assert!(matches!(
1460        barrier.kind,
1461        BarrierKind::Checkpoint | BarrierKind::Initial
1462    ));
1463    Ok(barrier)
1464}
1465
1466/// Expect the first message of the given `stream` as a barrier.
1467pub async fn expect_first_barrier_from_aligned_stream(
1468    stream: &mut (impl AlignedMessageStream + Unpin),
1469) -> StreamExecutorResult<Barrier> {
1470    let message = stream
1471        .next()
1472        .instrument_await("expect_first_barrier")
1473        .await
1474        .context("failed to extract the first message: stream closed unexpectedly")??;
1475    let barrier = message
1476        .into_barrier()
1477        .expect("the first message must be a barrier");
1478    Ok(barrier)
1479}
1480
1481/// `StreamConsumer` is the last step in an actor.
1482pub trait StreamConsumer: Send + 'static {
1483    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1484
1485    fn execute(self: Box<Self>) -> Self::BarrierStream;
1486}
1487
1488type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1489
1490/// A stream for merging messages from multiple upstreams.
1491/// Can dynamically add and delete upstream streams.
1492/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1493pub struct DynamicReceivers<InputId, M> {
1494    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1495    barrier: Option<BarrierInner<M>>,
1496    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1497    start_ts: Option<Instant>,
1498    /// The upstreams that're blocked by the `barrier`.
1499    blocked: Vec<BoxedMessageInput<InputId, M>>,
1500    /// The upstreams that're not blocked and can be polled.
1501    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1502    /// watermark column index -> `BufferedWatermarks`
1503    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1504    /// Currently only used for union.
1505    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1506    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1507    merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1508}
1509
1510impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1511    for DynamicReceivers<InputId, M>
1512{
1513    type Item = MessageStreamItemInner<M>;
1514
1515    fn poll_next(
1516        mut self: Pin<&mut Self>,
1517        cx: &mut std::task::Context<'_>,
1518    ) -> Poll<Option<Self::Item>> {
1519        if self.is_empty() {
1520            return Poll::Ready(None);
1521        }
1522
1523        loop {
1524            match futures::ready!(self.active.poll_next_unpin(cx)) {
1525                // Directly forward the error.
1526                Some((Some(Err(e)), _)) => {
1527                    return Poll::Ready(Some(Err(e)));
1528                }
1529                // Handle the message from some upstream.
1530                Some((Some(Ok(message)), remaining)) => {
1531                    let input_id = remaining.id();
1532                    match message {
1533                        MessageInner::Chunk(chunk) => {
1534                            // Continue polling this upstream by pushing it back to `active`.
1535                            self.active.push(remaining.into_future());
1536                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1537                        }
1538                        MessageInner::Watermark(watermark) => {
1539                            // Continue polling this upstream by pushing it back to `active`.
1540                            self.active.push(remaining.into_future());
1541                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1542                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1543                            }
1544                        }
1545                        MessageInner::Barrier(barrier) => {
1546                            // Block this upstream by pushing it to `blocked`.
1547                            if self.blocked.is_empty() {
1548                                self.start_ts = Some(Instant::now());
1549                            }
1550                            self.blocked.push(remaining);
1551                            if let Some(current_barrier) = self.barrier.as_ref() {
1552                                if current_barrier.epoch != barrier.epoch {
1553                                    return Poll::Ready(Some(Err(
1554                                        StreamExecutorError::align_barrier(
1555                                            current_barrier.clone().map_mutation(|_| None),
1556                                            barrier.map_mutation(|_| None),
1557                                        ),
1558                                    )));
1559                                }
1560                            } else {
1561                                self.barrier = Some(barrier);
1562                            }
1563                        }
1564                    }
1565                }
1566                // We use barrier as the control message of the stream. That is, we always stop the
1567                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1568                // termination.
1569                //
1570                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1571                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1572                // So this branch will never be reached in all cases.
1573                Some((None, remaining)) => {
1574                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1575                        "upstream input {:?} unexpectedly closed",
1576                        remaining.id()
1577                    )))));
1578                }
1579                // There's no active upstreams. Process the barrier and resume the blocked ones.
1580                None => {
1581                    assert!(!self.blocked.is_empty());
1582
1583                    let start_ts = self
1584                        .start_ts
1585                        .take()
1586                        .expect("should have received at least one barrier");
1587                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1588                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1589                    }
1590                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1591                        // Observe did a few atomic operation inside, we want to avoid the overhead.
1592                        merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1593                    }
1594
1595                    break;
1596                }
1597            }
1598        }
1599
1600        assert!(self.active.is_terminated());
1601
1602        let barrier = self.barrier.take().unwrap();
1603
1604        let upstreams = std::mem::take(&mut self.blocked);
1605        self.extend_active(upstreams);
1606        assert!(!self.active.is_terminated());
1607
1608        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1609    }
1610}
1611
1612impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1613    pub fn new(
1614        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1615        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1616        merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1617    ) -> Self {
1618        let mut this = Self {
1619            barrier: None,
1620            start_ts: None,
1621            blocked: Vec::with_capacity(upstreams.len()),
1622            active: Default::default(),
1623            buffered_watermarks: Default::default(),
1624            merge_barrier_align_duration,
1625            barrier_align_duration,
1626        };
1627        this.extend_active(upstreams);
1628        this
1629    }
1630
1631    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1632    /// clean state right after a barrier.
1633    pub fn extend_active(
1634        &mut self,
1635        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1636    ) {
1637        assert!(self.blocked.is_empty() && self.barrier.is_none());
1638
1639        self.active
1640            .extend(upstreams.into_iter().map(|s| s.into_future()));
1641    }
1642
1643    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1644    pub fn handle_watermark(
1645        &mut self,
1646        input_id: InputId,
1647        watermark: Watermark,
1648    ) -> Option<Watermark> {
1649        let col_idx = watermark.col_idx;
1650        // Insert a buffer watermarks when first received from a column.
1651        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1652        let watermarks = self
1653            .buffered_watermarks
1654            .entry(col_idx)
1655            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1656        watermarks.handle_watermark(input_id, watermark)
1657    }
1658
1659    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1660    /// right after a barrier.
1661    pub fn add_upstreams_from(
1662        &mut self,
1663        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1664    ) {
1665        assert!(self.blocked.is_empty() && self.barrier.is_none());
1666
1667        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1668        let input_ids = new_inputs.iter().map(|input| input.id());
1669        self.buffered_watermarks.values_mut().for_each(|buffers| {
1670            // Add buffers to the buffered watermarks for all cols
1671            buffers.add_buffers(input_ids.clone());
1672        });
1673        self.active
1674            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1675    }
1676
1677    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1678    /// clean state right after a barrier.
1679    /// The current container does not necessarily contain all the input ids passed in.
1680    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1681        assert!(self.blocked.is_empty() && self.barrier.is_none());
1682
1683        let new_upstreams = std::mem::take(&mut self.active)
1684            .into_iter()
1685            .map(|s| s.into_inner().unwrap())
1686            .filter(|u| !upstream_input_ids.contains(&u.id()));
1687        self.extend_active(new_upstreams);
1688        self.buffered_watermarks.values_mut().for_each(|buffers| {
1689            // Call `check_heap` in case the only upstream(s) that does not have
1690            // watermark in heap is removed
1691            buffers.remove_buffer(upstream_input_ids.clone());
1692        });
1693    }
1694
1695    pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1696        self.merge_barrier_align_duration.clone()
1697    }
1698
1699    pub fn flush_buffered_watermarks(&mut self) {
1700        self.buffered_watermarks
1701            .values_mut()
1702            .for_each(|buffers| buffers.clear());
1703    }
1704
1705    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1706        self.blocked
1707            .iter()
1708            .map(|s| s.id())
1709            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1710    }
1711
1712    pub fn is_empty(&self) -> bool {
1713        self.blocked.is_empty() && self.active.is_empty()
1714    }
1715}