risingwave_stream/executor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54    PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
55    PbWatermark, SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63    BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::DispatchExecutor;
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector::*;
184pub use watermark_filter::{UpsertWatermarkFilterExecutor, WatermarkFilterExecutor};
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197    CdcTableSnapshotSplitAssignmentWithGeneration,
198    build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::id::{ExecutorId, SubscriberId};
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202
203pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
204pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
205pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
206
207/// Static information of an executor.
208#[derive(Debug, Default, Clone)]
209pub struct ExecutorInfo {
210    /// The schema of the OUTPUT of the executor.
211    pub schema: Schema,
212
213    /// The stream key indices of the OUTPUT of the executor.
214    pub stream_key: StreamKey,
215
216    /// The stream kind of the OUTPUT of the executor.
217    pub stream_kind: PbStreamKind,
218
219    /// Identity of the executor.
220    pub identity: String,
221
222    /// The executor id of the executor.
223    pub id: ExecutorId,
224}
225
226impl ExecutorInfo {
227    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
228        Self {
229            schema,
230            stream_key,
231            stream_kind: PbStreamKind::Retract, // dummy value for test
232            identity,
233            id: id.into(),
234        }
235    }
236}
237
238/// [`Execute`] describes the methods an executor should implement to handle control messages.
239pub trait Execute: Send + 'static {
240    fn execute(self: Box<Self>) -> BoxedMessageStream;
241
242    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
243        self.execute()
244    }
245
246    fn boxed(self) -> Box<dyn Execute>
247    where
248        Self: Sized + Send + 'static,
249    {
250        Box::new(self)
251    }
252}
253
254/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
255/// handle messages ([`Execute`]).
256pub struct Executor {
257    info: ExecutorInfo,
258    execute: Box<dyn Execute>,
259}
260
261impl Executor {
262    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
263        Self { info, execute }
264    }
265
266    pub fn info(&self) -> &ExecutorInfo {
267        &self.info
268    }
269
270    pub fn schema(&self) -> &Schema {
271        &self.info.schema
272    }
273
274    pub fn stream_key(&self) -> StreamKeyRef<'_> {
275        &self.info.stream_key
276    }
277
278    pub fn stream_kind(&self) -> PbStreamKind {
279        self.info.stream_kind
280    }
281
282    pub fn identity(&self) -> &str {
283        &self.info.identity
284    }
285
286    pub fn execute(self) -> BoxedMessageStream {
287        self.execute.execute()
288    }
289
290    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
291        self.execute.execute_with_epoch(epoch)
292    }
293}
294
295impl std::fmt::Debug for Executor {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.write_str(self.identity())
298    }
299}
300
301impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
302    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
303        Self::new(info, execute)
304    }
305}
306
307impl<E> From<(ExecutorInfo, E)> for Executor
308where
309    E: Execute,
310{
311    fn from((info, execute): (ExecutorInfo, E)) -> Self {
312        Self::new(info, execute.boxed())
313    }
314}
315
316pub const INVALID_EPOCH: u64 = 0;
317
318type UpstreamFragmentId = FragmentId;
319type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
320
321#[derive(Debug, Clone)]
322#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
323pub struct UpdateMutation {
324    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
325    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
326    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
327    pub dropped_actors: HashSet<ActorId>,
328    pub actor_splits: SplitAssignments,
329    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
330    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
331    pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
332    pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
333}
334
335#[derive(Debug, Clone)]
336#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
337pub struct AddMutation {
338    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
339    pub added_actors: HashSet<ActorId>,
340    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
341    pub splits: SplitAssignments,
342    pub pause: bool,
343    /// (`upstream_mv_table_id`,  `subscriber_id`)
344    pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
345    /// nodes which should start backfill
346    pub backfill_nodes_to_pause: HashSet<FragmentId>,
347    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
348    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
349}
350
351#[derive(Debug, Clone)]
352#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
353pub struct StopMutation {
354    pub dropped_actors: HashSet<ActorId>,
355    pub dropped_sink_fragments: HashSet<FragmentId>,
356}
357
358/// See [`PbMutation`] for the semantics of each mutation.
359#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
360#[derive(Debug, Clone)]
361pub enum Mutation {
362    Stop(StopMutation),
363    Update(UpdateMutation),
364    Add(AddMutation),
365    SourceChangeSplit(SplitAssignments),
366    Pause,
367    Resume,
368    Throttle(HashMap<FragmentId, ThrottleConfig>),
369    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
370    DropSubscriptions {
371        /// `subscriber` -> `upstream_mv_table_id`
372        subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
373    },
374    StartFragmentBackfill {
375        fragment_ids: HashSet<FragmentId>,
376    },
377    RefreshStart {
378        table_id: TableId,
379        associated_source_id: SourceId,
380    },
381    ListFinish {
382        associated_source_id: SourceId,
383    },
384    LoadFinish {
385        associated_source_id: SourceId,
386    },
387    ResetSource {
388        source_id: SourceId,
389    },
390    InjectSourceOffsets {
391        source_id: SourceId,
392        /// Split ID -> offset (JSON-encoded based on connector type)
393        split_offsets: HashMap<String, String>,
394    },
395}
396
397/// The generic type `M` is the mutation type of the barrier.
398///
399/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
400/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
401#[derive(Debug, Clone)]
402pub struct BarrierInner<M> {
403    pub epoch: EpochPair,
404    pub mutation: M,
405    pub kind: BarrierKind,
406
407    /// Tracing context for the **current** epoch of this barrier.
408    pub tracing_context: TracingContext,
409}
410
411pub type BarrierMutationType = Option<Arc<Mutation>>;
412pub type Barrier = BarrierInner<BarrierMutationType>;
413pub type DispatcherBarrier = BarrierInner<()>;
414
415impl<M: Default> BarrierInner<M> {
416    /// Create a plain barrier.
417    pub fn new_test_barrier(epoch: u64) -> Self {
418        Self {
419            epoch: EpochPair::new_test_epoch(epoch),
420            kind: BarrierKind::Checkpoint,
421            tracing_context: TracingContext::none(),
422            mutation: Default::default(),
423        }
424    }
425
426    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
427        Self {
428            epoch: EpochPair::new(epoch, prev_epoch),
429            kind: BarrierKind::Checkpoint,
430            tracing_context: TracingContext::none(),
431            mutation: Default::default(),
432        }
433    }
434}
435
436impl Barrier {
437    pub fn into_dispatcher(self) -> DispatcherBarrier {
438        DispatcherBarrier {
439            epoch: self.epoch,
440            mutation: (),
441            kind: self.kind,
442            tracing_context: self.tracing_context,
443        }
444    }
445
446    #[must_use]
447    pub fn with_mutation(self, mutation: Mutation) -> Self {
448        Self {
449            mutation: Some(Arc::new(mutation)),
450            ..self
451        }
452    }
453
454    #[must_use]
455    pub fn with_stop(self) -> Self {
456        self.with_mutation(Mutation::Stop(StopMutation {
457            dropped_actors: Default::default(),
458            dropped_sink_fragments: Default::default(),
459        }))
460    }
461
462    /// Whether this barrier carries stop mutation.
463    pub fn is_with_stop_mutation(&self) -> bool {
464        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
465    }
466
467    /// Whether this barrier is to stop the actor with `actor_id`.
468    pub fn is_stop(&self, actor_id: ActorId) -> bool {
469        self.all_stop_actors()
470            .is_some_and(|actors| actors.contains(&actor_id))
471    }
472
473    pub fn is_checkpoint(&self) -> bool {
474        self.kind == BarrierKind::Checkpoint
475    }
476
477    /// Get the initial split assignments for the actor with `actor_id`.
478    ///
479    /// This should only be called on the initial barrier received by the executor. It must be
480    ///
481    /// - `Add` mutation when it's a new streaming job, or recovery.
482    /// - `Update` mutation when it's created for scaling.
483    ///
484    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
485    /// of existing executors.
486    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
487        match self.mutation.as_deref()? {
488            Mutation::Update(UpdateMutation { actor_splits, .. })
489            | Mutation::Add(AddMutation {
490                splits: actor_splits,
491                ..
492            }) => actor_splits.get(&actor_id),
493
494            _ => {
495                if cfg!(debug_assertions) {
496                    panic!(
497                        "the initial mutation of the barrier should not be {:?}",
498                        self.mutation
499                    );
500                }
501                None
502            }
503        }
504        .map(|s| s.as_slice())
505    }
506
507    /// Get all actors that to be stopped (dropped) by this barrier.
508    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
509        match self.mutation.as_deref() {
510            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
511            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
512            _ => None,
513        }
514    }
515
516    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
517    /// `Values` to decide whether to output the existing (historical) data.
518    ///
519    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
520    /// added for scaling are not included.
521    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
522        match self.mutation.as_deref() {
523            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
524                added_actors.contains(&actor_id)
525            }
526            _ => false,
527        }
528    }
529
530    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
531        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
532            fragment_ids.contains(&fragment_id)
533        } else {
534            false
535        }
536    }
537
538    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
539    ///
540    /// # Use case
541    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
542    /// * Pause a standalone shared `SourceExecutor`.
543    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
544    ///
545    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
546    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
547    /// needs to be turned off.
548    ///
549    /// ## Some special cases not included
550    ///
551    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
552    /// care about **number of downstream fragments** (more precisely, existence).
553    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
554    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
555    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
556        let Some(mutation) = self.mutation.as_deref() else {
557            return false;
558        };
559        match mutation {
560            // Add is for mv, index and sink creation.
561            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
562            Mutation::Update(_)
563            | Mutation::Stop(_)
564            | Mutation::Pause
565            | Mutation::Resume
566            | Mutation::SourceChangeSplit(_)
567            | Mutation::Throttle { .. }
568            | Mutation::DropSubscriptions { .. }
569            | Mutation::ConnectorPropsChange(_)
570            | Mutation::StartFragmentBackfill { .. }
571            | Mutation::RefreshStart { .. }
572            | Mutation::ListFinish { .. }
573            | Mutation::LoadFinish { .. }
574            | Mutation::ResetSource { .. }
575            | Mutation::InjectSourceOffsets { .. } => false,
576        }
577    }
578
579    /// Whether this barrier requires the executor to pause its data stream on startup.
580    pub fn is_pause_on_startup(&self) -> bool {
581        match self.mutation.as_deref() {
582            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
583            _ => false,
584        }
585    }
586
587    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
588        match self.mutation.as_deref() {
589            Some(Mutation::Add(AddMutation {
590                backfill_nodes_to_pause,
591                ..
592            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
593            Some(Mutation::Update(_)) => false,
594            _ => {
595                tracing::warn!(
596                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
597                    self
598                );
599                false
600            }
601        }
602    }
603
604    /// Whether this barrier is for resume.
605    pub fn is_resume(&self) -> bool {
606        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
607    }
608
609    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
610    /// with `actor_id`.
611    pub fn as_update_merge(
612        &self,
613        actor_id: ActorId,
614        upstream_fragment_id: UpstreamFragmentId,
615    ) -> Option<&MergeUpdate> {
616        self.mutation
617            .as_deref()
618            .and_then(|mutation| match mutation {
619                Mutation::Update(UpdateMutation { merges, .. }) => {
620                    merges.get(&(actor_id, upstream_fragment_id))
621                }
622                _ => None,
623            })
624    }
625
626    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
627    /// the specified downstream fragment.
628    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
629        self.mutation
630            .as_deref()
631            .and_then(|mutation| match mutation {
632                Mutation::Add(AddMutation {
633                    new_upstream_sinks, ..
634                }) => new_upstream_sinks.get(&fragment_id),
635                _ => None,
636            })
637    }
638
639    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
640    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
641        self.mutation
642            .as_deref()
643            .and_then(|mutation| match mutation {
644                Mutation::Stop(StopMutation {
645                    dropped_sink_fragments,
646                    ..
647                }) => Some(dropped_sink_fragments),
648                _ => None,
649            })
650    }
651
652    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
653    /// with `actor_id`.
654    ///
655    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
656    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
657    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
658        self.mutation
659            .as_deref()
660            .and_then(|mutation| match mutation {
661                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
662                    vnode_bitmaps.get(&actor_id).cloned()
663                }
664                _ => None,
665            })
666    }
667
668    pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
669        self.mutation
670            .as_deref()
671            .and_then(|mutation| match mutation {
672                Mutation::Update(UpdateMutation {
673                    sink_schema_change, ..
674                }) => sink_schema_change.get(&sink_id).cloned(),
675                _ => None,
676            })
677    }
678
679    pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
680        match self.mutation.as_deref() {
681            Some(Mutation::DropSubscriptions {
682                subscriptions_to_drop,
683            })
684            | Some(Mutation::Update(UpdateMutation {
685                subscriptions_to_drop,
686                ..
687            })) => Some(subscriptions_to_drop.as_slice()),
688            _ => None,
689        }
690    }
691
692    pub fn get_curr_epoch(&self) -> Epoch {
693        Epoch(self.epoch.curr)
694    }
695
696    /// Retrieve the tracing context for the **current** epoch of this barrier.
697    pub fn tracing_context(&self) -> &TracingContext {
698        &self.tracing_context
699    }
700
701    pub fn added_subscriber_on_mv_table(
702        &self,
703        mv_table_id: TableId,
704    ) -> impl Iterator<Item = SubscriberId> + '_ {
705        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
706            Some(add)
707        } else {
708            None
709        }
710        .into_iter()
711        .flat_map(move |add| {
712            add.subscriptions_to_add.iter().filter_map(
713                move |(upstream_mv_table_id, subscriber_id)| {
714                    if *upstream_mv_table_id == mv_table_id {
715                        Some(*subscriber_id)
716                    } else {
717                        None
718                    }
719                },
720            )
721        })
722    }
723}
724
725impl<M: PartialEq> PartialEq for BarrierInner<M> {
726    fn eq(&self, other: &Self) -> bool {
727        self.epoch == other.epoch && self.mutation == other.mutation
728    }
729}
730
731impl Mutation {
732    /// Return true if the mutation is stop.
733    ///
734    /// Note that this does not mean we will stop the current actor.
735    #[cfg(test)]
736    pub fn is_stop(&self) -> bool {
737        matches!(self, Mutation::Stop(_))
738    }
739
740    #[cfg(test)]
741    fn to_protobuf(&self) -> PbMutation {
742        use risingwave_pb::source::{
743            ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
744        };
745        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
746        use risingwave_pb::stream_plan::{
747            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
748            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
749            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
750            PbThrottleMutation, PbUpdateMutation,
751        };
752        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
753            actor_splits
754                .iter()
755                .map(|(&actor_id, splits)| {
756                    (
757                        actor_id,
758                        ConnectorSplits {
759                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
760                        },
761                    )
762                })
763                .collect::<HashMap<_, _>>()
764        };
765
766        match self {
767            Mutation::Stop(StopMutation {
768                dropped_actors,
769                dropped_sink_fragments,
770            }) => PbMutation::Stop(PbStopMutation {
771                actors: dropped_actors.iter().copied().collect(),
772                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
773            }),
774            Mutation::Update(UpdateMutation {
775                dispatchers,
776                merges,
777                vnode_bitmaps,
778                dropped_actors,
779                actor_splits,
780                actor_new_dispatchers,
781                actor_cdc_table_snapshot_splits,
782                sink_schema_change,
783                subscriptions_to_drop,
784            }) => PbMutation::Update(PbUpdateMutation {
785                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
786                merge_update: merges.values().cloned().collect(),
787                actor_vnode_bitmap_update: vnode_bitmaps
788                    .iter()
789                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
790                    .collect(),
791                dropped_actors: dropped_actors.iter().copied().collect(),
792                actor_splits: actor_splits_to_protobuf(actor_splits),
793                actor_new_dispatchers: actor_new_dispatchers
794                    .iter()
795                    .map(|(&actor_id, dispatchers)| {
796                        (
797                            actor_id,
798                            PbDispatchers {
799                                dispatchers: dispatchers.clone(),
800                            },
801                        )
802                    })
803                    .collect(),
804                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
805                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
806                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
807                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
808                            generation: *generation,
809                        })
810                    }).collect()
811                }),
812                sink_schema_change: sink_schema_change
813                    .iter()
814                    .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
815                    .collect(),
816                subscriptions_to_drop: subscriptions_to_drop.clone(),
817            }),
818            Mutation::Add(AddMutation {
819                adds,
820                added_actors,
821                splits,
822                pause,
823                subscriptions_to_add,
824                backfill_nodes_to_pause,
825                actor_cdc_table_snapshot_splits,
826                new_upstream_sinks,
827            }) => PbMutation::Add(PbAddMutation {
828                actor_dispatchers: adds
829                    .iter()
830                    .map(|(&actor_id, dispatchers)| {
831                        (
832                            actor_id,
833                            PbDispatchers {
834                                dispatchers: dispatchers.clone(),
835                            },
836                        )
837                    })
838                    .collect(),
839                added_actors: added_actors.iter().copied().collect(),
840                actor_splits: actor_splits_to_protobuf(splits),
841                pause: *pause,
842                subscriptions_to_add: subscriptions_to_add
843                    .iter()
844                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
845                        subscriber_id: *subscriber_id,
846                        upstream_mv_table_id: *table_id,
847                    })
848                    .collect(),
849                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
850                actor_cdc_table_snapshot_splits:
851                Some(PbCdcTableSnapshotSplitsWithGeneration {
852                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
853                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
854                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
855                            generation: *generation,
856                        })
857                    }).collect()
858                }),
859                new_upstream_sinks: new_upstream_sinks
860                    .iter()
861                    .map(|(k, v)| (*k, v.clone()))
862                    .collect(),
863            }),
864            Mutation::SourceChangeSplit(changes) => {
865                PbMutation::Splits(PbSourceChangeSplitMutation {
866                    actor_splits: changes
867                        .iter()
868                        .map(|(&actor_id, splits)| {
869                            (
870                                actor_id,
871                                ConnectorSplits {
872                                    splits: splits
873                                        .clone()
874                                        .iter()
875                                        .map(ConnectorSplit::from)
876                                        .collect(),
877                                },
878                            )
879                        })
880                        .collect(),
881                })
882            }
883            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
884            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
885            Mutation::Throttle (changes) => PbMutation::Throttle(PbThrottleMutation {
886                fragment_throttle: changes.clone(),
887            }),
888            Mutation::DropSubscriptions {
889                subscriptions_to_drop,
890            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
891                info: subscriptions_to_drop.clone(),
892            }),
893            Mutation::ConnectorPropsChange(map) => {
894                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
895                    connector_props_infos: map
896                        .iter()
897                        .map(|(actor_id, options)| {
898                            (
899                                *actor_id,
900                                ConnectorPropsInfo {
901                                    connector_props_info: options
902                                        .iter()
903                                        .map(|(k, v)| (k.clone(), v.clone()))
904                                        .collect(),
905                                },
906                            )
907                        })
908                        .collect(),
909                })
910            }
911            Mutation::StartFragmentBackfill { fragment_ids } => {
912                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
913                    fragment_ids: fragment_ids.iter().copied().collect(),
914                })
915            }
916            Mutation::RefreshStart {
917                table_id,
918                associated_source_id,
919            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
920                table_id: *table_id,
921                associated_source_id: *associated_source_id,
922            }),
923            Mutation::ListFinish {
924                associated_source_id,
925            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
926                associated_source_id: *associated_source_id,
927            }),
928            Mutation::LoadFinish {
929                associated_source_id,
930            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
931                associated_source_id: *associated_source_id,
932            }),
933            Mutation::ResetSource { source_id } => {
934                PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
935                    source_id: source_id.as_raw_id(),
936                })
937            }
938            Mutation::InjectSourceOffsets {
939                source_id,
940                split_offsets,
941            } => PbMutation::InjectSourceOffsets(
942                risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
943                    source_id: source_id.as_raw_id(),
944                    split_offsets: split_offsets.clone(),
945                },
946            ),
947        }
948    }
949
950    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
951        let mutation = match prost {
952            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
953                dropped_actors: stop.actors.iter().copied().collect(),
954                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
955            }),
956
957            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
958                dispatchers: update
959                    .dispatcher_update
960                    .iter()
961                    .map(|u| (u.actor_id, u.clone()))
962                    .into_group_map(),
963                merges: update
964                    .merge_update
965                    .iter()
966                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
967                    .collect(),
968                vnode_bitmaps: update
969                    .actor_vnode_bitmap_update
970                    .iter()
971                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
972                    .collect(),
973                dropped_actors: update.dropped_actors.iter().copied().collect(),
974                actor_splits: update
975                    .actor_splits
976                    .iter()
977                    .map(|(&actor_id, splits)| {
978                        (
979                            actor_id,
980                            splits
981                                .splits
982                                .iter()
983                                .map(|split| split.try_into().unwrap())
984                                .collect(),
985                        )
986                    })
987                    .collect(),
988                actor_new_dispatchers: update
989                    .actor_new_dispatchers
990                    .iter()
991                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
992                    .collect(),
993                actor_cdc_table_snapshot_splits:
994                    build_actor_cdc_table_snapshot_splits_with_generation(
995                        update
996                            .actor_cdc_table_snapshot_splits
997                            .clone()
998                            .unwrap_or_default(),
999                    ),
1000                sink_schema_change: update
1001                    .sink_schema_change
1002                    .iter()
1003                    .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
1004                    .collect(),
1005                subscriptions_to_drop: update.subscriptions_to_drop.clone(),
1006            }),
1007
1008            PbMutation::Add(add) => Mutation::Add(AddMutation {
1009                adds: add
1010                    .actor_dispatchers
1011                    .iter()
1012                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1013                    .collect(),
1014                added_actors: add.added_actors.iter().copied().collect(),
1015                // TODO: remove this and use `SourceChangesSplit` after we support multiple
1016                // mutations.
1017                splits: add
1018                    .actor_splits
1019                    .iter()
1020                    .map(|(&actor_id, splits)| {
1021                        (
1022                            actor_id,
1023                            splits
1024                                .splits
1025                                .iter()
1026                                .map(|split| split.try_into().unwrap())
1027                                .collect(),
1028                        )
1029                    })
1030                    .collect(),
1031                pause: add.pause,
1032                subscriptions_to_add: add
1033                    .subscriptions_to_add
1034                    .iter()
1035                    .map(
1036                        |SubscriptionUpstreamInfo {
1037                             subscriber_id,
1038                             upstream_mv_table_id,
1039                         }| { (*upstream_mv_table_id, *subscriber_id) },
1040                    )
1041                    .collect(),
1042                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1043                actor_cdc_table_snapshot_splits:
1044                    build_actor_cdc_table_snapshot_splits_with_generation(
1045                        add.actor_cdc_table_snapshot_splits
1046                            .clone()
1047                            .unwrap_or_default(),
1048                    ),
1049                new_upstream_sinks: add
1050                    .new_upstream_sinks
1051                    .iter()
1052                    .map(|(k, v)| (*k, v.clone()))
1053                    .collect(),
1054            }),
1055
1056            PbMutation::Splits(s) => {
1057                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1058                    Vec::with_capacity(s.actor_splits.len());
1059                for (&actor_id, splits) in &s.actor_splits {
1060                    if !splits.splits.is_empty() {
1061                        change_splits.push((
1062                            actor_id,
1063                            splits
1064                                .splits
1065                                .iter()
1066                                .map(SplitImpl::try_from)
1067                                .try_collect()?,
1068                        ));
1069                    }
1070                }
1071                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1072            }
1073            PbMutation::Pause(_) => Mutation::Pause,
1074            PbMutation::Resume(_) => Mutation::Resume,
1075            PbMutation::Throttle(changes) => Mutation::Throttle(changes.fragment_throttle.clone()),
1076            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1077                subscriptions_to_drop: drop.info.clone(),
1078            },
1079            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1080                Mutation::ConnectorPropsChange(
1081                    alter_connector_props
1082                        .connector_props_infos
1083                        .iter()
1084                        .map(|(connector_id, options)| {
1085                            (
1086                                *connector_id,
1087                                options
1088                                    .connector_props_info
1089                                    .iter()
1090                                    .map(|(k, v)| (k.clone(), v.clone()))
1091                                    .collect(),
1092                            )
1093                        })
1094                        .collect(),
1095                )
1096            }
1097            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1098                Mutation::StartFragmentBackfill {
1099                    fragment_ids: start_fragment_backfill
1100                        .fragment_ids
1101                        .iter()
1102                        .copied()
1103                        .collect(),
1104                }
1105            }
1106            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1107                table_id: refresh_start.table_id,
1108                associated_source_id: refresh_start.associated_source_id,
1109            },
1110            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1111                associated_source_id: list_finish.associated_source_id,
1112            },
1113            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1114                associated_source_id: load_finish.associated_source_id,
1115            },
1116            PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1117                source_id: SourceId::from(reset_source.source_id),
1118            },
1119            PbMutation::InjectSourceOffsets(inject) => Mutation::InjectSourceOffsets {
1120                source_id: SourceId::from(inject.source_id),
1121                split_offsets: inject.split_offsets.clone(),
1122            },
1123        };
1124        Ok(mutation)
1125    }
1126}
1127
1128impl<M> BarrierInner<M> {
1129    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1130        let Self {
1131            epoch,
1132            mutation,
1133            kind,
1134            tracing_context,
1135            ..
1136        } = self;
1137
1138        PbBarrier {
1139            epoch: Some(PbEpoch {
1140                curr: epoch.curr,
1141                prev: epoch.prev,
1142            }),
1143            mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1144                mutation: Some(mutation),
1145            }),
1146            tracing_context: tracing_context.to_protobuf(),
1147            kind: *kind as _,
1148        }
1149    }
1150
1151    fn from_protobuf_inner(
1152        prost: &PbBarrier,
1153        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1154    ) -> StreamExecutorResult<Self> {
1155        let epoch = prost.get_epoch()?;
1156
1157        Ok(Self {
1158            kind: prost.kind(),
1159            epoch: EpochPair::new(epoch.curr, epoch.prev),
1160            mutation: mutation_from_pb(
1161                (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1162            )?,
1163            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1164        })
1165    }
1166
1167    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1168        BarrierInner {
1169            epoch: self.epoch,
1170            mutation: f(self.mutation),
1171            kind: self.kind,
1172            tracing_context: self.tracing_context,
1173        }
1174    }
1175}
1176
1177impl DispatcherBarrier {
1178    pub fn to_protobuf(&self) -> PbBarrier {
1179        self.to_protobuf_inner(|_| None)
1180    }
1181}
1182
1183impl Barrier {
1184    #[cfg(test)]
1185    pub fn to_protobuf(&self) -> PbBarrier {
1186        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1187    }
1188
1189    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1190        Self::from_protobuf_inner(prost, |mutation| {
1191            mutation
1192                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1193                .transpose()
1194        })
1195    }
1196}
1197
1198#[derive(Debug, PartialEq, Eq, Clone)]
1199pub struct Watermark {
1200    pub col_idx: usize,
1201    pub data_type: DataType,
1202    pub val: ScalarImpl,
1203}
1204
1205impl PartialOrd for Watermark {
1206    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1207        Some(self.cmp(other))
1208    }
1209}
1210
1211impl Ord for Watermark {
1212    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1213        self.val.default_cmp(&other.val)
1214    }
1215}
1216
1217impl Watermark {
1218    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1219        Self {
1220            col_idx,
1221            data_type,
1222            val,
1223        }
1224    }
1225
1226    pub async fn transform_with_expr(
1227        self,
1228        expr: &NonStrictExpression<impl Expression>,
1229        new_col_idx: usize,
1230    ) -> Option<Self> {
1231        let Self { col_idx, val, .. } = self;
1232        let row = {
1233            let mut row = vec![None; col_idx + 1];
1234            row[col_idx] = Some(val);
1235            OwnedRow::new(row)
1236        };
1237        let val = expr.eval_row_infallible(&row).await?;
1238        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1239    }
1240
1241    /// Transform the watermark with the given output indices. If this watermark is not in the
1242    /// output, return `None`.
1243    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1244        output_indices
1245            .iter()
1246            .position(|p| *p == self.col_idx)
1247            .map(|new_col_idx| self.with_idx(new_col_idx))
1248    }
1249
1250    pub fn to_protobuf(&self) -> PbWatermark {
1251        PbWatermark {
1252            column: Some(PbInputRef {
1253                index: self.col_idx as _,
1254                r#type: Some(self.data_type.to_protobuf()),
1255            }),
1256            val: Some(&self.val).to_protobuf().into(),
1257        }
1258    }
1259
1260    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1261        let col_ref = prost.get_column()?;
1262        let data_type = DataType::from(col_ref.get_type()?);
1263        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1264            .expect("watermark value cannot be null");
1265        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1266    }
1267
1268    pub fn with_idx(self, idx: usize) -> Self {
1269        Self::new(idx, self.data_type, self.val)
1270    }
1271}
1272
1273#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1274#[derive(Debug, EnumAsInner, Clone)]
1275pub enum MessageInner<M> {
1276    Chunk(StreamChunk),
1277    Barrier(BarrierInner<M>),
1278    Watermark(Watermark),
1279}
1280
1281impl<M> MessageInner<M> {
1282    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1283        match self {
1284            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1285            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1286            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1287        }
1288    }
1289}
1290
1291pub type Message = MessageInner<BarrierMutationType>;
1292pub type DispatcherMessage = MessageInner<()>;
1293
1294/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1295/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1296#[derive(Debug, EnumAsInner, Clone)]
1297pub enum MessageBatchInner<M> {
1298    Chunk(StreamChunk),
1299    BarrierBatch(Vec<BarrierInner<M>>),
1300    Watermark(Watermark),
1301}
1302pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1303pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1304pub type DispatcherMessageBatch = MessageBatchInner<()>;
1305
1306impl From<DispatcherMessage> for DispatcherMessageBatch {
1307    fn from(m: DispatcherMessage) -> Self {
1308        match m {
1309            DispatcherMessage::Chunk(c) => Self::Chunk(c),
1310            DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1311            DispatcherMessage::Watermark(w) => Self::Watermark(w),
1312        }
1313    }
1314}
1315
1316impl From<StreamChunk> for Message {
1317    fn from(chunk: StreamChunk) -> Self {
1318        Message::Chunk(chunk)
1319    }
1320}
1321
1322impl<'a> TryFrom<&'a Message> for &'a Barrier {
1323    type Error = ();
1324
1325    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1326        match m {
1327            Message::Chunk(_) => Err(()),
1328            Message::Barrier(b) => Ok(b),
1329            Message::Watermark(_) => Err(()),
1330        }
1331    }
1332}
1333
1334impl Message {
1335    /// Return true if the message is a stop barrier, meaning the stream
1336    /// will not continue, false otherwise.
1337    ///
1338    /// Note that this does not mean we will stop the current actor.
1339    #[cfg(test)]
1340    pub fn is_stop(&self) -> bool {
1341        matches!(
1342            self,
1343            Message::Barrier(Barrier {
1344                mutation,
1345                ..
1346            }) if mutation.as_ref().unwrap().is_stop()
1347        )
1348    }
1349}
1350
1351impl DispatcherMessageBatch {
1352    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1353        let prost = match self {
1354            Self::Chunk(stream_chunk) => {
1355                let prost_stream_chunk = stream_chunk.to_protobuf();
1356                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1357            }
1358            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1359                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1360            }),
1361            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1362        };
1363        PbStreamMessageBatch {
1364            stream_message_batch: Some(prost),
1365        }
1366    }
1367
1368    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1369        let res = match prost.get_stream_message_batch()? {
1370            StreamMessageBatch::StreamChunk(chunk) => {
1371                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1372            }
1373            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1374                let barriers = barrier_batch
1375                    .barriers
1376                    .iter()
1377                    .map(|barrier| {
1378                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1379                            if mutation.is_some() {
1380                                if cfg!(debug_assertions) {
1381                                    panic!("should not receive message of barrier with mutation");
1382                                } else {
1383                                    warn!(?barrier, "receive message of barrier with mutation");
1384                                }
1385                            }
1386                            Ok(())
1387                        })
1388                    })
1389                    .try_collect()?;
1390                Self::BarrierBatch(barriers)
1391            }
1392            StreamMessageBatch::Watermark(watermark) => {
1393                Self::Watermark(Watermark::from_protobuf(watermark)?)
1394            }
1395        };
1396        Ok(res)
1397    }
1398
1399    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1400        ::prost::Message::encoded_len(msg)
1401    }
1402}
1403
1404pub type StreamKey = Vec<usize>;
1405pub type StreamKeyRef<'a> = &'a [usize];
1406pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1407
1408/// Expect the first message of the given `stream` as a barrier.
1409pub async fn expect_first_barrier<M: Debug>(
1410    stream: &mut (impl MessageStreamInner<M> + Unpin),
1411) -> StreamExecutorResult<BarrierInner<M>> {
1412    let message = stream
1413        .next()
1414        .instrument_await("expect_first_barrier")
1415        .await
1416        .context("failed to extract the first message: stream closed unexpectedly")??;
1417    let barrier = message
1418        .into_barrier()
1419        .expect("the first message must be a barrier");
1420    // TODO: Is this check correct?
1421    assert!(matches!(
1422        barrier.kind,
1423        BarrierKind::Checkpoint | BarrierKind::Initial
1424    ));
1425    Ok(barrier)
1426}
1427
1428/// Expect the first message of the given `stream` as a barrier.
1429pub async fn expect_first_barrier_from_aligned_stream(
1430    stream: &mut (impl AlignedMessageStream + Unpin),
1431) -> StreamExecutorResult<Barrier> {
1432    let message = stream
1433        .next()
1434        .instrument_await("expect_first_barrier")
1435        .await
1436        .context("failed to extract the first message: stream closed unexpectedly")??;
1437    let barrier = message
1438        .into_barrier()
1439        .expect("the first message must be a barrier");
1440    Ok(barrier)
1441}
1442
1443/// `StreamConsumer` is the last step in an actor.
1444pub trait StreamConsumer: Send + 'static {
1445    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1446
1447    fn execute(self: Box<Self>) -> Self::BarrierStream;
1448}
1449
1450type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1451
1452/// A stream for merging messages from multiple upstreams.
1453/// Can dynamically add and delete upstream streams.
1454/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1455pub struct DynamicReceivers<InputId, M> {
1456    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1457    barrier: Option<BarrierInner<M>>,
1458    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1459    start_ts: Option<Instant>,
1460    /// The upstreams that're blocked by the `barrier`.
1461    blocked: Vec<BoxedMessageInput<InputId, M>>,
1462    /// The upstreams that're not blocked and can be polled.
1463    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1464    /// watermark column index -> `BufferedWatermarks`
1465    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1466    /// Currently only used for union.
1467    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1468    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1469    merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1470}
1471
1472impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1473    for DynamicReceivers<InputId, M>
1474{
1475    type Item = MessageStreamItemInner<M>;
1476
1477    fn poll_next(
1478        mut self: Pin<&mut Self>,
1479        cx: &mut std::task::Context<'_>,
1480    ) -> Poll<Option<Self::Item>> {
1481        if self.is_empty() {
1482            return Poll::Ready(None);
1483        }
1484
1485        loop {
1486            match futures::ready!(self.active.poll_next_unpin(cx)) {
1487                // Directly forward the error.
1488                Some((Some(Err(e)), _)) => {
1489                    return Poll::Ready(Some(Err(e)));
1490                }
1491                // Handle the message from some upstream.
1492                Some((Some(Ok(message)), remaining)) => {
1493                    let input_id = remaining.id();
1494                    match message {
1495                        MessageInner::Chunk(chunk) => {
1496                            // Continue polling this upstream by pushing it back to `active`.
1497                            self.active.push(remaining.into_future());
1498                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1499                        }
1500                        MessageInner::Watermark(watermark) => {
1501                            // Continue polling this upstream by pushing it back to `active`.
1502                            self.active.push(remaining.into_future());
1503                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1504                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1505                            }
1506                        }
1507                        MessageInner::Barrier(barrier) => {
1508                            // Block this upstream by pushing it to `blocked`.
1509                            if self.blocked.is_empty() {
1510                                self.start_ts = Some(Instant::now());
1511                            }
1512                            self.blocked.push(remaining);
1513                            if let Some(current_barrier) = self.barrier.as_ref() {
1514                                if current_barrier.epoch != barrier.epoch {
1515                                    return Poll::Ready(Some(Err(
1516                                        StreamExecutorError::align_barrier(
1517                                            current_barrier.clone().map_mutation(|_| None),
1518                                            barrier.map_mutation(|_| None),
1519                                        ),
1520                                    )));
1521                                }
1522                            } else {
1523                                self.barrier = Some(barrier);
1524                            }
1525                        }
1526                    }
1527                }
1528                // We use barrier as the control message of the stream. That is, we always stop the
1529                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1530                // termination.
1531                //
1532                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1533                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1534                // So this branch will never be reached in all cases.
1535                Some((None, remaining)) => {
1536                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1537                        "upstream input {:?} unexpectedly closed",
1538                        remaining.id()
1539                    )))));
1540                }
1541                // There's no active upstreams. Process the barrier and resume the blocked ones.
1542                None => {
1543                    assert!(!self.blocked.is_empty());
1544
1545                    let start_ts = self
1546                        .start_ts
1547                        .take()
1548                        .expect("should have received at least one barrier");
1549                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1550                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1551                    }
1552                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1553                        merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1554                    }
1555
1556                    break;
1557                }
1558            }
1559        }
1560
1561        assert!(self.active.is_terminated());
1562
1563        let barrier = self.barrier.take().unwrap();
1564
1565        let upstreams = std::mem::take(&mut self.blocked);
1566        self.extend_active(upstreams);
1567        assert!(!self.active.is_terminated());
1568
1569        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1570    }
1571}
1572
1573impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1574    pub fn new(
1575        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1576        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1577        merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1578    ) -> Self {
1579        let mut this = Self {
1580            barrier: None,
1581            start_ts: None,
1582            blocked: Vec::with_capacity(upstreams.len()),
1583            active: Default::default(),
1584            buffered_watermarks: Default::default(),
1585            merge_barrier_align_duration,
1586            barrier_align_duration,
1587        };
1588        this.extend_active(upstreams);
1589        this
1590    }
1591
1592    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1593    /// clean state right after a barrier.
1594    pub fn extend_active(
1595        &mut self,
1596        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1597    ) {
1598        assert!(self.blocked.is_empty() && self.barrier.is_none());
1599
1600        self.active
1601            .extend(upstreams.into_iter().map(|s| s.into_future()));
1602    }
1603
1604    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1605    pub fn handle_watermark(
1606        &mut self,
1607        input_id: InputId,
1608        watermark: Watermark,
1609    ) -> Option<Watermark> {
1610        let col_idx = watermark.col_idx;
1611        // Insert a buffer watermarks when first received from a column.
1612        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1613        let watermarks = self
1614            .buffered_watermarks
1615            .entry(col_idx)
1616            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1617        watermarks.handle_watermark(input_id, watermark)
1618    }
1619
1620    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1621    /// right after a barrier.
1622    pub fn add_upstreams_from(
1623        &mut self,
1624        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1625    ) {
1626        assert!(self.blocked.is_empty() && self.barrier.is_none());
1627
1628        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1629        let input_ids = new_inputs.iter().map(|input| input.id());
1630        self.buffered_watermarks.values_mut().for_each(|buffers| {
1631            // Add buffers to the buffered watermarks for all cols
1632            buffers.add_buffers(input_ids.clone());
1633        });
1634        self.active
1635            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1636    }
1637
1638    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1639    /// clean state right after a barrier.
1640    /// The current container does not necessarily contain all the input ids passed in.
1641    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1642        assert!(self.blocked.is_empty() && self.barrier.is_none());
1643
1644        let new_upstreams = std::mem::take(&mut self.active)
1645            .into_iter()
1646            .map(|s| s.into_inner().unwrap())
1647            .filter(|u| !upstream_input_ids.contains(&u.id()));
1648        self.extend_active(new_upstreams);
1649        self.buffered_watermarks.values_mut().for_each(|buffers| {
1650            // Call `check_heap` in case the only upstream(s) that does not have
1651            // watermark in heap is removed
1652            buffers.remove_buffer(upstream_input_ids.clone());
1653        });
1654    }
1655
1656    pub fn merge_barrier_align_duration(
1657        &self,
1658    ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1659        self.merge_barrier_align_duration.clone()
1660    }
1661
1662    pub fn flush_buffered_watermarks(&mut self) {
1663        self.buffered_watermarks
1664            .values_mut()
1665            .for_each(|buffers| buffers.clear());
1666    }
1667
1668    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1669        self.blocked
1670            .iter()
1671            .map(|s| s.id())
1672            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1673    }
1674
1675    pub fn is_empty(&self) -> bool {
1676        self.blocked.is_empty() && self.active.is_empty()
1677    }
1678}
1679
1680// Explanation of why we need `DispatchBarrierBuffer`:
1681//
1682// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1683// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1684// for these old upstreams to completely process their barriers and align before we can safely update the
1685// `upstream-input-set`.
1686//
1687// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1688// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1689// downstream_merge_update -> old_actor_processing]
1690//
1691// To address this, we split the application of a barrier's `Mutation` into two steps:
1692// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1693//    and cache it.
1694// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1695//
1696// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1697// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1698// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1699// `barrier_rx`.
1700pub(crate) struct DispatchBarrierBuffer {
1701    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1702    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1703    recv_state: BarrierReceiverState,
1704    curr_upstream_fragment_id: FragmentId,
1705    actor_id: ActorId,
1706    // read-only context for building new inputs
1707    build_input_ctx: Arc<BuildInputContext>,
1708}
1709
1710struct BuildInputContext {
1711    pub actor_id: ActorId,
1712    pub local_barrier_manager: LocalBarrierManager,
1713    pub metrics: Arc<StreamingMetrics>,
1714    pub fragment_id: FragmentId,
1715    pub actor_config: Arc<StreamingConfig>,
1716}
1717
1718type BoxedNewInputsFuture =
1719    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1720
1721enum BarrierReceiverState {
1722    ReceivingBarrier,
1723    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1724}
1725
1726impl DispatchBarrierBuffer {
1727    pub fn new(
1728        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1729        actor_id: ActorId,
1730        curr_upstream_fragment_id: FragmentId,
1731        local_barrier_manager: LocalBarrierManager,
1732        metrics: Arc<StreamingMetrics>,
1733        fragment_id: FragmentId,
1734        actor_config: Arc<StreamingConfig>,
1735    ) -> Self {
1736        Self {
1737            buffer: VecDeque::new(),
1738            barrier_rx,
1739            recv_state: BarrierReceiverState::ReceivingBarrier,
1740            curr_upstream_fragment_id,
1741            actor_id,
1742            build_input_ctx: Arc::new(BuildInputContext {
1743                actor_id,
1744                local_barrier_manager,
1745                metrics,
1746                fragment_id,
1747                actor_config,
1748            }),
1749        }
1750    }
1751
1752    pub async fn await_next_message(
1753        &mut self,
1754        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1755        metrics: &ActorInputMetrics,
1756    ) -> StreamExecutorResult<DispatcherMessage> {
1757        let mut start_time = Instant::now();
1758        let interval_duration = Duration::from_secs(15);
1759        let mut interval =
1760            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1761
1762        loop {
1763            tokio::select! {
1764                biased;
1765                msg = stream.try_next() => {
1766                    metrics
1767                        .actor_input_buffer_blocking_duration_ns
1768                        .inc_by(start_time.elapsed().as_nanos() as u64);
1769                    return msg?.ok_or_else(
1770                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1771                    );
1772                }
1773
1774                e = self.continuously_fetch_barrier_rx() => {
1775                    return Err(e);
1776                }
1777
1778                _ = interval.tick() => {
1779                    start_time = Instant::now();
1780                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1781                    continue;
1782                }
1783            }
1784        }
1785    }
1786
1787    pub async fn pop_barrier_with_inputs(
1788        &mut self,
1789        barrier: DispatcherBarrier,
1790    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1791        while self.buffer.is_empty() {
1792            self.try_fetch_barrier_rx(false).await?;
1793        }
1794        let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1795        assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1796
1797        Ok((recv_barrier, inputs))
1798    }
1799
1800    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1801        loop {
1802            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1803                return e;
1804            }
1805        }
1806    }
1807
1808    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1809        match &mut self.recv_state {
1810            BarrierReceiverState::ReceivingBarrier => {
1811                let Some(barrier) = self.barrier_rx.recv().await else {
1812                    if pending_on_end {
1813                        return pending().await;
1814                    } else {
1815                        return Err(StreamExecutorError::channel_closed(
1816                            "barrier channel closed unexpectedly",
1817                        ));
1818                    }
1819                };
1820                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1821                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1822                } else {
1823                    self.buffer.push_back((barrier, None));
1824                }
1825            }
1826            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1827                let new_inputs = fut.await?;
1828                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1829                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1830            }
1831        }
1832        Ok(())
1833    }
1834
1835    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1836        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1837            && !update.added_upstream_actors.is_empty()
1838        {
1839            // When update upstream fragment, added_actors will not be empty.
1840            let upstream_fragment_id =
1841                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1842                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1843                    new_upstream_fragment_id
1844                } else {
1845                    self.curr_upstream_fragment_id
1846                };
1847            let ctx = self.build_input_ctx.clone();
1848            let added_upstream_actors = update.added_upstream_actors.clone();
1849            let barrier = barrier.clone();
1850            let fut = async move {
1851                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1852                    let mut new_input = new_input(
1853                        &ctx.local_barrier_manager,
1854                        ctx.metrics.clone(),
1855                        ctx.actor_id,
1856                        ctx.fragment_id,
1857                        upstream_actor,
1858                        upstream_fragment_id,
1859                        ctx.actor_config.clone(),
1860                    )
1861                    .await?;
1862
1863                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1864                    // original upstreams.
1865                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1866                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1867
1868                    StreamExecutorResult::Ok(new_input)
1869                }))
1870                .await
1871            }
1872            .boxed();
1873
1874            Some(fut)
1875        } else {
1876            None
1877        }
1878    }
1879}