1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54 PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
55 PbWatermark, SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63 BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::DispatchExecutor;
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector::*;
184pub use watermark_filter::{UpsertWatermarkFilterExecutor, WatermarkFilterExecutor};
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197 CdcTableSnapshotSplitAssignmentWithGeneration,
198 build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::id::{ExecutorId, SubscriberId};
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202
203pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
204pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
205pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
206
207#[derive(Debug, Default, Clone)]
209pub struct ExecutorInfo {
210 pub schema: Schema,
212
213 pub stream_key: StreamKey,
215
216 pub stream_kind: PbStreamKind,
218
219 pub identity: String,
221
222 pub id: ExecutorId,
224}
225
226impl ExecutorInfo {
227 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
228 Self {
229 schema,
230 stream_key,
231 stream_kind: PbStreamKind::Retract, identity,
233 id: id.into(),
234 }
235 }
236}
237
238pub trait Execute: Send + 'static {
240 fn execute(self: Box<Self>) -> BoxedMessageStream;
241
242 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
243 self.execute()
244 }
245
246 fn boxed(self) -> Box<dyn Execute>
247 where
248 Self: Sized + Send + 'static,
249 {
250 Box::new(self)
251 }
252}
253
254pub struct Executor {
257 info: ExecutorInfo,
258 execute: Box<dyn Execute>,
259}
260
261impl Executor {
262 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
263 Self { info, execute }
264 }
265
266 pub fn info(&self) -> &ExecutorInfo {
267 &self.info
268 }
269
270 pub fn schema(&self) -> &Schema {
271 &self.info.schema
272 }
273
274 pub fn stream_key(&self) -> StreamKeyRef<'_> {
275 &self.info.stream_key
276 }
277
278 pub fn stream_kind(&self) -> PbStreamKind {
279 self.info.stream_kind
280 }
281
282 pub fn identity(&self) -> &str {
283 &self.info.identity
284 }
285
286 pub fn execute(self) -> BoxedMessageStream {
287 self.execute.execute()
288 }
289
290 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
291 self.execute.execute_with_epoch(epoch)
292 }
293}
294
295impl std::fmt::Debug for Executor {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.write_str(self.identity())
298 }
299}
300
301impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
302 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
303 Self::new(info, execute)
304 }
305}
306
307impl<E> From<(ExecutorInfo, E)> for Executor
308where
309 E: Execute,
310{
311 fn from((info, execute): (ExecutorInfo, E)) -> Self {
312 Self::new(info, execute.boxed())
313 }
314}
315
316pub const INVALID_EPOCH: u64 = 0;
317
318type UpstreamFragmentId = FragmentId;
319type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
320
321#[derive(Debug, Clone)]
322#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
323pub struct UpdateMutation {
324 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
325 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
326 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
327 pub dropped_actors: HashSet<ActorId>,
328 pub actor_splits: SplitAssignments,
329 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
330 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
331 pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
332 pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
333}
334
335#[derive(Debug, Clone)]
336#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
337pub struct AddMutation {
338 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
339 pub added_actors: HashSet<ActorId>,
340 pub splits: SplitAssignments,
342 pub pause: bool,
343 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
345 pub backfill_nodes_to_pause: HashSet<FragmentId>,
347 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
348 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
349}
350
351#[derive(Debug, Clone)]
352#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
353pub struct StopMutation {
354 pub dropped_actors: HashSet<ActorId>,
355 pub dropped_sink_fragments: HashSet<FragmentId>,
356}
357
358#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
360#[derive(Debug, Clone)]
361pub enum Mutation {
362 Stop(StopMutation),
363 Update(UpdateMutation),
364 Add(AddMutation),
365 SourceChangeSplit(SplitAssignments),
366 Pause,
367 Resume,
368 Throttle(HashMap<FragmentId, ThrottleConfig>),
369 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
370 DropSubscriptions {
371 subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
373 },
374 StartFragmentBackfill {
375 fragment_ids: HashSet<FragmentId>,
376 },
377 RefreshStart {
378 table_id: TableId,
379 associated_source_id: SourceId,
380 },
381 ListFinish {
382 associated_source_id: SourceId,
383 },
384 LoadFinish {
385 associated_source_id: SourceId,
386 },
387 ResetSource {
388 source_id: SourceId,
389 },
390}
391
392#[derive(Debug, Clone)]
397pub struct BarrierInner<M> {
398 pub epoch: EpochPair,
399 pub mutation: M,
400 pub kind: BarrierKind,
401
402 pub tracing_context: TracingContext,
404}
405
406pub type BarrierMutationType = Option<Arc<Mutation>>;
407pub type Barrier = BarrierInner<BarrierMutationType>;
408pub type DispatcherBarrier = BarrierInner<()>;
409
410impl<M: Default> BarrierInner<M> {
411 pub fn new_test_barrier(epoch: u64) -> Self {
413 Self {
414 epoch: EpochPair::new_test_epoch(epoch),
415 kind: BarrierKind::Checkpoint,
416 tracing_context: TracingContext::none(),
417 mutation: Default::default(),
418 }
419 }
420
421 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
422 Self {
423 epoch: EpochPair::new(epoch, prev_epoch),
424 kind: BarrierKind::Checkpoint,
425 tracing_context: TracingContext::none(),
426 mutation: Default::default(),
427 }
428 }
429}
430
431impl Barrier {
432 pub fn into_dispatcher(self) -> DispatcherBarrier {
433 DispatcherBarrier {
434 epoch: self.epoch,
435 mutation: (),
436 kind: self.kind,
437 tracing_context: self.tracing_context,
438 }
439 }
440
441 #[must_use]
442 pub fn with_mutation(self, mutation: Mutation) -> Self {
443 Self {
444 mutation: Some(Arc::new(mutation)),
445 ..self
446 }
447 }
448
449 #[must_use]
450 pub fn with_stop(self) -> Self {
451 self.with_mutation(Mutation::Stop(StopMutation {
452 dropped_actors: Default::default(),
453 dropped_sink_fragments: Default::default(),
454 }))
455 }
456
457 pub fn is_with_stop_mutation(&self) -> bool {
459 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
460 }
461
462 pub fn is_stop(&self, actor_id: ActorId) -> bool {
464 self.all_stop_actors()
465 .is_some_and(|actors| actors.contains(&actor_id))
466 }
467
468 pub fn is_checkpoint(&self) -> bool {
469 self.kind == BarrierKind::Checkpoint
470 }
471
472 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
482 match self.mutation.as_deref()? {
483 Mutation::Update(UpdateMutation { actor_splits, .. })
484 | Mutation::Add(AddMutation {
485 splits: actor_splits,
486 ..
487 }) => actor_splits.get(&actor_id),
488
489 _ => {
490 if cfg!(debug_assertions) {
491 panic!(
492 "the initial mutation of the barrier should not be {:?}",
493 self.mutation
494 );
495 }
496 None
497 }
498 }
499 .map(|s| s.as_slice())
500 }
501
502 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
504 match self.mutation.as_deref() {
505 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
506 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
507 _ => None,
508 }
509 }
510
511 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
517 match self.mutation.as_deref() {
518 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
519 added_actors.contains(&actor_id)
520 }
521 _ => false,
522 }
523 }
524
525 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
526 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
527 fragment_ids.contains(&fragment_id)
528 } else {
529 false
530 }
531 }
532
533 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
551 let Some(mutation) = self.mutation.as_deref() else {
552 return false;
553 };
554 match mutation {
555 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
557 Mutation::Update(_)
558 | Mutation::Stop(_)
559 | Mutation::Pause
560 | Mutation::Resume
561 | Mutation::SourceChangeSplit(_)
562 | Mutation::Throttle { .. }
563 | Mutation::DropSubscriptions { .. }
564 | Mutation::ConnectorPropsChange(_)
565 | Mutation::StartFragmentBackfill { .. }
566 | Mutation::RefreshStart { .. }
567 | Mutation::ListFinish { .. }
568 | Mutation::LoadFinish { .. }
569 | Mutation::ResetSource { .. } => false,
570 }
571 }
572
573 pub fn is_pause_on_startup(&self) -> bool {
575 match self.mutation.as_deref() {
576 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
577 _ => false,
578 }
579 }
580
581 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
582 match self.mutation.as_deref() {
583 Some(Mutation::Add(AddMutation {
584 backfill_nodes_to_pause,
585 ..
586 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
587 Some(Mutation::Update(_)) => false,
588 _ => {
589 tracing::warn!(
590 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
591 self
592 );
593 false
594 }
595 }
596 }
597
598 pub fn is_resume(&self) -> bool {
600 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
601 }
602
603 pub fn as_update_merge(
606 &self,
607 actor_id: ActorId,
608 upstream_fragment_id: UpstreamFragmentId,
609 ) -> Option<&MergeUpdate> {
610 self.mutation
611 .as_deref()
612 .and_then(|mutation| match mutation {
613 Mutation::Update(UpdateMutation { merges, .. }) => {
614 merges.get(&(actor_id, upstream_fragment_id))
615 }
616 _ => None,
617 })
618 }
619
620 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
623 self.mutation
624 .as_deref()
625 .and_then(|mutation| match mutation {
626 Mutation::Add(AddMutation {
627 new_upstream_sinks, ..
628 }) => new_upstream_sinks.get(&fragment_id),
629 _ => None,
630 })
631 }
632
633 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
635 self.mutation
636 .as_deref()
637 .and_then(|mutation| match mutation {
638 Mutation::Stop(StopMutation {
639 dropped_sink_fragments,
640 ..
641 }) => Some(dropped_sink_fragments),
642 _ => None,
643 })
644 }
645
646 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
652 self.mutation
653 .as_deref()
654 .and_then(|mutation| match mutation {
655 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
656 vnode_bitmaps.get(&actor_id).cloned()
657 }
658 _ => None,
659 })
660 }
661
662 pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
663 self.mutation
664 .as_deref()
665 .and_then(|mutation| match mutation {
666 Mutation::Update(UpdateMutation {
667 sink_schema_change, ..
668 }) => sink_schema_change.get(&sink_id).cloned(),
669 _ => None,
670 })
671 }
672
673 pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
674 match self.mutation.as_deref() {
675 Some(Mutation::DropSubscriptions {
676 subscriptions_to_drop,
677 })
678 | Some(Mutation::Update(UpdateMutation {
679 subscriptions_to_drop,
680 ..
681 })) => Some(subscriptions_to_drop.as_slice()),
682 _ => None,
683 }
684 }
685
686 pub fn get_curr_epoch(&self) -> Epoch {
687 Epoch(self.epoch.curr)
688 }
689
690 pub fn tracing_context(&self) -> &TracingContext {
692 &self.tracing_context
693 }
694
695 pub fn added_subscriber_on_mv_table(
696 &self,
697 mv_table_id: TableId,
698 ) -> impl Iterator<Item = SubscriberId> + '_ {
699 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
700 Some(add)
701 } else {
702 None
703 }
704 .into_iter()
705 .flat_map(move |add| {
706 add.subscriptions_to_add.iter().filter_map(
707 move |(upstream_mv_table_id, subscriber_id)| {
708 if *upstream_mv_table_id == mv_table_id {
709 Some(*subscriber_id)
710 } else {
711 None
712 }
713 },
714 )
715 })
716 }
717}
718
719impl<M: PartialEq> PartialEq for BarrierInner<M> {
720 fn eq(&self, other: &Self) -> bool {
721 self.epoch == other.epoch && self.mutation == other.mutation
722 }
723}
724
725impl Mutation {
726 #[cfg(test)]
730 pub fn is_stop(&self) -> bool {
731 matches!(self, Mutation::Stop(_))
732 }
733
734 #[cfg(test)]
735 fn to_protobuf(&self) -> PbMutation {
736 use risingwave_pb::source::{
737 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
738 };
739 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
740 use risingwave_pb::stream_plan::{
741 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
742 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
743 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
744 PbThrottleMutation, PbUpdateMutation,
745 };
746 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
747 actor_splits
748 .iter()
749 .map(|(&actor_id, splits)| {
750 (
751 actor_id,
752 ConnectorSplits {
753 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
754 },
755 )
756 })
757 .collect::<HashMap<_, _>>()
758 };
759
760 match self {
761 Mutation::Stop(StopMutation {
762 dropped_actors,
763 dropped_sink_fragments,
764 }) => PbMutation::Stop(PbStopMutation {
765 actors: dropped_actors.iter().copied().collect(),
766 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
767 }),
768 Mutation::Update(UpdateMutation {
769 dispatchers,
770 merges,
771 vnode_bitmaps,
772 dropped_actors,
773 actor_splits,
774 actor_new_dispatchers,
775 actor_cdc_table_snapshot_splits,
776 sink_schema_change,
777 subscriptions_to_drop,
778 }) => PbMutation::Update(PbUpdateMutation {
779 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
780 merge_update: merges.values().cloned().collect(),
781 actor_vnode_bitmap_update: vnode_bitmaps
782 .iter()
783 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
784 .collect(),
785 dropped_actors: dropped_actors.iter().copied().collect(),
786 actor_splits: actor_splits_to_protobuf(actor_splits),
787 actor_new_dispatchers: actor_new_dispatchers
788 .iter()
789 .map(|(&actor_id, dispatchers)| {
790 (
791 actor_id,
792 PbDispatchers {
793 dispatchers: dispatchers.clone(),
794 },
795 )
796 })
797 .collect(),
798 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
799 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
800 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
801 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
802 generation: *generation,
803 })
804 }).collect()
805 }),
806 sink_schema_change: sink_schema_change
807 .iter()
808 .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
809 .collect(),
810 subscriptions_to_drop: subscriptions_to_drop.clone(),
811 }),
812 Mutation::Add(AddMutation {
813 adds,
814 added_actors,
815 splits,
816 pause,
817 subscriptions_to_add,
818 backfill_nodes_to_pause,
819 actor_cdc_table_snapshot_splits,
820 new_upstream_sinks,
821 }) => PbMutation::Add(PbAddMutation {
822 actor_dispatchers: adds
823 .iter()
824 .map(|(&actor_id, dispatchers)| {
825 (
826 actor_id,
827 PbDispatchers {
828 dispatchers: dispatchers.clone(),
829 },
830 )
831 })
832 .collect(),
833 added_actors: added_actors.iter().copied().collect(),
834 actor_splits: actor_splits_to_protobuf(splits),
835 pause: *pause,
836 subscriptions_to_add: subscriptions_to_add
837 .iter()
838 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
839 subscriber_id: *subscriber_id,
840 upstream_mv_table_id: *table_id,
841 })
842 .collect(),
843 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
844 actor_cdc_table_snapshot_splits:
845 Some(PbCdcTableSnapshotSplitsWithGeneration {
846 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
847 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
848 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
849 generation: *generation,
850 })
851 }).collect()
852 }),
853 new_upstream_sinks: new_upstream_sinks
854 .iter()
855 .map(|(k, v)| (*k, v.clone()))
856 .collect(),
857 }),
858 Mutation::SourceChangeSplit(changes) => {
859 PbMutation::Splits(PbSourceChangeSplitMutation {
860 actor_splits: changes
861 .iter()
862 .map(|(&actor_id, splits)| {
863 (
864 actor_id,
865 ConnectorSplits {
866 splits: splits
867 .clone()
868 .iter()
869 .map(ConnectorSplit::from)
870 .collect(),
871 },
872 )
873 })
874 .collect(),
875 })
876 }
877 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
878 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
879 Mutation::Throttle (changes) => PbMutation::Throttle(PbThrottleMutation {
880 fragment_throttle: changes.clone(),
881 }),
882 Mutation::DropSubscriptions {
883 subscriptions_to_drop,
884 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
885 info: subscriptions_to_drop.clone(),
886 }),
887 Mutation::ConnectorPropsChange(map) => {
888 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
889 connector_props_infos: map
890 .iter()
891 .map(|(actor_id, options)| {
892 (
893 *actor_id,
894 ConnectorPropsInfo {
895 connector_props_info: options
896 .iter()
897 .map(|(k, v)| (k.clone(), v.clone()))
898 .collect(),
899 },
900 )
901 })
902 .collect(),
903 })
904 }
905 Mutation::StartFragmentBackfill { fragment_ids } => {
906 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
907 fragment_ids: fragment_ids.iter().copied().collect(),
908 })
909 }
910 Mutation::RefreshStart {
911 table_id,
912 associated_source_id,
913 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
914 table_id: *table_id,
915 associated_source_id: *associated_source_id,
916 }),
917 Mutation::ListFinish {
918 associated_source_id,
919 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
920 associated_source_id: *associated_source_id,
921 }),
922 Mutation::LoadFinish {
923 associated_source_id,
924 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
925 associated_source_id: *associated_source_id,
926 }),
927 Mutation::ResetSource { source_id } => {
928 PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
929 source_id: source_id.as_raw_id(),
930 })
931 }
932 }
933 }
934
935 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
936 let mutation = match prost {
937 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
938 dropped_actors: stop.actors.iter().copied().collect(),
939 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
940 }),
941
942 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
943 dispatchers: update
944 .dispatcher_update
945 .iter()
946 .map(|u| (u.actor_id, u.clone()))
947 .into_group_map(),
948 merges: update
949 .merge_update
950 .iter()
951 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
952 .collect(),
953 vnode_bitmaps: update
954 .actor_vnode_bitmap_update
955 .iter()
956 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
957 .collect(),
958 dropped_actors: update.dropped_actors.iter().copied().collect(),
959 actor_splits: update
960 .actor_splits
961 .iter()
962 .map(|(&actor_id, splits)| {
963 (
964 actor_id,
965 splits
966 .splits
967 .iter()
968 .map(|split| split.try_into().unwrap())
969 .collect(),
970 )
971 })
972 .collect(),
973 actor_new_dispatchers: update
974 .actor_new_dispatchers
975 .iter()
976 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
977 .collect(),
978 actor_cdc_table_snapshot_splits:
979 build_actor_cdc_table_snapshot_splits_with_generation(
980 update
981 .actor_cdc_table_snapshot_splits
982 .clone()
983 .unwrap_or_default(),
984 ),
985 sink_schema_change: update
986 .sink_schema_change
987 .iter()
988 .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
989 .collect(),
990 subscriptions_to_drop: update.subscriptions_to_drop.clone(),
991 }),
992
993 PbMutation::Add(add) => Mutation::Add(AddMutation {
994 adds: add
995 .actor_dispatchers
996 .iter()
997 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
998 .collect(),
999 added_actors: add.added_actors.iter().copied().collect(),
1000 splits: add
1003 .actor_splits
1004 .iter()
1005 .map(|(&actor_id, splits)| {
1006 (
1007 actor_id,
1008 splits
1009 .splits
1010 .iter()
1011 .map(|split| split.try_into().unwrap())
1012 .collect(),
1013 )
1014 })
1015 .collect(),
1016 pause: add.pause,
1017 subscriptions_to_add: add
1018 .subscriptions_to_add
1019 .iter()
1020 .map(
1021 |SubscriptionUpstreamInfo {
1022 subscriber_id,
1023 upstream_mv_table_id,
1024 }| { (*upstream_mv_table_id, *subscriber_id) },
1025 )
1026 .collect(),
1027 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1028 actor_cdc_table_snapshot_splits:
1029 build_actor_cdc_table_snapshot_splits_with_generation(
1030 add.actor_cdc_table_snapshot_splits
1031 .clone()
1032 .unwrap_or_default(),
1033 ),
1034 new_upstream_sinks: add
1035 .new_upstream_sinks
1036 .iter()
1037 .map(|(k, v)| (*k, v.clone()))
1038 .collect(),
1039 }),
1040
1041 PbMutation::Splits(s) => {
1042 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1043 Vec::with_capacity(s.actor_splits.len());
1044 for (&actor_id, splits) in &s.actor_splits {
1045 if !splits.splits.is_empty() {
1046 change_splits.push((
1047 actor_id,
1048 splits
1049 .splits
1050 .iter()
1051 .map(SplitImpl::try_from)
1052 .try_collect()?,
1053 ));
1054 }
1055 }
1056 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1057 }
1058 PbMutation::Pause(_) => Mutation::Pause,
1059 PbMutation::Resume(_) => Mutation::Resume,
1060 PbMutation::Throttle(changes) => Mutation::Throttle(changes.fragment_throttle.clone()),
1061 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1062 subscriptions_to_drop: drop.info.clone(),
1063 },
1064 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1065 Mutation::ConnectorPropsChange(
1066 alter_connector_props
1067 .connector_props_infos
1068 .iter()
1069 .map(|(connector_id, options)| {
1070 (
1071 *connector_id,
1072 options
1073 .connector_props_info
1074 .iter()
1075 .map(|(k, v)| (k.clone(), v.clone()))
1076 .collect(),
1077 )
1078 })
1079 .collect(),
1080 )
1081 }
1082 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1083 Mutation::StartFragmentBackfill {
1084 fragment_ids: start_fragment_backfill
1085 .fragment_ids
1086 .iter()
1087 .copied()
1088 .collect(),
1089 }
1090 }
1091 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1092 table_id: refresh_start.table_id,
1093 associated_source_id: refresh_start.associated_source_id,
1094 },
1095 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1096 associated_source_id: list_finish.associated_source_id,
1097 },
1098 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1099 associated_source_id: load_finish.associated_source_id,
1100 },
1101 PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1102 source_id: SourceId::from(reset_source.source_id),
1103 },
1104 };
1105 Ok(mutation)
1106 }
1107}
1108
1109impl<M> BarrierInner<M> {
1110 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1111 let Self {
1112 epoch,
1113 mutation,
1114 kind,
1115 tracing_context,
1116 ..
1117 } = self;
1118
1119 PbBarrier {
1120 epoch: Some(PbEpoch {
1121 curr: epoch.curr,
1122 prev: epoch.prev,
1123 }),
1124 mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1125 mutation: Some(mutation),
1126 }),
1127 tracing_context: tracing_context.to_protobuf(),
1128 kind: *kind as _,
1129 }
1130 }
1131
1132 fn from_protobuf_inner(
1133 prost: &PbBarrier,
1134 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1135 ) -> StreamExecutorResult<Self> {
1136 let epoch = prost.get_epoch()?;
1137
1138 Ok(Self {
1139 kind: prost.kind(),
1140 epoch: EpochPair::new(epoch.curr, epoch.prev),
1141 mutation: mutation_from_pb(
1142 (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1143 )?,
1144 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1145 })
1146 }
1147
1148 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1149 BarrierInner {
1150 epoch: self.epoch,
1151 mutation: f(self.mutation),
1152 kind: self.kind,
1153 tracing_context: self.tracing_context,
1154 }
1155 }
1156}
1157
1158impl DispatcherBarrier {
1159 pub fn to_protobuf(&self) -> PbBarrier {
1160 self.to_protobuf_inner(|_| None)
1161 }
1162}
1163
1164impl Barrier {
1165 #[cfg(test)]
1166 pub fn to_protobuf(&self) -> PbBarrier {
1167 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1168 }
1169
1170 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1171 Self::from_protobuf_inner(prost, |mutation| {
1172 mutation
1173 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1174 .transpose()
1175 })
1176 }
1177}
1178
1179#[derive(Debug, PartialEq, Eq, Clone)]
1180pub struct Watermark {
1181 pub col_idx: usize,
1182 pub data_type: DataType,
1183 pub val: ScalarImpl,
1184}
1185
1186impl PartialOrd for Watermark {
1187 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1188 Some(self.cmp(other))
1189 }
1190}
1191
1192impl Ord for Watermark {
1193 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1194 self.val.default_cmp(&other.val)
1195 }
1196}
1197
1198impl Watermark {
1199 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1200 Self {
1201 col_idx,
1202 data_type,
1203 val,
1204 }
1205 }
1206
1207 pub async fn transform_with_expr(
1208 self,
1209 expr: &NonStrictExpression<impl Expression>,
1210 new_col_idx: usize,
1211 ) -> Option<Self> {
1212 let Self { col_idx, val, .. } = self;
1213 let row = {
1214 let mut row = vec![None; col_idx + 1];
1215 row[col_idx] = Some(val);
1216 OwnedRow::new(row)
1217 };
1218 let val = expr.eval_row_infallible(&row).await?;
1219 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1220 }
1221
1222 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1225 output_indices
1226 .iter()
1227 .position(|p| *p == self.col_idx)
1228 .map(|new_col_idx| self.with_idx(new_col_idx))
1229 }
1230
1231 pub fn to_protobuf(&self) -> PbWatermark {
1232 PbWatermark {
1233 column: Some(PbInputRef {
1234 index: self.col_idx as _,
1235 r#type: Some(self.data_type.to_protobuf()),
1236 }),
1237 val: Some(&self.val).to_protobuf().into(),
1238 }
1239 }
1240
1241 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1242 let col_ref = prost.get_column()?;
1243 let data_type = DataType::from(col_ref.get_type()?);
1244 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1245 .expect("watermark value cannot be null");
1246 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1247 }
1248
1249 pub fn with_idx(self, idx: usize) -> Self {
1250 Self::new(idx, self.data_type, self.val)
1251 }
1252}
1253
1254#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1255#[derive(Debug, EnumAsInner, Clone)]
1256pub enum MessageInner<M> {
1257 Chunk(StreamChunk),
1258 Barrier(BarrierInner<M>),
1259 Watermark(Watermark),
1260}
1261
1262impl<M> MessageInner<M> {
1263 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1264 match self {
1265 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1266 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1267 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1268 }
1269 }
1270}
1271
1272pub type Message = MessageInner<BarrierMutationType>;
1273pub type DispatcherMessage = MessageInner<()>;
1274
1275#[derive(Debug, EnumAsInner, Clone)]
1278pub enum MessageBatchInner<M> {
1279 Chunk(StreamChunk),
1280 BarrierBatch(Vec<BarrierInner<M>>),
1281 Watermark(Watermark),
1282}
1283pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1284pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1285pub type DispatcherMessageBatch = MessageBatchInner<()>;
1286
1287impl From<DispatcherMessage> for DispatcherMessageBatch {
1288 fn from(m: DispatcherMessage) -> Self {
1289 match m {
1290 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1291 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1292 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1293 }
1294 }
1295}
1296
1297impl From<StreamChunk> for Message {
1298 fn from(chunk: StreamChunk) -> Self {
1299 Message::Chunk(chunk)
1300 }
1301}
1302
1303impl<'a> TryFrom<&'a Message> for &'a Barrier {
1304 type Error = ();
1305
1306 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1307 match m {
1308 Message::Chunk(_) => Err(()),
1309 Message::Barrier(b) => Ok(b),
1310 Message::Watermark(_) => Err(()),
1311 }
1312 }
1313}
1314
1315impl Message {
1316 #[cfg(test)]
1321 pub fn is_stop(&self) -> bool {
1322 matches!(
1323 self,
1324 Message::Barrier(Barrier {
1325 mutation,
1326 ..
1327 }) if mutation.as_ref().unwrap().is_stop()
1328 )
1329 }
1330}
1331
1332impl DispatcherMessageBatch {
1333 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1334 let prost = match self {
1335 Self::Chunk(stream_chunk) => {
1336 let prost_stream_chunk = stream_chunk.to_protobuf();
1337 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1338 }
1339 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1340 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1341 }),
1342 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1343 };
1344 PbStreamMessageBatch {
1345 stream_message_batch: Some(prost),
1346 }
1347 }
1348
1349 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1350 let res = match prost.get_stream_message_batch()? {
1351 StreamMessageBatch::StreamChunk(chunk) => {
1352 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1353 }
1354 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1355 let barriers = barrier_batch
1356 .barriers
1357 .iter()
1358 .map(|barrier| {
1359 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1360 if mutation.is_some() {
1361 if cfg!(debug_assertions) {
1362 panic!("should not receive message of barrier with mutation");
1363 } else {
1364 warn!(?barrier, "receive message of barrier with mutation");
1365 }
1366 }
1367 Ok(())
1368 })
1369 })
1370 .try_collect()?;
1371 Self::BarrierBatch(barriers)
1372 }
1373 StreamMessageBatch::Watermark(watermark) => {
1374 Self::Watermark(Watermark::from_protobuf(watermark)?)
1375 }
1376 };
1377 Ok(res)
1378 }
1379
1380 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1381 ::prost::Message::encoded_len(msg)
1382 }
1383}
1384
1385pub type StreamKey = Vec<usize>;
1386pub type StreamKeyRef<'a> = &'a [usize];
1387pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1388
1389pub async fn expect_first_barrier<M: Debug>(
1391 stream: &mut (impl MessageStreamInner<M> + Unpin),
1392) -> StreamExecutorResult<BarrierInner<M>> {
1393 let message = stream
1394 .next()
1395 .instrument_await("expect_first_barrier")
1396 .await
1397 .context("failed to extract the first message: stream closed unexpectedly")??;
1398 let barrier = message
1399 .into_barrier()
1400 .expect("the first message must be a barrier");
1401 assert!(matches!(
1403 barrier.kind,
1404 BarrierKind::Checkpoint | BarrierKind::Initial
1405 ));
1406 Ok(barrier)
1407}
1408
1409pub async fn expect_first_barrier_from_aligned_stream(
1411 stream: &mut (impl AlignedMessageStream + Unpin),
1412) -> StreamExecutorResult<Barrier> {
1413 let message = stream
1414 .next()
1415 .instrument_await("expect_first_barrier")
1416 .await
1417 .context("failed to extract the first message: stream closed unexpectedly")??;
1418 let barrier = message
1419 .into_barrier()
1420 .expect("the first message must be a barrier");
1421 Ok(barrier)
1422}
1423
1424pub trait StreamConsumer: Send + 'static {
1426 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1427
1428 fn execute(self: Box<Self>) -> Self::BarrierStream;
1429}
1430
1431type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1432
1433pub struct DynamicReceivers<InputId, M> {
1437 barrier: Option<BarrierInner<M>>,
1439 start_ts: Option<Instant>,
1441 blocked: Vec<BoxedMessageInput<InputId, M>>,
1443 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1445 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1447 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1449 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1451}
1452
1453impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1454 for DynamicReceivers<InputId, M>
1455{
1456 type Item = MessageStreamItemInner<M>;
1457
1458 fn poll_next(
1459 mut self: Pin<&mut Self>,
1460 cx: &mut std::task::Context<'_>,
1461 ) -> Poll<Option<Self::Item>> {
1462 if self.is_empty() {
1463 return Poll::Ready(None);
1464 }
1465
1466 loop {
1467 match futures::ready!(self.active.poll_next_unpin(cx)) {
1468 Some((Some(Err(e)), _)) => {
1470 return Poll::Ready(Some(Err(e)));
1471 }
1472 Some((Some(Ok(message)), remaining)) => {
1474 let input_id = remaining.id();
1475 match message {
1476 MessageInner::Chunk(chunk) => {
1477 self.active.push(remaining.into_future());
1479 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1480 }
1481 MessageInner::Watermark(watermark) => {
1482 self.active.push(remaining.into_future());
1484 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1485 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1486 }
1487 }
1488 MessageInner::Barrier(barrier) => {
1489 if self.blocked.is_empty() {
1491 self.start_ts = Some(Instant::now());
1492 }
1493 self.blocked.push(remaining);
1494 if let Some(current_barrier) = self.barrier.as_ref() {
1495 if current_barrier.epoch != barrier.epoch {
1496 return Poll::Ready(Some(Err(
1497 StreamExecutorError::align_barrier(
1498 current_barrier.clone().map_mutation(|_| None),
1499 barrier.map_mutation(|_| None),
1500 ),
1501 )));
1502 }
1503 } else {
1504 self.barrier = Some(barrier);
1505 }
1506 }
1507 }
1508 }
1509 Some((None, remaining)) => {
1517 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1518 "upstream input {:?} unexpectedly closed",
1519 remaining.id()
1520 )))));
1521 }
1522 None => {
1524 assert!(!self.blocked.is_empty());
1525
1526 let start_ts = self
1527 .start_ts
1528 .take()
1529 .expect("should have received at least one barrier");
1530 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1531 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1532 }
1533 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1534 merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1535 }
1536
1537 break;
1538 }
1539 }
1540 }
1541
1542 assert!(self.active.is_terminated());
1543
1544 let barrier = self.barrier.take().unwrap();
1545
1546 let upstreams = std::mem::take(&mut self.blocked);
1547 self.extend_active(upstreams);
1548 assert!(!self.active.is_terminated());
1549
1550 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1551 }
1552}
1553
1554impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1555 pub fn new(
1556 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1557 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1558 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1559 ) -> Self {
1560 let mut this = Self {
1561 barrier: None,
1562 start_ts: None,
1563 blocked: Vec::with_capacity(upstreams.len()),
1564 active: Default::default(),
1565 buffered_watermarks: Default::default(),
1566 merge_barrier_align_duration,
1567 barrier_align_duration,
1568 };
1569 this.extend_active(upstreams);
1570 this
1571 }
1572
1573 pub fn extend_active(
1576 &mut self,
1577 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1578 ) {
1579 assert!(self.blocked.is_empty() && self.barrier.is_none());
1580
1581 self.active
1582 .extend(upstreams.into_iter().map(|s| s.into_future()));
1583 }
1584
1585 pub fn handle_watermark(
1587 &mut self,
1588 input_id: InputId,
1589 watermark: Watermark,
1590 ) -> Option<Watermark> {
1591 let col_idx = watermark.col_idx;
1592 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1594 let watermarks = self
1595 .buffered_watermarks
1596 .entry(col_idx)
1597 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1598 watermarks.handle_watermark(input_id, watermark)
1599 }
1600
1601 pub fn add_upstreams_from(
1604 &mut self,
1605 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1606 ) {
1607 assert!(self.blocked.is_empty() && self.barrier.is_none());
1608
1609 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1610 let input_ids = new_inputs.iter().map(|input| input.id());
1611 self.buffered_watermarks.values_mut().for_each(|buffers| {
1612 buffers.add_buffers(input_ids.clone());
1614 });
1615 self.active
1616 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1617 }
1618
1619 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1623 assert!(self.blocked.is_empty() && self.barrier.is_none());
1624
1625 let new_upstreams = std::mem::take(&mut self.active)
1626 .into_iter()
1627 .map(|s| s.into_inner().unwrap())
1628 .filter(|u| !upstream_input_ids.contains(&u.id()));
1629 self.extend_active(new_upstreams);
1630 self.buffered_watermarks.values_mut().for_each(|buffers| {
1631 buffers.remove_buffer(upstream_input_ids.clone());
1634 });
1635 }
1636
1637 pub fn merge_barrier_align_duration(
1638 &self,
1639 ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1640 self.merge_barrier_align_duration.clone()
1641 }
1642
1643 pub fn flush_buffered_watermarks(&mut self) {
1644 self.buffered_watermarks
1645 .values_mut()
1646 .for_each(|buffers| buffers.clear());
1647 }
1648
1649 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1650 self.blocked
1651 .iter()
1652 .map(|s| s.id())
1653 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1654 }
1655
1656 pub fn is_empty(&self) -> bool {
1657 self.blocked.is_empty() && self.active.is_empty()
1658 }
1659}
1660
1661pub(crate) struct DispatchBarrierBuffer {
1682 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1683 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1684 recv_state: BarrierReceiverState,
1685 curr_upstream_fragment_id: FragmentId,
1686 actor_id: ActorId,
1687 build_input_ctx: Arc<BuildInputContext>,
1689}
1690
1691struct BuildInputContext {
1692 pub actor_id: ActorId,
1693 pub local_barrier_manager: LocalBarrierManager,
1694 pub metrics: Arc<StreamingMetrics>,
1695 pub fragment_id: FragmentId,
1696 pub actor_config: Arc<StreamingConfig>,
1697}
1698
1699type BoxedNewInputsFuture =
1700 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1701
1702enum BarrierReceiverState {
1703 ReceivingBarrier,
1704 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1705}
1706
1707impl DispatchBarrierBuffer {
1708 pub fn new(
1709 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1710 actor_id: ActorId,
1711 curr_upstream_fragment_id: FragmentId,
1712 local_barrier_manager: LocalBarrierManager,
1713 metrics: Arc<StreamingMetrics>,
1714 fragment_id: FragmentId,
1715 actor_config: Arc<StreamingConfig>,
1716 ) -> Self {
1717 Self {
1718 buffer: VecDeque::new(),
1719 barrier_rx,
1720 recv_state: BarrierReceiverState::ReceivingBarrier,
1721 curr_upstream_fragment_id,
1722 actor_id,
1723 build_input_ctx: Arc::new(BuildInputContext {
1724 actor_id,
1725 local_barrier_manager,
1726 metrics,
1727 fragment_id,
1728 actor_config,
1729 }),
1730 }
1731 }
1732
1733 pub async fn await_next_message(
1734 &mut self,
1735 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1736 metrics: &ActorInputMetrics,
1737 ) -> StreamExecutorResult<DispatcherMessage> {
1738 let mut start_time = Instant::now();
1739 let interval_duration = Duration::from_secs(15);
1740 let mut interval =
1741 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1742
1743 loop {
1744 tokio::select! {
1745 biased;
1746 msg = stream.try_next() => {
1747 metrics
1748 .actor_input_buffer_blocking_duration_ns
1749 .inc_by(start_time.elapsed().as_nanos() as u64);
1750 return msg?.ok_or_else(
1751 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1752 );
1753 }
1754
1755 e = self.continuously_fetch_barrier_rx() => {
1756 return Err(e);
1757 }
1758
1759 _ = interval.tick() => {
1760 start_time = Instant::now();
1761 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1762 continue;
1763 }
1764 }
1765 }
1766 }
1767
1768 pub async fn pop_barrier_with_inputs(
1769 &mut self,
1770 barrier: DispatcherBarrier,
1771 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1772 while self.buffer.is_empty() {
1773 self.try_fetch_barrier_rx(false).await?;
1774 }
1775 let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1776 assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1777
1778 Ok((recv_barrier, inputs))
1779 }
1780
1781 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1782 loop {
1783 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1784 return e;
1785 }
1786 }
1787 }
1788
1789 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1790 match &mut self.recv_state {
1791 BarrierReceiverState::ReceivingBarrier => {
1792 let Some(barrier) = self.barrier_rx.recv().await else {
1793 if pending_on_end {
1794 return pending().await;
1795 } else {
1796 return Err(StreamExecutorError::channel_closed(
1797 "barrier channel closed unexpectedly",
1798 ));
1799 }
1800 };
1801 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1802 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1803 } else {
1804 self.buffer.push_back((barrier, None));
1805 }
1806 }
1807 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1808 let new_inputs = fut.await?;
1809 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1810 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1811 }
1812 }
1813 Ok(())
1814 }
1815
1816 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1817 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1818 && !update.added_upstream_actors.is_empty()
1819 {
1820 let upstream_fragment_id =
1822 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1823 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1824 new_upstream_fragment_id
1825 } else {
1826 self.curr_upstream_fragment_id
1827 };
1828 let ctx = self.build_input_ctx.clone();
1829 let added_upstream_actors = update.added_upstream_actors.clone();
1830 let barrier = barrier.clone();
1831 let fut = async move {
1832 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1833 let mut new_input = new_input(
1834 &ctx.local_barrier_manager,
1835 ctx.metrics.clone(),
1836 ctx.actor_id,
1837 ctx.fragment_id,
1838 upstream_actor,
1839 upstream_fragment_id,
1840 ctx.actor_config.clone(),
1841 )
1842 .await?;
1843
1844 let first_barrier = expect_first_barrier(&mut new_input).await?;
1847 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1848
1849 StreamExecutorResult::Ok(new_input)
1850 }))
1851 .await
1852 }
1853 .boxed();
1854
1855 Some(fut)
1856 } else {
1857 None
1858 }
1859 }
1860}