risingwave_stream/executor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::fmt::Debug;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Poll;
23use std::vec;
24
25use await_tree::InstrumentAwait;
26use enum_as_inner::EnumAsInner;
27use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
28use futures::{Stream, StreamExt};
29use itertools::Itertools;
30use prometheus::Histogram;
31use prometheus::core::{AtomicU64, GenericCounter};
32use risingwave_common::array::StreamChunk;
33use risingwave_common::bitmap::Bitmap;
34use risingwave_common::catalog::{Schema, TableId};
35use risingwave_common::metrics::LabelGuardedMetric;
36use risingwave_common::row::OwnedRow;
37use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
38use risingwave_common::util::epoch::{Epoch, EpochPair};
39use risingwave_common::util::tracing::TracingContext;
40use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
41use risingwave_connector::source::SplitImpl;
42use risingwave_expr::expr::{Expression, NonStrictExpression};
43use risingwave_pb::data::PbEpoch;
44use risingwave_pb::expr::PbInputRef;
45use risingwave_pb::stream_plan::barrier::BarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
49use risingwave_pb::stream_plan::{
50    BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
51    DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
52    PbDispatcher, PbStreamMessageBatch, PbUpdateMutation, PbWatermark, ResumeMutation,
53    SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
54    SubscriptionUpstreamInfo, ThrottleMutation,
55};
56use smallvec::SmallVec;
57use tokio::time::Instant;
58
59use crate::error::StreamResult;
60use crate::executor::exchange::input::BoxedInput;
61use crate::executor::watermark::BufferedWatermarks;
62use crate::task::{ActorId, FragmentId};
63
64mod actor;
65mod barrier_align;
66pub mod exchange;
67pub mod monitor;
68
69pub mod aggregate;
70pub mod asof_join;
71mod backfill;
72mod barrier_recv;
73mod batch_query;
74mod chain;
75mod changelog;
76mod dedup;
77mod dispatch;
78pub mod dml;
79mod dynamic_filter;
80pub mod eowc;
81pub mod error;
82mod expand;
83mod filter;
84pub mod hash_join;
85mod hop_window;
86mod join;
87mod lookup;
88mod lookup_union;
89mod merge;
90mod mview;
91mod nested_loop_temporal_join;
92mod no_op;
93mod now;
94mod over_window;
95pub mod project;
96mod rearranged_chain;
97mod receiver;
98pub mod row_id_gen;
99mod sink;
100pub mod source;
101mod stream_reader;
102pub mod subtask;
103mod temporal_join;
104mod top_n;
105mod troublemaker;
106mod union;
107mod upstream_sink_union;
108mod values;
109mod watermark;
110mod watermark_filter;
111mod wrapper;
112
113mod approx_percentile;
114
115mod row_merge;
116
117#[cfg(test)]
118mod integration_tests;
119mod sync_kv_log_store;
120pub mod test_utils;
121mod utils;
122mod vector_index;
123
124pub use actor::{Actor, ActorContext, ActorContextRef};
125use anyhow::Context;
126pub use approx_percentile::global::GlobalApproxPercentileExecutor;
127pub use approx_percentile::local::LocalApproxPercentileExecutor;
128pub use backfill::arrangement_backfill::*;
129pub use backfill::cdc::{
130    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
131};
132pub use backfill::no_shuffle_backfill::*;
133pub use backfill::snapshot_backfill::*;
134pub use barrier_recv::BarrierRecvExecutor;
135pub use batch_query::BatchQueryExecutor;
136pub use chain::ChainExecutor;
137pub use changelog::ChangeLogExecutor;
138pub use dedup::AppendOnlyDedupExecutor;
139pub use dispatch::{DispatchExecutor, DispatcherImpl};
140pub use dynamic_filter::DynamicFilterExecutor;
141pub use error::{StreamExecutorError, StreamExecutorResult};
142pub use expand::ExpandExecutor;
143pub use filter::FilterExecutor;
144pub use hash_join::*;
145pub use hop_window::HopWindowExecutor;
146pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
147pub use join::{AsOfDesc, AsOfJoinType, JoinType};
148pub use lookup::*;
149pub use lookup_union::LookupUnionExecutor;
150pub use merge::MergeExecutor;
151pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
152pub use mview::*;
153pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
154pub use no_op::NoOpExecutor;
155pub use now::*;
156pub use over_window::*;
157pub use rearranged_chain::RearrangedChainExecutor;
158pub use receiver::ReceiverExecutor;
159use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
160pub use row_merge::RowMergeExecutor;
161pub use sink::SinkExecutor;
162pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
163pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
164pub use temporal_join::TemporalJoinExecutor;
165pub use top_n::{
166    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
167};
168pub use troublemaker::TroublemakerExecutor;
169pub use union::UnionExecutor;
170pub use upstream_sink_union::UpstreamSinkUnionExecutor;
171pub use utils::DummyExecutor;
172pub use values::ValuesExecutor;
173pub use vector_index::VectorIndexWriteExecutor;
174pub use watermark_filter::WatermarkFilterExecutor;
175pub use wrapper::WrapperExecutor;
176
177use self::barrier_align::AlignedMessageStream;
178
179pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
180pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
181pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
182pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
183
184pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
185use risingwave_connector::source::cdc::{
186    CdcTableSnapshotSplitAssignment, build_actor_cdc_table_snapshot_splits,
187    build_pb_actor_cdc_table_snapshot_splits,
188};
189use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
190use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
191
192pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
193pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
194pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
195
196/// Static information of an executor.
197#[derive(Debug, Default, Clone)]
198pub struct ExecutorInfo {
199    /// The schema of the OUTPUT of the executor.
200    pub schema: Schema,
201
202    /// The primary key indices of the OUTPUT of the executor.
203    /// Schema is used by both OLAP and streaming, therefore
204    /// pk indices are maintained independently.
205    pub pk_indices: PkIndices,
206
207    /// Identity of the executor.
208    pub identity: String,
209
210    /// The executor id of the executor.
211    pub id: u64,
212}
213
214impl ExecutorInfo {
215    pub fn new(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
216        Self {
217            schema,
218            pk_indices,
219            identity,
220            id,
221        }
222    }
223}
224
225/// [`Execute`] describes the methods an executor should implement to handle control messages.
226pub trait Execute: Send + 'static {
227    fn execute(self: Box<Self>) -> BoxedMessageStream;
228
229    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
230        self.execute()
231    }
232
233    fn boxed(self) -> Box<dyn Execute>
234    where
235        Self: Sized + Send + 'static,
236    {
237        Box::new(self)
238    }
239}
240
241/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
242/// handle messages ([`Execute`]).
243pub struct Executor {
244    info: ExecutorInfo,
245    execute: Box<dyn Execute>,
246}
247
248impl Executor {
249    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
250        Self { info, execute }
251    }
252
253    pub fn info(&self) -> &ExecutorInfo {
254        &self.info
255    }
256
257    pub fn schema(&self) -> &Schema {
258        &self.info.schema
259    }
260
261    pub fn pk_indices(&self) -> PkIndicesRef<'_> {
262        &self.info.pk_indices
263    }
264
265    pub fn identity(&self) -> &str {
266        &self.info.identity
267    }
268
269    pub fn execute(self) -> BoxedMessageStream {
270        self.execute.execute()
271    }
272
273    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
274        self.execute.execute_with_epoch(epoch)
275    }
276}
277
278impl std::fmt::Debug for Executor {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        f.write_str(self.identity())
281    }
282}
283
284impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
285    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
286        Self::new(info, execute)
287    }
288}
289
290impl<E> From<(ExecutorInfo, E)> for Executor
291where
292    E: Execute,
293{
294    fn from((info, execute): (ExecutorInfo, E)) -> Self {
295        Self::new(info, execute.boxed())
296    }
297}
298
299pub const INVALID_EPOCH: u64 = 0;
300
301type UpstreamFragmentId = FragmentId;
302type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
303
304#[derive(Debug, Clone, PartialEq)]
305pub struct UpdateMutation {
306    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
307    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
308    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
309    pub dropped_actors: HashSet<ActorId>,
310    pub actor_splits: SplitAssignments,
311    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
312    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignment,
313}
314
315#[derive(Debug, Clone, PartialEq)]
316pub struct AddMutation {
317    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
318    pub added_actors: HashSet<ActorId>,
319    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
320    pub splits: SplitAssignments,
321    pub pause: bool,
322    /// (`upstream_mv_table_id`,  `subscriber_id`)
323    pub subscriptions_to_add: Vec<(TableId, u32)>,
324    /// nodes which should start backfill
325    pub backfill_nodes_to_pause: HashSet<FragmentId>,
326    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignment,
327}
328
329/// See [`PbMutation`] for the semantics of each mutation.
330#[derive(Debug, Clone, PartialEq)]
331pub enum Mutation {
332    Stop(HashSet<ActorId>),
333    Update(UpdateMutation),
334    Add(AddMutation),
335    SourceChangeSplit(SplitAssignments),
336    Pause,
337    Resume,
338    Throttle(HashMap<ActorId, Option<u32>>),
339    AddAndUpdate(AddMutation, UpdateMutation),
340    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
341    DropSubscriptions {
342        /// `subscriber` -> `upstream_mv_table_id`
343        subscriptions_to_drop: Vec<(u32, TableId)>,
344    },
345    StartFragmentBackfill {
346        fragment_ids: HashSet<FragmentId>,
347    },
348    RefreshStart {
349        table_id: TableId,
350        associated_source_id: TableId,
351    },
352    /// This mutation is generated by the source executor
353    LoadFinish {
354        associated_source_id: TableId,
355    },
356}
357
358/// The generic type `M` is the mutation type of the barrier.
359///
360/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
361/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
362#[derive(Debug, Clone)]
363pub struct BarrierInner<M> {
364    pub epoch: EpochPair,
365    pub mutation: M,
366    pub kind: BarrierKind,
367
368    /// Tracing context for the **current** epoch of this barrier.
369    pub tracing_context: TracingContext,
370
371    /// The actors that this barrier has passed locally. Used for debugging only.
372    pub passed_actors: Vec<ActorId>,
373}
374
375pub type BarrierMutationType = Option<Arc<Mutation>>;
376pub type Barrier = BarrierInner<BarrierMutationType>;
377pub type DispatcherBarrier = BarrierInner<()>;
378
379impl<M: Default> BarrierInner<M> {
380    /// Create a plain barrier.
381    pub fn new_test_barrier(epoch: u64) -> Self {
382        Self {
383            epoch: EpochPair::new_test_epoch(epoch),
384            kind: BarrierKind::Checkpoint,
385            tracing_context: TracingContext::none(),
386            mutation: Default::default(),
387            passed_actors: Default::default(),
388        }
389    }
390
391    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
392        Self {
393            epoch: EpochPair::new(epoch, prev_epoch),
394            kind: BarrierKind::Checkpoint,
395            tracing_context: TracingContext::none(),
396            mutation: Default::default(),
397            passed_actors: Default::default(),
398        }
399    }
400}
401
402impl Barrier {
403    pub fn into_dispatcher(self) -> DispatcherBarrier {
404        DispatcherBarrier {
405            epoch: self.epoch,
406            mutation: (),
407            kind: self.kind,
408            tracing_context: self.tracing_context,
409            passed_actors: self.passed_actors,
410        }
411    }
412
413    #[must_use]
414    pub fn with_mutation(self, mutation: Mutation) -> Self {
415        Self {
416            mutation: Some(Arc::new(mutation)),
417            ..self
418        }
419    }
420
421    #[must_use]
422    pub fn with_stop(self) -> Self {
423        self.with_mutation(Mutation::Stop(HashSet::default()))
424    }
425
426    /// Whether this barrier carries stop mutation.
427    pub fn is_with_stop_mutation(&self) -> bool {
428        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
429    }
430
431    /// Whether this barrier is to stop the actor with `actor_id`.
432    pub fn is_stop(&self, actor_id: ActorId) -> bool {
433        self.all_stop_actors()
434            .is_some_and(|actors| actors.contains(&actor_id))
435    }
436
437    pub fn is_checkpoint(&self) -> bool {
438        self.kind == BarrierKind::Checkpoint
439    }
440
441    /// Get the initial split assignments for the actor with `actor_id`.
442    ///
443    /// This should only be called on the initial barrier received by the executor. It must be
444    ///
445    /// - `Add` mutation when it's a new streaming job, or recovery.
446    /// - `Update` mutation when it's created for scaling.
447    /// - `AddAndUpdate` mutation when it's created for sink-into-table.
448    ///
449    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
450    /// of existing executors.
451    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
452        match self.mutation.as_deref()? {
453            Mutation::Update(UpdateMutation { actor_splits, .. })
454            | Mutation::Add(AddMutation {
455                splits: actor_splits,
456                ..
457            }) => actor_splits.get(&actor_id),
458
459            Mutation::AddAndUpdate(
460                AddMutation {
461                    splits: add_actor_splits,
462                    ..
463                },
464                UpdateMutation {
465                    actor_splits: update_actor_splits,
466                    ..
467                },
468            ) => add_actor_splits
469                .get(&actor_id)
470                // `Add` and `Update` should apply to different fragments, so we don't need to merge them.
471                .or_else(|| update_actor_splits.get(&actor_id)),
472
473            _ => {
474                if cfg!(debug_assertions) {
475                    panic!(
476                        "the initial mutation of the barrier should not be {:?}",
477                        self.mutation
478                    );
479                }
480                None
481            }
482        }
483        .map(|s| s.as_slice())
484    }
485
486    /// Get all actors that to be stopped (dropped) by this barrier.
487    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
488        match self.mutation.as_deref() {
489            Some(Mutation::Stop(actors)) => Some(actors),
490            Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
491            | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
492                Some(dropped_actors)
493            }
494            _ => None,
495        }
496    }
497
498    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
499    /// `Values` to decide whether to output the existing (historical) data.
500    ///
501    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
502    /// added for scaling are not included.
503    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
504        match self.mutation.as_deref() {
505            Some(Mutation::Add(AddMutation { added_actors, .. }))
506            | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
507                added_actors.contains(&actor_id)
508            }
509            _ => false,
510        }
511    }
512
513    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
514        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
515            fragment_ids.contains(&fragment_id)
516        } else {
517            false
518        }
519    }
520
521    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
522    ///
523    /// # Use case
524    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
525    /// * Pause a standalone shared `SourceExecutor`.
526    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
527    ///
528    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
529    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
530    /// needs to be turned off.
531    ///
532    /// ## Some special cases not included
533    ///
534    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
535    /// care about **number of downstream fragments** (more precisely, existence).
536    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
537    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
538    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
539        let Some(mutation) = self.mutation.as_deref() else {
540            return false;
541        };
542        match mutation {
543            // Add is for mv, index and sink creation.
544            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
545            // AddAndUpdate is for sink-into-table.
546            Mutation::AddAndUpdate(
547                AddMutation { adds, .. },
548                UpdateMutation {
549                    dispatchers,
550                    actor_new_dispatchers,
551                    ..
552                },
553            ) => {
554                adds.get(&upstream_actor_id).is_some()
555                    || actor_new_dispatchers.get(&upstream_actor_id).is_some()
556                    || dispatchers.get(&upstream_actor_id).is_some()
557            }
558            Mutation::Update(_)
559            | Mutation::Stop(_)
560            | Mutation::Pause
561            | Mutation::Resume
562            | Mutation::SourceChangeSplit(_)
563            | Mutation::Throttle(_)
564            | Mutation::DropSubscriptions { .. }
565            | Mutation::ConnectorPropsChange(_)
566            | Mutation::StartFragmentBackfill { .. }
567            | Mutation::RefreshStart { .. }
568            | Mutation::LoadFinish { .. } => false,
569        }
570    }
571
572    /// Whether this barrier requires the executor to pause its data stream on startup.
573    pub fn is_pause_on_startup(&self) -> bool {
574        match self.mutation.as_deref() {
575            Some(Mutation::Add(AddMutation { pause, .. }))
576            | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
577            _ => false,
578        }
579    }
580
581    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
582        match self.mutation.as_deref() {
583            Some(Mutation::Add(AddMutation {
584                backfill_nodes_to_pause,
585                ..
586            }))
587            | Some(Mutation::AddAndUpdate(
588                AddMutation {
589                    backfill_nodes_to_pause,
590                    ..
591                },
592                _,
593            )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
594            _ => {
595                tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
596                true
597            }
598        }
599    }
600
601    /// Whether this barrier is for resume.
602    pub fn is_resume(&self) -> bool {
603        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
604    }
605
606    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
607    /// with `actor_id`.
608    pub fn as_update_merge(
609        &self,
610        actor_id: ActorId,
611        upstream_fragment_id: UpstreamFragmentId,
612    ) -> Option<&MergeUpdate> {
613        self.mutation
614            .as_deref()
615            .and_then(|mutation| match mutation {
616                Mutation::Update(UpdateMutation { merges, .. })
617                | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
618                    merges.get(&(actor_id, upstream_fragment_id))
619                }
620
621                _ => None,
622            })
623    }
624
625    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
626    /// with `actor_id`.
627    ///
628    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
629    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
630    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
631        self.mutation
632            .as_deref()
633            .and_then(|mutation| match mutation {
634                Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
635                | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
636                    vnode_bitmaps.get(&actor_id).cloned()
637                }
638                _ => None,
639            })
640    }
641
642    pub fn get_curr_epoch(&self) -> Epoch {
643        Epoch(self.epoch.curr)
644    }
645
646    /// Retrieve the tracing context for the **current** epoch of this barrier.
647    pub fn tracing_context(&self) -> &TracingContext {
648        &self.tracing_context
649    }
650
651    pub fn added_subscriber_on_mv_table(
652        &self,
653        mv_table_id: TableId,
654    ) -> impl Iterator<Item = u32> + '_ {
655        if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
656            self.mutation.as_deref()
657        {
658            Some(add)
659        } else {
660            None
661        }
662        .into_iter()
663        .flat_map(move |add| {
664            add.subscriptions_to_add.iter().filter_map(
665                move |(upstream_mv_table_id, subscriber_id)| {
666                    if *upstream_mv_table_id == mv_table_id {
667                        Some(*subscriber_id)
668                    } else {
669                        None
670                    }
671                },
672            )
673        })
674    }
675}
676
677impl<M: PartialEq> PartialEq for BarrierInner<M> {
678    fn eq(&self, other: &Self) -> bool {
679        self.epoch == other.epoch && self.mutation == other.mutation
680    }
681}
682
683impl Mutation {
684    /// Return true if the mutation is stop.
685    ///
686    /// Note that this does not mean we will stop the current actor.
687    #[cfg(test)]
688    pub fn is_stop(&self) -> bool {
689        matches!(self, Mutation::Stop(_))
690    }
691
692    fn to_protobuf(&self) -> PbMutation {
693        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
694            actor_splits
695                .iter()
696                .map(|(&actor_id, splits)| {
697                    (
698                        actor_id,
699                        ConnectorSplits {
700                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
701                        },
702                    )
703                })
704                .collect::<HashMap<_, _>>()
705        };
706
707        match self {
708            Mutation::Stop(actors) => PbMutation::Stop(StopMutation {
709                actors: actors.iter().copied().collect::<Vec<_>>(),
710            }),
711            Mutation::Update(UpdateMutation {
712                dispatchers,
713                merges,
714                vnode_bitmaps,
715                dropped_actors,
716                actor_splits,
717                actor_new_dispatchers,
718                actor_cdc_table_snapshot_splits,
719            }) => PbMutation::Update(PbUpdateMutation {
720                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
721                merge_update: merges.values().cloned().collect(),
722                actor_vnode_bitmap_update: vnode_bitmaps
723                    .iter()
724                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
725                    .collect(),
726                dropped_actors: dropped_actors.iter().cloned().collect(),
727                actor_splits: actor_splits_to_protobuf(actor_splits),
728                actor_new_dispatchers: actor_new_dispatchers
729                    .iter()
730                    .map(|(&actor_id, dispatchers)| {
731                        (
732                            actor_id,
733                            Dispatchers {
734                                dispatchers: dispatchers.clone(),
735                            },
736                        )
737                    })
738                    .collect(),
739                actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
740                    actor_cdc_table_snapshot_splits.clone(),
741                ),
742            }),
743            Mutation::Add(AddMutation {
744                adds,
745                added_actors,
746                splits,
747                pause,
748                subscriptions_to_add,
749                backfill_nodes_to_pause,
750                actor_cdc_table_snapshot_splits,
751            }) => PbMutation::Add(PbAddMutation {
752                actor_dispatchers: adds
753                    .iter()
754                    .map(|(&actor_id, dispatchers)| {
755                        (
756                            actor_id,
757                            Dispatchers {
758                                dispatchers: dispatchers.clone(),
759                            },
760                        )
761                    })
762                    .collect(),
763                added_actors: added_actors.iter().copied().collect(),
764                actor_splits: actor_splits_to_protobuf(splits),
765                pause: *pause,
766                subscriptions_to_add: subscriptions_to_add
767                    .iter()
768                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
769                        subscriber_id: *subscriber_id,
770                        upstream_mv_table_id: table_id.table_id,
771                    })
772                    .collect(),
773                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
774                actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
775                    actor_cdc_table_snapshot_splits.clone(),
776                ),
777            }),
778            Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
779                actor_splits: changes
780                    .iter()
781                    .map(|(&actor_id, splits)| {
782                        (
783                            actor_id,
784                            ConnectorSplits {
785                                splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
786                            },
787                        )
788                    })
789                    .collect(),
790            }),
791            Mutation::Pause => PbMutation::Pause(PauseMutation {}),
792            Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
793            Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
794                actor_throttle: changes
795                    .iter()
796                    .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
797                    .collect(),
798            }),
799
800            Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
801                mutations: vec![
802                    BarrierMutation {
803                        mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
804                    },
805                    BarrierMutation {
806                        mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
807                    },
808                ],
809            }),
810            Mutation::DropSubscriptions {
811                subscriptions_to_drop,
812            } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
813                info: subscriptions_to_drop
814                    .iter()
815                    .map(
816                        |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
817                            subscriber_id: *subscriber_id,
818                            upstream_mv_table_id: upstream_mv_table_id.table_id,
819                        },
820                    )
821                    .collect(),
822            }),
823            Mutation::ConnectorPropsChange(map) => {
824                PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
825                    connector_props_infos: map
826                        .iter()
827                        .map(|(actor_id, options)| {
828                            (
829                                *actor_id,
830                                ConnectorPropsInfo {
831                                    connector_props_info: options
832                                        .iter()
833                                        .map(|(k, v)| (k.clone(), v.clone()))
834                                        .collect(),
835                                },
836                            )
837                        })
838                        .collect(),
839                })
840            }
841            Mutation::StartFragmentBackfill { fragment_ids } => {
842                PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
843                    fragment_ids: fragment_ids.iter().copied().collect(),
844                })
845            }
846            Mutation::RefreshStart {
847                table_id,
848                associated_source_id,
849            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
850                table_id: table_id.table_id,
851                associated_source_id: associated_source_id.table_id,
852            }),
853            Mutation::LoadFinish {
854                associated_source_id,
855            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
856                associated_source_id: associated_source_id.table_id,
857            }),
858        }
859    }
860
861    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
862        let mutation = match prost {
863            PbMutation::Stop(stop) => {
864                Mutation::Stop(HashSet::from_iter(stop.actors.iter().cloned()))
865            }
866
867            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
868                dispatchers: update
869                    .dispatcher_update
870                    .iter()
871                    .map(|u| (u.actor_id, u.clone()))
872                    .into_group_map(),
873                merges: update
874                    .merge_update
875                    .iter()
876                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
877                    .collect(),
878                vnode_bitmaps: update
879                    .actor_vnode_bitmap_update
880                    .iter()
881                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
882                    .collect(),
883                dropped_actors: update.dropped_actors.iter().cloned().collect(),
884                actor_splits: update
885                    .actor_splits
886                    .iter()
887                    .map(|(&actor_id, splits)| {
888                        (
889                            actor_id,
890                            splits
891                                .splits
892                                .iter()
893                                .map(|split| split.try_into().unwrap())
894                                .collect(),
895                        )
896                    })
897                    .collect(),
898                actor_new_dispatchers: update
899                    .actor_new_dispatchers
900                    .iter()
901                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
902                    .collect(),
903                actor_cdc_table_snapshot_splits: build_actor_cdc_table_snapshot_splits(
904                    update.actor_cdc_table_snapshot_splits.clone(),
905                ),
906            }),
907
908            PbMutation::Add(add) => Mutation::Add(AddMutation {
909                adds: add
910                    .actor_dispatchers
911                    .iter()
912                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
913                    .collect(),
914                added_actors: add.added_actors.iter().copied().collect(),
915                // TODO: remove this and use `SourceChangesSplit` after we support multiple
916                // mutations.
917                splits: add
918                    .actor_splits
919                    .iter()
920                    .map(|(&actor_id, splits)| {
921                        (
922                            actor_id,
923                            splits
924                                .splits
925                                .iter()
926                                .map(|split| split.try_into().unwrap())
927                                .collect(),
928                        )
929                    })
930                    .collect(),
931                pause: add.pause,
932                subscriptions_to_add: add
933                    .subscriptions_to_add
934                    .iter()
935                    .map(
936                        |SubscriptionUpstreamInfo {
937                             subscriber_id,
938                             upstream_mv_table_id,
939                         }| {
940                            (TableId::new(*upstream_mv_table_id), *subscriber_id)
941                        },
942                    )
943                    .collect(),
944                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
945                actor_cdc_table_snapshot_splits: build_actor_cdc_table_snapshot_splits(
946                    add.actor_cdc_table_snapshot_splits.clone(),
947                ),
948            }),
949
950            PbMutation::Splits(s) => {
951                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
952                    Vec::with_capacity(s.actor_splits.len());
953                for (&actor_id, splits) in &s.actor_splits {
954                    if !splits.splits.is_empty() {
955                        change_splits.push((
956                            actor_id,
957                            splits
958                                .splits
959                                .iter()
960                                .map(SplitImpl::try_from)
961                                .try_collect()?,
962                        ));
963                    }
964                }
965                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
966            }
967            PbMutation::Pause(_) => Mutation::Pause,
968            PbMutation::Resume(_) => Mutation::Resume,
969            PbMutation::Throttle(changes) => Mutation::Throttle(
970                changes
971                    .actor_throttle
972                    .iter()
973                    .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
974                    .collect(),
975            ),
976            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
977                subscriptions_to_drop: drop
978                    .info
979                    .iter()
980                    .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
981                    .collect(),
982            },
983            PbMutation::ConnectorPropsChange(alter_connector_props) => {
984                Mutation::ConnectorPropsChange(
985                    alter_connector_props
986                        .connector_props_infos
987                        .iter()
988                        .map(|(actor_id, options)| {
989                            (
990                                *actor_id,
991                                options
992                                    .connector_props_info
993                                    .iter()
994                                    .map(|(k, v)| (k.clone(), v.clone()))
995                                    .collect(),
996                            )
997                        })
998                        .collect(),
999                )
1000            }
1001            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1002                Mutation::StartFragmentBackfill {
1003                    fragment_ids: start_fragment_backfill
1004                        .fragment_ids
1005                        .iter()
1006                        .copied()
1007                        .collect(),
1008                }
1009            }
1010            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1011                table_id: TableId::new(refresh_start.table_id),
1012                associated_source_id: TableId::new(refresh_start.associated_source_id),
1013            },
1014            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1015                associated_source_id: TableId::new(load_finish.associated_source_id),
1016            },
1017            PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
1018                [
1019                    BarrierMutation {
1020                        mutation: Some(add),
1021                    },
1022                    BarrierMutation {
1023                        mutation: Some(update),
1024                    },
1025                ] => {
1026                    let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
1027                        unreachable!();
1028                    };
1029
1030                    let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
1031                        unreachable!();
1032                    };
1033
1034                    Mutation::AddAndUpdate(add_mutation, update_mutation)
1035                }
1036
1037                _ => unreachable!(),
1038            },
1039        };
1040        Ok(mutation)
1041    }
1042}
1043
1044impl<M> BarrierInner<M> {
1045    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1046        let Self {
1047            epoch,
1048            mutation,
1049            kind,
1050            passed_actors,
1051            tracing_context,
1052            ..
1053        } = self;
1054
1055        PbBarrier {
1056            epoch: Some(PbEpoch {
1057                curr: epoch.curr,
1058                prev: epoch.prev,
1059            }),
1060            mutation: Some(PbBarrierMutation {
1061                mutation: barrier_fn(mutation),
1062            }),
1063            tracing_context: tracing_context.to_protobuf(),
1064            kind: *kind as _,
1065            passed_actors: passed_actors.clone(),
1066        }
1067    }
1068
1069    fn from_protobuf_inner(
1070        prost: &PbBarrier,
1071        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1072    ) -> StreamExecutorResult<Self> {
1073        let epoch = prost.get_epoch()?;
1074
1075        Ok(Self {
1076            kind: prost.kind(),
1077            epoch: EpochPair::new(epoch.curr, epoch.prev),
1078            mutation: mutation_from_pb(
1079                prost
1080                    .mutation
1081                    .as_ref()
1082                    .and_then(|mutation| mutation.mutation.as_ref()),
1083            )?,
1084            passed_actors: prost.get_passed_actors().clone(),
1085            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1086        })
1087    }
1088
1089    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1090        BarrierInner {
1091            epoch: self.epoch,
1092            mutation: f(self.mutation),
1093            kind: self.kind,
1094            tracing_context: self.tracing_context,
1095            passed_actors: self.passed_actors,
1096        }
1097    }
1098}
1099
1100impl DispatcherBarrier {
1101    pub fn to_protobuf(&self) -> PbBarrier {
1102        self.to_protobuf_inner(|_| None)
1103    }
1104}
1105
1106impl Barrier {
1107    pub fn to_protobuf(&self) -> PbBarrier {
1108        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1109    }
1110
1111    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1112        Self::from_protobuf_inner(prost, |mutation| {
1113            mutation
1114                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1115                .transpose()
1116        })
1117    }
1118}
1119
1120#[derive(Debug, PartialEq, Eq, Clone)]
1121pub struct Watermark {
1122    pub col_idx: usize,
1123    pub data_type: DataType,
1124    pub val: ScalarImpl,
1125}
1126
1127impl PartialOrd for Watermark {
1128    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1129        Some(self.cmp(other))
1130    }
1131}
1132
1133impl Ord for Watermark {
1134    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1135        self.val.default_cmp(&other.val)
1136    }
1137}
1138
1139impl Watermark {
1140    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1141        Self {
1142            col_idx,
1143            data_type,
1144            val,
1145        }
1146    }
1147
1148    pub async fn transform_with_expr(
1149        self,
1150        expr: &NonStrictExpression<impl Expression>,
1151        new_col_idx: usize,
1152    ) -> Option<Self> {
1153        let Self { col_idx, val, .. } = self;
1154        let row = {
1155            let mut row = vec![None; col_idx + 1];
1156            row[col_idx] = Some(val);
1157            OwnedRow::new(row)
1158        };
1159        let val = expr.eval_row_infallible(&row).await?;
1160        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1161    }
1162
1163    /// Transform the watermark with the given output indices. If this watermark is not in the
1164    /// output, return `None`.
1165    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1166        output_indices
1167            .iter()
1168            .position(|p| *p == self.col_idx)
1169            .map(|new_col_idx| self.with_idx(new_col_idx))
1170    }
1171
1172    pub fn to_protobuf(&self) -> PbWatermark {
1173        PbWatermark {
1174            column: Some(PbInputRef {
1175                index: self.col_idx as _,
1176                r#type: Some(self.data_type.to_protobuf()),
1177            }),
1178            val: Some(&self.val).to_protobuf().into(),
1179        }
1180    }
1181
1182    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1183        let col_ref = prost.get_column()?;
1184        let data_type = DataType::from(col_ref.get_type()?);
1185        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1186            .expect("watermark value cannot be null");
1187        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1188    }
1189
1190    pub fn with_idx(self, idx: usize) -> Self {
1191        Self::new(idx, self.data_type, self.val)
1192    }
1193}
1194
1195#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1196pub enum MessageInner<M> {
1197    Chunk(StreamChunk),
1198    Barrier(BarrierInner<M>),
1199    Watermark(Watermark),
1200}
1201
1202impl<M> MessageInner<M> {
1203    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1204        match self {
1205            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1206            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1207            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1208        }
1209    }
1210}
1211
1212pub type Message = MessageInner<BarrierMutationType>;
1213pub type DispatcherMessage = MessageInner<()>;
1214
1215/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1216/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1217#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1218pub enum MessageBatchInner<M> {
1219    Chunk(StreamChunk),
1220    BarrierBatch(Vec<BarrierInner<M>>),
1221    Watermark(Watermark),
1222}
1223pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1224pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1225pub type DispatcherMessageBatch = MessageBatchInner<()>;
1226
1227impl From<DispatcherMessage> for DispatcherMessageBatch {
1228    fn from(m: DispatcherMessage) -> Self {
1229        match m {
1230            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1231            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1232            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1233        }
1234    }
1235}
1236
1237impl From<StreamChunk> for Message {
1238    fn from(chunk: StreamChunk) -> Self {
1239        Message::Chunk(chunk)
1240    }
1241}
1242
1243impl<'a> TryFrom<&'a Message> for &'a Barrier {
1244    type Error = ();
1245
1246    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1247        match m {
1248            Message::Chunk(_) => Err(()),
1249            Message::Barrier(b) => Ok(b),
1250            Message::Watermark(_) => Err(()),
1251        }
1252    }
1253}
1254
1255impl Message {
1256    /// Return true if the message is a stop barrier, meaning the stream
1257    /// will not continue, false otherwise.
1258    ///
1259    /// Note that this does not mean we will stop the current actor.
1260    #[cfg(test)]
1261    pub fn is_stop(&self) -> bool {
1262        matches!(
1263            self,
1264            Message::Barrier(Barrier {
1265                mutation,
1266                ..
1267            }) if mutation.as_ref().unwrap().is_stop()
1268        )
1269    }
1270}
1271
1272impl DispatcherMessageBatch {
1273    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1274        let prost = match self {
1275            Self::Chunk(stream_chunk) => {
1276                let prost_stream_chunk = stream_chunk.to_protobuf();
1277                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1278            }
1279            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1280                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1281            }),
1282            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1283        };
1284        PbStreamMessageBatch {
1285            stream_message_batch: Some(prost),
1286        }
1287    }
1288
1289    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1290        let res = match prost.get_stream_message_batch()? {
1291            StreamMessageBatch::StreamChunk(chunk) => {
1292                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1293            }
1294            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1295                let barriers = barrier_batch
1296                    .barriers
1297                    .iter()
1298                    .map(|barrier| {
1299                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1300                            if mutation.is_some() {
1301                                if cfg!(debug_assertions) {
1302                                    panic!("should not receive message of barrier with mutation");
1303                                } else {
1304                                    warn!(?barrier, "receive message of barrier with mutation");
1305                                }
1306                            }
1307                            Ok(())
1308                        })
1309                    })
1310                    .try_collect()?;
1311                Self::BarrierBatch(barriers)
1312            }
1313            StreamMessageBatch::Watermark(watermark) => {
1314                Self::Watermark(Watermark::from_protobuf(watermark)?)
1315            }
1316        };
1317        Ok(res)
1318    }
1319
1320    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1321        ::prost::Message::encoded_len(msg)
1322    }
1323}
1324
1325pub type PkIndices = Vec<usize>;
1326pub type PkIndicesRef<'a> = &'a [usize];
1327pub type PkDataTypes = SmallVec<[DataType; 1]>;
1328
1329/// Expect the first message of the given `stream` as a barrier.
1330pub async fn expect_first_barrier<M: Debug>(
1331    stream: &mut (impl MessageStreamInner<M> + Unpin),
1332) -> StreamExecutorResult<BarrierInner<M>> {
1333    let message = stream
1334        .next()
1335        .instrument_await("expect_first_barrier")
1336        .await
1337        .context("failed to extract the first message: stream closed unexpectedly")??;
1338    let barrier = message
1339        .into_barrier()
1340        .expect("the first message must be a barrier");
1341    // TODO: Is this check correct?
1342    assert!(matches!(
1343        barrier.kind,
1344        BarrierKind::Checkpoint | BarrierKind::Initial
1345    ));
1346    Ok(barrier)
1347}
1348
1349/// Expect the first message of the given `stream` as a barrier.
1350pub async fn expect_first_barrier_from_aligned_stream(
1351    stream: &mut (impl AlignedMessageStream + Unpin),
1352) -> StreamExecutorResult<Barrier> {
1353    let message = stream
1354        .next()
1355        .instrument_await("expect_first_barrier")
1356        .await
1357        .context("failed to extract the first message: stream closed unexpectedly")??;
1358    let barrier = message
1359        .into_barrier()
1360        .expect("the first message must be a barrier");
1361    Ok(barrier)
1362}
1363
1364/// `StreamConsumer` is the last step in an actor.
1365pub trait StreamConsumer: Send + 'static {
1366    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1367
1368    fn execute(self: Box<Self>) -> Self::BarrierStream;
1369}
1370
1371type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1372
1373/// A stream for merging messages from multiple upstreams.
1374/// Can dynamically add and delete upstream streams.
1375/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1376pub struct DynamicReceivers<InputId, M> {
1377    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1378    barrier: Option<BarrierInner<M>>,
1379    /// The upstreams that're blocked by the `barrier`.
1380    blocked: Vec<BoxedMessageInput<InputId, M>>,
1381    /// The upstreams that're not blocked and can be polled.
1382    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1383    /// watermark column index -> `BufferedWatermarks`
1384    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1385    /// Currently only used for union.
1386    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1387    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1388    merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1389}
1390
1391impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1392    for DynamicReceivers<InputId, M>
1393{
1394    type Item = MessageStreamItemInner<M>;
1395
1396    fn poll_next(
1397        mut self: Pin<&mut Self>,
1398        cx: &mut std::task::Context<'_>,
1399    ) -> Poll<Option<Self::Item>> {
1400        if self.active.is_terminated() {
1401            // This only happens if we've been asked to stop.
1402            assert!(self.blocked.is_empty());
1403            return Poll::Ready(None);
1404        }
1405
1406        let mut start = None;
1407        loop {
1408            match futures::ready!(self.active.poll_next_unpin(cx)) {
1409                // Directly forward the error.
1410                Some((Some(Err(e)), _)) => {
1411                    return Poll::Ready(Some(Err(e)));
1412                }
1413                // Handle the message from some upstream.
1414                Some((Some(Ok(message)), remaining)) => {
1415                    let input_id = remaining.id();
1416                    match message {
1417                        MessageInner::Chunk(chunk) => {
1418                            // Continue polling this upstream by pushing it back to `active`.
1419                            self.active.push(remaining.into_future());
1420                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1421                        }
1422                        MessageInner::Watermark(watermark) => {
1423                            // Continue polling this upstream by pushing it back to `active`.
1424                            self.active.push(remaining.into_future());
1425                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1426                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1427                            }
1428                        }
1429                        MessageInner::Barrier(barrier) => {
1430                            // Block this upstream by pushing it to `blocked`.
1431                            if self.blocked.is_empty() {
1432                                start = Some(Instant::now());
1433                            }
1434                            self.blocked.push(remaining);
1435                            if let Some(current_barrier) = self.barrier.as_ref() {
1436                                if current_barrier.epoch != barrier.epoch {
1437                                    return Poll::Ready(Some(Err(
1438                                        StreamExecutorError::align_barrier(
1439                                            current_barrier.clone().map_mutation(|_| None),
1440                                            barrier.map_mutation(|_| None),
1441                                        ),
1442                                    )));
1443                                }
1444                            } else {
1445                                self.barrier = Some(barrier);
1446                            }
1447                        }
1448                    }
1449                }
1450                // We use barrier as the control message of the stream. That is, we always stop the
1451                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1452                // termination.
1453                //
1454                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1455                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1456                // So this branch will never be reached in all cases.
1457                Some((None, remaining)) => {
1458                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1459                        "upstream input {:?} unexpectedly closed",
1460                        remaining.id()
1461                    )))));
1462                }
1463                // There's no active upstreams. Process the barrier and resume the blocked ones.
1464                None => {
1465                    if let Some(start) = start {
1466                        if let Some(barrier_align_duration) = &self.barrier_align_duration {
1467                            barrier_align_duration.inc_by(start.elapsed().as_nanos() as u64);
1468                        }
1469                        if let Some(merge_barrier_align_duration) =
1470                            &self.merge_barrier_align_duration
1471                        {
1472                            // Observe did a few atomic operation inside, we want to avoid the overhead.
1473                            merge_barrier_align_duration.observe(start.elapsed().as_secs_f64())
1474                        }
1475                    }
1476
1477                    break;
1478                }
1479            }
1480        }
1481
1482        assert!(self.active.is_terminated());
1483        let barrier = self.barrier.take().unwrap();
1484
1485        let upstreams = std::mem::take(&mut self.blocked);
1486        self.extend_active(upstreams);
1487        assert!(!self.active.is_terminated());
1488
1489        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1490    }
1491}
1492
1493impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1494    pub fn new(
1495        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1496        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1497        merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1498    ) -> Self {
1499        assert!(!upstreams.is_empty());
1500        let mut this = Self {
1501            barrier: None,
1502            blocked: Vec::with_capacity(upstreams.len()),
1503            active: Default::default(),
1504            buffered_watermarks: Default::default(),
1505            merge_barrier_align_duration,
1506            barrier_align_duration,
1507        };
1508        this.extend_active(upstreams);
1509        this
1510    }
1511
1512    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1513    /// clean state right after a barrier.
1514    pub fn extend_active(
1515        &mut self,
1516        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1517    ) {
1518        assert!(self.blocked.is_empty() && self.barrier.is_none());
1519
1520        self.active
1521            .extend(upstreams.into_iter().map(|s| s.into_future()));
1522    }
1523
1524    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1525    pub fn handle_watermark(
1526        &mut self,
1527        input_id: InputId,
1528        watermark: Watermark,
1529    ) -> Option<Watermark> {
1530        let col_idx = watermark.col_idx;
1531        // Insert a buffer watermarks when first received from a column.
1532        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1533        let watermarks = self
1534            .buffered_watermarks
1535            .entry(col_idx)
1536            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1537        watermarks.handle_watermark(input_id, watermark)
1538    }
1539
1540    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1541    /// right after a barrier.
1542    pub fn add_upstreams_from(&mut self, other: Self) {
1543        assert!(self.blocked.is_empty() && self.barrier.is_none());
1544        assert!(other.blocked.is_empty() && other.barrier.is_none());
1545
1546        // Add buffers to the buffered watermarks for all cols
1547        self.buffered_watermarks.values_mut().for_each(|buffers| {
1548            buffers.add_buffers(other.upstream_input_ids());
1549        });
1550
1551        self.active.extend(other.active);
1552    }
1553
1554    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1555    /// clean state right after a barrier.
1556    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1557        assert!(self.blocked.is_empty() && self.barrier.is_none());
1558
1559        let new_upstreams = std::mem::take(&mut self.active)
1560            .into_iter()
1561            .map(|s| s.into_inner().unwrap())
1562            .filter(|u| !upstream_input_ids.contains(&u.id()));
1563        self.extend_active(new_upstreams);
1564        self.buffered_watermarks.values_mut().for_each(|buffers| {
1565            // Call `check_heap` in case the only upstream(s) that does not have
1566            // watermark in heap is removed
1567            buffers.remove_buffer(upstream_input_ids.clone());
1568        });
1569    }
1570
1571    pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1572        self.merge_barrier_align_duration.clone()
1573    }
1574
1575    pub fn flush_buffered_watermarks(&mut self) {
1576        self.buffered_watermarks
1577            .values_mut()
1578            .for_each(|buffers| buffers.clear());
1579    }
1580
1581    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1582        self.blocked
1583            .iter()
1584            .map(|s| s.id())
1585            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1586    }
1587}