risingwave_stream/executor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54    PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
55    PbWatermark, SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63    BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::{Context, anyhow};
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139    CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::{DispatchExecutor, SyncLogStoreDispatchExecutor};
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::asof_join::{AsOfCpuEncoding, AsOfMemoryEncoding};
157pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
158pub use join::{AsOfDesc, AsOfJoinType, JoinType};
159pub use lookup::*;
160pub use lookup_union::LookupUnionExecutor;
161pub use merge::MergeExecutor;
162pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
163pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
164pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
165pub use no_op::NoOpExecutor;
166pub use now::*;
167pub use over_window::*;
168pub use rearranged_chain::RearrangedChainExecutor;
169pub use receiver::ReceiverExecutor;
170use risingwave_common::id::SourceId;
171pub use row_merge::RowMergeExecutor;
172pub use sink::SinkExecutor;
173pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
174pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
175pub use temporal_join::TemporalJoinExecutor;
176pub use top_n::{
177    AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
178};
179pub use troublemaker::TroublemakerExecutor;
180pub use union::UnionExecutor;
181pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
182pub use utils::DummyExecutor;
183pub use values::ValuesExecutor;
184pub use vector::*;
185pub use watermark_filter::{UpsertWatermarkFilterExecutor, WatermarkFilterExecutor};
186pub use wrapper::WrapperExecutor;
187
188use self::barrier_align::AlignedMessageStream;
189
190pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
191pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
192pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
193pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
194
195pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
196use risingwave_connector::sink::catalog::SinkId;
197use risingwave_connector::source::cdc::{
198    CdcTableSnapshotSplitAssignmentWithGeneration,
199    build_actor_cdc_table_snapshot_splits_with_generation,
200};
201use risingwave_pb::id::{ExecutorId, SubscriberId};
202use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
203
204pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
205pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
206pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
207
208/// Static information of an executor.
209#[derive(Debug, Default, Clone)]
210pub struct ExecutorInfo {
211    /// The schema of the OUTPUT of the executor.
212    pub schema: Schema,
213
214    /// The stream key indices of the OUTPUT of the executor.
215    pub stream_key: StreamKey,
216
217    /// The stream kind of the OUTPUT of the executor.
218    pub stream_kind: PbStreamKind,
219
220    /// Identity of the executor.
221    pub identity: String,
222
223    /// The executor id of the executor.
224    pub id: ExecutorId,
225}
226
227impl ExecutorInfo {
228    pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
229        Self {
230            schema,
231            stream_key,
232            stream_kind: PbStreamKind::Retract, // dummy value for test
233            identity,
234            id: id.into(),
235        }
236    }
237}
238
239/// [`Execute`] describes the methods an executor should implement to handle control messages.
240pub trait Execute: Send + 'static {
241    fn execute(self: Box<Self>) -> BoxedMessageStream;
242
243    fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
244        self.execute()
245    }
246
247    fn boxed(self) -> Box<dyn Execute>
248    where
249        Self: Sized + Send + 'static,
250    {
251        Box::new(self)
252    }
253}
254
255/// [`Executor`] combines the static information ([`ExecutorInfo`]) and the executable object to
256/// handle messages ([`Execute`]).
257pub struct Executor {
258    info: ExecutorInfo,
259    execute: Box<dyn Execute>,
260}
261
262impl Executor {
263    pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
264        Self { info, execute }
265    }
266
267    pub fn info(&self) -> &ExecutorInfo {
268        &self.info
269    }
270
271    pub fn schema(&self) -> &Schema {
272        &self.info.schema
273    }
274
275    pub fn stream_key(&self) -> StreamKeyRef<'_> {
276        &self.info.stream_key
277    }
278
279    pub fn stream_kind(&self) -> PbStreamKind {
280        self.info.stream_kind
281    }
282
283    pub fn identity(&self) -> &str {
284        &self.info.identity
285    }
286
287    pub fn execute(self) -> BoxedMessageStream {
288        self.execute.execute()
289    }
290
291    pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
292        self.execute.execute_with_epoch(epoch)
293    }
294}
295
296impl std::fmt::Debug for Executor {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        f.write_str(self.identity())
299    }
300}
301
302impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
303    fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
304        Self::new(info, execute)
305    }
306}
307
308impl<E> From<(ExecutorInfo, E)> for Executor
309where
310    E: Execute,
311{
312    fn from((info, execute): (ExecutorInfo, E)) -> Self {
313        Self::new(info, execute.boxed())
314    }
315}
316
317pub const INVALID_EPOCH: u64 = 0;
318
319type UpstreamFragmentId = FragmentId;
320type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
321
322#[derive(Debug, Clone)]
323#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
324pub struct UpdateMutation {
325    pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
326    pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
327    pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
328    pub dropped_actors: HashSet<ActorId>,
329    pub actor_splits: SplitAssignments,
330    pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
331    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
332    pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
333    pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
334}
335
336#[derive(Debug, Clone)]
337#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
338pub struct AddMutation {
339    pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
340    pub added_actors: HashSet<ActorId>,
341    // TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
342    pub splits: SplitAssignments,
343    pub pause: bool,
344    /// (`upstream_mv_table_id`,  `subscriber_id`)
345    pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
346    /// nodes which should start backfill
347    pub backfill_nodes_to_pause: HashSet<FragmentId>,
348    pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
349    pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
350}
351
352#[derive(Debug, Clone)]
353#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
354pub struct StopMutation {
355    pub dropped_actors: HashSet<ActorId>,
356    pub dropped_sink_fragments: HashSet<FragmentId>,
357}
358
359/// See [`PbMutation`] for the semantics of each mutation.
360#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
361#[derive(Debug, Clone)]
362pub enum Mutation {
363    Stop(StopMutation),
364    Update(UpdateMutation),
365    Add(AddMutation),
366    SourceChangeSplit(SplitAssignments),
367    Pause,
368    Resume,
369    Throttle(HashMap<FragmentId, ThrottleConfig>),
370    ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
371    DropSubscriptions {
372        /// `subscriber` -> `upstream_mv_table_id`
373        subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
374    },
375    StartFragmentBackfill {
376        fragment_ids: HashSet<FragmentId>,
377    },
378    RefreshStart {
379        table_id: TableId,
380        associated_source_id: SourceId,
381    },
382    ListFinish {
383        associated_source_id: SourceId,
384    },
385    LoadFinish {
386        associated_source_id: SourceId,
387    },
388    ResetSource {
389        source_id: SourceId,
390    },
391    InjectSourceOffsets {
392        source_id: SourceId,
393        /// Split ID -> offset (JSON-encoded based on connector type)
394        split_offsets: HashMap<String, String>,
395    },
396}
397
398/// The generic type `M` is the mutation type of the barrier.
399///
400/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased.
401/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`.
402#[derive(Debug, Clone)]
403pub struct BarrierInner<M> {
404    pub epoch: EpochPair,
405    pub mutation: M,
406    pub kind: BarrierKind,
407
408    /// Tracing context for the **current** epoch of this barrier.
409    pub tracing_context: TracingContext,
410}
411
412pub type BarrierMutationType = Option<Arc<Mutation>>;
413pub type Barrier = BarrierInner<BarrierMutationType>;
414pub type DispatcherBarrier = BarrierInner<()>;
415
416impl<M: Default> BarrierInner<M> {
417    /// Create a plain barrier.
418    pub fn new_test_barrier(epoch: u64) -> Self {
419        Self {
420            epoch: EpochPair::new_test_epoch(epoch),
421            kind: BarrierKind::Checkpoint,
422            tracing_context: TracingContext::none(),
423            mutation: Default::default(),
424        }
425    }
426
427    pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
428        Self {
429            epoch: EpochPair::new(epoch, prev_epoch),
430            kind: BarrierKind::Checkpoint,
431            tracing_context: TracingContext::none(),
432            mutation: Default::default(),
433        }
434    }
435}
436
437impl Barrier {
438    pub fn into_dispatcher(self) -> DispatcherBarrier {
439        DispatcherBarrier {
440            epoch: self.epoch,
441            mutation: (),
442            kind: self.kind,
443            tracing_context: self.tracing_context,
444        }
445    }
446
447    #[must_use]
448    pub fn with_mutation(self, mutation: Mutation) -> Self {
449        Self {
450            mutation: Some(Arc::new(mutation)),
451            ..self
452        }
453    }
454
455    #[must_use]
456    pub fn with_stop(self) -> Self {
457        self.with_mutation(Mutation::Stop(StopMutation {
458            dropped_actors: Default::default(),
459            dropped_sink_fragments: Default::default(),
460        }))
461    }
462
463    /// Whether this barrier carries stop mutation.
464    pub fn is_with_stop_mutation(&self) -> bool {
465        matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
466    }
467
468    /// Whether this barrier is to stop the actor with `actor_id`.
469    pub fn is_stop(&self, actor_id: ActorId) -> bool {
470        self.all_stop_actors()
471            .is_some_and(|actors| actors.contains(&actor_id))
472    }
473
474    pub fn is_checkpoint(&self) -> bool {
475        self.kind == BarrierKind::Checkpoint
476    }
477
478    /// Get the initial split assignments for the actor with `actor_id`.
479    ///
480    /// This should only be called on the initial barrier received by the executor. It must be
481    ///
482    /// - `Add` mutation when it's a new streaming job, or recovery.
483    /// - `Update` mutation when it's created for scaling.
484    ///
485    /// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
486    /// of existing executors.
487    pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
488        match self.mutation.as_deref()? {
489            Mutation::Update(UpdateMutation { actor_splits, .. })
490            | Mutation::Add(AddMutation {
491                splits: actor_splits,
492                ..
493            }) => actor_splits.get(&actor_id),
494
495            _ => {
496                if cfg!(debug_assertions) {
497                    panic!(
498                        "the initial mutation of the barrier should not be {:?}",
499                        self.mutation
500                    );
501                }
502                None
503            }
504        }
505        .map(|s| s.as_slice())
506    }
507
508    /// Get all actors that to be stopped (dropped) by this barrier.
509    pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
510        match self.mutation.as_deref() {
511            Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
512            Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
513            _ => None,
514        }
515    }
516
517    /// Whether this barrier is to newly add the actor with `actor_id`. This is used for `Chain` and
518    /// `Values` to decide whether to output the existing (historical) data.
519    ///
520    /// By "newly", we mean the actor belongs to a subgraph of a new streaming job. That is, actors
521    /// added for scaling are not included.
522    pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
523        match self.mutation.as_deref() {
524            Some(Mutation::Add(AddMutation { added_actors, .. })) => {
525                added_actors.contains(&actor_id)
526            }
527            _ => false,
528        }
529    }
530
531    pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
532        if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
533            fragment_ids.contains(&fragment_id)
534        } else {
535            false
536        }
537    }
538
539    /// Whether this barrier adds new downstream fragment for the actor with `upstream_actor_id`.
540    ///
541    /// # Use case
542    /// Some optimizations are applied when an actor doesn't have any downstreams ("standalone" actors).
543    /// * Pause a standalone shared `SourceExecutor`.
544    /// * Disable a standalone `MaterializeExecutor`'s conflict check.
545    ///
546    /// This is implemented by checking `actor_context.initial_dispatch_num` on startup, and
547    /// check `has_more_downstream_fragments` on barrier to see whether the optimization
548    /// needs to be turned off.
549    ///
550    /// ## Some special cases not included
551    ///
552    /// Note that this is not `has_new_downstream_actor/fragment`. For our use case, we only
553    /// care about **number of downstream fragments** (more precisely, existence).
554    /// - When scaling, the number of downstream actors is changed, and they are "new", but downstream fragments is not changed.
555    /// - When `ALTER TABLE sink_into_table`, the fragment is replaced with a "new" one, but the number is not changed.
556    pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
557        let Some(mutation) = self.mutation.as_deref() else {
558            return false;
559        };
560        match mutation {
561            // Add is for mv, index and sink creation.
562            Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
563            Mutation::Update(_)
564            | Mutation::Stop(_)
565            | Mutation::Pause
566            | Mutation::Resume
567            | Mutation::SourceChangeSplit(_)
568            | Mutation::Throttle { .. }
569            | Mutation::DropSubscriptions { .. }
570            | Mutation::ConnectorPropsChange(_)
571            | Mutation::StartFragmentBackfill { .. }
572            | Mutation::RefreshStart { .. }
573            | Mutation::ListFinish { .. }
574            | Mutation::LoadFinish { .. }
575            | Mutation::ResetSource { .. }
576            | Mutation::InjectSourceOffsets { .. } => false,
577        }
578    }
579
580    /// Whether this barrier requires the executor to pause its data stream on startup.
581    pub fn is_pause_on_startup(&self) -> bool {
582        match self.mutation.as_deref() {
583            Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
584            _ => false,
585        }
586    }
587
588    pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
589        match self.mutation.as_deref() {
590            Some(Mutation::Add(AddMutation {
591                backfill_nodes_to_pause,
592                ..
593            })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
594            Some(Mutation::Update(_)) => false,
595            _ => {
596                tracing::warn!(
597                    "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
598                    self
599                );
600                false
601            }
602        }
603    }
604
605    /// Whether this barrier is for resume.
606    pub fn is_resume(&self) -> bool {
607        matches!(self.mutation.as_deref(), Some(Mutation::Resume))
608    }
609
610    /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
611    /// with `actor_id`.
612    pub fn as_update_merge(
613        &self,
614        actor_id: ActorId,
615        upstream_fragment_id: UpstreamFragmentId,
616    ) -> Option<&MergeUpdate> {
617        self.mutation
618            .as_deref()
619            .and_then(|mutation| match mutation {
620                Mutation::Update(UpdateMutation { merges, .. }) => {
621                    merges.get(&(actor_id, upstream_fragment_id))
622                }
623                _ => None,
624            })
625    }
626
627    /// Returns the new upstream sink information if this barrier is to add a new upstream sink for
628    /// the specified downstream fragment.
629    pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
630        self.mutation
631            .as_deref()
632            .and_then(|mutation| match mutation {
633                Mutation::Add(AddMutation {
634                    new_upstream_sinks, ..
635                }) => new_upstream_sinks.get(&fragment_id),
636                _ => None,
637            })
638    }
639
640    /// Returns the dropped upstream sink-fragment if this barrier is to drop any sink.
641    pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
642        self.mutation
643            .as_deref()
644            .and_then(|mutation| match mutation {
645                Mutation::Stop(StopMutation {
646                    dropped_sink_fragments,
647                    ..
648                }) => Some(dropped_sink_fragments),
649                _ => None,
650            })
651    }
652
653    /// Returns the new vnode bitmap if this barrier is to update the vnode bitmap for the actor
654    /// with `actor_id`.
655    ///
656    /// Actually, this vnode bitmap update is only useful for the record accessing validation for
657    /// distributed executors, since the read/write pattern will never be across multiple vnodes.
658    pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
659        self.mutation
660            .as_deref()
661            .and_then(|mutation| match mutation {
662                Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
663                    vnode_bitmaps.get(&actor_id).cloned()
664                }
665                _ => None,
666            })
667    }
668
669    pub fn assume_no_update_vnode_bitmap(&self, actor_id: ActorId) -> StreamExecutorResult<()> {
670        if self.as_update_vnode_bitmap(actor_id).is_some() {
671            return Err(anyhow!("updating vnode bitmap in place is not supported").into());
672        }
673        Ok(())
674    }
675
676    pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
677        self.mutation
678            .as_deref()
679            .and_then(|mutation| match mutation {
680                Mutation::Update(UpdateMutation {
681                    sink_schema_change, ..
682                }) => sink_schema_change.get(&sink_id).cloned(),
683                _ => None,
684            })
685    }
686
687    pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
688        match self.mutation.as_deref() {
689            Some(Mutation::DropSubscriptions {
690                subscriptions_to_drop,
691            })
692            | Some(Mutation::Update(UpdateMutation {
693                subscriptions_to_drop,
694                ..
695            })) => Some(subscriptions_to_drop.as_slice()),
696            _ => None,
697        }
698    }
699
700    pub fn get_curr_epoch(&self) -> Epoch {
701        Epoch(self.epoch.curr)
702    }
703
704    /// Retrieve the tracing context for the **current** epoch of this barrier.
705    pub fn tracing_context(&self) -> &TracingContext {
706        &self.tracing_context
707    }
708
709    pub fn added_subscriber_on_mv_table(
710        &self,
711        mv_table_id: TableId,
712    ) -> impl Iterator<Item = SubscriberId> + '_ {
713        if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
714            Some(add)
715        } else {
716            None
717        }
718        .into_iter()
719        .flat_map(move |add| {
720            add.subscriptions_to_add.iter().filter_map(
721                move |(upstream_mv_table_id, subscriber_id)| {
722                    if *upstream_mv_table_id == mv_table_id {
723                        Some(*subscriber_id)
724                    } else {
725                        None
726                    }
727                },
728            )
729        })
730    }
731}
732
733impl<M: PartialEq> PartialEq for BarrierInner<M> {
734    fn eq(&self, other: &Self) -> bool {
735        self.epoch == other.epoch && self.mutation == other.mutation
736    }
737}
738
739impl Mutation {
740    /// Return true if the mutation is stop.
741    ///
742    /// Note that this does not mean we will stop the current actor.
743    #[cfg(test)]
744    pub fn is_stop(&self) -> bool {
745        matches!(self, Mutation::Stop(_))
746    }
747
748    #[cfg(test)]
749    fn to_protobuf(&self) -> PbMutation {
750        use risingwave_pb::source::{
751            ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
752        };
753        use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
754        use risingwave_pb::stream_plan::{
755            PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
756            PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
757            PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
758            PbThrottleMutation, PbUpdateMutation,
759        };
760        let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
761            actor_splits
762                .iter()
763                .map(|(&actor_id, splits)| {
764                    (
765                        actor_id,
766                        ConnectorSplits {
767                            splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
768                        },
769                    )
770                })
771                .collect::<HashMap<_, _>>()
772        };
773
774        match self {
775            Mutation::Stop(StopMutation {
776                dropped_actors,
777                dropped_sink_fragments,
778            }) => PbMutation::Stop(PbStopMutation {
779                actors: dropped_actors.iter().copied().collect(),
780                dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
781            }),
782            Mutation::Update(UpdateMutation {
783                dispatchers,
784                merges,
785                vnode_bitmaps,
786                dropped_actors,
787                actor_splits,
788                actor_new_dispatchers,
789                actor_cdc_table_snapshot_splits,
790                sink_schema_change,
791                subscriptions_to_drop,
792            }) => PbMutation::Update(PbUpdateMutation {
793                dispatcher_update: dispatchers.values().flatten().cloned().collect(),
794                merge_update: merges.values().cloned().collect(),
795                actor_vnode_bitmap_update: vnode_bitmaps
796                    .iter()
797                    .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
798                    .collect(),
799                dropped_actors: dropped_actors.iter().copied().collect(),
800                actor_splits: actor_splits_to_protobuf(actor_splits),
801                actor_new_dispatchers: actor_new_dispatchers
802                    .iter()
803                    .map(|(&actor_id, dispatchers)| {
804                        (
805                            actor_id,
806                            PbDispatchers {
807                                dispatchers: dispatchers.clone(),
808                            },
809                        )
810                    })
811                    .collect(),
812                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
813                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
814                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
815                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
816                            generation: *generation,
817                        })
818                    }).collect()
819                }),
820                sink_schema_change: sink_schema_change
821                    .iter()
822                    .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
823                    .collect(),
824                subscriptions_to_drop: subscriptions_to_drop.clone(),
825            }),
826            Mutation::Add(AddMutation {
827                adds,
828                added_actors,
829                splits,
830                pause,
831                subscriptions_to_add,
832                backfill_nodes_to_pause,
833                actor_cdc_table_snapshot_splits,
834                new_upstream_sinks,
835            }) => PbMutation::Add(PbAddMutation {
836                actor_dispatchers: adds
837                    .iter()
838                    .map(|(&actor_id, dispatchers)| {
839                        (
840                            actor_id,
841                            PbDispatchers {
842                                dispatchers: dispatchers.clone(),
843                            },
844                        )
845                    })
846                    .collect(),
847                added_actors: added_actors.iter().copied().collect(),
848                actor_splits: actor_splits_to_protobuf(splits),
849                pause: *pause,
850                subscriptions_to_add: subscriptions_to_add
851                    .iter()
852                    .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
853                        subscriber_id: *subscriber_id,
854                        upstream_mv_table_id: *table_id,
855                    })
856                    .collect(),
857                backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
858                actor_cdc_table_snapshot_splits:
859                Some(PbCdcTableSnapshotSplitsWithGeneration {
860                    splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
861                        (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
862                            splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
863                            generation: *generation,
864                        })
865                    }).collect()
866                }),
867                new_upstream_sinks: new_upstream_sinks
868                    .iter()
869                    .map(|(k, v)| (*k, v.clone()))
870                    .collect(),
871            }),
872            Mutation::SourceChangeSplit(changes) => {
873                PbMutation::Splits(PbSourceChangeSplitMutation {
874                    actor_splits: changes
875                        .iter()
876                        .map(|(&actor_id, splits)| {
877                            (
878                                actor_id,
879                                ConnectorSplits {
880                                    splits: splits
881                                        .clone()
882                                        .iter()
883                                        .map(ConnectorSplit::from)
884                                        .collect(),
885                                },
886                            )
887                        })
888                        .collect(),
889                })
890            }
891            Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
892            Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
893            Mutation::Throttle (changes) => PbMutation::Throttle(PbThrottleMutation {
894                fragment_throttle: changes.clone(),
895            }),
896            Mutation::DropSubscriptions {
897                subscriptions_to_drop,
898            } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
899                info: subscriptions_to_drop.clone(),
900            }),
901            Mutation::ConnectorPropsChange(map) => {
902                PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
903                    connector_props_infos: map
904                        .iter()
905                        .map(|(actor_id, options)| {
906                            (
907                                *actor_id,
908                                ConnectorPropsInfo {
909                                    connector_props_info: options
910                                        .iter()
911                                        .map(|(k, v)| (k.clone(), v.clone()))
912                                        .collect(),
913                                },
914                            )
915                        })
916                        .collect(),
917                })
918            }
919            Mutation::StartFragmentBackfill { fragment_ids } => {
920                PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
921                    fragment_ids: fragment_ids.iter().copied().collect(),
922                })
923            }
924            Mutation::RefreshStart {
925                table_id,
926                associated_source_id,
927            } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
928                table_id: *table_id,
929                associated_source_id: *associated_source_id,
930            }),
931            Mutation::ListFinish {
932                associated_source_id,
933            } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
934                associated_source_id: *associated_source_id,
935            }),
936            Mutation::LoadFinish {
937                associated_source_id,
938            } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
939                associated_source_id: *associated_source_id,
940            }),
941            Mutation::ResetSource { source_id } => {
942                PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
943                    source_id: source_id.as_raw_id(),
944                })
945            }
946            Mutation::InjectSourceOffsets {
947                source_id,
948                split_offsets,
949            } => PbMutation::InjectSourceOffsets(
950                risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
951                    source_id: source_id.as_raw_id(),
952                    split_offsets: split_offsets.clone(),
953                },
954            ),
955        }
956    }
957
958    fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
959        let mutation = match prost {
960            PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
961                dropped_actors: stop.actors.iter().copied().collect(),
962                dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
963            }),
964
965            PbMutation::Update(update) => Mutation::Update(UpdateMutation {
966                dispatchers: update
967                    .dispatcher_update
968                    .iter()
969                    .map(|u| (u.actor_id, u.clone()))
970                    .into_group_map(),
971                merges: update
972                    .merge_update
973                    .iter()
974                    .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
975                    .collect(),
976                vnode_bitmaps: update
977                    .actor_vnode_bitmap_update
978                    .iter()
979                    .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
980                    .collect(),
981                dropped_actors: update.dropped_actors.iter().copied().collect(),
982                actor_splits: update
983                    .actor_splits
984                    .iter()
985                    .map(|(&actor_id, splits)| {
986                        (
987                            actor_id,
988                            splits
989                                .splits
990                                .iter()
991                                .map(|split| split.try_into().unwrap())
992                                .collect(),
993                        )
994                    })
995                    .collect(),
996                actor_new_dispatchers: update
997                    .actor_new_dispatchers
998                    .iter()
999                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1000                    .collect(),
1001                actor_cdc_table_snapshot_splits:
1002                    build_actor_cdc_table_snapshot_splits_with_generation(
1003                        update
1004                            .actor_cdc_table_snapshot_splits
1005                            .clone()
1006                            .unwrap_or_default(),
1007                    ),
1008                sink_schema_change: update
1009                    .sink_schema_change
1010                    .iter()
1011                    .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
1012                    .collect(),
1013                subscriptions_to_drop: update.subscriptions_to_drop.clone(),
1014            }),
1015
1016            PbMutation::Add(add) => Mutation::Add(AddMutation {
1017                adds: add
1018                    .actor_dispatchers
1019                    .iter()
1020                    .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1021                    .collect(),
1022                added_actors: add.added_actors.iter().copied().collect(),
1023                // TODO: remove this and use `SourceChangesSplit` after we support multiple
1024                // mutations.
1025                splits: add
1026                    .actor_splits
1027                    .iter()
1028                    .map(|(&actor_id, splits)| {
1029                        (
1030                            actor_id,
1031                            splits
1032                                .splits
1033                                .iter()
1034                                .map(|split| split.try_into().unwrap())
1035                                .collect(),
1036                        )
1037                    })
1038                    .collect(),
1039                pause: add.pause,
1040                subscriptions_to_add: add
1041                    .subscriptions_to_add
1042                    .iter()
1043                    .map(
1044                        |SubscriptionUpstreamInfo {
1045                             subscriber_id,
1046                             upstream_mv_table_id,
1047                         }| { (*upstream_mv_table_id, *subscriber_id) },
1048                    )
1049                    .collect(),
1050                backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1051                actor_cdc_table_snapshot_splits:
1052                    build_actor_cdc_table_snapshot_splits_with_generation(
1053                        add.actor_cdc_table_snapshot_splits
1054                            .clone()
1055                            .unwrap_or_default(),
1056                    ),
1057                new_upstream_sinks: add
1058                    .new_upstream_sinks
1059                    .iter()
1060                    .map(|(k, v)| (*k, v.clone()))
1061                    .collect(),
1062            }),
1063
1064            PbMutation::Splits(s) => {
1065                let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1066                    Vec::with_capacity(s.actor_splits.len());
1067                for (&actor_id, splits) in &s.actor_splits {
1068                    if !splits.splits.is_empty() {
1069                        change_splits.push((
1070                            actor_id,
1071                            splits
1072                                .splits
1073                                .iter()
1074                                .map(SplitImpl::try_from)
1075                                .try_collect()?,
1076                        ));
1077                    }
1078                }
1079                Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1080            }
1081            PbMutation::Pause(_) => Mutation::Pause,
1082            PbMutation::Resume(_) => Mutation::Resume,
1083            PbMutation::Throttle(changes) => Mutation::Throttle(changes.fragment_throttle.clone()),
1084            PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1085                subscriptions_to_drop: drop.info.clone(),
1086            },
1087            PbMutation::ConnectorPropsChange(alter_connector_props) => {
1088                Mutation::ConnectorPropsChange(
1089                    alter_connector_props
1090                        .connector_props_infos
1091                        .iter()
1092                        .map(|(connector_id, options)| {
1093                            (
1094                                *connector_id,
1095                                options
1096                                    .connector_props_info
1097                                    .iter()
1098                                    .map(|(k, v)| (k.clone(), v.clone()))
1099                                    .collect(),
1100                            )
1101                        })
1102                        .collect(),
1103                )
1104            }
1105            PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1106                Mutation::StartFragmentBackfill {
1107                    fragment_ids: start_fragment_backfill
1108                        .fragment_ids
1109                        .iter()
1110                        .copied()
1111                        .collect(),
1112                }
1113            }
1114            PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1115                table_id: refresh_start.table_id,
1116                associated_source_id: refresh_start.associated_source_id,
1117            },
1118            PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1119                associated_source_id: list_finish.associated_source_id,
1120            },
1121            PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1122                associated_source_id: load_finish.associated_source_id,
1123            },
1124            PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1125                source_id: SourceId::from(reset_source.source_id),
1126            },
1127            PbMutation::InjectSourceOffsets(inject) => Mutation::InjectSourceOffsets {
1128                source_id: SourceId::from(inject.source_id),
1129                split_offsets: inject.split_offsets.clone(),
1130            },
1131        };
1132        Ok(mutation)
1133    }
1134}
1135
1136impl<M> BarrierInner<M> {
1137    fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1138        let Self {
1139            epoch,
1140            mutation,
1141            kind,
1142            tracing_context,
1143            ..
1144        } = self;
1145
1146        PbBarrier {
1147            epoch: Some(PbEpoch {
1148                curr: epoch.curr,
1149                prev: epoch.prev,
1150            }),
1151            mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1152                mutation: Some(mutation),
1153            }),
1154            tracing_context: tracing_context.to_protobuf(),
1155            kind: *kind as _,
1156        }
1157    }
1158
1159    fn from_protobuf_inner(
1160        prost: &PbBarrier,
1161        mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1162    ) -> StreamExecutorResult<Self> {
1163        let epoch = prost.get_epoch()?;
1164
1165        Ok(Self {
1166            kind: prost.kind(),
1167            epoch: EpochPair::new(epoch.curr, epoch.prev),
1168            mutation: mutation_from_pb(
1169                (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1170            )?,
1171            tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1172        })
1173    }
1174
1175    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1176        BarrierInner {
1177            epoch: self.epoch,
1178            mutation: f(self.mutation),
1179            kind: self.kind,
1180            tracing_context: self.tracing_context,
1181        }
1182    }
1183}
1184
1185impl DispatcherBarrier {
1186    pub fn to_protobuf(&self) -> PbBarrier {
1187        self.to_protobuf_inner(|_| None)
1188    }
1189}
1190
1191impl Barrier {
1192    #[cfg(test)]
1193    pub fn to_protobuf(&self) -> PbBarrier {
1194        self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1195    }
1196
1197    pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1198        Self::from_protobuf_inner(prost, |mutation| {
1199            mutation
1200                .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1201                .transpose()
1202        })
1203    }
1204}
1205
1206#[derive(Debug, PartialEq, Eq, Clone)]
1207pub struct Watermark {
1208    pub col_idx: usize,
1209    pub data_type: DataType,
1210    pub val: ScalarImpl,
1211}
1212
1213impl PartialOrd for Watermark {
1214    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1215        Some(self.cmp(other))
1216    }
1217}
1218
1219impl Ord for Watermark {
1220    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1221        self.val.default_cmp(&other.val)
1222    }
1223}
1224
1225impl Watermark {
1226    pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1227        Self {
1228            col_idx,
1229            data_type,
1230            val,
1231        }
1232    }
1233
1234    pub async fn transform_with_expr(
1235        self,
1236        expr: &NonStrictExpression<impl Expression>,
1237        new_col_idx: usize,
1238    ) -> Option<Self> {
1239        let Self { col_idx, val, .. } = self;
1240        let row = {
1241            let mut row = vec![None; col_idx + 1];
1242            row[col_idx] = Some(val);
1243            OwnedRow::new(row)
1244        };
1245        let val = expr.eval_row_infallible(&row).await?;
1246        Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1247    }
1248
1249    /// Transform the watermark with the given output indices. If this watermark is not in the
1250    /// output, return `None`.
1251    pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1252        output_indices
1253            .iter()
1254            .position(|p| *p == self.col_idx)
1255            .map(|new_col_idx| self.with_idx(new_col_idx))
1256    }
1257
1258    pub fn to_protobuf(&self) -> PbWatermark {
1259        PbWatermark {
1260            column: Some(PbInputRef {
1261                index: self.col_idx as _,
1262                r#type: Some(self.data_type.to_protobuf()),
1263            }),
1264            val: Some(&self.val).to_protobuf().into(),
1265        }
1266    }
1267
1268    pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1269        let col_ref = prost.get_column()?;
1270        let data_type = DataType::from(col_ref.get_type()?);
1271        let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1272            .expect("watermark value cannot be null");
1273        Ok(Self::new(col_ref.get_index() as _, data_type, val))
1274    }
1275
1276    pub fn with_idx(self, idx: usize) -> Self {
1277        Self::new(idx, self.data_type, self.val)
1278    }
1279}
1280
1281#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1282#[derive(Debug, EnumAsInner, Clone)]
1283pub enum MessageInner<M> {
1284    Chunk(StreamChunk),
1285    Barrier(BarrierInner<M>),
1286    Watermark(Watermark),
1287}
1288
1289impl<M> MessageInner<M> {
1290    pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1291        match self {
1292            MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1293            MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1294            MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1295        }
1296    }
1297}
1298
1299pub type Message = MessageInner<BarrierMutationType>;
1300pub type DispatcherMessage = MessageInner<()>;
1301
1302/// `MessageBatchInner` is used exclusively by `Dispatcher` and the `Merger`/`Receiver` for exchanging messages between them.
1303/// It shares the same message type as the fundamental `MessageInner`, but batches multiple barriers into a single message.
1304#[derive(Debug, EnumAsInner, Clone)]
1305pub enum MessageBatchInner<M> {
1306    Chunk(StreamChunk),
1307    BarrierBatch(Vec<BarrierInner<M>>),
1308    Watermark(Watermark),
1309}
1310pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1311pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1312pub type DispatcherMessageBatch = MessageBatchInner<()>;
1313
1314impl<M> From<MessageInner<M>> for MessageBatchInner<M> {
1315    fn from(m: MessageInner<M>) -> Self {
1316        match m {
1317            MessageInner::Chunk(c) => Self::Chunk(c),
1318            MessageInner::Barrier(b) => Self::BarrierBatch(vec![b]),
1319            MessageInner::Watermark(w) => Self::Watermark(w),
1320        }
1321    }
1322}
1323
1324impl From<StreamChunk> for Message {
1325    fn from(chunk: StreamChunk) -> Self {
1326        Message::Chunk(chunk)
1327    }
1328}
1329
1330impl<'a> TryFrom<&'a Message> for &'a Barrier {
1331    type Error = ();
1332
1333    fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1334        match m {
1335            Message::Chunk(_) => Err(()),
1336            Message::Barrier(b) => Ok(b),
1337            Message::Watermark(_) => Err(()),
1338        }
1339    }
1340}
1341
1342impl Message {
1343    /// Return true if the message is a stop barrier, meaning the stream
1344    /// will not continue, false otherwise.
1345    ///
1346    /// Note that this does not mean we will stop the current actor.
1347    #[cfg(test)]
1348    pub fn is_stop(&self) -> bool {
1349        matches!(
1350            self,
1351            Message::Barrier(Barrier {
1352                mutation,
1353                ..
1354            }) if mutation.as_ref().unwrap().is_stop()
1355        )
1356    }
1357}
1358
1359impl DispatcherMessageBatch {
1360    pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1361        let prost = match self {
1362            Self::Chunk(stream_chunk) => {
1363                let prost_stream_chunk = stream_chunk.to_protobuf();
1364                StreamMessageBatch::StreamChunk(prost_stream_chunk)
1365            }
1366            Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1367                barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1368            }),
1369            Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1370        };
1371        PbStreamMessageBatch {
1372            stream_message_batch: Some(prost),
1373        }
1374    }
1375
1376    pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1377        let res = match prost.get_stream_message_batch()? {
1378            StreamMessageBatch::StreamChunk(chunk) => {
1379                Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1380            }
1381            StreamMessageBatch::BarrierBatch(barrier_batch) => {
1382                let barriers = barrier_batch
1383                    .barriers
1384                    .iter()
1385                    .map(|barrier| {
1386                        DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1387                            if mutation.is_some() {
1388                                if cfg!(debug_assertions) {
1389                                    panic!("should not receive message of barrier with mutation");
1390                                } else {
1391                                    warn!(?barrier, "receive message of barrier with mutation");
1392                                }
1393                            }
1394                            Ok(())
1395                        })
1396                    })
1397                    .try_collect()?;
1398                Self::BarrierBatch(barriers)
1399            }
1400            StreamMessageBatch::Watermark(watermark) => {
1401                Self::Watermark(Watermark::from_protobuf(watermark)?)
1402            }
1403        };
1404        Ok(res)
1405    }
1406
1407    pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1408        ::prost::Message::encoded_len(msg)
1409    }
1410}
1411
1412pub type StreamKey = Vec<usize>;
1413pub type StreamKeyRef<'a> = &'a [usize];
1414pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1415
1416/// Expect the first message of the given `stream` as a barrier.
1417pub async fn expect_first_barrier<M: Debug>(
1418    stream: &mut (impl MessageStreamInner<M> + Unpin),
1419) -> StreamExecutorResult<BarrierInner<M>> {
1420    let message = stream
1421        .next()
1422        .instrument_await("expect_first_barrier")
1423        .await
1424        .context("failed to extract the first message: stream closed unexpectedly")??;
1425    let barrier = message
1426        .into_barrier()
1427        .expect("the first message must be a barrier");
1428    // TODO: Is this check correct?
1429    assert!(matches!(
1430        barrier.kind,
1431        BarrierKind::Checkpoint | BarrierKind::Initial
1432    ));
1433    Ok(barrier)
1434}
1435
1436/// Expect the first message of the given `stream` as a barrier.
1437pub async fn expect_first_barrier_from_aligned_stream(
1438    stream: &mut (impl AlignedMessageStream + Unpin),
1439) -> StreamExecutorResult<Barrier> {
1440    let message = stream
1441        .next()
1442        .instrument_await("expect_first_barrier")
1443        .await
1444        .context("failed to extract the first message: stream closed unexpectedly")??;
1445    let barrier = message
1446        .into_barrier()
1447        .expect("the first message must be a barrier");
1448    Ok(barrier)
1449}
1450
1451/// `StreamConsumer` is the last step in an actor.
1452pub trait StreamConsumer: Send + 'static {
1453    type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1454
1455    fn execute(self: Box<Self>) -> Self::BarrierStream;
1456}
1457
1458type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1459
1460/// A stream for merging messages from multiple upstreams.
1461/// Can dynamically add and delete upstream streams.
1462/// For the meaning of the generic parameter `M` used, refer to `BarrierInner<M>`.
1463pub struct DynamicReceivers<InputId, M> {
1464    /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty.
1465    barrier: Option<BarrierInner<M>>,
1466    /// The start timestamp of the current barrier. Used for measuring the alignment duration.
1467    start_ts: Option<Instant>,
1468    /// The upstreams that're blocked by the `barrier`.
1469    blocked: Vec<BoxedMessageInput<InputId, M>>,
1470    /// The upstreams that're not blocked and can be polled.
1471    active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1472    /// watermark column index -> `BufferedWatermarks`
1473    buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1474    /// Currently only used for union.
1475    barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1476    /// Only for merge. If None, then we don't take `Instant::now()` and `observe` during `poll_next`
1477    merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1478}
1479
1480impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1481    for DynamicReceivers<InputId, M>
1482{
1483    type Item = MessageStreamItemInner<M>;
1484
1485    fn poll_next(
1486        mut self: Pin<&mut Self>,
1487        cx: &mut std::task::Context<'_>,
1488    ) -> Poll<Option<Self::Item>> {
1489        if self.is_empty() {
1490            return Poll::Ready(None);
1491        }
1492
1493        loop {
1494            match futures::ready!(self.active.poll_next_unpin(cx)) {
1495                // Directly forward the error.
1496                Some((Some(Err(e)), _)) => {
1497                    return Poll::Ready(Some(Err(e)));
1498                }
1499                // Handle the message from some upstream.
1500                Some((Some(Ok(message)), remaining)) => {
1501                    let input_id = remaining.id();
1502                    match message {
1503                        MessageInner::Chunk(chunk) => {
1504                            // Continue polling this upstream by pushing it back to `active`.
1505                            self.active.push(remaining.into_future());
1506                            return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1507                        }
1508                        MessageInner::Watermark(watermark) => {
1509                            // Continue polling this upstream by pushing it back to `active`.
1510                            self.active.push(remaining.into_future());
1511                            if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1512                                return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1513                            }
1514                        }
1515                        MessageInner::Barrier(barrier) => {
1516                            // Block this upstream by pushing it to `blocked`.
1517                            if self.blocked.is_empty() {
1518                                self.start_ts = Some(Instant::now());
1519                            }
1520                            self.blocked.push(remaining);
1521                            if let Some(current_barrier) = self.barrier.as_ref() {
1522                                if current_barrier.epoch != barrier.epoch {
1523                                    return Poll::Ready(Some(Err(
1524                                        StreamExecutorError::align_barrier(
1525                                            current_barrier.clone().map_mutation(|_| None),
1526                                            barrier.map_mutation(|_| None),
1527                                        ),
1528                                    )));
1529                                }
1530                            } else {
1531                                self.barrier = Some(barrier);
1532                            }
1533                        }
1534                    }
1535                }
1536                // We use barrier as the control message of the stream. That is, we always stop the
1537                // actors actively when we receive a `Stop` mutation, instead of relying on the stream
1538                // termination.
1539                //
1540                // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly,
1541                // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above.
1542                // So this branch will never be reached in all cases.
1543                Some((None, remaining)) => {
1544                    return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1545                        "upstream input {:?} unexpectedly closed",
1546                        remaining.id()
1547                    )))));
1548                }
1549                // There's no active upstreams. Process the barrier and resume the blocked ones.
1550                None => {
1551                    assert!(!self.blocked.is_empty());
1552
1553                    let start_ts = self
1554                        .start_ts
1555                        .take()
1556                        .expect("should have received at least one barrier");
1557                    if let Some(barrier_align_duration) = &self.barrier_align_duration {
1558                        barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1559                    }
1560                    if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1561                        merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1562                    }
1563
1564                    break;
1565                }
1566            }
1567        }
1568
1569        assert!(self.active.is_terminated());
1570
1571        let barrier = self.barrier.take().unwrap();
1572
1573        let upstreams = std::mem::take(&mut self.blocked);
1574        self.extend_active(upstreams);
1575        assert!(!self.active.is_terminated());
1576
1577        Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1578    }
1579}
1580
1581impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1582    pub fn new(
1583        upstreams: Vec<BoxedMessageInput<InputId, M>>,
1584        barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1585        merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1586    ) -> Self {
1587        let mut this = Self {
1588            barrier: None,
1589            start_ts: None,
1590            blocked: Vec::with_capacity(upstreams.len()),
1591            active: Default::default(),
1592            buffered_watermarks: Default::default(),
1593            merge_barrier_align_duration,
1594            barrier_align_duration,
1595        };
1596        this.extend_active(upstreams);
1597        this
1598    }
1599
1600    /// Extend the active upstreams with the given upstreams. The current stream must be at the
1601    /// clean state right after a barrier.
1602    pub fn extend_active(
1603        &mut self,
1604        upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1605    ) {
1606        assert!(self.blocked.is_empty() && self.barrier.is_none());
1607
1608        self.active
1609            .extend(upstreams.into_iter().map(|s| s.into_future()));
1610    }
1611
1612    /// Handle a new watermark message. Optionally returns the watermark message to emit.
1613    pub fn handle_watermark(
1614        &mut self,
1615        input_id: InputId,
1616        watermark: Watermark,
1617    ) -> Option<Watermark> {
1618        let col_idx = watermark.col_idx;
1619        // Insert a buffer watermarks when first received from a column.
1620        let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1621        let watermarks = self
1622            .buffered_watermarks
1623            .entry(col_idx)
1624            .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1625        watermarks.handle_watermark(input_id, watermark)
1626    }
1627
1628    /// Consume `other` and add its upstreams to `self`. The two streams must be at the clean state
1629    /// right after a barrier.
1630    pub fn add_upstreams_from(
1631        &mut self,
1632        new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1633    ) {
1634        assert!(self.blocked.is_empty() && self.barrier.is_none());
1635
1636        let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1637        let input_ids = new_inputs.iter().map(|input| input.id());
1638        self.buffered_watermarks.values_mut().for_each(|buffers| {
1639            // Add buffers to the buffered watermarks for all cols
1640            buffers.add_buffers(input_ids.clone());
1641        });
1642        self.active
1643            .extend(new_inputs.into_iter().map(|s| s.into_future()));
1644    }
1645
1646    /// Remove upstreams from `self` in `upstream_input_ids`. The current stream must be at the
1647    /// clean state right after a barrier.
1648    /// The current container does not necessarily contain all the input ids passed in.
1649    pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1650        assert!(self.blocked.is_empty() && self.barrier.is_none());
1651
1652        let new_upstreams = std::mem::take(&mut self.active)
1653            .into_iter()
1654            .map(|s| s.into_inner().unwrap())
1655            .filter(|u| !upstream_input_ids.contains(&u.id()));
1656        self.extend_active(new_upstreams);
1657        self.buffered_watermarks.values_mut().for_each(|buffers| {
1658            // Call `check_heap` in case the only upstream(s) that does not have
1659            // watermark in heap is removed
1660            buffers.remove_buffer(upstream_input_ids.clone());
1661        });
1662    }
1663
1664    pub fn merge_barrier_align_duration(
1665        &self,
1666    ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1667        self.merge_barrier_align_duration.clone()
1668    }
1669
1670    pub fn flush_buffered_watermarks(&mut self) {
1671        self.buffered_watermarks
1672            .values_mut()
1673            .for_each(|buffers| buffers.clear());
1674    }
1675
1676    pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1677        self.blocked
1678            .iter()
1679            .map(|s| s.id())
1680            .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1681    }
1682
1683    pub fn is_empty(&self) -> bool {
1684        self.blocked.is_empty() && self.active.is_empty()
1685    }
1686}
1687
1688// Explanation of why we need `DispatchBarrierBuffer`:
1689//
1690// When we need to create or replace an upstream fragment for the current fragment, the `Merge` operator must
1691// add some new upstream actor inputs. However, the `Merge` operator may still have old upstreams. We must wait
1692// for these old upstreams to completely process their barriers and align before we can safely update the
1693// `upstream-input-set`.
1694//
1695// Meanwhile, the creation of a new upstream actor can only succeed after the channel to the downstream `Merge`
1696// operator has been established. This creates a potential dependency chain: [new_actor_creation ->
1697// downstream_merge_update -> old_actor_processing]
1698//
1699// To address this, we split the application of a barrier's `Mutation` into two steps:
1700// 1. Parse the `Mutation`. If there is an addition on the upstream-set, establish a channel with the upstream
1701//    and cache it.
1702// 2. When the upstream barrier actually arrives, apply the cached upstream changes to the upstream-set
1703//
1704// Additionally, since receiving a barrier from current upstream input and from the `barrier_rx` are
1705// asynchronous, we cannot determine which will arrive first. Therefore, when a barrier is received from an
1706// upstream: if a cached mutation is present, we apply it. Otherwise, we must fetch a new barrier from
1707// `barrier_rx`.
1708pub(crate) struct DispatchBarrierBuffer {
1709    buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1710    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1711    recv_state: BarrierReceiverState,
1712    curr_upstream_fragment_id: FragmentId,
1713    actor_id: ActorId,
1714    // read-only context for building new inputs
1715    build_input_ctx: Arc<BuildInputContext>,
1716}
1717
1718struct BuildInputContext {
1719    pub actor_id: ActorId,
1720    pub local_barrier_manager: LocalBarrierManager,
1721    pub metrics: Arc<StreamingMetrics>,
1722    pub fragment_id: FragmentId,
1723    pub actor_config: Arc<StreamingConfig>,
1724}
1725
1726type BoxedNewInputsFuture =
1727    Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1728
1729enum BarrierReceiverState {
1730    ReceivingBarrier,
1731    CreatingNewInput(Barrier, BoxedNewInputsFuture),
1732}
1733
1734impl DispatchBarrierBuffer {
1735    pub fn new(
1736        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1737        actor_id: ActorId,
1738        curr_upstream_fragment_id: FragmentId,
1739        local_barrier_manager: LocalBarrierManager,
1740        metrics: Arc<StreamingMetrics>,
1741        fragment_id: FragmentId,
1742        actor_config: Arc<StreamingConfig>,
1743    ) -> Self {
1744        Self {
1745            buffer: VecDeque::new(),
1746            barrier_rx,
1747            recv_state: BarrierReceiverState::ReceivingBarrier,
1748            curr_upstream_fragment_id,
1749            actor_id,
1750            build_input_ctx: Arc::new(BuildInputContext {
1751                actor_id,
1752                local_barrier_manager,
1753                metrics,
1754                fragment_id,
1755                actor_config,
1756            }),
1757        }
1758    }
1759
1760    pub async fn await_next_message(
1761        &mut self,
1762        stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1763        metrics: &ActorInputMetrics,
1764    ) -> StreamExecutorResult<DispatcherMessage> {
1765        let mut start_time = Instant::now();
1766        let interval_duration = Duration::from_secs(15);
1767        let mut interval =
1768            tokio::time::interval_at(start_time + interval_duration, interval_duration);
1769
1770        loop {
1771            tokio::select! {
1772                biased;
1773                msg = stream.try_next() => {
1774                    metrics
1775                        .actor_input_buffer_blocking_duration_ns
1776                        .inc_by(start_time.elapsed().as_nanos() as u64);
1777                    return msg?.ok_or_else(
1778                        || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1779                    );
1780                }
1781
1782                e = self.continuously_fetch_barrier_rx() => {
1783                    return Err(e);
1784                }
1785
1786                _ = interval.tick() => {
1787                    start_time = Instant::now();
1788                    metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1789                    continue;
1790                }
1791            }
1792        }
1793    }
1794
1795    pub async fn pop_barrier_with_inputs(
1796        &mut self,
1797        barrier: DispatcherBarrier,
1798    ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1799        while self.buffer.is_empty() {
1800            self.try_fetch_barrier_rx(false).await?;
1801        }
1802        let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1803        assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1804
1805        Ok((recv_barrier, inputs))
1806    }
1807
1808    async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1809        loop {
1810            if let Err(e) = self.try_fetch_barrier_rx(true).await {
1811                return e;
1812            }
1813        }
1814    }
1815
1816    async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1817        match &mut self.recv_state {
1818            BarrierReceiverState::ReceivingBarrier => {
1819                let Some(barrier) = self.barrier_rx.recv().await else {
1820                    if pending_on_end {
1821                        return pending().await;
1822                    } else {
1823                        return Err(StreamExecutorError::channel_closed(
1824                            "barrier channel closed unexpectedly",
1825                        ));
1826                    }
1827                };
1828                if let Some(fut) = self.pre_apply_barrier(&barrier) {
1829                    self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1830                } else {
1831                    self.buffer.push_back((barrier, None));
1832                }
1833            }
1834            BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1835                let new_inputs = fut.await?;
1836                self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1837                self.recv_state = BarrierReceiverState::ReceivingBarrier;
1838            }
1839        }
1840        Ok(())
1841    }
1842
1843    fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1844        if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1845            && !update.added_upstream_actors.is_empty()
1846        {
1847            // When update upstream fragment, added_actors will not be empty.
1848            let upstream_fragment_id =
1849                if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1850                    self.curr_upstream_fragment_id = new_upstream_fragment_id;
1851                    new_upstream_fragment_id
1852                } else {
1853                    self.curr_upstream_fragment_id
1854                };
1855            let ctx = self.build_input_ctx.clone();
1856            let added_upstream_actors = update.added_upstream_actors.clone();
1857            let barrier = barrier.clone();
1858            let fut = async move {
1859                try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1860                    let mut new_input = new_input(
1861                        &ctx.local_barrier_manager,
1862                        ctx.metrics.clone(),
1863                        ctx.actor_id,
1864                        ctx.fragment_id,
1865                        upstream_actor,
1866                        upstream_fragment_id,
1867                        ctx.actor_config.clone(),
1868                    )
1869                    .await?;
1870
1871                    // Poll the first barrier from the new upstreams. It must be the same as the one we polled from
1872                    // original upstreams.
1873                    let first_barrier = expect_first_barrier(&mut new_input).await?;
1874                    assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1875
1876                    StreamExecutorResult::Ok(new_input)
1877                }))
1878                .await
1879            }
1880            .boxed();
1881
1882            Some(fut)
1883        } else {
1884            None
1885        }
1886    }
1887}