1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Field, Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53 PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, PbWatermark,
54 SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62 BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
63};
64use crate::executor::monitor::ActorInputMetrics;
65use crate::executor::prelude::StreamingMetrics;
66use crate::executor::watermark::BufferedWatermarks;
67use crate::task::{ActorId, FragmentId, LocalBarrierManager};
68
69mod actor;
70mod barrier_align;
71pub mod exchange;
72pub mod monitor;
73
74pub mod aggregate;
75pub mod asof_join;
76mod backfill;
77mod barrier_recv;
78mod batch_query;
79mod chain;
80mod changelog;
81mod dedup;
82mod dispatch;
83pub mod dml;
84mod dynamic_filter;
85pub mod eowc;
86pub mod error;
87mod expand;
88mod filter;
89mod gap_fill;
90pub mod hash_join;
91mod hop_window;
92mod join;
93pub mod locality_provider;
94mod lookup;
95mod lookup_union;
96mod merge;
97mod mview;
98mod nested_loop_temporal_join;
99mod no_op;
100mod now;
101mod over_window;
102pub mod project;
103mod rearranged_chain;
104mod receiver;
105pub mod row_id_gen;
106mod sink;
107pub mod source;
108mod stream_reader;
109pub mod subtask;
110mod temporal_join;
111mod top_n;
112mod troublemaker;
113mod union;
114mod upstream_sink_union;
115mod values;
116mod watermark;
117mod watermark_filter;
118mod wrapper;
119
120mod approx_percentile;
121
122mod row_merge;
123
124#[cfg(test)]
125mod integration_tests;
126mod sync_kv_log_store;
127#[cfg(any(test, feature = "test"))]
128pub mod test_utils;
129mod utils;
130mod vector;
131
132pub use actor::{Actor, ActorContext, ActorContextRef};
133use anyhow::Context;
134pub use approx_percentile::global::GlobalApproxPercentileExecutor;
135pub use approx_percentile::local::LocalApproxPercentileExecutor;
136pub use backfill::arrangement_backfill::*;
137pub use backfill::cdc::{
138 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
139};
140pub use backfill::no_shuffle_backfill::*;
141pub use backfill::snapshot_backfill::*;
142pub use barrier_recv::BarrierRecvExecutor;
143pub use batch_query::BatchQueryExecutor;
144pub use chain::ChainExecutor;
145pub use changelog::ChangeLogExecutor;
146pub use dedup::AppendOnlyDedupExecutor;
147pub use dispatch::{DispatchExecutor, DispatcherImpl};
148pub use dynamic_filter::DynamicFilterExecutor;
149pub use error::{StreamExecutorError, StreamExecutorResult};
150pub use expand::ExpandExecutor;
151pub use filter::{FilterExecutor, UpsertFilterExecutor};
152pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
153pub use hash_join::*;
154pub use hop_window::HopWindowExecutor;
155pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
156pub use join::{AsOfDesc, AsOfJoinType, JoinType};
157pub use lookup::*;
158pub use lookup_union::LookupUnionExecutor;
159pub use merge::MergeExecutor;
160pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
161pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
162pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
163pub use no_op::NoOpExecutor;
164pub use now::*;
165pub use over_window::*;
166pub use rearranged_chain::RearrangedChainExecutor;
167pub use receiver::ReceiverExecutor;
168use risingwave_common::id::SourceId;
169pub use row_merge::RowMergeExecutor;
170pub use sink::SinkExecutor;
171pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
172pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
173pub use temporal_join::TemporalJoinExecutor;
174pub use top_n::{
175 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
176};
177pub use troublemaker::TroublemakerExecutor;
178pub use union::UnionExecutor;
179pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
180pub use utils::DummyExecutor;
181pub use values::ValuesExecutor;
182pub use vector::*;
183pub use watermark_filter::WatermarkFilterExecutor;
184pub use wrapper::WrapperExecutor;
185
186use self::barrier_align::AlignedMessageStream;
187
188pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
189pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
190pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
191pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
192
193pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
194use risingwave_connector::sink::catalog::SinkId;
195use risingwave_connector::source::cdc::{
196 CdcTableSnapshotSplitAssignmentWithGeneration,
197 build_actor_cdc_table_snapshot_splits_with_generation,
198};
199use risingwave_pb::id::{ExecutorId, SubscriberId};
200use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
201
202pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
203pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
204pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
205
206#[derive(Debug, Default, Clone)]
208pub struct ExecutorInfo {
209 pub schema: Schema,
211
212 pub stream_key: StreamKey,
214
215 pub stream_kind: PbStreamKind,
217
218 pub identity: String,
220
221 pub id: ExecutorId,
223}
224
225impl ExecutorInfo {
226 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
227 Self {
228 schema,
229 stream_key,
230 stream_kind: PbStreamKind::Retract, identity,
232 id: id.into(),
233 }
234 }
235}
236
237pub trait Execute: Send + 'static {
239 fn execute(self: Box<Self>) -> BoxedMessageStream;
240
241 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
242 self.execute()
243 }
244
245 fn boxed(self) -> Box<dyn Execute>
246 where
247 Self: Sized + Send + 'static,
248 {
249 Box::new(self)
250 }
251}
252
253pub struct Executor {
256 info: ExecutorInfo,
257 execute: Box<dyn Execute>,
258}
259
260impl Executor {
261 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
262 Self { info, execute }
263 }
264
265 pub fn info(&self) -> &ExecutorInfo {
266 &self.info
267 }
268
269 pub fn schema(&self) -> &Schema {
270 &self.info.schema
271 }
272
273 pub fn stream_key(&self) -> StreamKeyRef<'_> {
274 &self.info.stream_key
275 }
276
277 pub fn stream_kind(&self) -> PbStreamKind {
278 self.info.stream_kind
279 }
280
281 pub fn identity(&self) -> &str {
282 &self.info.identity
283 }
284
285 pub fn execute(self) -> BoxedMessageStream {
286 self.execute.execute()
287 }
288
289 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
290 self.execute.execute_with_epoch(epoch)
291 }
292}
293
294impl std::fmt::Debug for Executor {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.write_str(self.identity())
297 }
298}
299
300impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
301 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
302 Self::new(info, execute)
303 }
304}
305
306impl<E> From<(ExecutorInfo, E)> for Executor
307where
308 E: Execute,
309{
310 fn from((info, execute): (ExecutorInfo, E)) -> Self {
311 Self::new(info, execute.boxed())
312 }
313}
314
315pub const INVALID_EPOCH: u64 = 0;
316
317type UpstreamFragmentId = FragmentId;
318type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
319
320#[derive(Debug, Clone, PartialEq)]
321#[cfg_attr(any(test, feature = "test"), derive(Default))]
322pub struct UpdateMutation {
323 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
324 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
325 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
326 pub dropped_actors: HashSet<ActorId>,
327 pub actor_splits: SplitAssignments,
328 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
329 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
330 pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
331}
332
333#[derive(Debug, Clone, PartialEq)]
334#[cfg_attr(any(test, feature = "test"), derive(Default))]
335pub struct AddMutation {
336 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
337 pub added_actors: HashSet<ActorId>,
338 pub splits: SplitAssignments,
340 pub pause: bool,
341 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
343 pub backfill_nodes_to_pause: HashSet<FragmentId>,
345 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
346 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
347}
348
349#[derive(Debug, Clone, PartialEq)]
350#[cfg_attr(any(test, feature = "test"), derive(Default))]
351pub struct StopMutation {
352 pub dropped_actors: HashSet<ActorId>,
353 pub dropped_sink_fragments: HashSet<FragmentId>,
354}
355
356#[derive(Debug, Clone, PartialEq)]
358pub enum Mutation {
359 Stop(StopMutation),
360 Update(UpdateMutation),
361 Add(AddMutation),
362 SourceChangeSplit(SplitAssignments),
363 Pause,
364 Resume,
365 Throttle(HashMap<ActorId, Option<u32>>),
366 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
367 DropSubscriptions {
368 subscriptions_to_drop: Vec<(SubscriberId, TableId)>,
370 },
371 StartFragmentBackfill {
372 fragment_ids: HashSet<FragmentId>,
373 },
374 RefreshStart {
375 table_id: TableId,
376 associated_source_id: SourceId,
377 },
378 ListFinish {
379 associated_source_id: SourceId,
380 },
381 LoadFinish {
382 associated_source_id: SourceId,
383 },
384}
385
386#[derive(Debug, Clone)]
391pub struct BarrierInner<M> {
392 pub epoch: EpochPair,
393 pub mutation: M,
394 pub kind: BarrierKind,
395
396 pub tracing_context: TracingContext,
398}
399
400pub type BarrierMutationType = Option<Arc<Mutation>>;
401pub type Barrier = BarrierInner<BarrierMutationType>;
402pub type DispatcherBarrier = BarrierInner<()>;
403
404impl<M: Default> BarrierInner<M> {
405 pub fn new_test_barrier(epoch: u64) -> Self {
407 Self {
408 epoch: EpochPair::new_test_epoch(epoch),
409 kind: BarrierKind::Checkpoint,
410 tracing_context: TracingContext::none(),
411 mutation: Default::default(),
412 }
413 }
414
415 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
416 Self {
417 epoch: EpochPair::new(epoch, prev_epoch),
418 kind: BarrierKind::Checkpoint,
419 tracing_context: TracingContext::none(),
420 mutation: Default::default(),
421 }
422 }
423}
424
425impl Barrier {
426 pub fn into_dispatcher(self) -> DispatcherBarrier {
427 DispatcherBarrier {
428 epoch: self.epoch,
429 mutation: (),
430 kind: self.kind,
431 tracing_context: self.tracing_context,
432 }
433 }
434
435 #[must_use]
436 pub fn with_mutation(self, mutation: Mutation) -> Self {
437 Self {
438 mutation: Some(Arc::new(mutation)),
439 ..self
440 }
441 }
442
443 #[must_use]
444 pub fn with_stop(self) -> Self {
445 self.with_mutation(Mutation::Stop(StopMutation {
446 dropped_actors: Default::default(),
447 dropped_sink_fragments: Default::default(),
448 }))
449 }
450
451 pub fn is_with_stop_mutation(&self) -> bool {
453 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
454 }
455
456 pub fn is_stop(&self, actor_id: ActorId) -> bool {
458 self.all_stop_actors()
459 .is_some_and(|actors| actors.contains(&actor_id))
460 }
461
462 pub fn is_checkpoint(&self) -> bool {
463 self.kind == BarrierKind::Checkpoint
464 }
465
466 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
476 match self.mutation.as_deref()? {
477 Mutation::Update(UpdateMutation { actor_splits, .. })
478 | Mutation::Add(AddMutation {
479 splits: actor_splits,
480 ..
481 }) => actor_splits.get(&actor_id),
482
483 _ => {
484 if cfg!(debug_assertions) {
485 panic!(
486 "the initial mutation of the barrier should not be {:?}",
487 self.mutation
488 );
489 }
490 None
491 }
492 }
493 .map(|s| s.as_slice())
494 }
495
496 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
498 match self.mutation.as_deref() {
499 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
500 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
501 _ => None,
502 }
503 }
504
505 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
511 match self.mutation.as_deref() {
512 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
513 added_actors.contains(&actor_id)
514 }
515 _ => false,
516 }
517 }
518
519 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
520 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
521 fragment_ids.contains(&fragment_id)
522 } else {
523 false
524 }
525 }
526
527 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
545 let Some(mutation) = self.mutation.as_deref() else {
546 return false;
547 };
548 match mutation {
549 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
551 Mutation::Update(_)
552 | Mutation::Stop(_)
553 | Mutation::Pause
554 | Mutation::Resume
555 | Mutation::SourceChangeSplit(_)
556 | Mutation::Throttle(_)
557 | Mutation::DropSubscriptions { .. }
558 | Mutation::ConnectorPropsChange(_)
559 | Mutation::StartFragmentBackfill { .. }
560 | Mutation::RefreshStart { .. }
561 | Mutation::ListFinish { .. }
562 | Mutation::LoadFinish { .. } => false,
563 }
564 }
565
566 pub fn is_pause_on_startup(&self) -> bool {
568 match self.mutation.as_deref() {
569 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
570 _ => false,
571 }
572 }
573
574 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
575 match self.mutation.as_deref() {
576 Some(Mutation::Add(AddMutation {
577 backfill_nodes_to_pause,
578 ..
579 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
580 Some(Mutation::Update(_)) => false,
581 _ => {
582 tracing::warn!(
583 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
584 self
585 );
586 false
587 }
588 }
589 }
590
591 pub fn is_resume(&self) -> bool {
593 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
594 }
595
596 pub fn as_update_merge(
599 &self,
600 actor_id: ActorId,
601 upstream_fragment_id: UpstreamFragmentId,
602 ) -> Option<&MergeUpdate> {
603 self.mutation
604 .as_deref()
605 .and_then(|mutation| match mutation {
606 Mutation::Update(UpdateMutation { merges, .. }) => {
607 merges.get(&(actor_id, upstream_fragment_id))
608 }
609 _ => None,
610 })
611 }
612
613 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
616 self.mutation
617 .as_deref()
618 .and_then(|mutation| match mutation {
619 Mutation::Add(AddMutation {
620 new_upstream_sinks, ..
621 }) => new_upstream_sinks.get(&fragment_id),
622 _ => None,
623 })
624 }
625
626 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
628 self.mutation
629 .as_deref()
630 .and_then(|mutation| match mutation {
631 Mutation::Stop(StopMutation {
632 dropped_sink_fragments,
633 ..
634 }) => Some(dropped_sink_fragments),
635 _ => None,
636 })
637 }
638
639 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
645 self.mutation
646 .as_deref()
647 .and_then(|mutation| match mutation {
648 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
649 vnode_bitmaps.get(&actor_id).cloned()
650 }
651 _ => None,
652 })
653 }
654
655 pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
656 self.mutation
657 .as_deref()
658 .and_then(|mutation| match mutation {
659 Mutation::Update(UpdateMutation {
660 sink_add_columns, ..
661 }) => sink_add_columns.get(&sink_id).cloned(),
662 _ => None,
663 })
664 }
665
666 pub fn get_curr_epoch(&self) -> Epoch {
667 Epoch(self.epoch.curr)
668 }
669
670 pub fn tracing_context(&self) -> &TracingContext {
672 &self.tracing_context
673 }
674
675 pub fn added_subscriber_on_mv_table(
676 &self,
677 mv_table_id: TableId,
678 ) -> impl Iterator<Item = SubscriberId> + '_ {
679 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
680 Some(add)
681 } else {
682 None
683 }
684 .into_iter()
685 .flat_map(move |add| {
686 add.subscriptions_to_add.iter().filter_map(
687 move |(upstream_mv_table_id, subscriber_id)| {
688 if *upstream_mv_table_id == mv_table_id {
689 Some(*subscriber_id)
690 } else {
691 None
692 }
693 },
694 )
695 })
696 }
697}
698
699impl<M: PartialEq> PartialEq for BarrierInner<M> {
700 fn eq(&self, other: &Self) -> bool {
701 self.epoch == other.epoch && self.mutation == other.mutation
702 }
703}
704
705impl Mutation {
706 #[cfg(test)]
710 pub fn is_stop(&self) -> bool {
711 matches!(self, Mutation::Stop(_))
712 }
713
714 #[cfg(test)]
715 fn to_protobuf(&self) -> PbMutation {
716 use risingwave_pb::source::{
717 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
718 };
719 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
720 use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
721 use risingwave_pb::stream_plan::{
722 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
723 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation, PbSinkAddColumns,
724 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
725 PbThrottleMutation, PbUpdateMutation,
726 };
727 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
728 actor_splits
729 .iter()
730 .map(|(&actor_id, splits)| {
731 (
732 actor_id,
733 ConnectorSplits {
734 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
735 },
736 )
737 })
738 .collect::<HashMap<_, _>>()
739 };
740
741 match self {
742 Mutation::Stop(StopMutation {
743 dropped_actors,
744 dropped_sink_fragments,
745 }) => PbMutation::Stop(PbStopMutation {
746 actors: dropped_actors.iter().copied().collect(),
747 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
748 }),
749 Mutation::Update(UpdateMutation {
750 dispatchers,
751 merges,
752 vnode_bitmaps,
753 dropped_actors,
754 actor_splits,
755 actor_new_dispatchers,
756 actor_cdc_table_snapshot_splits,
757 sink_add_columns,
758 }) => PbMutation::Update(PbUpdateMutation {
759 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
760 merge_update: merges.values().cloned().collect(),
761 actor_vnode_bitmap_update: vnode_bitmaps
762 .iter()
763 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
764 .collect(),
765 dropped_actors: dropped_actors.iter().copied().collect(),
766 actor_splits: actor_splits_to_protobuf(actor_splits),
767 actor_new_dispatchers: actor_new_dispatchers
768 .iter()
769 .map(|(&actor_id, dispatchers)| {
770 (
771 actor_id,
772 PbDispatchers {
773 dispatchers: dispatchers.clone(),
774 },
775 )
776 })
777 .collect(),
778 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
779 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
780 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
781 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
782 generation: *generation,
783 })
784 }).collect()
785 }),
786 sink_add_columns: sink_add_columns
787 .iter()
788 .map(|(sink_id, add_columns)| {
789 (
790 *sink_id,
791 PbSinkAddColumns {
792 fields: add_columns.iter().map(|field| field.to_prost()).collect(),
793 },
794 )
795 })
796 .collect(),
797 }),
798 Mutation::Add(AddMutation {
799 adds,
800 added_actors,
801 splits,
802 pause,
803 subscriptions_to_add,
804 backfill_nodes_to_pause,
805 actor_cdc_table_snapshot_splits,
806 new_upstream_sinks,
807 }) => PbMutation::Add(PbAddMutation {
808 actor_dispatchers: adds
809 .iter()
810 .map(|(&actor_id, dispatchers)| {
811 (
812 actor_id,
813 PbDispatchers {
814 dispatchers: dispatchers.clone(),
815 },
816 )
817 })
818 .collect(),
819 added_actors: added_actors.iter().copied().collect(),
820 actor_splits: actor_splits_to_protobuf(splits),
821 pause: *pause,
822 subscriptions_to_add: subscriptions_to_add
823 .iter()
824 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
825 subscriber_id: *subscriber_id,
826 upstream_mv_table_id: *table_id,
827 })
828 .collect(),
829 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
830 actor_cdc_table_snapshot_splits:
831 Some(PbCdcTableSnapshotSplitsWithGeneration {
832 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
833 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
834 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
835 generation: *generation,
836 })
837 }).collect()
838 }),
839 new_upstream_sinks: new_upstream_sinks
840 .iter()
841 .map(|(k, v)| (*k, v.clone()))
842 .collect(),
843 }),
844 Mutation::SourceChangeSplit(changes) => {
845 PbMutation::Splits(PbSourceChangeSplitMutation {
846 actor_splits: changes
847 .iter()
848 .map(|(&actor_id, splits)| {
849 (
850 actor_id,
851 ConnectorSplits {
852 splits: splits
853 .clone()
854 .iter()
855 .map(ConnectorSplit::from)
856 .collect(),
857 },
858 )
859 })
860 .collect(),
861 })
862 }
863 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
864 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
865 Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
866 actor_throttle: changes
867 .iter()
868 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
869 .collect(),
870 }),
871 Mutation::DropSubscriptions {
872 subscriptions_to_drop,
873 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
874 info: subscriptions_to_drop
875 .iter()
876 .map(
877 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
878 subscriber_id: *subscriber_id,
879 upstream_mv_table_id: *upstream_mv_table_id,
880 },
881 )
882 .collect(),
883 }),
884 Mutation::ConnectorPropsChange(map) => {
885 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
886 connector_props_infos: map
887 .iter()
888 .map(|(actor_id, options)| {
889 (
890 *actor_id,
891 ConnectorPropsInfo {
892 connector_props_info: options
893 .iter()
894 .map(|(k, v)| (k.clone(), v.clone()))
895 .collect(),
896 },
897 )
898 })
899 .collect(),
900 })
901 }
902 Mutation::StartFragmentBackfill { fragment_ids } => {
903 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
904 fragment_ids: fragment_ids.iter().copied().collect(),
905 })
906 }
907 Mutation::RefreshStart {
908 table_id,
909 associated_source_id,
910 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
911 table_id: *table_id,
912 associated_source_id: *associated_source_id,
913 }),
914 Mutation::ListFinish {
915 associated_source_id,
916 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
917 associated_source_id: *associated_source_id,
918 }),
919 Mutation::LoadFinish {
920 associated_source_id,
921 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
922 associated_source_id: *associated_source_id,
923 }),
924 }
925 }
926
927 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
928 let mutation = match prost {
929 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
930 dropped_actors: stop.actors.iter().copied().collect(),
931 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
932 }),
933
934 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
935 dispatchers: update
936 .dispatcher_update
937 .iter()
938 .map(|u| (u.actor_id, u.clone()))
939 .into_group_map(),
940 merges: update
941 .merge_update
942 .iter()
943 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
944 .collect(),
945 vnode_bitmaps: update
946 .actor_vnode_bitmap_update
947 .iter()
948 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
949 .collect(),
950 dropped_actors: update.dropped_actors.iter().copied().collect(),
951 actor_splits: update
952 .actor_splits
953 .iter()
954 .map(|(&actor_id, splits)| {
955 (
956 actor_id,
957 splits
958 .splits
959 .iter()
960 .map(|split| split.try_into().unwrap())
961 .collect(),
962 )
963 })
964 .collect(),
965 actor_new_dispatchers: update
966 .actor_new_dispatchers
967 .iter()
968 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
969 .collect(),
970 actor_cdc_table_snapshot_splits:
971 build_actor_cdc_table_snapshot_splits_with_generation(
972 update
973 .actor_cdc_table_snapshot_splits
974 .clone()
975 .unwrap_or_default(),
976 ),
977 sink_add_columns: update
978 .sink_add_columns
979 .iter()
980 .map(|(sink_id, add_columns)| {
981 (
982 *sink_id,
983 add_columns.fields.iter().map(Field::from_prost).collect(),
984 )
985 })
986 .collect(),
987 }),
988
989 PbMutation::Add(add) => Mutation::Add(AddMutation {
990 adds: add
991 .actor_dispatchers
992 .iter()
993 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
994 .collect(),
995 added_actors: add.added_actors.iter().copied().collect(),
996 splits: add
999 .actor_splits
1000 .iter()
1001 .map(|(&actor_id, splits)| {
1002 (
1003 actor_id,
1004 splits
1005 .splits
1006 .iter()
1007 .map(|split| split.try_into().unwrap())
1008 .collect(),
1009 )
1010 })
1011 .collect(),
1012 pause: add.pause,
1013 subscriptions_to_add: add
1014 .subscriptions_to_add
1015 .iter()
1016 .map(
1017 |SubscriptionUpstreamInfo {
1018 subscriber_id,
1019 upstream_mv_table_id,
1020 }| { (*upstream_mv_table_id, *subscriber_id) },
1021 )
1022 .collect(),
1023 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1024 actor_cdc_table_snapshot_splits:
1025 build_actor_cdc_table_snapshot_splits_with_generation(
1026 add.actor_cdc_table_snapshot_splits
1027 .clone()
1028 .unwrap_or_default(),
1029 ),
1030 new_upstream_sinks: add
1031 .new_upstream_sinks
1032 .iter()
1033 .map(|(k, v)| (*k, v.clone()))
1034 .collect(),
1035 }),
1036
1037 PbMutation::Splits(s) => {
1038 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1039 Vec::with_capacity(s.actor_splits.len());
1040 for (&actor_id, splits) in &s.actor_splits {
1041 if !splits.splits.is_empty() {
1042 change_splits.push((
1043 actor_id,
1044 splits
1045 .splits
1046 .iter()
1047 .map(SplitImpl::try_from)
1048 .try_collect()?,
1049 ));
1050 }
1051 }
1052 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1053 }
1054 PbMutation::Pause(_) => Mutation::Pause,
1055 PbMutation::Resume(_) => Mutation::Resume,
1056 PbMutation::Throttle(changes) => Mutation::Throttle(
1057 changes
1058 .actor_throttle
1059 .iter()
1060 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1061 .collect(),
1062 ),
1063 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1064 subscriptions_to_drop: drop
1065 .info
1066 .iter()
1067 .map(|info| (info.subscriber_id, info.upstream_mv_table_id))
1068 .collect(),
1069 },
1070 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1071 Mutation::ConnectorPropsChange(
1072 alter_connector_props
1073 .connector_props_infos
1074 .iter()
1075 .map(|(connector_id, options)| {
1076 (
1077 *connector_id,
1078 options
1079 .connector_props_info
1080 .iter()
1081 .map(|(k, v)| (k.clone(), v.clone()))
1082 .collect(),
1083 )
1084 })
1085 .collect(),
1086 )
1087 }
1088 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1089 Mutation::StartFragmentBackfill {
1090 fragment_ids: start_fragment_backfill
1091 .fragment_ids
1092 .iter()
1093 .copied()
1094 .collect(),
1095 }
1096 }
1097 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1098 table_id: refresh_start.table_id,
1099 associated_source_id: refresh_start.associated_source_id,
1100 },
1101 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1102 associated_source_id: list_finish.associated_source_id,
1103 },
1104 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1105 associated_source_id: load_finish.associated_source_id,
1106 },
1107 };
1108 Ok(mutation)
1109 }
1110}
1111
1112impl<M> BarrierInner<M> {
1113 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1114 let Self {
1115 epoch,
1116 mutation,
1117 kind,
1118 tracing_context,
1119 ..
1120 } = self;
1121
1122 PbBarrier {
1123 epoch: Some(PbEpoch {
1124 curr: epoch.curr,
1125 prev: epoch.prev,
1126 }),
1127 mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1128 mutation: Some(mutation),
1129 }),
1130 tracing_context: tracing_context.to_protobuf(),
1131 kind: *kind as _,
1132 }
1133 }
1134
1135 fn from_protobuf_inner(
1136 prost: &PbBarrier,
1137 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1138 ) -> StreamExecutorResult<Self> {
1139 let epoch = prost.get_epoch()?;
1140
1141 Ok(Self {
1142 kind: prost.kind(),
1143 epoch: EpochPair::new(epoch.curr, epoch.prev),
1144 mutation: mutation_from_pb(
1145 (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1146 )?,
1147 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1148 })
1149 }
1150
1151 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1152 BarrierInner {
1153 epoch: self.epoch,
1154 mutation: f(self.mutation),
1155 kind: self.kind,
1156 tracing_context: self.tracing_context,
1157 }
1158 }
1159}
1160
1161impl DispatcherBarrier {
1162 pub fn to_protobuf(&self) -> PbBarrier {
1163 self.to_protobuf_inner(|_| None)
1164 }
1165}
1166
1167impl Barrier {
1168 #[cfg(test)]
1169 pub fn to_protobuf(&self) -> PbBarrier {
1170 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1171 }
1172
1173 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1174 Self::from_protobuf_inner(prost, |mutation| {
1175 mutation
1176 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1177 .transpose()
1178 })
1179 }
1180}
1181
1182#[derive(Debug, PartialEq, Eq, Clone)]
1183pub struct Watermark {
1184 pub col_idx: usize,
1185 pub data_type: DataType,
1186 pub val: ScalarImpl,
1187}
1188
1189impl PartialOrd for Watermark {
1190 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1191 Some(self.cmp(other))
1192 }
1193}
1194
1195impl Ord for Watermark {
1196 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1197 self.val.default_cmp(&other.val)
1198 }
1199}
1200
1201impl Watermark {
1202 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1203 Self {
1204 col_idx,
1205 data_type,
1206 val,
1207 }
1208 }
1209
1210 pub async fn transform_with_expr(
1211 self,
1212 expr: &NonStrictExpression<impl Expression>,
1213 new_col_idx: usize,
1214 ) -> Option<Self> {
1215 let Self { col_idx, val, .. } = self;
1216 let row = {
1217 let mut row = vec![None; col_idx + 1];
1218 row[col_idx] = Some(val);
1219 OwnedRow::new(row)
1220 };
1221 let val = expr.eval_row_infallible(&row).await?;
1222 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1223 }
1224
1225 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1228 output_indices
1229 .iter()
1230 .position(|p| *p == self.col_idx)
1231 .map(|new_col_idx| self.with_idx(new_col_idx))
1232 }
1233
1234 pub fn to_protobuf(&self) -> PbWatermark {
1235 PbWatermark {
1236 column: Some(PbInputRef {
1237 index: self.col_idx as _,
1238 r#type: Some(self.data_type.to_protobuf()),
1239 }),
1240 val: Some(&self.val).to_protobuf().into(),
1241 }
1242 }
1243
1244 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1245 let col_ref = prost.get_column()?;
1246 let data_type = DataType::from(col_ref.get_type()?);
1247 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1248 .expect("watermark value cannot be null");
1249 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1250 }
1251
1252 pub fn with_idx(self, idx: usize) -> Self {
1253 Self::new(idx, self.data_type, self.val)
1254 }
1255}
1256
1257#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1258pub enum MessageInner<M> {
1259 Chunk(StreamChunk),
1260 Barrier(BarrierInner<M>),
1261 Watermark(Watermark),
1262}
1263
1264impl<M> MessageInner<M> {
1265 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1266 match self {
1267 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1268 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1269 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1270 }
1271 }
1272}
1273
1274pub type Message = MessageInner<BarrierMutationType>;
1275pub type DispatcherMessage = MessageInner<()>;
1276
1277#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1280pub enum MessageBatchInner<M> {
1281 Chunk(StreamChunk),
1282 BarrierBatch(Vec<BarrierInner<M>>),
1283 Watermark(Watermark),
1284}
1285pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1286pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1287pub type DispatcherMessageBatch = MessageBatchInner<()>;
1288
1289impl From<DispatcherMessage> for DispatcherMessageBatch {
1290 fn from(m: DispatcherMessage) -> Self {
1291 match m {
1292 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1293 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1294 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1295 }
1296 }
1297}
1298
1299impl From<StreamChunk> for Message {
1300 fn from(chunk: StreamChunk) -> Self {
1301 Message::Chunk(chunk)
1302 }
1303}
1304
1305impl<'a> TryFrom<&'a Message> for &'a Barrier {
1306 type Error = ();
1307
1308 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1309 match m {
1310 Message::Chunk(_) => Err(()),
1311 Message::Barrier(b) => Ok(b),
1312 Message::Watermark(_) => Err(()),
1313 }
1314 }
1315}
1316
1317impl Message {
1318 #[cfg(test)]
1323 pub fn is_stop(&self) -> bool {
1324 matches!(
1325 self,
1326 Message::Barrier(Barrier {
1327 mutation,
1328 ..
1329 }) if mutation.as_ref().unwrap().is_stop()
1330 )
1331 }
1332}
1333
1334impl DispatcherMessageBatch {
1335 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1336 let prost = match self {
1337 Self::Chunk(stream_chunk) => {
1338 let prost_stream_chunk = stream_chunk.to_protobuf();
1339 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1340 }
1341 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1342 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1343 }),
1344 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1345 };
1346 PbStreamMessageBatch {
1347 stream_message_batch: Some(prost),
1348 }
1349 }
1350
1351 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1352 let res = match prost.get_stream_message_batch()? {
1353 StreamMessageBatch::StreamChunk(chunk) => {
1354 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1355 }
1356 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1357 let barriers = barrier_batch
1358 .barriers
1359 .iter()
1360 .map(|barrier| {
1361 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1362 if mutation.is_some() {
1363 if cfg!(debug_assertions) {
1364 panic!("should not receive message of barrier with mutation");
1365 } else {
1366 warn!(?barrier, "receive message of barrier with mutation");
1367 }
1368 }
1369 Ok(())
1370 })
1371 })
1372 .try_collect()?;
1373 Self::BarrierBatch(barriers)
1374 }
1375 StreamMessageBatch::Watermark(watermark) => {
1376 Self::Watermark(Watermark::from_protobuf(watermark)?)
1377 }
1378 };
1379 Ok(res)
1380 }
1381
1382 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1383 ::prost::Message::encoded_len(msg)
1384 }
1385}
1386
1387pub type StreamKey = Vec<usize>;
1388pub type StreamKeyRef<'a> = &'a [usize];
1389pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1390
1391pub async fn expect_first_barrier<M: Debug>(
1393 stream: &mut (impl MessageStreamInner<M> + Unpin),
1394) -> StreamExecutorResult<BarrierInner<M>> {
1395 let message = stream
1396 .next()
1397 .instrument_await("expect_first_barrier")
1398 .await
1399 .context("failed to extract the first message: stream closed unexpectedly")??;
1400 let barrier = message
1401 .into_barrier()
1402 .expect("the first message must be a barrier");
1403 assert!(matches!(
1405 barrier.kind,
1406 BarrierKind::Checkpoint | BarrierKind::Initial
1407 ));
1408 Ok(barrier)
1409}
1410
1411pub async fn expect_first_barrier_from_aligned_stream(
1413 stream: &mut (impl AlignedMessageStream + Unpin),
1414) -> StreamExecutorResult<Barrier> {
1415 let message = stream
1416 .next()
1417 .instrument_await("expect_first_barrier")
1418 .await
1419 .context("failed to extract the first message: stream closed unexpectedly")??;
1420 let barrier = message
1421 .into_barrier()
1422 .expect("the first message must be a barrier");
1423 Ok(barrier)
1424}
1425
1426pub trait StreamConsumer: Send + 'static {
1428 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1429
1430 fn execute(self: Box<Self>) -> Self::BarrierStream;
1431}
1432
1433type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1434
1435pub struct DynamicReceivers<InputId, M> {
1439 barrier: Option<BarrierInner<M>>,
1441 start_ts: Option<Instant>,
1443 blocked: Vec<BoxedMessageInput<InputId, M>>,
1445 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1447 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1449 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1451 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1453}
1454
1455impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1456 for DynamicReceivers<InputId, M>
1457{
1458 type Item = MessageStreamItemInner<M>;
1459
1460 fn poll_next(
1461 mut self: Pin<&mut Self>,
1462 cx: &mut std::task::Context<'_>,
1463 ) -> Poll<Option<Self::Item>> {
1464 if self.is_empty() {
1465 return Poll::Ready(None);
1466 }
1467
1468 loop {
1469 match futures::ready!(self.active.poll_next_unpin(cx)) {
1470 Some((Some(Err(e)), _)) => {
1472 return Poll::Ready(Some(Err(e)));
1473 }
1474 Some((Some(Ok(message)), remaining)) => {
1476 let input_id = remaining.id();
1477 match message {
1478 MessageInner::Chunk(chunk) => {
1479 self.active.push(remaining.into_future());
1481 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1482 }
1483 MessageInner::Watermark(watermark) => {
1484 self.active.push(remaining.into_future());
1486 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1487 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1488 }
1489 }
1490 MessageInner::Barrier(barrier) => {
1491 if self.blocked.is_empty() {
1493 self.start_ts = Some(Instant::now());
1494 }
1495 self.blocked.push(remaining);
1496 if let Some(current_barrier) = self.barrier.as_ref() {
1497 if current_barrier.epoch != barrier.epoch {
1498 return Poll::Ready(Some(Err(
1499 StreamExecutorError::align_barrier(
1500 current_barrier.clone().map_mutation(|_| None),
1501 barrier.map_mutation(|_| None),
1502 ),
1503 )));
1504 }
1505 } else {
1506 self.barrier = Some(barrier);
1507 }
1508 }
1509 }
1510 }
1511 Some((None, remaining)) => {
1519 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1520 "upstream input {:?} unexpectedly closed",
1521 remaining.id()
1522 )))));
1523 }
1524 None => {
1526 assert!(!self.blocked.is_empty());
1527
1528 let start_ts = self
1529 .start_ts
1530 .take()
1531 .expect("should have received at least one barrier");
1532 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1533 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1534 }
1535 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1536 merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1537 }
1538
1539 break;
1540 }
1541 }
1542 }
1543
1544 assert!(self.active.is_terminated());
1545
1546 let barrier = self.barrier.take().unwrap();
1547
1548 let upstreams = std::mem::take(&mut self.blocked);
1549 self.extend_active(upstreams);
1550 assert!(!self.active.is_terminated());
1551
1552 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1553 }
1554}
1555
1556impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1557 pub fn new(
1558 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1559 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1560 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1561 ) -> Self {
1562 let mut this = Self {
1563 barrier: None,
1564 start_ts: None,
1565 blocked: Vec::with_capacity(upstreams.len()),
1566 active: Default::default(),
1567 buffered_watermarks: Default::default(),
1568 merge_barrier_align_duration,
1569 barrier_align_duration,
1570 };
1571 this.extend_active(upstreams);
1572 this
1573 }
1574
1575 pub fn extend_active(
1578 &mut self,
1579 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1580 ) {
1581 assert!(self.blocked.is_empty() && self.barrier.is_none());
1582
1583 self.active
1584 .extend(upstreams.into_iter().map(|s| s.into_future()));
1585 }
1586
1587 pub fn handle_watermark(
1589 &mut self,
1590 input_id: InputId,
1591 watermark: Watermark,
1592 ) -> Option<Watermark> {
1593 let col_idx = watermark.col_idx;
1594 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1596 let watermarks = self
1597 .buffered_watermarks
1598 .entry(col_idx)
1599 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1600 watermarks.handle_watermark(input_id, watermark)
1601 }
1602
1603 pub fn add_upstreams_from(
1606 &mut self,
1607 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1608 ) {
1609 assert!(self.blocked.is_empty() && self.barrier.is_none());
1610
1611 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1612 let input_ids = new_inputs.iter().map(|input| input.id());
1613 self.buffered_watermarks.values_mut().for_each(|buffers| {
1614 buffers.add_buffers(input_ids.clone());
1616 });
1617 self.active
1618 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1619 }
1620
1621 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1625 assert!(self.blocked.is_empty() && self.barrier.is_none());
1626
1627 let new_upstreams = std::mem::take(&mut self.active)
1628 .into_iter()
1629 .map(|s| s.into_inner().unwrap())
1630 .filter(|u| !upstream_input_ids.contains(&u.id()));
1631 self.extend_active(new_upstreams);
1632 self.buffered_watermarks.values_mut().for_each(|buffers| {
1633 buffers.remove_buffer(upstream_input_ids.clone());
1636 });
1637 }
1638
1639 pub fn merge_barrier_align_duration(
1640 &self,
1641 ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1642 self.merge_barrier_align_duration.clone()
1643 }
1644
1645 pub fn flush_buffered_watermarks(&mut self) {
1646 self.buffered_watermarks
1647 .values_mut()
1648 .for_each(|buffers| buffers.clear());
1649 }
1650
1651 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1652 self.blocked
1653 .iter()
1654 .map(|s| s.id())
1655 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1656 }
1657
1658 pub fn is_empty(&self) -> bool {
1659 self.blocked.is_empty() && self.active.is_empty()
1660 }
1661}
1662
1663pub(crate) struct DispatchBarrierBuffer {
1684 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1685 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1686 recv_state: BarrierReceiverState,
1687 curr_upstream_fragment_id: FragmentId,
1688 actor_id: ActorId,
1689 build_input_ctx: Arc<BuildInputContext>,
1691}
1692
1693struct BuildInputContext {
1694 pub actor_id: ActorId,
1695 pub local_barrier_manager: LocalBarrierManager,
1696 pub metrics: Arc<StreamingMetrics>,
1697 pub fragment_id: FragmentId,
1698 pub actor_config: Arc<StreamingConfig>,
1699}
1700
1701type BoxedNewInputsFuture =
1702 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1703
1704enum BarrierReceiverState {
1705 ReceivingBarrier,
1706 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1707}
1708
1709impl DispatchBarrierBuffer {
1710 pub fn new(
1711 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1712 actor_id: ActorId,
1713 curr_upstream_fragment_id: FragmentId,
1714 local_barrier_manager: LocalBarrierManager,
1715 metrics: Arc<StreamingMetrics>,
1716 fragment_id: FragmentId,
1717 actor_config: Arc<StreamingConfig>,
1718 ) -> Self {
1719 Self {
1720 buffer: VecDeque::new(),
1721 barrier_rx,
1722 recv_state: BarrierReceiverState::ReceivingBarrier,
1723 curr_upstream_fragment_id,
1724 actor_id,
1725 build_input_ctx: Arc::new(BuildInputContext {
1726 actor_id,
1727 local_barrier_manager,
1728 metrics,
1729 fragment_id,
1730 actor_config,
1731 }),
1732 }
1733 }
1734
1735 pub async fn await_next_message(
1736 &mut self,
1737 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1738 metrics: &ActorInputMetrics,
1739 ) -> StreamExecutorResult<DispatcherMessage> {
1740 let mut start_time = Instant::now();
1741 let interval_duration = Duration::from_secs(15);
1742 let mut interval =
1743 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1744
1745 loop {
1746 tokio::select! {
1747 biased;
1748 msg = stream.try_next() => {
1749 metrics
1750 .actor_input_buffer_blocking_duration_ns
1751 .inc_by(start_time.elapsed().as_nanos() as u64);
1752 return msg?.ok_or_else(
1753 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1754 );
1755 }
1756
1757 e = self.continuously_fetch_barrier_rx() => {
1758 return Err(e);
1759 }
1760
1761 _ = interval.tick() => {
1762 start_time = Instant::now();
1763 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1764 continue;
1765 }
1766 }
1767 }
1768 }
1769
1770 pub async fn pop_barrier_with_inputs(
1771 &mut self,
1772 barrier: DispatcherBarrier,
1773 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1774 while self.buffer.is_empty() {
1775 self.try_fetch_barrier_rx(false).await?;
1776 }
1777 let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1778 assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1779
1780 Ok((recv_barrier, inputs))
1781 }
1782
1783 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1784 loop {
1785 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1786 return e;
1787 }
1788 }
1789 }
1790
1791 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1792 match &mut self.recv_state {
1793 BarrierReceiverState::ReceivingBarrier => {
1794 let Some(barrier) = self.barrier_rx.recv().await else {
1795 if pending_on_end {
1796 return pending().await;
1797 } else {
1798 return Err(StreamExecutorError::channel_closed(
1799 "barrier channel closed unexpectedly",
1800 ));
1801 }
1802 };
1803 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1804 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1805 } else {
1806 self.buffer.push_back((barrier, None));
1807 }
1808 }
1809 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1810 let new_inputs = fut.await?;
1811 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1812 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1813 }
1814 }
1815 Ok(())
1816 }
1817
1818 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1819 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1820 && !update.added_upstream_actors.is_empty()
1821 {
1822 let upstream_fragment_id =
1824 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1825 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1826 new_upstream_fragment_id
1827 } else {
1828 self.curr_upstream_fragment_id
1829 };
1830 let ctx = self.build_input_ctx.clone();
1831 let added_upstream_actors = update.added_upstream_actors.clone();
1832 let barrier = barrier.clone();
1833 let fut = async move {
1834 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1835 let mut new_input = new_input(
1836 &ctx.local_barrier_manager,
1837 ctx.metrics.clone(),
1838 ctx.actor_id,
1839 ctx.fragment_id,
1840 upstream_actor,
1841 upstream_fragment_id,
1842 ctx.actor_config.clone(),
1843 )
1844 .await?;
1845
1846 let first_barrier = expect_first_barrier(&mut new_input).await?;
1849 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1850
1851 StreamExecutorResult::Ok(new_input)
1852 }))
1853 .await
1854 }
1855 .boxed();
1856
1857 Some(fut)
1858 } else {
1859 None
1860 }
1861 }
1862}