1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53 PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
54 PbWatermark, SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62 BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
63};
64use crate::executor::monitor::ActorInputMetrics;
65use crate::executor::prelude::StreamingMetrics;
66use crate::executor::watermark::BufferedWatermarks;
67use crate::task::{ActorId, FragmentId, LocalBarrierManager};
68
69mod actor;
70mod barrier_align;
71pub mod exchange;
72pub mod monitor;
73
74pub mod aggregate;
75pub mod asof_join;
76mod backfill;
77mod barrier_recv;
78mod batch_query;
79mod chain;
80mod changelog;
81mod dedup;
82mod dispatch;
83pub mod dml;
84mod dynamic_filter;
85pub mod eowc;
86pub mod error;
87mod expand;
88mod filter;
89mod gap_fill;
90pub mod hash_join;
91mod hop_window;
92mod join;
93pub mod locality_provider;
94mod lookup;
95mod lookup_union;
96mod merge;
97mod mview;
98mod nested_loop_temporal_join;
99mod no_op;
100mod now;
101mod over_window;
102pub mod project;
103mod rearranged_chain;
104mod receiver;
105pub mod row_id_gen;
106mod sink;
107pub mod source;
108mod stream_reader;
109pub mod subtask;
110mod temporal_join;
111mod top_n;
112mod troublemaker;
113mod union;
114mod upstream_sink_union;
115mod values;
116mod watermark;
117mod watermark_filter;
118mod wrapper;
119
120mod approx_percentile;
121
122mod row_merge;
123
124#[cfg(test)]
125mod integration_tests;
126mod sync_kv_log_store;
127#[cfg(any(test, feature = "test"))]
128pub mod test_utils;
129mod utils;
130mod vector;
131
132pub use actor::{Actor, ActorContext, ActorContextRef};
133use anyhow::Context;
134pub use approx_percentile::global::GlobalApproxPercentileExecutor;
135pub use approx_percentile::local::LocalApproxPercentileExecutor;
136pub use backfill::arrangement_backfill::*;
137pub use backfill::cdc::{
138 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
139};
140pub use backfill::no_shuffle_backfill::*;
141pub use backfill::snapshot_backfill::*;
142pub use barrier_recv::BarrierRecvExecutor;
143pub use batch_query::BatchQueryExecutor;
144pub use chain::ChainExecutor;
145pub use changelog::ChangeLogExecutor;
146pub use dedup::AppendOnlyDedupExecutor;
147pub use dispatch::DispatchExecutor;
148pub use dynamic_filter::DynamicFilterExecutor;
149pub use error::{StreamExecutorError, StreamExecutorResult};
150pub use expand::ExpandExecutor;
151pub use filter::{FilterExecutor, UpsertFilterExecutor};
152pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
153pub use hash_join::*;
154pub use hop_window::HopWindowExecutor;
155pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
156pub use join::{AsOfDesc, AsOfJoinType, JoinType};
157pub use lookup::*;
158pub use lookup_union::LookupUnionExecutor;
159pub use merge::MergeExecutor;
160pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
161pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
162pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
163pub use no_op::NoOpExecutor;
164pub use now::*;
165pub use over_window::*;
166pub use rearranged_chain::RearrangedChainExecutor;
167pub use receiver::ReceiverExecutor;
168use risingwave_common::id::SourceId;
169pub use row_merge::RowMergeExecutor;
170pub use sink::SinkExecutor;
171pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
172pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
173pub use temporal_join::TemporalJoinExecutor;
174pub use top_n::{
175 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
176};
177pub use troublemaker::TroublemakerExecutor;
178pub use union::UnionExecutor;
179pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
180pub use utils::DummyExecutor;
181pub use values::ValuesExecutor;
182pub use vector::*;
183pub use watermark_filter::WatermarkFilterExecutor;
184pub use wrapper::WrapperExecutor;
185
186use self::barrier_align::AlignedMessageStream;
187
188pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
189pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
190pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
191pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
192
193pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
194use risingwave_connector::sink::catalog::SinkId;
195use risingwave_connector::source::cdc::{
196 CdcTableSnapshotSplitAssignmentWithGeneration,
197 build_actor_cdc_table_snapshot_splits_with_generation,
198};
199use risingwave_pb::id::{ExecutorId, SubscriberId};
200use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
201
202pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
203pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
204pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
205
206#[derive(Debug, Default, Clone)]
208pub struct ExecutorInfo {
209 pub schema: Schema,
211
212 pub stream_key: StreamKey,
214
215 pub stream_kind: PbStreamKind,
217
218 pub identity: String,
220
221 pub id: ExecutorId,
223}
224
225impl ExecutorInfo {
226 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
227 Self {
228 schema,
229 stream_key,
230 stream_kind: PbStreamKind::Retract, identity,
232 id: id.into(),
233 }
234 }
235}
236
237pub trait Execute: Send + 'static {
239 fn execute(self: Box<Self>) -> BoxedMessageStream;
240
241 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
242 self.execute()
243 }
244
245 fn boxed(self) -> Box<dyn Execute>
246 where
247 Self: Sized + Send + 'static,
248 {
249 Box::new(self)
250 }
251}
252
253pub struct Executor {
256 info: ExecutorInfo,
257 execute: Box<dyn Execute>,
258}
259
260impl Executor {
261 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
262 Self { info, execute }
263 }
264
265 pub fn info(&self) -> &ExecutorInfo {
266 &self.info
267 }
268
269 pub fn schema(&self) -> &Schema {
270 &self.info.schema
271 }
272
273 pub fn stream_key(&self) -> StreamKeyRef<'_> {
274 &self.info.stream_key
275 }
276
277 pub fn stream_kind(&self) -> PbStreamKind {
278 self.info.stream_kind
279 }
280
281 pub fn identity(&self) -> &str {
282 &self.info.identity
283 }
284
285 pub fn execute(self) -> BoxedMessageStream {
286 self.execute.execute()
287 }
288
289 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
290 self.execute.execute_with_epoch(epoch)
291 }
292}
293
294impl std::fmt::Debug for Executor {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.write_str(self.identity())
297 }
298}
299
300impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
301 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
302 Self::new(info, execute)
303 }
304}
305
306impl<E> From<(ExecutorInfo, E)> for Executor
307where
308 E: Execute,
309{
310 fn from((info, execute): (ExecutorInfo, E)) -> Self {
311 Self::new(info, execute.boxed())
312 }
313}
314
315pub const INVALID_EPOCH: u64 = 0;
316
317type UpstreamFragmentId = FragmentId;
318type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
319
320#[derive(Debug, Clone)]
321#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
322pub struct UpdateMutation {
323 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
324 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
325 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
326 pub dropped_actors: HashSet<ActorId>,
327 pub actor_splits: SplitAssignments,
328 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
329 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
330 pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
331 pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
332}
333
334#[derive(Debug, Clone)]
335#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
336pub struct AddMutation {
337 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
338 pub added_actors: HashSet<ActorId>,
339 pub splits: SplitAssignments,
341 pub pause: bool,
342 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
344 pub backfill_nodes_to_pause: HashSet<FragmentId>,
346 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
347 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
348}
349
350#[derive(Debug, Clone)]
351#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
352pub struct StopMutation {
353 pub dropped_actors: HashSet<ActorId>,
354 pub dropped_sink_fragments: HashSet<FragmentId>,
355}
356
357#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
359#[derive(Debug, Clone)]
360pub enum Mutation {
361 Stop(StopMutation),
362 Update(UpdateMutation),
363 Add(AddMutation),
364 SourceChangeSplit(SplitAssignments),
365 Pause,
366 Resume,
367 Throttle(HashMap<FragmentId, Option<u32>>),
368 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
369 DropSubscriptions {
370 subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
372 },
373 StartFragmentBackfill {
374 fragment_ids: HashSet<FragmentId>,
375 },
376 RefreshStart {
377 table_id: TableId,
378 associated_source_id: SourceId,
379 },
380 ListFinish {
381 associated_source_id: SourceId,
382 },
383 LoadFinish {
384 associated_source_id: SourceId,
385 },
386 ResetSource {
387 source_id: SourceId,
388 },
389}
390
391#[derive(Debug, Clone)]
396pub struct BarrierInner<M> {
397 pub epoch: EpochPair,
398 pub mutation: M,
399 pub kind: BarrierKind,
400
401 pub tracing_context: TracingContext,
403}
404
405pub type BarrierMutationType = Option<Arc<Mutation>>;
406pub type Barrier = BarrierInner<BarrierMutationType>;
407pub type DispatcherBarrier = BarrierInner<()>;
408
409impl<M: Default> BarrierInner<M> {
410 pub fn new_test_barrier(epoch: u64) -> Self {
412 Self {
413 epoch: EpochPair::new_test_epoch(epoch),
414 kind: BarrierKind::Checkpoint,
415 tracing_context: TracingContext::none(),
416 mutation: Default::default(),
417 }
418 }
419
420 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
421 Self {
422 epoch: EpochPair::new(epoch, prev_epoch),
423 kind: BarrierKind::Checkpoint,
424 tracing_context: TracingContext::none(),
425 mutation: Default::default(),
426 }
427 }
428}
429
430impl Barrier {
431 pub fn into_dispatcher(self) -> DispatcherBarrier {
432 DispatcherBarrier {
433 epoch: self.epoch,
434 mutation: (),
435 kind: self.kind,
436 tracing_context: self.tracing_context,
437 }
438 }
439
440 #[must_use]
441 pub fn with_mutation(self, mutation: Mutation) -> Self {
442 Self {
443 mutation: Some(Arc::new(mutation)),
444 ..self
445 }
446 }
447
448 #[must_use]
449 pub fn with_stop(self) -> Self {
450 self.with_mutation(Mutation::Stop(StopMutation {
451 dropped_actors: Default::default(),
452 dropped_sink_fragments: Default::default(),
453 }))
454 }
455
456 pub fn is_with_stop_mutation(&self) -> bool {
458 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
459 }
460
461 pub fn is_stop(&self, actor_id: ActorId) -> bool {
463 self.all_stop_actors()
464 .is_some_and(|actors| actors.contains(&actor_id))
465 }
466
467 pub fn is_checkpoint(&self) -> bool {
468 self.kind == BarrierKind::Checkpoint
469 }
470
471 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
481 match self.mutation.as_deref()? {
482 Mutation::Update(UpdateMutation { actor_splits, .. })
483 | Mutation::Add(AddMutation {
484 splits: actor_splits,
485 ..
486 }) => actor_splits.get(&actor_id),
487
488 _ => {
489 if cfg!(debug_assertions) {
490 panic!(
491 "the initial mutation of the barrier should not be {:?}",
492 self.mutation
493 );
494 }
495 None
496 }
497 }
498 .map(|s| s.as_slice())
499 }
500
501 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
503 match self.mutation.as_deref() {
504 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
505 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
506 _ => None,
507 }
508 }
509
510 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
516 match self.mutation.as_deref() {
517 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
518 added_actors.contains(&actor_id)
519 }
520 _ => false,
521 }
522 }
523
524 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
525 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
526 fragment_ids.contains(&fragment_id)
527 } else {
528 false
529 }
530 }
531
532 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
550 let Some(mutation) = self.mutation.as_deref() else {
551 return false;
552 };
553 match mutation {
554 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
556 Mutation::Update(_)
557 | Mutation::Stop(_)
558 | Mutation::Pause
559 | Mutation::Resume
560 | Mutation::SourceChangeSplit(_)
561 | Mutation::Throttle(_)
562 | Mutation::DropSubscriptions { .. }
563 | Mutation::ConnectorPropsChange(_)
564 | Mutation::StartFragmentBackfill { .. }
565 | Mutation::RefreshStart { .. }
566 | Mutation::ListFinish { .. }
567 | Mutation::LoadFinish { .. }
568 | Mutation::ResetSource { .. } => false,
569 }
570 }
571
572 pub fn is_pause_on_startup(&self) -> bool {
574 match self.mutation.as_deref() {
575 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
576 _ => false,
577 }
578 }
579
580 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
581 match self.mutation.as_deref() {
582 Some(Mutation::Add(AddMutation {
583 backfill_nodes_to_pause,
584 ..
585 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
586 Some(Mutation::Update(_)) => false,
587 _ => {
588 tracing::warn!(
589 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
590 self
591 );
592 false
593 }
594 }
595 }
596
597 pub fn is_resume(&self) -> bool {
599 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
600 }
601
602 pub fn as_update_merge(
605 &self,
606 actor_id: ActorId,
607 upstream_fragment_id: UpstreamFragmentId,
608 ) -> Option<&MergeUpdate> {
609 self.mutation
610 .as_deref()
611 .and_then(|mutation| match mutation {
612 Mutation::Update(UpdateMutation { merges, .. }) => {
613 merges.get(&(actor_id, upstream_fragment_id))
614 }
615 _ => None,
616 })
617 }
618
619 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
622 self.mutation
623 .as_deref()
624 .and_then(|mutation| match mutation {
625 Mutation::Add(AddMutation {
626 new_upstream_sinks, ..
627 }) => new_upstream_sinks.get(&fragment_id),
628 _ => None,
629 })
630 }
631
632 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
634 self.mutation
635 .as_deref()
636 .and_then(|mutation| match mutation {
637 Mutation::Stop(StopMutation {
638 dropped_sink_fragments,
639 ..
640 }) => Some(dropped_sink_fragments),
641 _ => None,
642 })
643 }
644
645 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
651 self.mutation
652 .as_deref()
653 .and_then(|mutation| match mutation {
654 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
655 vnode_bitmaps.get(&actor_id).cloned()
656 }
657 _ => None,
658 })
659 }
660
661 pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
662 self.mutation
663 .as_deref()
664 .and_then(|mutation| match mutation {
665 Mutation::Update(UpdateMutation {
666 sink_schema_change, ..
667 }) => sink_schema_change.get(&sink_id).cloned(),
668 _ => None,
669 })
670 }
671
672 pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
673 match self.mutation.as_deref() {
674 Some(Mutation::DropSubscriptions {
675 subscriptions_to_drop,
676 })
677 | Some(Mutation::Update(UpdateMutation {
678 subscriptions_to_drop,
679 ..
680 })) => Some(subscriptions_to_drop.as_slice()),
681 _ => None,
682 }
683 }
684
685 pub fn get_curr_epoch(&self) -> Epoch {
686 Epoch(self.epoch.curr)
687 }
688
689 pub fn tracing_context(&self) -> &TracingContext {
691 &self.tracing_context
692 }
693
694 pub fn added_subscriber_on_mv_table(
695 &self,
696 mv_table_id: TableId,
697 ) -> impl Iterator<Item = SubscriberId> + '_ {
698 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
699 Some(add)
700 } else {
701 None
702 }
703 .into_iter()
704 .flat_map(move |add| {
705 add.subscriptions_to_add.iter().filter_map(
706 move |(upstream_mv_table_id, subscriber_id)| {
707 if *upstream_mv_table_id == mv_table_id {
708 Some(*subscriber_id)
709 } else {
710 None
711 }
712 },
713 )
714 })
715 }
716}
717
718impl<M: PartialEq> PartialEq for BarrierInner<M> {
719 fn eq(&self, other: &Self) -> bool {
720 self.epoch == other.epoch && self.mutation == other.mutation
721 }
722}
723
724impl Mutation {
725 #[cfg(test)]
729 pub fn is_stop(&self) -> bool {
730 matches!(self, Mutation::Stop(_))
731 }
732
733 #[cfg(test)]
734 fn to_protobuf(&self) -> PbMutation {
735 use risingwave_pb::source::{
736 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
737 };
738 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
739 use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
740 use risingwave_pb::stream_plan::{
741 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
742 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
743 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
744 PbThrottleMutation, PbUpdateMutation,
745 };
746 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
747 actor_splits
748 .iter()
749 .map(|(&actor_id, splits)| {
750 (
751 actor_id,
752 ConnectorSplits {
753 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
754 },
755 )
756 })
757 .collect::<HashMap<_, _>>()
758 };
759
760 match self {
761 Mutation::Stop(StopMutation {
762 dropped_actors,
763 dropped_sink_fragments,
764 }) => PbMutation::Stop(PbStopMutation {
765 actors: dropped_actors.iter().copied().collect(),
766 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
767 }),
768 Mutation::Update(UpdateMutation {
769 dispatchers,
770 merges,
771 vnode_bitmaps,
772 dropped_actors,
773 actor_splits,
774 actor_new_dispatchers,
775 actor_cdc_table_snapshot_splits,
776 sink_schema_change,
777 subscriptions_to_drop,
778 }) => PbMutation::Update(PbUpdateMutation {
779 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
780 merge_update: merges.values().cloned().collect(),
781 actor_vnode_bitmap_update: vnode_bitmaps
782 .iter()
783 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
784 .collect(),
785 dropped_actors: dropped_actors.iter().copied().collect(),
786 actor_splits: actor_splits_to_protobuf(actor_splits),
787 actor_new_dispatchers: actor_new_dispatchers
788 .iter()
789 .map(|(&actor_id, dispatchers)| {
790 (
791 actor_id,
792 PbDispatchers {
793 dispatchers: dispatchers.clone(),
794 },
795 )
796 })
797 .collect(),
798 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
799 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
800 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
801 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
802 generation: *generation,
803 })
804 }).collect()
805 }),
806 sink_schema_change: sink_schema_change
807 .iter()
808 .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
809 .collect(),
810 subscriptions_to_drop: subscriptions_to_drop.clone(),
811 }),
812 Mutation::Add(AddMutation {
813 adds,
814 added_actors,
815 splits,
816 pause,
817 subscriptions_to_add,
818 backfill_nodes_to_pause,
819 actor_cdc_table_snapshot_splits,
820 new_upstream_sinks,
821 }) => PbMutation::Add(PbAddMutation {
822 actor_dispatchers: adds
823 .iter()
824 .map(|(&actor_id, dispatchers)| {
825 (
826 actor_id,
827 PbDispatchers {
828 dispatchers: dispatchers.clone(),
829 },
830 )
831 })
832 .collect(),
833 added_actors: added_actors.iter().copied().collect(),
834 actor_splits: actor_splits_to_protobuf(splits),
835 pause: *pause,
836 subscriptions_to_add: subscriptions_to_add
837 .iter()
838 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
839 subscriber_id: *subscriber_id,
840 upstream_mv_table_id: *table_id,
841 })
842 .collect(),
843 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
844 actor_cdc_table_snapshot_splits:
845 Some(PbCdcTableSnapshotSplitsWithGeneration {
846 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
847 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
848 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
849 generation: *generation,
850 })
851 }).collect()
852 }),
853 new_upstream_sinks: new_upstream_sinks
854 .iter()
855 .map(|(k, v)| (*k, v.clone()))
856 .collect(),
857 }),
858 Mutation::SourceChangeSplit(changes) => {
859 PbMutation::Splits(PbSourceChangeSplitMutation {
860 actor_splits: changes
861 .iter()
862 .map(|(&actor_id, splits)| {
863 (
864 actor_id,
865 ConnectorSplits {
866 splits: splits
867 .clone()
868 .iter()
869 .map(ConnectorSplit::from)
870 .collect(),
871 },
872 )
873 })
874 .collect(),
875 })
876 }
877 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
878 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
879 Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
880 fragment_throttle: changes
881 .iter()
882 .map(|(fragment_id, limit)| (*fragment_id, RateLimit { rate_limit: *limit }))
883 .collect(),
884 }),
885 Mutation::DropSubscriptions {
886 subscriptions_to_drop,
887 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
888 info: subscriptions_to_drop.clone(),
889 }),
890 Mutation::ConnectorPropsChange(map) => {
891 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
892 connector_props_infos: map
893 .iter()
894 .map(|(actor_id, options)| {
895 (
896 *actor_id,
897 ConnectorPropsInfo {
898 connector_props_info: options
899 .iter()
900 .map(|(k, v)| (k.clone(), v.clone()))
901 .collect(),
902 },
903 )
904 })
905 .collect(),
906 })
907 }
908 Mutation::StartFragmentBackfill { fragment_ids } => {
909 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
910 fragment_ids: fragment_ids.iter().copied().collect(),
911 })
912 }
913 Mutation::RefreshStart {
914 table_id,
915 associated_source_id,
916 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
917 table_id: *table_id,
918 associated_source_id: *associated_source_id,
919 }),
920 Mutation::ListFinish {
921 associated_source_id,
922 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
923 associated_source_id: *associated_source_id,
924 }),
925 Mutation::LoadFinish {
926 associated_source_id,
927 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
928 associated_source_id: *associated_source_id,
929 }),
930 Mutation::ResetSource { source_id } => {
931 PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
932 source_id: source_id.as_raw_id(),
933 })
934 }
935 }
936 }
937
938 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
939 let mutation = match prost {
940 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
941 dropped_actors: stop.actors.iter().copied().collect(),
942 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
943 }),
944
945 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
946 dispatchers: update
947 .dispatcher_update
948 .iter()
949 .map(|u| (u.actor_id, u.clone()))
950 .into_group_map(),
951 merges: update
952 .merge_update
953 .iter()
954 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
955 .collect(),
956 vnode_bitmaps: update
957 .actor_vnode_bitmap_update
958 .iter()
959 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
960 .collect(),
961 dropped_actors: update.dropped_actors.iter().copied().collect(),
962 actor_splits: update
963 .actor_splits
964 .iter()
965 .map(|(&actor_id, splits)| {
966 (
967 actor_id,
968 splits
969 .splits
970 .iter()
971 .map(|split| split.try_into().unwrap())
972 .collect(),
973 )
974 })
975 .collect(),
976 actor_new_dispatchers: update
977 .actor_new_dispatchers
978 .iter()
979 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
980 .collect(),
981 actor_cdc_table_snapshot_splits:
982 build_actor_cdc_table_snapshot_splits_with_generation(
983 update
984 .actor_cdc_table_snapshot_splits
985 .clone()
986 .unwrap_or_default(),
987 ),
988 sink_schema_change: update
989 .sink_schema_change
990 .iter()
991 .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
992 .collect(),
993 subscriptions_to_drop: update.subscriptions_to_drop.clone(),
994 }),
995
996 PbMutation::Add(add) => Mutation::Add(AddMutation {
997 adds: add
998 .actor_dispatchers
999 .iter()
1000 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1001 .collect(),
1002 added_actors: add.added_actors.iter().copied().collect(),
1003 splits: add
1006 .actor_splits
1007 .iter()
1008 .map(|(&actor_id, splits)| {
1009 (
1010 actor_id,
1011 splits
1012 .splits
1013 .iter()
1014 .map(|split| split.try_into().unwrap())
1015 .collect(),
1016 )
1017 })
1018 .collect(),
1019 pause: add.pause,
1020 subscriptions_to_add: add
1021 .subscriptions_to_add
1022 .iter()
1023 .map(
1024 |SubscriptionUpstreamInfo {
1025 subscriber_id,
1026 upstream_mv_table_id,
1027 }| { (*upstream_mv_table_id, *subscriber_id) },
1028 )
1029 .collect(),
1030 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1031 actor_cdc_table_snapshot_splits:
1032 build_actor_cdc_table_snapshot_splits_with_generation(
1033 add.actor_cdc_table_snapshot_splits
1034 .clone()
1035 .unwrap_or_default(),
1036 ),
1037 new_upstream_sinks: add
1038 .new_upstream_sinks
1039 .iter()
1040 .map(|(k, v)| (*k, v.clone()))
1041 .collect(),
1042 }),
1043
1044 PbMutation::Splits(s) => {
1045 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1046 Vec::with_capacity(s.actor_splits.len());
1047 for (&actor_id, splits) in &s.actor_splits {
1048 if !splits.splits.is_empty() {
1049 change_splits.push((
1050 actor_id,
1051 splits
1052 .splits
1053 .iter()
1054 .map(SplitImpl::try_from)
1055 .try_collect()?,
1056 ));
1057 }
1058 }
1059 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1060 }
1061 PbMutation::Pause(_) => Mutation::Pause,
1062 PbMutation::Resume(_) => Mutation::Resume,
1063 PbMutation::Throttle(changes) => Mutation::Throttle(
1064 changes
1065 .fragment_throttle
1066 .iter()
1067 .map(|(fragment_id, limit)| (*fragment_id, limit.rate_limit))
1068 .collect(),
1069 ),
1070 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1071 subscriptions_to_drop: drop.info.clone(),
1072 },
1073 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1074 Mutation::ConnectorPropsChange(
1075 alter_connector_props
1076 .connector_props_infos
1077 .iter()
1078 .map(|(connector_id, options)| {
1079 (
1080 *connector_id,
1081 options
1082 .connector_props_info
1083 .iter()
1084 .map(|(k, v)| (k.clone(), v.clone()))
1085 .collect(),
1086 )
1087 })
1088 .collect(),
1089 )
1090 }
1091 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1092 Mutation::StartFragmentBackfill {
1093 fragment_ids: start_fragment_backfill
1094 .fragment_ids
1095 .iter()
1096 .copied()
1097 .collect(),
1098 }
1099 }
1100 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1101 table_id: refresh_start.table_id,
1102 associated_source_id: refresh_start.associated_source_id,
1103 },
1104 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1105 associated_source_id: list_finish.associated_source_id,
1106 },
1107 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1108 associated_source_id: load_finish.associated_source_id,
1109 },
1110 PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1111 source_id: SourceId::from(reset_source.source_id),
1112 },
1113 };
1114 Ok(mutation)
1115 }
1116}
1117
1118impl<M> BarrierInner<M> {
1119 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1120 let Self {
1121 epoch,
1122 mutation,
1123 kind,
1124 tracing_context,
1125 ..
1126 } = self;
1127
1128 PbBarrier {
1129 epoch: Some(PbEpoch {
1130 curr: epoch.curr,
1131 prev: epoch.prev,
1132 }),
1133 mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1134 mutation: Some(mutation),
1135 }),
1136 tracing_context: tracing_context.to_protobuf(),
1137 kind: *kind as _,
1138 }
1139 }
1140
1141 fn from_protobuf_inner(
1142 prost: &PbBarrier,
1143 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1144 ) -> StreamExecutorResult<Self> {
1145 let epoch = prost.get_epoch()?;
1146
1147 Ok(Self {
1148 kind: prost.kind(),
1149 epoch: EpochPair::new(epoch.curr, epoch.prev),
1150 mutation: mutation_from_pb(
1151 (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1152 )?,
1153 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1154 })
1155 }
1156
1157 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1158 BarrierInner {
1159 epoch: self.epoch,
1160 mutation: f(self.mutation),
1161 kind: self.kind,
1162 tracing_context: self.tracing_context,
1163 }
1164 }
1165}
1166
1167impl DispatcherBarrier {
1168 pub fn to_protobuf(&self) -> PbBarrier {
1169 self.to_protobuf_inner(|_| None)
1170 }
1171}
1172
1173impl Barrier {
1174 #[cfg(test)]
1175 pub fn to_protobuf(&self) -> PbBarrier {
1176 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1177 }
1178
1179 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1180 Self::from_protobuf_inner(prost, |mutation| {
1181 mutation
1182 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1183 .transpose()
1184 })
1185 }
1186}
1187
1188#[derive(Debug, PartialEq, Eq, Clone)]
1189pub struct Watermark {
1190 pub col_idx: usize,
1191 pub data_type: DataType,
1192 pub val: ScalarImpl,
1193}
1194
1195impl PartialOrd for Watermark {
1196 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1197 Some(self.cmp(other))
1198 }
1199}
1200
1201impl Ord for Watermark {
1202 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1203 self.val.default_cmp(&other.val)
1204 }
1205}
1206
1207impl Watermark {
1208 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1209 Self {
1210 col_idx,
1211 data_type,
1212 val,
1213 }
1214 }
1215
1216 pub async fn transform_with_expr(
1217 self,
1218 expr: &NonStrictExpression<impl Expression>,
1219 new_col_idx: usize,
1220 ) -> Option<Self> {
1221 let Self { col_idx, val, .. } = self;
1222 let row = {
1223 let mut row = vec![None; col_idx + 1];
1224 row[col_idx] = Some(val);
1225 OwnedRow::new(row)
1226 };
1227 let val = expr.eval_row_infallible(&row).await?;
1228 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1229 }
1230
1231 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1234 output_indices
1235 .iter()
1236 .position(|p| *p == self.col_idx)
1237 .map(|new_col_idx| self.with_idx(new_col_idx))
1238 }
1239
1240 pub fn to_protobuf(&self) -> PbWatermark {
1241 PbWatermark {
1242 column: Some(PbInputRef {
1243 index: self.col_idx as _,
1244 r#type: Some(self.data_type.to_protobuf()),
1245 }),
1246 val: Some(&self.val).to_protobuf().into(),
1247 }
1248 }
1249
1250 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1251 let col_ref = prost.get_column()?;
1252 let data_type = DataType::from(col_ref.get_type()?);
1253 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1254 .expect("watermark value cannot be null");
1255 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1256 }
1257
1258 pub fn with_idx(self, idx: usize) -> Self {
1259 Self::new(idx, self.data_type, self.val)
1260 }
1261}
1262
1263#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1264#[derive(Debug, EnumAsInner, Clone)]
1265pub enum MessageInner<M> {
1266 Chunk(StreamChunk),
1267 Barrier(BarrierInner<M>),
1268 Watermark(Watermark),
1269}
1270
1271impl<M> MessageInner<M> {
1272 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1273 match self {
1274 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1275 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1276 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1277 }
1278 }
1279}
1280
1281pub type Message = MessageInner<BarrierMutationType>;
1282pub type DispatcherMessage = MessageInner<()>;
1283
1284#[derive(Debug, EnumAsInner, Clone)]
1287pub enum MessageBatchInner<M> {
1288 Chunk(StreamChunk),
1289 BarrierBatch(Vec<BarrierInner<M>>),
1290 Watermark(Watermark),
1291}
1292pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1293pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1294pub type DispatcherMessageBatch = MessageBatchInner<()>;
1295
1296impl From<DispatcherMessage> for DispatcherMessageBatch {
1297 fn from(m: DispatcherMessage) -> Self {
1298 match m {
1299 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1300 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1301 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1302 }
1303 }
1304}
1305
1306impl From<StreamChunk> for Message {
1307 fn from(chunk: StreamChunk) -> Self {
1308 Message::Chunk(chunk)
1309 }
1310}
1311
1312impl<'a> TryFrom<&'a Message> for &'a Barrier {
1313 type Error = ();
1314
1315 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1316 match m {
1317 Message::Chunk(_) => Err(()),
1318 Message::Barrier(b) => Ok(b),
1319 Message::Watermark(_) => Err(()),
1320 }
1321 }
1322}
1323
1324impl Message {
1325 #[cfg(test)]
1330 pub fn is_stop(&self) -> bool {
1331 matches!(
1332 self,
1333 Message::Barrier(Barrier {
1334 mutation,
1335 ..
1336 }) if mutation.as_ref().unwrap().is_stop()
1337 )
1338 }
1339}
1340
1341impl DispatcherMessageBatch {
1342 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1343 let prost = match self {
1344 Self::Chunk(stream_chunk) => {
1345 let prost_stream_chunk = stream_chunk.to_protobuf();
1346 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1347 }
1348 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1349 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1350 }),
1351 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1352 };
1353 PbStreamMessageBatch {
1354 stream_message_batch: Some(prost),
1355 }
1356 }
1357
1358 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1359 let res = match prost.get_stream_message_batch()? {
1360 StreamMessageBatch::StreamChunk(chunk) => {
1361 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1362 }
1363 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1364 let barriers = barrier_batch
1365 .barriers
1366 .iter()
1367 .map(|barrier| {
1368 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1369 if mutation.is_some() {
1370 if cfg!(debug_assertions) {
1371 panic!("should not receive message of barrier with mutation");
1372 } else {
1373 warn!(?barrier, "receive message of barrier with mutation");
1374 }
1375 }
1376 Ok(())
1377 })
1378 })
1379 .try_collect()?;
1380 Self::BarrierBatch(barriers)
1381 }
1382 StreamMessageBatch::Watermark(watermark) => {
1383 Self::Watermark(Watermark::from_protobuf(watermark)?)
1384 }
1385 };
1386 Ok(res)
1387 }
1388
1389 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1390 ::prost::Message::encoded_len(msg)
1391 }
1392}
1393
1394pub type StreamKey = Vec<usize>;
1395pub type StreamKeyRef<'a> = &'a [usize];
1396pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1397
1398pub async fn expect_first_barrier<M: Debug>(
1400 stream: &mut (impl MessageStreamInner<M> + Unpin),
1401) -> StreamExecutorResult<BarrierInner<M>> {
1402 let message = stream
1403 .next()
1404 .instrument_await("expect_first_barrier")
1405 .await
1406 .context("failed to extract the first message: stream closed unexpectedly")??;
1407 let barrier = message
1408 .into_barrier()
1409 .expect("the first message must be a barrier");
1410 assert!(matches!(
1412 barrier.kind,
1413 BarrierKind::Checkpoint | BarrierKind::Initial
1414 ));
1415 Ok(barrier)
1416}
1417
1418pub async fn expect_first_barrier_from_aligned_stream(
1420 stream: &mut (impl AlignedMessageStream + Unpin),
1421) -> StreamExecutorResult<Barrier> {
1422 let message = stream
1423 .next()
1424 .instrument_await("expect_first_barrier")
1425 .await
1426 .context("failed to extract the first message: stream closed unexpectedly")??;
1427 let barrier = message
1428 .into_barrier()
1429 .expect("the first message must be a barrier");
1430 Ok(barrier)
1431}
1432
1433pub trait StreamConsumer: Send + 'static {
1435 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1436
1437 fn execute(self: Box<Self>) -> Self::BarrierStream;
1438}
1439
1440type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1441
1442pub struct DynamicReceivers<InputId, M> {
1446 barrier: Option<BarrierInner<M>>,
1448 start_ts: Option<Instant>,
1450 blocked: Vec<BoxedMessageInput<InputId, M>>,
1452 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1454 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1456 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1458 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1460}
1461
1462impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1463 for DynamicReceivers<InputId, M>
1464{
1465 type Item = MessageStreamItemInner<M>;
1466
1467 fn poll_next(
1468 mut self: Pin<&mut Self>,
1469 cx: &mut std::task::Context<'_>,
1470 ) -> Poll<Option<Self::Item>> {
1471 if self.is_empty() {
1472 return Poll::Ready(None);
1473 }
1474
1475 loop {
1476 match futures::ready!(self.active.poll_next_unpin(cx)) {
1477 Some((Some(Err(e)), _)) => {
1479 return Poll::Ready(Some(Err(e)));
1480 }
1481 Some((Some(Ok(message)), remaining)) => {
1483 let input_id = remaining.id();
1484 match message {
1485 MessageInner::Chunk(chunk) => {
1486 self.active.push(remaining.into_future());
1488 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1489 }
1490 MessageInner::Watermark(watermark) => {
1491 self.active.push(remaining.into_future());
1493 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1494 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1495 }
1496 }
1497 MessageInner::Barrier(barrier) => {
1498 if self.blocked.is_empty() {
1500 self.start_ts = Some(Instant::now());
1501 }
1502 self.blocked.push(remaining);
1503 if let Some(current_barrier) = self.barrier.as_ref() {
1504 if current_barrier.epoch != barrier.epoch {
1505 return Poll::Ready(Some(Err(
1506 StreamExecutorError::align_barrier(
1507 current_barrier.clone().map_mutation(|_| None),
1508 barrier.map_mutation(|_| None),
1509 ),
1510 )));
1511 }
1512 } else {
1513 self.barrier = Some(barrier);
1514 }
1515 }
1516 }
1517 }
1518 Some((None, remaining)) => {
1526 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1527 "upstream input {:?} unexpectedly closed",
1528 remaining.id()
1529 )))));
1530 }
1531 None => {
1533 assert!(!self.blocked.is_empty());
1534
1535 let start_ts = self
1536 .start_ts
1537 .take()
1538 .expect("should have received at least one barrier");
1539 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1540 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1541 }
1542 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1543 merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1544 }
1545
1546 break;
1547 }
1548 }
1549 }
1550
1551 assert!(self.active.is_terminated());
1552
1553 let barrier = self.barrier.take().unwrap();
1554
1555 let upstreams = std::mem::take(&mut self.blocked);
1556 self.extend_active(upstreams);
1557 assert!(!self.active.is_terminated());
1558
1559 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1560 }
1561}
1562
1563impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1564 pub fn new(
1565 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1566 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1567 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1568 ) -> Self {
1569 let mut this = Self {
1570 barrier: None,
1571 start_ts: None,
1572 blocked: Vec::with_capacity(upstreams.len()),
1573 active: Default::default(),
1574 buffered_watermarks: Default::default(),
1575 merge_barrier_align_duration,
1576 barrier_align_duration,
1577 };
1578 this.extend_active(upstreams);
1579 this
1580 }
1581
1582 pub fn extend_active(
1585 &mut self,
1586 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1587 ) {
1588 assert!(self.blocked.is_empty() && self.barrier.is_none());
1589
1590 self.active
1591 .extend(upstreams.into_iter().map(|s| s.into_future()));
1592 }
1593
1594 pub fn handle_watermark(
1596 &mut self,
1597 input_id: InputId,
1598 watermark: Watermark,
1599 ) -> Option<Watermark> {
1600 let col_idx = watermark.col_idx;
1601 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1603 let watermarks = self
1604 .buffered_watermarks
1605 .entry(col_idx)
1606 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1607 watermarks.handle_watermark(input_id, watermark)
1608 }
1609
1610 pub fn add_upstreams_from(
1613 &mut self,
1614 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1615 ) {
1616 assert!(self.blocked.is_empty() && self.barrier.is_none());
1617
1618 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1619 let input_ids = new_inputs.iter().map(|input| input.id());
1620 self.buffered_watermarks.values_mut().for_each(|buffers| {
1621 buffers.add_buffers(input_ids.clone());
1623 });
1624 self.active
1625 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1626 }
1627
1628 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1632 assert!(self.blocked.is_empty() && self.barrier.is_none());
1633
1634 let new_upstreams = std::mem::take(&mut self.active)
1635 .into_iter()
1636 .map(|s| s.into_inner().unwrap())
1637 .filter(|u| !upstream_input_ids.contains(&u.id()));
1638 self.extend_active(new_upstreams);
1639 self.buffered_watermarks.values_mut().for_each(|buffers| {
1640 buffers.remove_buffer(upstream_input_ids.clone());
1643 });
1644 }
1645
1646 pub fn merge_barrier_align_duration(
1647 &self,
1648 ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1649 self.merge_barrier_align_duration.clone()
1650 }
1651
1652 pub fn flush_buffered_watermarks(&mut self) {
1653 self.buffered_watermarks
1654 .values_mut()
1655 .for_each(|buffers| buffers.clear());
1656 }
1657
1658 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1659 self.blocked
1660 .iter()
1661 .map(|s| s.id())
1662 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1663 }
1664
1665 pub fn is_empty(&self) -> bool {
1666 self.blocked.is_empty() && self.active.is_empty()
1667 }
1668}
1669
1670pub(crate) struct DispatchBarrierBuffer {
1691 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1692 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1693 recv_state: BarrierReceiverState,
1694 curr_upstream_fragment_id: FragmentId,
1695 actor_id: ActorId,
1696 build_input_ctx: Arc<BuildInputContext>,
1698}
1699
1700struct BuildInputContext {
1701 pub actor_id: ActorId,
1702 pub local_barrier_manager: LocalBarrierManager,
1703 pub metrics: Arc<StreamingMetrics>,
1704 pub fragment_id: FragmentId,
1705 pub actor_config: Arc<StreamingConfig>,
1706}
1707
1708type BoxedNewInputsFuture =
1709 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1710
1711enum BarrierReceiverState {
1712 ReceivingBarrier,
1713 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1714}
1715
1716impl DispatchBarrierBuffer {
1717 pub fn new(
1718 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1719 actor_id: ActorId,
1720 curr_upstream_fragment_id: FragmentId,
1721 local_barrier_manager: LocalBarrierManager,
1722 metrics: Arc<StreamingMetrics>,
1723 fragment_id: FragmentId,
1724 actor_config: Arc<StreamingConfig>,
1725 ) -> Self {
1726 Self {
1727 buffer: VecDeque::new(),
1728 barrier_rx,
1729 recv_state: BarrierReceiverState::ReceivingBarrier,
1730 curr_upstream_fragment_id,
1731 actor_id,
1732 build_input_ctx: Arc::new(BuildInputContext {
1733 actor_id,
1734 local_barrier_manager,
1735 metrics,
1736 fragment_id,
1737 actor_config,
1738 }),
1739 }
1740 }
1741
1742 pub async fn await_next_message(
1743 &mut self,
1744 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1745 metrics: &ActorInputMetrics,
1746 ) -> StreamExecutorResult<DispatcherMessage> {
1747 let mut start_time = Instant::now();
1748 let interval_duration = Duration::from_secs(15);
1749 let mut interval =
1750 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1751
1752 loop {
1753 tokio::select! {
1754 biased;
1755 msg = stream.try_next() => {
1756 metrics
1757 .actor_input_buffer_blocking_duration_ns
1758 .inc_by(start_time.elapsed().as_nanos() as u64);
1759 return msg?.ok_or_else(
1760 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1761 );
1762 }
1763
1764 e = self.continuously_fetch_barrier_rx() => {
1765 return Err(e);
1766 }
1767
1768 _ = interval.tick() => {
1769 start_time = Instant::now();
1770 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1771 continue;
1772 }
1773 }
1774 }
1775 }
1776
1777 pub async fn pop_barrier_with_inputs(
1778 &mut self,
1779 barrier: DispatcherBarrier,
1780 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1781 while self.buffer.is_empty() {
1782 self.try_fetch_barrier_rx(false).await?;
1783 }
1784 let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1785 assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1786
1787 Ok((recv_barrier, inputs))
1788 }
1789
1790 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1791 loop {
1792 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1793 return e;
1794 }
1795 }
1796 }
1797
1798 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1799 match &mut self.recv_state {
1800 BarrierReceiverState::ReceivingBarrier => {
1801 let Some(barrier) = self.barrier_rx.recv().await else {
1802 if pending_on_end {
1803 return pending().await;
1804 } else {
1805 return Err(StreamExecutorError::channel_closed(
1806 "barrier channel closed unexpectedly",
1807 ));
1808 }
1809 };
1810 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1811 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1812 } else {
1813 self.buffer.push_back((barrier, None));
1814 }
1815 }
1816 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1817 let new_inputs = fut.await?;
1818 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1819 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1820 }
1821 }
1822 Ok(())
1823 }
1824
1825 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1826 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1827 && !update.added_upstream_actors.is_empty()
1828 {
1829 let upstream_fragment_id =
1831 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1832 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1833 new_upstream_fragment_id
1834 } else {
1835 self.curr_upstream_fragment_id
1836 };
1837 let ctx = self.build_input_ctx.clone();
1838 let added_upstream_actors = update.added_upstream_actors.clone();
1839 let barrier = barrier.clone();
1840 let fut = async move {
1841 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1842 let mut new_input = new_input(
1843 &ctx.local_barrier_manager,
1844 ctx.metrics.clone(),
1845 ctx.actor_id,
1846 ctx.fragment_id,
1847 upstream_actor,
1848 upstream_fragment_id,
1849 ctx.actor_config.clone(),
1850 )
1851 .await?;
1852
1853 let first_barrier = expect_first_barrier(&mut new_input).await?;
1856 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1857
1858 StreamExecutorResult::Ok(new_input)
1859 }))
1860 .await
1861 }
1862 .boxed();
1863
1864 Some(fut)
1865 } else {
1866 None
1867 }
1868 }
1869}