1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53 PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
54 PbWatermark, SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62 BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
63};
64use crate::executor::monitor::ActorInputMetrics;
65use crate::executor::prelude::StreamingMetrics;
66use crate::executor::watermark::BufferedWatermarks;
67use crate::task::{ActorId, FragmentId, LocalBarrierManager};
68
69mod actor;
70mod barrier_align;
71pub mod exchange;
72pub mod monitor;
73
74pub mod aggregate;
75pub mod asof_join;
76mod backfill;
77mod barrier_recv;
78mod batch_query;
79mod chain;
80mod changelog;
81mod dedup;
82mod dispatch;
83pub mod dml;
84mod dynamic_filter;
85pub mod eowc;
86pub mod error;
87mod expand;
88mod filter;
89mod gap_fill;
90pub mod hash_join;
91mod hop_window;
92mod join;
93pub mod locality_provider;
94mod lookup;
95mod lookup_union;
96mod merge;
97mod mview;
98mod nested_loop_temporal_join;
99mod no_op;
100mod now;
101mod over_window;
102pub mod project;
103mod rearranged_chain;
104mod receiver;
105pub mod row_id_gen;
106mod sink;
107pub mod source;
108mod stream_reader;
109pub mod subtask;
110mod temporal_join;
111mod top_n;
112mod troublemaker;
113mod union;
114mod upstream_sink_union;
115mod values;
116mod watermark;
117mod watermark_filter;
118mod wrapper;
119
120mod approx_percentile;
121
122mod row_merge;
123
124#[cfg(test)]
125mod integration_tests;
126mod sync_kv_log_store;
127#[cfg(any(test, feature = "test"))]
128pub mod test_utils;
129mod utils;
130mod vector;
131
132pub use actor::{Actor, ActorContext, ActorContextRef};
133use anyhow::Context;
134pub use approx_percentile::global::GlobalApproxPercentileExecutor;
135pub use approx_percentile::local::LocalApproxPercentileExecutor;
136pub use backfill::arrangement_backfill::*;
137pub use backfill::cdc::{
138 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
139};
140pub use backfill::no_shuffle_backfill::*;
141pub use backfill::snapshot_backfill::*;
142pub use barrier_recv::BarrierRecvExecutor;
143pub use batch_query::BatchQueryExecutor;
144pub use chain::ChainExecutor;
145pub use changelog::ChangeLogExecutor;
146pub use dedup::AppendOnlyDedupExecutor;
147pub use dispatch::{DispatchExecutor, DispatcherImpl};
148pub use dynamic_filter::DynamicFilterExecutor;
149pub use error::{StreamExecutorError, StreamExecutorResult};
150pub use expand::ExpandExecutor;
151pub use filter::{FilterExecutor, UpsertFilterExecutor};
152pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
153pub use hash_join::*;
154pub use hop_window::HopWindowExecutor;
155pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
156pub use join::{AsOfDesc, AsOfJoinType, JoinType};
157pub use lookup::*;
158pub use lookup_union::LookupUnionExecutor;
159pub use merge::MergeExecutor;
160pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
161pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
162pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
163pub use no_op::NoOpExecutor;
164pub use now::*;
165pub use over_window::*;
166pub use rearranged_chain::RearrangedChainExecutor;
167pub use receiver::ReceiverExecutor;
168use risingwave_common::id::SourceId;
169pub use row_merge::RowMergeExecutor;
170pub use sink::SinkExecutor;
171pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
172pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
173pub use temporal_join::TemporalJoinExecutor;
174pub use top_n::{
175 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
176};
177pub use troublemaker::TroublemakerExecutor;
178pub use union::UnionExecutor;
179pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
180pub use utils::DummyExecutor;
181pub use values::ValuesExecutor;
182pub use vector::*;
183pub use watermark_filter::WatermarkFilterExecutor;
184pub use wrapper::WrapperExecutor;
185
186use self::barrier_align::AlignedMessageStream;
187
188pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
189pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
190pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
191pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
192
193pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
194use risingwave_connector::sink::catalog::SinkId;
195use risingwave_connector::source::cdc::{
196 CdcTableSnapshotSplitAssignmentWithGeneration,
197 build_actor_cdc_table_snapshot_splits_with_generation,
198};
199use risingwave_pb::id::{ExecutorId, SubscriberId};
200use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
201
202pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
203pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
204pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
205
206#[derive(Debug, Default, Clone)]
208pub struct ExecutorInfo {
209 pub schema: Schema,
211
212 pub stream_key: StreamKey,
214
215 pub stream_kind: PbStreamKind,
217
218 pub identity: String,
220
221 pub id: ExecutorId,
223}
224
225impl ExecutorInfo {
226 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
227 Self {
228 schema,
229 stream_key,
230 stream_kind: PbStreamKind::Retract, identity,
232 id: id.into(),
233 }
234 }
235}
236
237pub trait Execute: Send + 'static {
239 fn execute(self: Box<Self>) -> BoxedMessageStream;
240
241 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
242 self.execute()
243 }
244
245 fn boxed(self) -> Box<dyn Execute>
246 where
247 Self: Sized + Send + 'static,
248 {
249 Box::new(self)
250 }
251}
252
253pub struct Executor {
256 info: ExecutorInfo,
257 execute: Box<dyn Execute>,
258}
259
260impl Executor {
261 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
262 Self { info, execute }
263 }
264
265 pub fn info(&self) -> &ExecutorInfo {
266 &self.info
267 }
268
269 pub fn schema(&self) -> &Schema {
270 &self.info.schema
271 }
272
273 pub fn stream_key(&self) -> StreamKeyRef<'_> {
274 &self.info.stream_key
275 }
276
277 pub fn stream_kind(&self) -> PbStreamKind {
278 self.info.stream_kind
279 }
280
281 pub fn identity(&self) -> &str {
282 &self.info.identity
283 }
284
285 pub fn execute(self) -> BoxedMessageStream {
286 self.execute.execute()
287 }
288
289 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
290 self.execute.execute_with_epoch(epoch)
291 }
292}
293
294impl std::fmt::Debug for Executor {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.write_str(self.identity())
297 }
298}
299
300impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
301 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
302 Self::new(info, execute)
303 }
304}
305
306impl<E> From<(ExecutorInfo, E)> for Executor
307where
308 E: Execute,
309{
310 fn from((info, execute): (ExecutorInfo, E)) -> Self {
311 Self::new(info, execute.boxed())
312 }
313}
314
315pub const INVALID_EPOCH: u64 = 0;
316
317type UpstreamFragmentId = FragmentId;
318type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
319
320#[derive(Debug, Clone)]
321#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
322pub struct UpdateMutation {
323 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
324 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
325 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
326 pub dropped_actors: HashSet<ActorId>,
327 pub actor_splits: SplitAssignments,
328 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
329 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
330 pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
331 pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
332}
333
334#[derive(Debug, Clone)]
335#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
336pub struct AddMutation {
337 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
338 pub added_actors: HashSet<ActorId>,
339 pub splits: SplitAssignments,
341 pub pause: bool,
342 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
344 pub backfill_nodes_to_pause: HashSet<FragmentId>,
346 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
347 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
348}
349
350#[derive(Debug, Clone)]
351#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
352pub struct StopMutation {
353 pub dropped_actors: HashSet<ActorId>,
354 pub dropped_sink_fragments: HashSet<FragmentId>,
355}
356
357#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
359#[derive(Debug, Clone)]
360pub enum Mutation {
361 Stop(StopMutation),
362 Update(UpdateMutation),
363 Add(AddMutation),
364 SourceChangeSplit(SplitAssignments),
365 Pause,
366 Resume,
367 Throttle(HashMap<ActorId, Option<u32>>),
368 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
369 DropSubscriptions {
370 subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
372 },
373 StartFragmentBackfill {
374 fragment_ids: HashSet<FragmentId>,
375 },
376 RefreshStart {
377 table_id: TableId,
378 associated_source_id: SourceId,
379 },
380 ListFinish {
381 associated_source_id: SourceId,
382 },
383 LoadFinish {
384 associated_source_id: SourceId,
385 },
386}
387
388#[derive(Debug, Clone)]
393pub struct BarrierInner<M> {
394 pub epoch: EpochPair,
395 pub mutation: M,
396 pub kind: BarrierKind,
397
398 pub tracing_context: TracingContext,
400}
401
402pub type BarrierMutationType = Option<Arc<Mutation>>;
403pub type Barrier = BarrierInner<BarrierMutationType>;
404pub type DispatcherBarrier = BarrierInner<()>;
405
406impl<M: Default> BarrierInner<M> {
407 pub fn new_test_barrier(epoch: u64) -> Self {
409 Self {
410 epoch: EpochPair::new_test_epoch(epoch),
411 kind: BarrierKind::Checkpoint,
412 tracing_context: TracingContext::none(),
413 mutation: Default::default(),
414 }
415 }
416
417 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
418 Self {
419 epoch: EpochPair::new(epoch, prev_epoch),
420 kind: BarrierKind::Checkpoint,
421 tracing_context: TracingContext::none(),
422 mutation: Default::default(),
423 }
424 }
425}
426
427impl Barrier {
428 pub fn into_dispatcher(self) -> DispatcherBarrier {
429 DispatcherBarrier {
430 epoch: self.epoch,
431 mutation: (),
432 kind: self.kind,
433 tracing_context: self.tracing_context,
434 }
435 }
436
437 #[must_use]
438 pub fn with_mutation(self, mutation: Mutation) -> Self {
439 Self {
440 mutation: Some(Arc::new(mutation)),
441 ..self
442 }
443 }
444
445 #[must_use]
446 pub fn with_stop(self) -> Self {
447 self.with_mutation(Mutation::Stop(StopMutation {
448 dropped_actors: Default::default(),
449 dropped_sink_fragments: Default::default(),
450 }))
451 }
452
453 pub fn is_with_stop_mutation(&self) -> bool {
455 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
456 }
457
458 pub fn is_stop(&self, actor_id: ActorId) -> bool {
460 self.all_stop_actors()
461 .is_some_and(|actors| actors.contains(&actor_id))
462 }
463
464 pub fn is_checkpoint(&self) -> bool {
465 self.kind == BarrierKind::Checkpoint
466 }
467
468 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
478 match self.mutation.as_deref()? {
479 Mutation::Update(UpdateMutation { actor_splits, .. })
480 | Mutation::Add(AddMutation {
481 splits: actor_splits,
482 ..
483 }) => actor_splits.get(&actor_id),
484
485 _ => {
486 if cfg!(debug_assertions) {
487 panic!(
488 "the initial mutation of the barrier should not be {:?}",
489 self.mutation
490 );
491 }
492 None
493 }
494 }
495 .map(|s| s.as_slice())
496 }
497
498 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
500 match self.mutation.as_deref() {
501 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
502 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
503 _ => None,
504 }
505 }
506
507 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
513 match self.mutation.as_deref() {
514 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
515 added_actors.contains(&actor_id)
516 }
517 _ => false,
518 }
519 }
520
521 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
522 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
523 fragment_ids.contains(&fragment_id)
524 } else {
525 false
526 }
527 }
528
529 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
547 let Some(mutation) = self.mutation.as_deref() else {
548 return false;
549 };
550 match mutation {
551 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
553 Mutation::Update(_)
554 | Mutation::Stop(_)
555 | Mutation::Pause
556 | Mutation::Resume
557 | Mutation::SourceChangeSplit(_)
558 | Mutation::Throttle(_)
559 | Mutation::DropSubscriptions { .. }
560 | Mutation::ConnectorPropsChange(_)
561 | Mutation::StartFragmentBackfill { .. }
562 | Mutation::RefreshStart { .. }
563 | Mutation::ListFinish { .. }
564 | Mutation::LoadFinish { .. } => false,
565 }
566 }
567
568 pub fn is_pause_on_startup(&self) -> bool {
570 match self.mutation.as_deref() {
571 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
572 _ => false,
573 }
574 }
575
576 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
577 match self.mutation.as_deref() {
578 Some(Mutation::Add(AddMutation {
579 backfill_nodes_to_pause,
580 ..
581 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
582 Some(Mutation::Update(_)) => false,
583 _ => {
584 tracing::warn!(
585 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
586 self
587 );
588 false
589 }
590 }
591 }
592
593 pub fn is_resume(&self) -> bool {
595 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
596 }
597
598 pub fn as_update_merge(
601 &self,
602 actor_id: ActorId,
603 upstream_fragment_id: UpstreamFragmentId,
604 ) -> Option<&MergeUpdate> {
605 self.mutation
606 .as_deref()
607 .and_then(|mutation| match mutation {
608 Mutation::Update(UpdateMutation { merges, .. }) => {
609 merges.get(&(actor_id, upstream_fragment_id))
610 }
611 _ => None,
612 })
613 }
614
615 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
618 self.mutation
619 .as_deref()
620 .and_then(|mutation| match mutation {
621 Mutation::Add(AddMutation {
622 new_upstream_sinks, ..
623 }) => new_upstream_sinks.get(&fragment_id),
624 _ => None,
625 })
626 }
627
628 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
630 self.mutation
631 .as_deref()
632 .and_then(|mutation| match mutation {
633 Mutation::Stop(StopMutation {
634 dropped_sink_fragments,
635 ..
636 }) => Some(dropped_sink_fragments),
637 _ => None,
638 })
639 }
640
641 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
647 self.mutation
648 .as_deref()
649 .and_then(|mutation| match mutation {
650 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
651 vnode_bitmaps.get(&actor_id).cloned()
652 }
653 _ => None,
654 })
655 }
656
657 pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
658 self.mutation
659 .as_deref()
660 .and_then(|mutation| match mutation {
661 Mutation::Update(UpdateMutation {
662 sink_schema_change, ..
663 }) => sink_schema_change.get(&sink_id).cloned(),
664 _ => None,
665 })
666 }
667
668 pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
669 match self.mutation.as_deref() {
670 Some(Mutation::DropSubscriptions {
671 subscriptions_to_drop,
672 })
673 | Some(Mutation::Update(UpdateMutation {
674 subscriptions_to_drop,
675 ..
676 })) => Some(subscriptions_to_drop.as_slice()),
677 _ => None,
678 }
679 }
680
681 pub fn get_curr_epoch(&self) -> Epoch {
682 Epoch(self.epoch.curr)
683 }
684
685 pub fn tracing_context(&self) -> &TracingContext {
687 &self.tracing_context
688 }
689
690 pub fn added_subscriber_on_mv_table(
691 &self,
692 mv_table_id: TableId,
693 ) -> impl Iterator<Item = SubscriberId> + '_ {
694 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
695 Some(add)
696 } else {
697 None
698 }
699 .into_iter()
700 .flat_map(move |add| {
701 add.subscriptions_to_add.iter().filter_map(
702 move |(upstream_mv_table_id, subscriber_id)| {
703 if *upstream_mv_table_id == mv_table_id {
704 Some(*subscriber_id)
705 } else {
706 None
707 }
708 },
709 )
710 })
711 }
712}
713
714impl<M: PartialEq> PartialEq for BarrierInner<M> {
715 fn eq(&self, other: &Self) -> bool {
716 self.epoch == other.epoch && self.mutation == other.mutation
717 }
718}
719
720impl Mutation {
721 #[cfg(test)]
725 pub fn is_stop(&self) -> bool {
726 matches!(self, Mutation::Stop(_))
727 }
728
729 #[cfg(test)]
730 fn to_protobuf(&self) -> PbMutation {
731 use risingwave_pb::source::{
732 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
733 };
734 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
735 use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
736 use risingwave_pb::stream_plan::{
737 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
738 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
739 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
740 PbThrottleMutation, PbUpdateMutation,
741 };
742 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
743 actor_splits
744 .iter()
745 .map(|(&actor_id, splits)| {
746 (
747 actor_id,
748 ConnectorSplits {
749 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
750 },
751 )
752 })
753 .collect::<HashMap<_, _>>()
754 };
755
756 match self {
757 Mutation::Stop(StopMutation {
758 dropped_actors,
759 dropped_sink_fragments,
760 }) => PbMutation::Stop(PbStopMutation {
761 actors: dropped_actors.iter().copied().collect(),
762 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
763 }),
764 Mutation::Update(UpdateMutation {
765 dispatchers,
766 merges,
767 vnode_bitmaps,
768 dropped_actors,
769 actor_splits,
770 actor_new_dispatchers,
771 actor_cdc_table_snapshot_splits,
772 sink_schema_change,
773 subscriptions_to_drop,
774 }) => PbMutation::Update(PbUpdateMutation {
775 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
776 merge_update: merges.values().cloned().collect(),
777 actor_vnode_bitmap_update: vnode_bitmaps
778 .iter()
779 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
780 .collect(),
781 dropped_actors: dropped_actors.iter().copied().collect(),
782 actor_splits: actor_splits_to_protobuf(actor_splits),
783 actor_new_dispatchers: actor_new_dispatchers
784 .iter()
785 .map(|(&actor_id, dispatchers)| {
786 (
787 actor_id,
788 PbDispatchers {
789 dispatchers: dispatchers.clone(),
790 },
791 )
792 })
793 .collect(),
794 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
795 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
796 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
797 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
798 generation: *generation,
799 })
800 }).collect()
801 }),
802 sink_schema_change: sink_schema_change
803 .iter()
804 .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
805 .collect(),
806 subscriptions_to_drop: subscriptions_to_drop.clone(),
807 }),
808 Mutation::Add(AddMutation {
809 adds,
810 added_actors,
811 splits,
812 pause,
813 subscriptions_to_add,
814 backfill_nodes_to_pause,
815 actor_cdc_table_snapshot_splits,
816 new_upstream_sinks,
817 }) => PbMutation::Add(PbAddMutation {
818 actor_dispatchers: adds
819 .iter()
820 .map(|(&actor_id, dispatchers)| {
821 (
822 actor_id,
823 PbDispatchers {
824 dispatchers: dispatchers.clone(),
825 },
826 )
827 })
828 .collect(),
829 added_actors: added_actors.iter().copied().collect(),
830 actor_splits: actor_splits_to_protobuf(splits),
831 pause: *pause,
832 subscriptions_to_add: subscriptions_to_add
833 .iter()
834 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
835 subscriber_id: *subscriber_id,
836 upstream_mv_table_id: *table_id,
837 })
838 .collect(),
839 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
840 actor_cdc_table_snapshot_splits:
841 Some(PbCdcTableSnapshotSplitsWithGeneration {
842 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
843 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
844 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
845 generation: *generation,
846 })
847 }).collect()
848 }),
849 new_upstream_sinks: new_upstream_sinks
850 .iter()
851 .map(|(k, v)| (*k, v.clone()))
852 .collect(),
853 }),
854 Mutation::SourceChangeSplit(changes) => {
855 PbMutation::Splits(PbSourceChangeSplitMutation {
856 actor_splits: changes
857 .iter()
858 .map(|(&actor_id, splits)| {
859 (
860 actor_id,
861 ConnectorSplits {
862 splits: splits
863 .clone()
864 .iter()
865 .map(ConnectorSplit::from)
866 .collect(),
867 },
868 )
869 })
870 .collect(),
871 })
872 }
873 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
874 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
875 Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
876 actor_throttle: changes
877 .iter()
878 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
879 .collect(),
880 }),
881 Mutation::DropSubscriptions {
882 subscriptions_to_drop,
883 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
884 info: subscriptions_to_drop.clone(),
885 }),
886 Mutation::ConnectorPropsChange(map) => {
887 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
888 connector_props_infos: map
889 .iter()
890 .map(|(actor_id, options)| {
891 (
892 *actor_id,
893 ConnectorPropsInfo {
894 connector_props_info: options
895 .iter()
896 .map(|(k, v)| (k.clone(), v.clone()))
897 .collect(),
898 },
899 )
900 })
901 .collect(),
902 })
903 }
904 Mutation::StartFragmentBackfill { fragment_ids } => {
905 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
906 fragment_ids: fragment_ids.iter().copied().collect(),
907 })
908 }
909 Mutation::RefreshStart {
910 table_id,
911 associated_source_id,
912 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
913 table_id: *table_id,
914 associated_source_id: *associated_source_id,
915 }),
916 Mutation::ListFinish {
917 associated_source_id,
918 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
919 associated_source_id: *associated_source_id,
920 }),
921 Mutation::LoadFinish {
922 associated_source_id,
923 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
924 associated_source_id: *associated_source_id,
925 }),
926 }
927 }
928
929 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
930 let mutation = match prost {
931 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
932 dropped_actors: stop.actors.iter().copied().collect(),
933 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
934 }),
935
936 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
937 dispatchers: update
938 .dispatcher_update
939 .iter()
940 .map(|u| (u.actor_id, u.clone()))
941 .into_group_map(),
942 merges: update
943 .merge_update
944 .iter()
945 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
946 .collect(),
947 vnode_bitmaps: update
948 .actor_vnode_bitmap_update
949 .iter()
950 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
951 .collect(),
952 dropped_actors: update.dropped_actors.iter().copied().collect(),
953 actor_splits: update
954 .actor_splits
955 .iter()
956 .map(|(&actor_id, splits)| {
957 (
958 actor_id,
959 splits
960 .splits
961 .iter()
962 .map(|split| split.try_into().unwrap())
963 .collect(),
964 )
965 })
966 .collect(),
967 actor_new_dispatchers: update
968 .actor_new_dispatchers
969 .iter()
970 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
971 .collect(),
972 actor_cdc_table_snapshot_splits:
973 build_actor_cdc_table_snapshot_splits_with_generation(
974 update
975 .actor_cdc_table_snapshot_splits
976 .clone()
977 .unwrap_or_default(),
978 ),
979 sink_schema_change: update
980 .sink_schema_change
981 .iter()
982 .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
983 .collect(),
984 subscriptions_to_drop: update.subscriptions_to_drop.clone(),
985 }),
986
987 PbMutation::Add(add) => Mutation::Add(AddMutation {
988 adds: add
989 .actor_dispatchers
990 .iter()
991 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
992 .collect(),
993 added_actors: add.added_actors.iter().copied().collect(),
994 splits: add
997 .actor_splits
998 .iter()
999 .map(|(&actor_id, splits)| {
1000 (
1001 actor_id,
1002 splits
1003 .splits
1004 .iter()
1005 .map(|split| split.try_into().unwrap())
1006 .collect(),
1007 )
1008 })
1009 .collect(),
1010 pause: add.pause,
1011 subscriptions_to_add: add
1012 .subscriptions_to_add
1013 .iter()
1014 .map(
1015 |SubscriptionUpstreamInfo {
1016 subscriber_id,
1017 upstream_mv_table_id,
1018 }| { (*upstream_mv_table_id, *subscriber_id) },
1019 )
1020 .collect(),
1021 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1022 actor_cdc_table_snapshot_splits:
1023 build_actor_cdc_table_snapshot_splits_with_generation(
1024 add.actor_cdc_table_snapshot_splits
1025 .clone()
1026 .unwrap_or_default(),
1027 ),
1028 new_upstream_sinks: add
1029 .new_upstream_sinks
1030 .iter()
1031 .map(|(k, v)| (*k, v.clone()))
1032 .collect(),
1033 }),
1034
1035 PbMutation::Splits(s) => {
1036 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1037 Vec::with_capacity(s.actor_splits.len());
1038 for (&actor_id, splits) in &s.actor_splits {
1039 if !splits.splits.is_empty() {
1040 change_splits.push((
1041 actor_id,
1042 splits
1043 .splits
1044 .iter()
1045 .map(SplitImpl::try_from)
1046 .try_collect()?,
1047 ));
1048 }
1049 }
1050 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1051 }
1052 PbMutation::Pause(_) => Mutation::Pause,
1053 PbMutation::Resume(_) => Mutation::Resume,
1054 PbMutation::Throttle(changes) => Mutation::Throttle(
1055 changes
1056 .actor_throttle
1057 .iter()
1058 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1059 .collect(),
1060 ),
1061 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1062 subscriptions_to_drop: drop.info.clone(),
1063 },
1064 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1065 Mutation::ConnectorPropsChange(
1066 alter_connector_props
1067 .connector_props_infos
1068 .iter()
1069 .map(|(connector_id, options)| {
1070 (
1071 *connector_id,
1072 options
1073 .connector_props_info
1074 .iter()
1075 .map(|(k, v)| (k.clone(), v.clone()))
1076 .collect(),
1077 )
1078 })
1079 .collect(),
1080 )
1081 }
1082 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1083 Mutation::StartFragmentBackfill {
1084 fragment_ids: start_fragment_backfill
1085 .fragment_ids
1086 .iter()
1087 .copied()
1088 .collect(),
1089 }
1090 }
1091 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1092 table_id: refresh_start.table_id,
1093 associated_source_id: refresh_start.associated_source_id,
1094 },
1095 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1096 associated_source_id: list_finish.associated_source_id,
1097 },
1098 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1099 associated_source_id: load_finish.associated_source_id,
1100 },
1101 };
1102 Ok(mutation)
1103 }
1104}
1105
1106impl<M> BarrierInner<M> {
1107 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1108 let Self {
1109 epoch,
1110 mutation,
1111 kind,
1112 tracing_context,
1113 ..
1114 } = self;
1115
1116 PbBarrier {
1117 epoch: Some(PbEpoch {
1118 curr: epoch.curr,
1119 prev: epoch.prev,
1120 }),
1121 mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1122 mutation: Some(mutation),
1123 }),
1124 tracing_context: tracing_context.to_protobuf(),
1125 kind: *kind as _,
1126 }
1127 }
1128
1129 fn from_protobuf_inner(
1130 prost: &PbBarrier,
1131 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1132 ) -> StreamExecutorResult<Self> {
1133 let epoch = prost.get_epoch()?;
1134
1135 Ok(Self {
1136 kind: prost.kind(),
1137 epoch: EpochPair::new(epoch.curr, epoch.prev),
1138 mutation: mutation_from_pb(
1139 (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1140 )?,
1141 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1142 })
1143 }
1144
1145 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1146 BarrierInner {
1147 epoch: self.epoch,
1148 mutation: f(self.mutation),
1149 kind: self.kind,
1150 tracing_context: self.tracing_context,
1151 }
1152 }
1153}
1154
1155impl DispatcherBarrier {
1156 pub fn to_protobuf(&self) -> PbBarrier {
1157 self.to_protobuf_inner(|_| None)
1158 }
1159}
1160
1161impl Barrier {
1162 #[cfg(test)]
1163 pub fn to_protobuf(&self) -> PbBarrier {
1164 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1165 }
1166
1167 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1168 Self::from_protobuf_inner(prost, |mutation| {
1169 mutation
1170 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1171 .transpose()
1172 })
1173 }
1174}
1175
1176#[derive(Debug, PartialEq, Eq, Clone)]
1177pub struct Watermark {
1178 pub col_idx: usize,
1179 pub data_type: DataType,
1180 pub val: ScalarImpl,
1181}
1182
1183impl PartialOrd for Watermark {
1184 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1185 Some(self.cmp(other))
1186 }
1187}
1188
1189impl Ord for Watermark {
1190 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1191 self.val.default_cmp(&other.val)
1192 }
1193}
1194
1195impl Watermark {
1196 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1197 Self {
1198 col_idx,
1199 data_type,
1200 val,
1201 }
1202 }
1203
1204 pub async fn transform_with_expr(
1205 self,
1206 expr: &NonStrictExpression<impl Expression>,
1207 new_col_idx: usize,
1208 ) -> Option<Self> {
1209 let Self { col_idx, val, .. } = self;
1210 let row = {
1211 let mut row = vec![None; col_idx + 1];
1212 row[col_idx] = Some(val);
1213 OwnedRow::new(row)
1214 };
1215 let val = expr.eval_row_infallible(&row).await?;
1216 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1217 }
1218
1219 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1222 output_indices
1223 .iter()
1224 .position(|p| *p == self.col_idx)
1225 .map(|new_col_idx| self.with_idx(new_col_idx))
1226 }
1227
1228 pub fn to_protobuf(&self) -> PbWatermark {
1229 PbWatermark {
1230 column: Some(PbInputRef {
1231 index: self.col_idx as _,
1232 r#type: Some(self.data_type.to_protobuf()),
1233 }),
1234 val: Some(&self.val).to_protobuf().into(),
1235 }
1236 }
1237
1238 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1239 let col_ref = prost.get_column()?;
1240 let data_type = DataType::from(col_ref.get_type()?);
1241 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1242 .expect("watermark value cannot be null");
1243 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1244 }
1245
1246 pub fn with_idx(self, idx: usize) -> Self {
1247 Self::new(idx, self.data_type, self.val)
1248 }
1249}
1250
1251#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1252#[derive(Debug, EnumAsInner, Clone)]
1253pub enum MessageInner<M> {
1254 Chunk(StreamChunk),
1255 Barrier(BarrierInner<M>),
1256 Watermark(Watermark),
1257}
1258
1259impl<M> MessageInner<M> {
1260 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1261 match self {
1262 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1263 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1264 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1265 }
1266 }
1267}
1268
1269pub type Message = MessageInner<BarrierMutationType>;
1270pub type DispatcherMessage = MessageInner<()>;
1271
1272#[derive(Debug, EnumAsInner, Clone)]
1275pub enum MessageBatchInner<M> {
1276 Chunk(StreamChunk),
1277 BarrierBatch(Vec<BarrierInner<M>>),
1278 Watermark(Watermark),
1279}
1280pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1281pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1282pub type DispatcherMessageBatch = MessageBatchInner<()>;
1283
1284impl From<DispatcherMessage> for DispatcherMessageBatch {
1285 fn from(m: DispatcherMessage) -> Self {
1286 match m {
1287 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1288 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1289 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1290 }
1291 }
1292}
1293
1294impl From<StreamChunk> for Message {
1295 fn from(chunk: StreamChunk) -> Self {
1296 Message::Chunk(chunk)
1297 }
1298}
1299
1300impl<'a> TryFrom<&'a Message> for &'a Barrier {
1301 type Error = ();
1302
1303 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1304 match m {
1305 Message::Chunk(_) => Err(()),
1306 Message::Barrier(b) => Ok(b),
1307 Message::Watermark(_) => Err(()),
1308 }
1309 }
1310}
1311
1312impl Message {
1313 #[cfg(test)]
1318 pub fn is_stop(&self) -> bool {
1319 matches!(
1320 self,
1321 Message::Barrier(Barrier {
1322 mutation,
1323 ..
1324 }) if mutation.as_ref().unwrap().is_stop()
1325 )
1326 }
1327}
1328
1329impl DispatcherMessageBatch {
1330 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1331 let prost = match self {
1332 Self::Chunk(stream_chunk) => {
1333 let prost_stream_chunk = stream_chunk.to_protobuf();
1334 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1335 }
1336 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1337 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1338 }),
1339 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1340 };
1341 PbStreamMessageBatch {
1342 stream_message_batch: Some(prost),
1343 }
1344 }
1345
1346 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1347 let res = match prost.get_stream_message_batch()? {
1348 StreamMessageBatch::StreamChunk(chunk) => {
1349 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1350 }
1351 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1352 let barriers = barrier_batch
1353 .barriers
1354 .iter()
1355 .map(|barrier| {
1356 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1357 if mutation.is_some() {
1358 if cfg!(debug_assertions) {
1359 panic!("should not receive message of barrier with mutation");
1360 } else {
1361 warn!(?barrier, "receive message of barrier with mutation");
1362 }
1363 }
1364 Ok(())
1365 })
1366 })
1367 .try_collect()?;
1368 Self::BarrierBatch(barriers)
1369 }
1370 StreamMessageBatch::Watermark(watermark) => {
1371 Self::Watermark(Watermark::from_protobuf(watermark)?)
1372 }
1373 };
1374 Ok(res)
1375 }
1376
1377 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1378 ::prost::Message::encoded_len(msg)
1379 }
1380}
1381
1382pub type StreamKey = Vec<usize>;
1383pub type StreamKeyRef<'a> = &'a [usize];
1384pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1385
1386pub async fn expect_first_barrier<M: Debug>(
1388 stream: &mut (impl MessageStreamInner<M> + Unpin),
1389) -> StreamExecutorResult<BarrierInner<M>> {
1390 let message = stream
1391 .next()
1392 .instrument_await("expect_first_barrier")
1393 .await
1394 .context("failed to extract the first message: stream closed unexpectedly")??;
1395 let barrier = message
1396 .into_barrier()
1397 .expect("the first message must be a barrier");
1398 assert!(matches!(
1400 barrier.kind,
1401 BarrierKind::Checkpoint | BarrierKind::Initial
1402 ));
1403 Ok(barrier)
1404}
1405
1406pub async fn expect_first_barrier_from_aligned_stream(
1408 stream: &mut (impl AlignedMessageStream + Unpin),
1409) -> StreamExecutorResult<Barrier> {
1410 let message = stream
1411 .next()
1412 .instrument_await("expect_first_barrier")
1413 .await
1414 .context("failed to extract the first message: stream closed unexpectedly")??;
1415 let barrier = message
1416 .into_barrier()
1417 .expect("the first message must be a barrier");
1418 Ok(barrier)
1419}
1420
1421pub trait StreamConsumer: Send + 'static {
1423 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1424
1425 fn execute(self: Box<Self>) -> Self::BarrierStream;
1426}
1427
1428type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1429
1430pub struct DynamicReceivers<InputId, M> {
1434 barrier: Option<BarrierInner<M>>,
1436 start_ts: Option<Instant>,
1438 blocked: Vec<BoxedMessageInput<InputId, M>>,
1440 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1442 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1444 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1446 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1448}
1449
1450impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1451 for DynamicReceivers<InputId, M>
1452{
1453 type Item = MessageStreamItemInner<M>;
1454
1455 fn poll_next(
1456 mut self: Pin<&mut Self>,
1457 cx: &mut std::task::Context<'_>,
1458 ) -> Poll<Option<Self::Item>> {
1459 if self.is_empty() {
1460 return Poll::Ready(None);
1461 }
1462
1463 loop {
1464 match futures::ready!(self.active.poll_next_unpin(cx)) {
1465 Some((Some(Err(e)), _)) => {
1467 return Poll::Ready(Some(Err(e)));
1468 }
1469 Some((Some(Ok(message)), remaining)) => {
1471 let input_id = remaining.id();
1472 match message {
1473 MessageInner::Chunk(chunk) => {
1474 self.active.push(remaining.into_future());
1476 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1477 }
1478 MessageInner::Watermark(watermark) => {
1479 self.active.push(remaining.into_future());
1481 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1482 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1483 }
1484 }
1485 MessageInner::Barrier(barrier) => {
1486 if self.blocked.is_empty() {
1488 self.start_ts = Some(Instant::now());
1489 }
1490 self.blocked.push(remaining);
1491 if let Some(current_barrier) = self.barrier.as_ref() {
1492 if current_barrier.epoch != barrier.epoch {
1493 return Poll::Ready(Some(Err(
1494 StreamExecutorError::align_barrier(
1495 current_barrier.clone().map_mutation(|_| None),
1496 barrier.map_mutation(|_| None),
1497 ),
1498 )));
1499 }
1500 } else {
1501 self.barrier = Some(barrier);
1502 }
1503 }
1504 }
1505 }
1506 Some((None, remaining)) => {
1514 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1515 "upstream input {:?} unexpectedly closed",
1516 remaining.id()
1517 )))));
1518 }
1519 None => {
1521 assert!(!self.blocked.is_empty());
1522
1523 let start_ts = self
1524 .start_ts
1525 .take()
1526 .expect("should have received at least one barrier");
1527 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1528 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1529 }
1530 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1531 merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1532 }
1533
1534 break;
1535 }
1536 }
1537 }
1538
1539 assert!(self.active.is_terminated());
1540
1541 let barrier = self.barrier.take().unwrap();
1542
1543 let upstreams = std::mem::take(&mut self.blocked);
1544 self.extend_active(upstreams);
1545 assert!(!self.active.is_terminated());
1546
1547 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1548 }
1549}
1550
1551impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1552 pub fn new(
1553 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1554 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1555 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1556 ) -> Self {
1557 let mut this = Self {
1558 barrier: None,
1559 start_ts: None,
1560 blocked: Vec::with_capacity(upstreams.len()),
1561 active: Default::default(),
1562 buffered_watermarks: Default::default(),
1563 merge_barrier_align_duration,
1564 barrier_align_duration,
1565 };
1566 this.extend_active(upstreams);
1567 this
1568 }
1569
1570 pub fn extend_active(
1573 &mut self,
1574 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1575 ) {
1576 assert!(self.blocked.is_empty() && self.barrier.is_none());
1577
1578 self.active
1579 .extend(upstreams.into_iter().map(|s| s.into_future()));
1580 }
1581
1582 pub fn handle_watermark(
1584 &mut self,
1585 input_id: InputId,
1586 watermark: Watermark,
1587 ) -> Option<Watermark> {
1588 let col_idx = watermark.col_idx;
1589 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1591 let watermarks = self
1592 .buffered_watermarks
1593 .entry(col_idx)
1594 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1595 watermarks.handle_watermark(input_id, watermark)
1596 }
1597
1598 pub fn add_upstreams_from(
1601 &mut self,
1602 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1603 ) {
1604 assert!(self.blocked.is_empty() && self.barrier.is_none());
1605
1606 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1607 let input_ids = new_inputs.iter().map(|input| input.id());
1608 self.buffered_watermarks.values_mut().for_each(|buffers| {
1609 buffers.add_buffers(input_ids.clone());
1611 });
1612 self.active
1613 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1614 }
1615
1616 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1620 assert!(self.blocked.is_empty() && self.barrier.is_none());
1621
1622 let new_upstreams = std::mem::take(&mut self.active)
1623 .into_iter()
1624 .map(|s| s.into_inner().unwrap())
1625 .filter(|u| !upstream_input_ids.contains(&u.id()));
1626 self.extend_active(new_upstreams);
1627 self.buffered_watermarks.values_mut().for_each(|buffers| {
1628 buffers.remove_buffer(upstream_input_ids.clone());
1631 });
1632 }
1633
1634 pub fn merge_barrier_align_duration(
1635 &self,
1636 ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1637 self.merge_barrier_align_duration.clone()
1638 }
1639
1640 pub fn flush_buffered_watermarks(&mut self) {
1641 self.buffered_watermarks
1642 .values_mut()
1643 .for_each(|buffers| buffers.clear());
1644 }
1645
1646 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1647 self.blocked
1648 .iter()
1649 .map(|s| s.id())
1650 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1651 }
1652
1653 pub fn is_empty(&self) -> bool {
1654 self.blocked.is_empty() && self.active.is_empty()
1655 }
1656}
1657
1658pub(crate) struct DispatchBarrierBuffer {
1679 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1680 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1681 recv_state: BarrierReceiverState,
1682 curr_upstream_fragment_id: FragmentId,
1683 actor_id: ActorId,
1684 build_input_ctx: Arc<BuildInputContext>,
1686}
1687
1688struct BuildInputContext {
1689 pub actor_id: ActorId,
1690 pub local_barrier_manager: LocalBarrierManager,
1691 pub metrics: Arc<StreamingMetrics>,
1692 pub fragment_id: FragmentId,
1693 pub actor_config: Arc<StreamingConfig>,
1694}
1695
1696type BoxedNewInputsFuture =
1697 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1698
1699enum BarrierReceiverState {
1700 ReceivingBarrier,
1701 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1702}
1703
1704impl DispatchBarrierBuffer {
1705 pub fn new(
1706 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1707 actor_id: ActorId,
1708 curr_upstream_fragment_id: FragmentId,
1709 local_barrier_manager: LocalBarrierManager,
1710 metrics: Arc<StreamingMetrics>,
1711 fragment_id: FragmentId,
1712 actor_config: Arc<StreamingConfig>,
1713 ) -> Self {
1714 Self {
1715 buffer: VecDeque::new(),
1716 barrier_rx,
1717 recv_state: BarrierReceiverState::ReceivingBarrier,
1718 curr_upstream_fragment_id,
1719 actor_id,
1720 build_input_ctx: Arc::new(BuildInputContext {
1721 actor_id,
1722 local_barrier_manager,
1723 metrics,
1724 fragment_id,
1725 actor_config,
1726 }),
1727 }
1728 }
1729
1730 pub async fn await_next_message(
1731 &mut self,
1732 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1733 metrics: &ActorInputMetrics,
1734 ) -> StreamExecutorResult<DispatcherMessage> {
1735 let mut start_time = Instant::now();
1736 let interval_duration = Duration::from_secs(15);
1737 let mut interval =
1738 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1739
1740 loop {
1741 tokio::select! {
1742 biased;
1743 msg = stream.try_next() => {
1744 metrics
1745 .actor_input_buffer_blocking_duration_ns
1746 .inc_by(start_time.elapsed().as_nanos() as u64);
1747 return msg?.ok_or_else(
1748 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1749 );
1750 }
1751
1752 e = self.continuously_fetch_barrier_rx() => {
1753 return Err(e);
1754 }
1755
1756 _ = interval.tick() => {
1757 start_time = Instant::now();
1758 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1759 continue;
1760 }
1761 }
1762 }
1763 }
1764
1765 pub async fn pop_barrier_with_inputs(
1766 &mut self,
1767 barrier: DispatcherBarrier,
1768 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1769 while self.buffer.is_empty() {
1770 self.try_fetch_barrier_rx(false).await?;
1771 }
1772 let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1773 assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1774
1775 Ok((recv_barrier, inputs))
1776 }
1777
1778 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1779 loop {
1780 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1781 return e;
1782 }
1783 }
1784 }
1785
1786 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1787 match &mut self.recv_state {
1788 BarrierReceiverState::ReceivingBarrier => {
1789 let Some(barrier) = self.barrier_rx.recv().await else {
1790 if pending_on_end {
1791 return pending().await;
1792 } else {
1793 return Err(StreamExecutorError::channel_closed(
1794 "barrier channel closed unexpectedly",
1795 ));
1796 }
1797 };
1798 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1799 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1800 } else {
1801 self.buffer.push_back((barrier, None));
1802 }
1803 }
1804 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1805 let new_inputs = fut.await?;
1806 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1807 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1808 }
1809 }
1810 Ok(())
1811 }
1812
1813 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1814 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1815 && !update.added_upstream_actors.is_empty()
1816 {
1817 let upstream_fragment_id =
1819 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1820 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1821 new_upstream_fragment_id
1822 } else {
1823 self.curr_upstream_fragment_id
1824 };
1825 let ctx = self.build_input_ctx.clone();
1826 let added_upstream_actors = update.added_upstream_actors.clone();
1827 let barrier = barrier.clone();
1828 let fut = async move {
1829 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1830 let mut new_input = new_input(
1831 &ctx.local_barrier_manager,
1832 ctx.metrics.clone(),
1833 ctx.actor_id,
1834 ctx.fragment_id,
1835 upstream_actor,
1836 upstream_fragment_id,
1837 ctx.actor_config.clone(),
1838 )
1839 .await?;
1840
1841 let first_barrier = expect_first_barrier(&mut new_input).await?;
1844 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1845
1846 StreamExecutorResult::Ok(new_input)
1847 }))
1848 .await
1849 }
1850 .boxed();
1851
1852 Some(fut)
1853 } else {
1854 None
1855 }
1856 }
1857}