1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::fmt::Debug;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Poll;
23use std::vec;
24
25use await_tree::InstrumentAwait;
26use enum_as_inner::EnumAsInner;
27use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
28use futures::{Stream, StreamExt};
29use itertools::Itertools;
30use prometheus::Histogram;
31use prometheus::core::{AtomicU64, GenericCounter};
32use risingwave_common::array::StreamChunk;
33use risingwave_common::bitmap::Bitmap;
34use risingwave_common::catalog::{Schema, TableId};
35use risingwave_common::metrics::LabelGuardedMetric;
36use risingwave_common::row::OwnedRow;
37use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
38use risingwave_common::util::epoch::{Epoch, EpochPair};
39use risingwave_common::util::tracing::TracingContext;
40use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
41use risingwave_connector::source::SplitImpl;
42use risingwave_expr::expr::{Expression, NonStrictExpression};
43use risingwave_pb::data::PbEpoch;
44use risingwave_pb::expr::PbInputRef;
45use risingwave_pb::stream_plan::barrier::BarrierKind;
46use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
47use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
48use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
49use risingwave_pb::stream_plan::{
50 BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
51 DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
52 PbDispatcher, PbStreamMessageBatch, PbUpdateMutation, PbWatermark, ResumeMutation,
53 SourceChangeSplitMutation, StartFragmentBackfillMutation, StopMutation,
54 SubscriptionUpstreamInfo, ThrottleMutation,
55};
56use smallvec::SmallVec;
57use tokio::time::Instant;
58
59use crate::error::StreamResult;
60use crate::executor::exchange::input::BoxedInput;
61use crate::executor::watermark::BufferedWatermarks;
62use crate::task::{ActorId, FragmentId};
63
64mod actor;
65mod barrier_align;
66pub mod exchange;
67pub mod monitor;
68
69pub mod aggregate;
70pub mod asof_join;
71mod backfill;
72mod barrier_recv;
73mod batch_query;
74mod chain;
75mod changelog;
76mod dedup;
77mod dispatch;
78pub mod dml;
79mod dynamic_filter;
80pub mod eowc;
81pub mod error;
82mod expand;
83mod filter;
84pub mod hash_join;
85mod hop_window;
86mod join;
87mod lookup;
88mod lookup_union;
89mod merge;
90mod mview;
91mod nested_loop_temporal_join;
92mod no_op;
93mod now;
94mod over_window;
95pub mod project;
96mod rearranged_chain;
97mod receiver;
98pub mod row_id_gen;
99mod sink;
100pub mod source;
101mod stream_reader;
102pub mod subtask;
103mod temporal_join;
104mod top_n;
105mod troublemaker;
106mod union;
107mod upstream_sink_union;
108mod values;
109mod watermark;
110mod watermark_filter;
111mod wrapper;
112
113mod approx_percentile;
114
115mod row_merge;
116
117#[cfg(test)]
118mod integration_tests;
119mod sync_kv_log_store;
120pub mod test_utils;
121mod utils;
122mod vector_index;
123
124pub use actor::{Actor, ActorContext, ActorContextRef};
125use anyhow::Context;
126pub use approx_percentile::global::GlobalApproxPercentileExecutor;
127pub use approx_percentile::local::LocalApproxPercentileExecutor;
128pub use backfill::arrangement_backfill::*;
129pub use backfill::cdc::{
130 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
131};
132pub use backfill::no_shuffle_backfill::*;
133pub use backfill::snapshot_backfill::*;
134pub use barrier_recv::BarrierRecvExecutor;
135pub use batch_query::BatchQueryExecutor;
136pub use chain::ChainExecutor;
137pub use changelog::ChangeLogExecutor;
138pub use dedup::AppendOnlyDedupExecutor;
139pub use dispatch::{DispatchExecutor, DispatcherImpl};
140pub use dynamic_filter::DynamicFilterExecutor;
141pub use error::{StreamExecutorError, StreamExecutorResult};
142pub use expand::ExpandExecutor;
143pub use filter::FilterExecutor;
144pub use hash_join::*;
145pub use hop_window::HopWindowExecutor;
146pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
147pub use join::{AsOfDesc, AsOfJoinType, JoinType};
148pub use lookup::*;
149pub use lookup_union::LookupUnionExecutor;
150pub use merge::MergeExecutor;
151pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
152pub use mview::*;
153pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
154pub use no_op::NoOpExecutor;
155pub use now::*;
156pub use over_window::*;
157pub use rearranged_chain::RearrangedChainExecutor;
158pub use receiver::ReceiverExecutor;
159use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
160pub use row_merge::RowMergeExecutor;
161pub use sink::SinkExecutor;
162pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
163pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
164pub use temporal_join::TemporalJoinExecutor;
165pub use top_n::{
166 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
167};
168pub use troublemaker::TroublemakerExecutor;
169pub use union::UnionExecutor;
170pub use upstream_sink_union::UpstreamSinkUnionExecutor;
171pub use utils::DummyExecutor;
172pub use values::ValuesExecutor;
173pub use vector_index::VectorIndexWriteExecutor;
174pub use watermark_filter::WatermarkFilterExecutor;
175pub use wrapper::WrapperExecutor;
176
177use self::barrier_align::AlignedMessageStream;
178
179pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
180pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
181pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
182pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
183
184pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
185use risingwave_connector::source::cdc::{
186 CdcTableSnapshotSplitAssignment, build_actor_cdc_table_snapshot_splits,
187 build_pb_actor_cdc_table_snapshot_splits,
188};
189use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
190use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
191
192pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
193pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
194pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
195
196#[derive(Debug, Default, Clone)]
198pub struct ExecutorInfo {
199 pub schema: Schema,
201
202 pub pk_indices: PkIndices,
206
207 pub identity: String,
209
210 pub id: u64,
212}
213
214impl ExecutorInfo {
215 pub fn new(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
216 Self {
217 schema,
218 pk_indices,
219 identity,
220 id,
221 }
222 }
223}
224
225pub trait Execute: Send + 'static {
227 fn execute(self: Box<Self>) -> BoxedMessageStream;
228
229 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
230 self.execute()
231 }
232
233 fn boxed(self) -> Box<dyn Execute>
234 where
235 Self: Sized + Send + 'static,
236 {
237 Box::new(self)
238 }
239}
240
241pub struct Executor {
244 info: ExecutorInfo,
245 execute: Box<dyn Execute>,
246}
247
248impl Executor {
249 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
250 Self { info, execute }
251 }
252
253 pub fn info(&self) -> &ExecutorInfo {
254 &self.info
255 }
256
257 pub fn schema(&self) -> &Schema {
258 &self.info.schema
259 }
260
261 pub fn pk_indices(&self) -> PkIndicesRef<'_> {
262 &self.info.pk_indices
263 }
264
265 pub fn identity(&self) -> &str {
266 &self.info.identity
267 }
268
269 pub fn execute(self) -> BoxedMessageStream {
270 self.execute.execute()
271 }
272
273 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
274 self.execute.execute_with_epoch(epoch)
275 }
276}
277
278impl std::fmt::Debug for Executor {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 f.write_str(self.identity())
281 }
282}
283
284impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
285 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
286 Self::new(info, execute)
287 }
288}
289
290impl<E> From<(ExecutorInfo, E)> for Executor
291where
292 E: Execute,
293{
294 fn from((info, execute): (ExecutorInfo, E)) -> Self {
295 Self::new(info, execute.boxed())
296 }
297}
298
299pub const INVALID_EPOCH: u64 = 0;
300
301type UpstreamFragmentId = FragmentId;
302type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
303
304#[derive(Debug, Clone, PartialEq)]
305pub struct UpdateMutation {
306 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
307 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
308 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
309 pub dropped_actors: HashSet<ActorId>,
310 pub actor_splits: SplitAssignments,
311 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
312 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignment,
313}
314
315#[derive(Debug, Clone, PartialEq)]
316pub struct AddMutation {
317 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
318 pub added_actors: HashSet<ActorId>,
319 pub splits: SplitAssignments,
321 pub pause: bool,
322 pub subscriptions_to_add: Vec<(TableId, u32)>,
324 pub backfill_nodes_to_pause: HashSet<FragmentId>,
326 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignment,
327}
328
329#[derive(Debug, Clone, PartialEq)]
331pub enum Mutation {
332 Stop(HashSet<ActorId>),
333 Update(UpdateMutation),
334 Add(AddMutation),
335 SourceChangeSplit(SplitAssignments),
336 Pause,
337 Resume,
338 Throttle(HashMap<ActorId, Option<u32>>),
339 AddAndUpdate(AddMutation, UpdateMutation),
340 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
341 DropSubscriptions {
342 subscriptions_to_drop: Vec<(u32, TableId)>,
344 },
345 StartFragmentBackfill {
346 fragment_ids: HashSet<FragmentId>,
347 },
348 RefreshStart {
349 table_id: TableId,
350 associated_source_id: TableId,
351 },
352 LoadFinish {
354 associated_source_id: TableId,
355 },
356}
357
358#[derive(Debug, Clone)]
363pub struct BarrierInner<M> {
364 pub epoch: EpochPair,
365 pub mutation: M,
366 pub kind: BarrierKind,
367
368 pub tracing_context: TracingContext,
370
371 pub passed_actors: Vec<ActorId>,
373}
374
375pub type BarrierMutationType = Option<Arc<Mutation>>;
376pub type Barrier = BarrierInner<BarrierMutationType>;
377pub type DispatcherBarrier = BarrierInner<()>;
378
379impl<M: Default> BarrierInner<M> {
380 pub fn new_test_barrier(epoch: u64) -> Self {
382 Self {
383 epoch: EpochPair::new_test_epoch(epoch),
384 kind: BarrierKind::Checkpoint,
385 tracing_context: TracingContext::none(),
386 mutation: Default::default(),
387 passed_actors: Default::default(),
388 }
389 }
390
391 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
392 Self {
393 epoch: EpochPair::new(epoch, prev_epoch),
394 kind: BarrierKind::Checkpoint,
395 tracing_context: TracingContext::none(),
396 mutation: Default::default(),
397 passed_actors: Default::default(),
398 }
399 }
400}
401
402impl Barrier {
403 pub fn into_dispatcher(self) -> DispatcherBarrier {
404 DispatcherBarrier {
405 epoch: self.epoch,
406 mutation: (),
407 kind: self.kind,
408 tracing_context: self.tracing_context,
409 passed_actors: self.passed_actors,
410 }
411 }
412
413 #[must_use]
414 pub fn with_mutation(self, mutation: Mutation) -> Self {
415 Self {
416 mutation: Some(Arc::new(mutation)),
417 ..self
418 }
419 }
420
421 #[must_use]
422 pub fn with_stop(self) -> Self {
423 self.with_mutation(Mutation::Stop(HashSet::default()))
424 }
425
426 pub fn is_with_stop_mutation(&self) -> bool {
428 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
429 }
430
431 pub fn is_stop(&self, actor_id: ActorId) -> bool {
433 self.all_stop_actors()
434 .is_some_and(|actors| actors.contains(&actor_id))
435 }
436
437 pub fn is_checkpoint(&self) -> bool {
438 self.kind == BarrierKind::Checkpoint
439 }
440
441 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
452 match self.mutation.as_deref()? {
453 Mutation::Update(UpdateMutation { actor_splits, .. })
454 | Mutation::Add(AddMutation {
455 splits: actor_splits,
456 ..
457 }) => actor_splits.get(&actor_id),
458
459 Mutation::AddAndUpdate(
460 AddMutation {
461 splits: add_actor_splits,
462 ..
463 },
464 UpdateMutation {
465 actor_splits: update_actor_splits,
466 ..
467 },
468 ) => add_actor_splits
469 .get(&actor_id)
470 .or_else(|| update_actor_splits.get(&actor_id)),
472
473 _ => {
474 if cfg!(debug_assertions) {
475 panic!(
476 "the initial mutation of the barrier should not be {:?}",
477 self.mutation
478 );
479 }
480 None
481 }
482 }
483 .map(|s| s.as_slice())
484 }
485
486 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
488 match self.mutation.as_deref() {
489 Some(Mutation::Stop(actors)) => Some(actors),
490 Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
491 | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
492 Some(dropped_actors)
493 }
494 _ => None,
495 }
496 }
497
498 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
504 match self.mutation.as_deref() {
505 Some(Mutation::Add(AddMutation { added_actors, .. }))
506 | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
507 added_actors.contains(&actor_id)
508 }
509 _ => false,
510 }
511 }
512
513 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
514 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
515 fragment_ids.contains(&fragment_id)
516 } else {
517 false
518 }
519 }
520
521 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
539 let Some(mutation) = self.mutation.as_deref() else {
540 return false;
541 };
542 match mutation {
543 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
545 Mutation::AddAndUpdate(
547 AddMutation { adds, .. },
548 UpdateMutation {
549 dispatchers,
550 actor_new_dispatchers,
551 ..
552 },
553 ) => {
554 adds.get(&upstream_actor_id).is_some()
555 || actor_new_dispatchers.get(&upstream_actor_id).is_some()
556 || dispatchers.get(&upstream_actor_id).is_some()
557 }
558 Mutation::Update(_)
559 | Mutation::Stop(_)
560 | Mutation::Pause
561 | Mutation::Resume
562 | Mutation::SourceChangeSplit(_)
563 | Mutation::Throttle(_)
564 | Mutation::DropSubscriptions { .. }
565 | Mutation::ConnectorPropsChange(_)
566 | Mutation::StartFragmentBackfill { .. }
567 | Mutation::RefreshStart { .. }
568 | Mutation::LoadFinish { .. } => false,
569 }
570 }
571
572 pub fn is_pause_on_startup(&self) -> bool {
574 match self.mutation.as_deref() {
575 Some(Mutation::Add(AddMutation { pause, .. }))
576 | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
577 _ => false,
578 }
579 }
580
581 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
582 match self.mutation.as_deref() {
583 Some(Mutation::Add(AddMutation {
584 backfill_nodes_to_pause,
585 ..
586 }))
587 | Some(Mutation::AddAndUpdate(
588 AddMutation {
589 backfill_nodes_to_pause,
590 ..
591 },
592 _,
593 )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
594 _ => {
595 tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
596 true
597 }
598 }
599 }
600
601 pub fn is_resume(&self) -> bool {
603 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
604 }
605
606 pub fn as_update_merge(
609 &self,
610 actor_id: ActorId,
611 upstream_fragment_id: UpstreamFragmentId,
612 ) -> Option<&MergeUpdate> {
613 self.mutation
614 .as_deref()
615 .and_then(|mutation| match mutation {
616 Mutation::Update(UpdateMutation { merges, .. })
617 | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
618 merges.get(&(actor_id, upstream_fragment_id))
619 }
620
621 _ => None,
622 })
623 }
624
625 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
631 self.mutation
632 .as_deref()
633 .and_then(|mutation| match mutation {
634 Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
635 | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
636 vnode_bitmaps.get(&actor_id).cloned()
637 }
638 _ => None,
639 })
640 }
641
642 pub fn get_curr_epoch(&self) -> Epoch {
643 Epoch(self.epoch.curr)
644 }
645
646 pub fn tracing_context(&self) -> &TracingContext {
648 &self.tracing_context
649 }
650
651 pub fn added_subscriber_on_mv_table(
652 &self,
653 mv_table_id: TableId,
654 ) -> impl Iterator<Item = u32> + '_ {
655 if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
656 self.mutation.as_deref()
657 {
658 Some(add)
659 } else {
660 None
661 }
662 .into_iter()
663 .flat_map(move |add| {
664 add.subscriptions_to_add.iter().filter_map(
665 move |(upstream_mv_table_id, subscriber_id)| {
666 if *upstream_mv_table_id == mv_table_id {
667 Some(*subscriber_id)
668 } else {
669 None
670 }
671 },
672 )
673 })
674 }
675}
676
677impl<M: PartialEq> PartialEq for BarrierInner<M> {
678 fn eq(&self, other: &Self) -> bool {
679 self.epoch == other.epoch && self.mutation == other.mutation
680 }
681}
682
683impl Mutation {
684 #[cfg(test)]
688 pub fn is_stop(&self) -> bool {
689 matches!(self, Mutation::Stop(_))
690 }
691
692 fn to_protobuf(&self) -> PbMutation {
693 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
694 actor_splits
695 .iter()
696 .map(|(&actor_id, splits)| {
697 (
698 actor_id,
699 ConnectorSplits {
700 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
701 },
702 )
703 })
704 .collect::<HashMap<_, _>>()
705 };
706
707 match self {
708 Mutation::Stop(actors) => PbMutation::Stop(StopMutation {
709 actors: actors.iter().copied().collect::<Vec<_>>(),
710 }),
711 Mutation::Update(UpdateMutation {
712 dispatchers,
713 merges,
714 vnode_bitmaps,
715 dropped_actors,
716 actor_splits,
717 actor_new_dispatchers,
718 actor_cdc_table_snapshot_splits,
719 }) => PbMutation::Update(PbUpdateMutation {
720 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
721 merge_update: merges.values().cloned().collect(),
722 actor_vnode_bitmap_update: vnode_bitmaps
723 .iter()
724 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
725 .collect(),
726 dropped_actors: dropped_actors.iter().cloned().collect(),
727 actor_splits: actor_splits_to_protobuf(actor_splits),
728 actor_new_dispatchers: actor_new_dispatchers
729 .iter()
730 .map(|(&actor_id, dispatchers)| {
731 (
732 actor_id,
733 Dispatchers {
734 dispatchers: dispatchers.clone(),
735 },
736 )
737 })
738 .collect(),
739 actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
740 actor_cdc_table_snapshot_splits.clone(),
741 ),
742 }),
743 Mutation::Add(AddMutation {
744 adds,
745 added_actors,
746 splits,
747 pause,
748 subscriptions_to_add,
749 backfill_nodes_to_pause,
750 actor_cdc_table_snapshot_splits,
751 }) => PbMutation::Add(PbAddMutation {
752 actor_dispatchers: adds
753 .iter()
754 .map(|(&actor_id, dispatchers)| {
755 (
756 actor_id,
757 Dispatchers {
758 dispatchers: dispatchers.clone(),
759 },
760 )
761 })
762 .collect(),
763 added_actors: added_actors.iter().copied().collect(),
764 actor_splits: actor_splits_to_protobuf(splits),
765 pause: *pause,
766 subscriptions_to_add: subscriptions_to_add
767 .iter()
768 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
769 subscriber_id: *subscriber_id,
770 upstream_mv_table_id: table_id.table_id,
771 })
772 .collect(),
773 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
774 actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
775 actor_cdc_table_snapshot_splits.clone(),
776 ),
777 }),
778 Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
779 actor_splits: changes
780 .iter()
781 .map(|(&actor_id, splits)| {
782 (
783 actor_id,
784 ConnectorSplits {
785 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
786 },
787 )
788 })
789 .collect(),
790 }),
791 Mutation::Pause => PbMutation::Pause(PauseMutation {}),
792 Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
793 Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
794 actor_throttle: changes
795 .iter()
796 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
797 .collect(),
798 }),
799
800 Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
801 mutations: vec![
802 BarrierMutation {
803 mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
804 },
805 BarrierMutation {
806 mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
807 },
808 ],
809 }),
810 Mutation::DropSubscriptions {
811 subscriptions_to_drop,
812 } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
813 info: subscriptions_to_drop
814 .iter()
815 .map(
816 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
817 subscriber_id: *subscriber_id,
818 upstream_mv_table_id: upstream_mv_table_id.table_id,
819 },
820 )
821 .collect(),
822 }),
823 Mutation::ConnectorPropsChange(map) => {
824 PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
825 connector_props_infos: map
826 .iter()
827 .map(|(actor_id, options)| {
828 (
829 *actor_id,
830 ConnectorPropsInfo {
831 connector_props_info: options
832 .iter()
833 .map(|(k, v)| (k.clone(), v.clone()))
834 .collect(),
835 },
836 )
837 })
838 .collect(),
839 })
840 }
841 Mutation::StartFragmentBackfill { fragment_ids } => {
842 PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
843 fragment_ids: fragment_ids.iter().copied().collect(),
844 })
845 }
846 Mutation::RefreshStart {
847 table_id,
848 associated_source_id,
849 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
850 table_id: table_id.table_id,
851 associated_source_id: associated_source_id.table_id,
852 }),
853 Mutation::LoadFinish {
854 associated_source_id,
855 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
856 associated_source_id: associated_source_id.table_id,
857 }),
858 }
859 }
860
861 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
862 let mutation = match prost {
863 PbMutation::Stop(stop) => {
864 Mutation::Stop(HashSet::from_iter(stop.actors.iter().cloned()))
865 }
866
867 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
868 dispatchers: update
869 .dispatcher_update
870 .iter()
871 .map(|u| (u.actor_id, u.clone()))
872 .into_group_map(),
873 merges: update
874 .merge_update
875 .iter()
876 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
877 .collect(),
878 vnode_bitmaps: update
879 .actor_vnode_bitmap_update
880 .iter()
881 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
882 .collect(),
883 dropped_actors: update.dropped_actors.iter().cloned().collect(),
884 actor_splits: update
885 .actor_splits
886 .iter()
887 .map(|(&actor_id, splits)| {
888 (
889 actor_id,
890 splits
891 .splits
892 .iter()
893 .map(|split| split.try_into().unwrap())
894 .collect(),
895 )
896 })
897 .collect(),
898 actor_new_dispatchers: update
899 .actor_new_dispatchers
900 .iter()
901 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
902 .collect(),
903 actor_cdc_table_snapshot_splits: build_actor_cdc_table_snapshot_splits(
904 update.actor_cdc_table_snapshot_splits.clone(),
905 ),
906 }),
907
908 PbMutation::Add(add) => Mutation::Add(AddMutation {
909 adds: add
910 .actor_dispatchers
911 .iter()
912 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
913 .collect(),
914 added_actors: add.added_actors.iter().copied().collect(),
915 splits: add
918 .actor_splits
919 .iter()
920 .map(|(&actor_id, splits)| {
921 (
922 actor_id,
923 splits
924 .splits
925 .iter()
926 .map(|split| split.try_into().unwrap())
927 .collect(),
928 )
929 })
930 .collect(),
931 pause: add.pause,
932 subscriptions_to_add: add
933 .subscriptions_to_add
934 .iter()
935 .map(
936 |SubscriptionUpstreamInfo {
937 subscriber_id,
938 upstream_mv_table_id,
939 }| {
940 (TableId::new(*upstream_mv_table_id), *subscriber_id)
941 },
942 )
943 .collect(),
944 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
945 actor_cdc_table_snapshot_splits: build_actor_cdc_table_snapshot_splits(
946 add.actor_cdc_table_snapshot_splits.clone(),
947 ),
948 }),
949
950 PbMutation::Splits(s) => {
951 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
952 Vec::with_capacity(s.actor_splits.len());
953 for (&actor_id, splits) in &s.actor_splits {
954 if !splits.splits.is_empty() {
955 change_splits.push((
956 actor_id,
957 splits
958 .splits
959 .iter()
960 .map(SplitImpl::try_from)
961 .try_collect()?,
962 ));
963 }
964 }
965 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
966 }
967 PbMutation::Pause(_) => Mutation::Pause,
968 PbMutation::Resume(_) => Mutation::Resume,
969 PbMutation::Throttle(changes) => Mutation::Throttle(
970 changes
971 .actor_throttle
972 .iter()
973 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
974 .collect(),
975 ),
976 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
977 subscriptions_to_drop: drop
978 .info
979 .iter()
980 .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
981 .collect(),
982 },
983 PbMutation::ConnectorPropsChange(alter_connector_props) => {
984 Mutation::ConnectorPropsChange(
985 alter_connector_props
986 .connector_props_infos
987 .iter()
988 .map(|(actor_id, options)| {
989 (
990 *actor_id,
991 options
992 .connector_props_info
993 .iter()
994 .map(|(k, v)| (k.clone(), v.clone()))
995 .collect(),
996 )
997 })
998 .collect(),
999 )
1000 }
1001 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1002 Mutation::StartFragmentBackfill {
1003 fragment_ids: start_fragment_backfill
1004 .fragment_ids
1005 .iter()
1006 .copied()
1007 .collect(),
1008 }
1009 }
1010 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1011 table_id: TableId::new(refresh_start.table_id),
1012 associated_source_id: TableId::new(refresh_start.associated_source_id),
1013 },
1014 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1015 associated_source_id: TableId::new(load_finish.associated_source_id),
1016 },
1017 PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
1018 [
1019 BarrierMutation {
1020 mutation: Some(add),
1021 },
1022 BarrierMutation {
1023 mutation: Some(update),
1024 },
1025 ] => {
1026 let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
1027 unreachable!();
1028 };
1029
1030 let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
1031 unreachable!();
1032 };
1033
1034 Mutation::AddAndUpdate(add_mutation, update_mutation)
1035 }
1036
1037 _ => unreachable!(),
1038 },
1039 };
1040 Ok(mutation)
1041 }
1042}
1043
1044impl<M> BarrierInner<M> {
1045 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1046 let Self {
1047 epoch,
1048 mutation,
1049 kind,
1050 passed_actors,
1051 tracing_context,
1052 ..
1053 } = self;
1054
1055 PbBarrier {
1056 epoch: Some(PbEpoch {
1057 curr: epoch.curr,
1058 prev: epoch.prev,
1059 }),
1060 mutation: Some(PbBarrierMutation {
1061 mutation: barrier_fn(mutation),
1062 }),
1063 tracing_context: tracing_context.to_protobuf(),
1064 kind: *kind as _,
1065 passed_actors: passed_actors.clone(),
1066 }
1067 }
1068
1069 fn from_protobuf_inner(
1070 prost: &PbBarrier,
1071 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1072 ) -> StreamExecutorResult<Self> {
1073 let epoch = prost.get_epoch()?;
1074
1075 Ok(Self {
1076 kind: prost.kind(),
1077 epoch: EpochPair::new(epoch.curr, epoch.prev),
1078 mutation: mutation_from_pb(
1079 prost
1080 .mutation
1081 .as_ref()
1082 .and_then(|mutation| mutation.mutation.as_ref()),
1083 )?,
1084 passed_actors: prost.get_passed_actors().clone(),
1085 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1086 })
1087 }
1088
1089 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1090 BarrierInner {
1091 epoch: self.epoch,
1092 mutation: f(self.mutation),
1093 kind: self.kind,
1094 tracing_context: self.tracing_context,
1095 passed_actors: self.passed_actors,
1096 }
1097 }
1098}
1099
1100impl DispatcherBarrier {
1101 pub fn to_protobuf(&self) -> PbBarrier {
1102 self.to_protobuf_inner(|_| None)
1103 }
1104}
1105
1106impl Barrier {
1107 pub fn to_protobuf(&self) -> PbBarrier {
1108 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1109 }
1110
1111 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1112 Self::from_protobuf_inner(prost, |mutation| {
1113 mutation
1114 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1115 .transpose()
1116 })
1117 }
1118}
1119
1120#[derive(Debug, PartialEq, Eq, Clone)]
1121pub struct Watermark {
1122 pub col_idx: usize,
1123 pub data_type: DataType,
1124 pub val: ScalarImpl,
1125}
1126
1127impl PartialOrd for Watermark {
1128 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1129 Some(self.cmp(other))
1130 }
1131}
1132
1133impl Ord for Watermark {
1134 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1135 self.val.default_cmp(&other.val)
1136 }
1137}
1138
1139impl Watermark {
1140 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1141 Self {
1142 col_idx,
1143 data_type,
1144 val,
1145 }
1146 }
1147
1148 pub async fn transform_with_expr(
1149 self,
1150 expr: &NonStrictExpression<impl Expression>,
1151 new_col_idx: usize,
1152 ) -> Option<Self> {
1153 let Self { col_idx, val, .. } = self;
1154 let row = {
1155 let mut row = vec![None; col_idx + 1];
1156 row[col_idx] = Some(val);
1157 OwnedRow::new(row)
1158 };
1159 let val = expr.eval_row_infallible(&row).await?;
1160 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1161 }
1162
1163 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1166 output_indices
1167 .iter()
1168 .position(|p| *p == self.col_idx)
1169 .map(|new_col_idx| self.with_idx(new_col_idx))
1170 }
1171
1172 pub fn to_protobuf(&self) -> PbWatermark {
1173 PbWatermark {
1174 column: Some(PbInputRef {
1175 index: self.col_idx as _,
1176 r#type: Some(self.data_type.to_protobuf()),
1177 }),
1178 val: Some(&self.val).to_protobuf().into(),
1179 }
1180 }
1181
1182 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1183 let col_ref = prost.get_column()?;
1184 let data_type = DataType::from(col_ref.get_type()?);
1185 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1186 .expect("watermark value cannot be null");
1187 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1188 }
1189
1190 pub fn with_idx(self, idx: usize) -> Self {
1191 Self::new(idx, self.data_type, self.val)
1192 }
1193}
1194
1195#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1196pub enum MessageInner<M> {
1197 Chunk(StreamChunk),
1198 Barrier(BarrierInner<M>),
1199 Watermark(Watermark),
1200}
1201
1202impl<M> MessageInner<M> {
1203 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1204 match self {
1205 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1206 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1207 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1208 }
1209 }
1210}
1211
1212pub type Message = MessageInner<BarrierMutationType>;
1213pub type DispatcherMessage = MessageInner<()>;
1214
1215#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1218pub enum MessageBatchInner<M> {
1219 Chunk(StreamChunk),
1220 BarrierBatch(Vec<BarrierInner<M>>),
1221 Watermark(Watermark),
1222}
1223pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1224pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1225pub type DispatcherMessageBatch = MessageBatchInner<()>;
1226
1227impl From<DispatcherMessage> for DispatcherMessageBatch {
1228 fn from(m: DispatcherMessage) -> Self {
1229 match m {
1230 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1231 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1232 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1233 }
1234 }
1235}
1236
1237impl From<StreamChunk> for Message {
1238 fn from(chunk: StreamChunk) -> Self {
1239 Message::Chunk(chunk)
1240 }
1241}
1242
1243impl<'a> TryFrom<&'a Message> for &'a Barrier {
1244 type Error = ();
1245
1246 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1247 match m {
1248 Message::Chunk(_) => Err(()),
1249 Message::Barrier(b) => Ok(b),
1250 Message::Watermark(_) => Err(()),
1251 }
1252 }
1253}
1254
1255impl Message {
1256 #[cfg(test)]
1261 pub fn is_stop(&self) -> bool {
1262 matches!(
1263 self,
1264 Message::Barrier(Barrier {
1265 mutation,
1266 ..
1267 }) if mutation.as_ref().unwrap().is_stop()
1268 )
1269 }
1270}
1271
1272impl DispatcherMessageBatch {
1273 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1274 let prost = match self {
1275 Self::Chunk(stream_chunk) => {
1276 let prost_stream_chunk = stream_chunk.to_protobuf();
1277 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1278 }
1279 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1280 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1281 }),
1282 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1283 };
1284 PbStreamMessageBatch {
1285 stream_message_batch: Some(prost),
1286 }
1287 }
1288
1289 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1290 let res = match prost.get_stream_message_batch()? {
1291 StreamMessageBatch::StreamChunk(chunk) => {
1292 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1293 }
1294 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1295 let barriers = barrier_batch
1296 .barriers
1297 .iter()
1298 .map(|barrier| {
1299 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1300 if mutation.is_some() {
1301 if cfg!(debug_assertions) {
1302 panic!("should not receive message of barrier with mutation");
1303 } else {
1304 warn!(?barrier, "receive message of barrier with mutation");
1305 }
1306 }
1307 Ok(())
1308 })
1309 })
1310 .try_collect()?;
1311 Self::BarrierBatch(barriers)
1312 }
1313 StreamMessageBatch::Watermark(watermark) => {
1314 Self::Watermark(Watermark::from_protobuf(watermark)?)
1315 }
1316 };
1317 Ok(res)
1318 }
1319
1320 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1321 ::prost::Message::encoded_len(msg)
1322 }
1323}
1324
1325pub type PkIndices = Vec<usize>;
1326pub type PkIndicesRef<'a> = &'a [usize];
1327pub type PkDataTypes = SmallVec<[DataType; 1]>;
1328
1329pub async fn expect_first_barrier<M: Debug>(
1331 stream: &mut (impl MessageStreamInner<M> + Unpin),
1332) -> StreamExecutorResult<BarrierInner<M>> {
1333 let message = stream
1334 .next()
1335 .instrument_await("expect_first_barrier")
1336 .await
1337 .context("failed to extract the first message: stream closed unexpectedly")??;
1338 let barrier = message
1339 .into_barrier()
1340 .expect("the first message must be a barrier");
1341 assert!(matches!(
1343 barrier.kind,
1344 BarrierKind::Checkpoint | BarrierKind::Initial
1345 ));
1346 Ok(barrier)
1347}
1348
1349pub async fn expect_first_barrier_from_aligned_stream(
1351 stream: &mut (impl AlignedMessageStream + Unpin),
1352) -> StreamExecutorResult<Barrier> {
1353 let message = stream
1354 .next()
1355 .instrument_await("expect_first_barrier")
1356 .await
1357 .context("failed to extract the first message: stream closed unexpectedly")??;
1358 let barrier = message
1359 .into_barrier()
1360 .expect("the first message must be a barrier");
1361 Ok(barrier)
1362}
1363
1364pub trait StreamConsumer: Send + 'static {
1366 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1367
1368 fn execute(self: Box<Self>) -> Self::BarrierStream;
1369}
1370
1371type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1372
1373pub struct DynamicReceivers<InputId, M> {
1377 barrier: Option<BarrierInner<M>>,
1379 blocked: Vec<BoxedMessageInput<InputId, M>>,
1381 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1383 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1385 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1387 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1389}
1390
1391impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1392 for DynamicReceivers<InputId, M>
1393{
1394 type Item = MessageStreamItemInner<M>;
1395
1396 fn poll_next(
1397 mut self: Pin<&mut Self>,
1398 cx: &mut std::task::Context<'_>,
1399 ) -> Poll<Option<Self::Item>> {
1400 if self.active.is_terminated() {
1401 assert!(self.blocked.is_empty());
1403 return Poll::Ready(None);
1404 }
1405
1406 let mut start = None;
1407 loop {
1408 match futures::ready!(self.active.poll_next_unpin(cx)) {
1409 Some((Some(Err(e)), _)) => {
1411 return Poll::Ready(Some(Err(e)));
1412 }
1413 Some((Some(Ok(message)), remaining)) => {
1415 let input_id = remaining.id();
1416 match message {
1417 MessageInner::Chunk(chunk) => {
1418 self.active.push(remaining.into_future());
1420 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1421 }
1422 MessageInner::Watermark(watermark) => {
1423 self.active.push(remaining.into_future());
1425 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1426 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1427 }
1428 }
1429 MessageInner::Barrier(barrier) => {
1430 if self.blocked.is_empty() {
1432 start = Some(Instant::now());
1433 }
1434 self.blocked.push(remaining);
1435 if let Some(current_barrier) = self.barrier.as_ref() {
1436 if current_barrier.epoch != barrier.epoch {
1437 return Poll::Ready(Some(Err(
1438 StreamExecutorError::align_barrier(
1439 current_barrier.clone().map_mutation(|_| None),
1440 barrier.map_mutation(|_| None),
1441 ),
1442 )));
1443 }
1444 } else {
1445 self.barrier = Some(barrier);
1446 }
1447 }
1448 }
1449 }
1450 Some((None, remaining)) => {
1458 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1459 "upstream input {:?} unexpectedly closed",
1460 remaining.id()
1461 )))));
1462 }
1463 None => {
1465 if let Some(start) = start {
1466 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1467 barrier_align_duration.inc_by(start.elapsed().as_nanos() as u64);
1468 }
1469 if let Some(merge_barrier_align_duration) =
1470 &self.merge_barrier_align_duration
1471 {
1472 merge_barrier_align_duration.observe(start.elapsed().as_secs_f64())
1474 }
1475 }
1476
1477 break;
1478 }
1479 }
1480 }
1481
1482 assert!(self.active.is_terminated());
1483 let barrier = self.barrier.take().unwrap();
1484
1485 let upstreams = std::mem::take(&mut self.blocked);
1486 self.extend_active(upstreams);
1487 assert!(!self.active.is_terminated());
1488
1489 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1490 }
1491}
1492
1493impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1494 pub fn new(
1495 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1496 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1497 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1498 ) -> Self {
1499 assert!(!upstreams.is_empty());
1500 let mut this = Self {
1501 barrier: None,
1502 blocked: Vec::with_capacity(upstreams.len()),
1503 active: Default::default(),
1504 buffered_watermarks: Default::default(),
1505 merge_barrier_align_duration,
1506 barrier_align_duration,
1507 };
1508 this.extend_active(upstreams);
1509 this
1510 }
1511
1512 pub fn extend_active(
1515 &mut self,
1516 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1517 ) {
1518 assert!(self.blocked.is_empty() && self.barrier.is_none());
1519
1520 self.active
1521 .extend(upstreams.into_iter().map(|s| s.into_future()));
1522 }
1523
1524 pub fn handle_watermark(
1526 &mut self,
1527 input_id: InputId,
1528 watermark: Watermark,
1529 ) -> Option<Watermark> {
1530 let col_idx = watermark.col_idx;
1531 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1533 let watermarks = self
1534 .buffered_watermarks
1535 .entry(col_idx)
1536 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1537 watermarks.handle_watermark(input_id, watermark)
1538 }
1539
1540 pub fn add_upstreams_from(&mut self, other: Self) {
1543 assert!(self.blocked.is_empty() && self.barrier.is_none());
1544 assert!(other.blocked.is_empty() && other.barrier.is_none());
1545
1546 self.buffered_watermarks.values_mut().for_each(|buffers| {
1548 buffers.add_buffers(other.upstream_input_ids());
1549 });
1550
1551 self.active.extend(other.active);
1552 }
1553
1554 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1557 assert!(self.blocked.is_empty() && self.barrier.is_none());
1558
1559 let new_upstreams = std::mem::take(&mut self.active)
1560 .into_iter()
1561 .map(|s| s.into_inner().unwrap())
1562 .filter(|u| !upstream_input_ids.contains(&u.id()));
1563 self.extend_active(new_upstreams);
1564 self.buffered_watermarks.values_mut().for_each(|buffers| {
1565 buffers.remove_buffer(upstream_input_ids.clone());
1568 });
1569 }
1570
1571 pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1572 self.merge_barrier_align_duration.clone()
1573 }
1574
1575 pub fn flush_buffered_watermarks(&mut self) {
1576 self.buffered_watermarks
1577 .values_mut()
1578 .for_each(|buffers| buffers.clear());
1579 }
1580
1581 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1582 self.blocked
1583 .iter()
1584 .map(|s| s.id())
1585 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1586 }
1587}