1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54 PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
55 PbWatermark, SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63 BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::DispatchExecutor;
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector::*;
184pub use watermark_filter::{UpsertWatermarkFilterExecutor, WatermarkFilterExecutor};
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197 CdcTableSnapshotSplitAssignmentWithGeneration,
198 build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::id::{ExecutorId, SubscriberId};
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202
203pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
204pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
205pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
206
207#[derive(Debug, Default, Clone)]
209pub struct ExecutorInfo {
210 pub schema: Schema,
212
213 pub stream_key: StreamKey,
215
216 pub stream_kind: PbStreamKind,
218
219 pub identity: String,
221
222 pub id: ExecutorId,
224}
225
226impl ExecutorInfo {
227 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
228 Self {
229 schema,
230 stream_key,
231 stream_kind: PbStreamKind::Retract, identity,
233 id: id.into(),
234 }
235 }
236}
237
238pub trait Execute: Send + 'static {
240 fn execute(self: Box<Self>) -> BoxedMessageStream;
241
242 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
243 self.execute()
244 }
245
246 fn boxed(self) -> Box<dyn Execute>
247 where
248 Self: Sized + Send + 'static,
249 {
250 Box::new(self)
251 }
252}
253
254pub struct Executor {
257 info: ExecutorInfo,
258 execute: Box<dyn Execute>,
259}
260
261impl Executor {
262 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
263 Self { info, execute }
264 }
265
266 pub fn info(&self) -> &ExecutorInfo {
267 &self.info
268 }
269
270 pub fn schema(&self) -> &Schema {
271 &self.info.schema
272 }
273
274 pub fn stream_key(&self) -> StreamKeyRef<'_> {
275 &self.info.stream_key
276 }
277
278 pub fn stream_kind(&self) -> PbStreamKind {
279 self.info.stream_kind
280 }
281
282 pub fn identity(&self) -> &str {
283 &self.info.identity
284 }
285
286 pub fn execute(self) -> BoxedMessageStream {
287 self.execute.execute()
288 }
289
290 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
291 self.execute.execute_with_epoch(epoch)
292 }
293}
294
295impl std::fmt::Debug for Executor {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.write_str(self.identity())
298 }
299}
300
301impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
302 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
303 Self::new(info, execute)
304 }
305}
306
307impl<E> From<(ExecutorInfo, E)> for Executor
308where
309 E: Execute,
310{
311 fn from((info, execute): (ExecutorInfo, E)) -> Self {
312 Self::new(info, execute.boxed())
313 }
314}
315
316pub const INVALID_EPOCH: u64 = 0;
317
318type UpstreamFragmentId = FragmentId;
319type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
320
321#[derive(Debug, Clone)]
322#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
323pub struct UpdateMutation {
324 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
325 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
326 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
327 pub dropped_actors: HashSet<ActorId>,
328 pub actor_splits: SplitAssignments,
329 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
330 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
331 pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
332 pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
333}
334
335#[derive(Debug, Clone)]
336#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
337pub struct AddMutation {
338 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
339 pub added_actors: HashSet<ActorId>,
340 pub splits: SplitAssignments,
342 pub pause: bool,
343 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
345 pub backfill_nodes_to_pause: HashSet<FragmentId>,
347 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
348 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
349}
350
351#[derive(Debug, Clone)]
352#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
353pub struct StopMutation {
354 pub dropped_actors: HashSet<ActorId>,
355 pub dropped_sink_fragments: HashSet<FragmentId>,
356}
357
358#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
360#[derive(Debug, Clone)]
361pub enum Mutation {
362 Stop(StopMutation),
363 Update(UpdateMutation),
364 Add(AddMutation),
365 SourceChangeSplit(SplitAssignments),
366 Pause,
367 Resume,
368 Throttle(HashMap<FragmentId, ThrottleConfig>),
369 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
370 DropSubscriptions {
371 subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
373 },
374 StartFragmentBackfill {
375 fragment_ids: HashSet<FragmentId>,
376 },
377 RefreshStart {
378 table_id: TableId,
379 associated_source_id: SourceId,
380 },
381 ListFinish {
382 associated_source_id: SourceId,
383 },
384 LoadFinish {
385 associated_source_id: SourceId,
386 },
387 ResetSource {
388 source_id: SourceId,
389 },
390 InjectSourceOffsets {
391 source_id: SourceId,
392 split_offsets: HashMap<String, String>,
394 },
395}
396
397#[derive(Debug, Clone)]
402pub struct BarrierInner<M> {
403 pub epoch: EpochPair,
404 pub mutation: M,
405 pub kind: BarrierKind,
406
407 pub tracing_context: TracingContext,
409}
410
411pub type BarrierMutationType = Option<Arc<Mutation>>;
412pub type Barrier = BarrierInner<BarrierMutationType>;
413pub type DispatcherBarrier = BarrierInner<()>;
414
415impl<M: Default> BarrierInner<M> {
416 pub fn new_test_barrier(epoch: u64) -> Self {
418 Self {
419 epoch: EpochPair::new_test_epoch(epoch),
420 kind: BarrierKind::Checkpoint,
421 tracing_context: TracingContext::none(),
422 mutation: Default::default(),
423 }
424 }
425
426 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
427 Self {
428 epoch: EpochPair::new(epoch, prev_epoch),
429 kind: BarrierKind::Checkpoint,
430 tracing_context: TracingContext::none(),
431 mutation: Default::default(),
432 }
433 }
434}
435
436impl Barrier {
437 pub fn into_dispatcher(self) -> DispatcherBarrier {
438 DispatcherBarrier {
439 epoch: self.epoch,
440 mutation: (),
441 kind: self.kind,
442 tracing_context: self.tracing_context,
443 }
444 }
445
446 #[must_use]
447 pub fn with_mutation(self, mutation: Mutation) -> Self {
448 Self {
449 mutation: Some(Arc::new(mutation)),
450 ..self
451 }
452 }
453
454 #[must_use]
455 pub fn with_stop(self) -> Self {
456 self.with_mutation(Mutation::Stop(StopMutation {
457 dropped_actors: Default::default(),
458 dropped_sink_fragments: Default::default(),
459 }))
460 }
461
462 pub fn is_with_stop_mutation(&self) -> bool {
464 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
465 }
466
467 pub fn is_stop(&self, actor_id: ActorId) -> bool {
469 self.all_stop_actors()
470 .is_some_and(|actors| actors.contains(&actor_id))
471 }
472
473 pub fn is_checkpoint(&self) -> bool {
474 self.kind == BarrierKind::Checkpoint
475 }
476
477 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
487 match self.mutation.as_deref()? {
488 Mutation::Update(UpdateMutation { actor_splits, .. })
489 | Mutation::Add(AddMutation {
490 splits: actor_splits,
491 ..
492 }) => actor_splits.get(&actor_id),
493
494 _ => {
495 if cfg!(debug_assertions) {
496 panic!(
497 "the initial mutation of the barrier should not be {:?}",
498 self.mutation
499 );
500 }
501 None
502 }
503 }
504 .map(|s| s.as_slice())
505 }
506
507 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
509 match self.mutation.as_deref() {
510 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
511 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
512 _ => None,
513 }
514 }
515
516 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
522 match self.mutation.as_deref() {
523 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
524 added_actors.contains(&actor_id)
525 }
526 _ => false,
527 }
528 }
529
530 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
531 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
532 fragment_ids.contains(&fragment_id)
533 } else {
534 false
535 }
536 }
537
538 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
556 let Some(mutation) = self.mutation.as_deref() else {
557 return false;
558 };
559 match mutation {
560 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
562 Mutation::Update(_)
563 | Mutation::Stop(_)
564 | Mutation::Pause
565 | Mutation::Resume
566 | Mutation::SourceChangeSplit(_)
567 | Mutation::Throttle { .. }
568 | Mutation::DropSubscriptions { .. }
569 | Mutation::ConnectorPropsChange(_)
570 | Mutation::StartFragmentBackfill { .. }
571 | Mutation::RefreshStart { .. }
572 | Mutation::ListFinish { .. }
573 | Mutation::LoadFinish { .. }
574 | Mutation::ResetSource { .. }
575 | Mutation::InjectSourceOffsets { .. } => false,
576 }
577 }
578
579 pub fn is_pause_on_startup(&self) -> bool {
581 match self.mutation.as_deref() {
582 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
583 _ => false,
584 }
585 }
586
587 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
588 match self.mutation.as_deref() {
589 Some(Mutation::Add(AddMutation {
590 backfill_nodes_to_pause,
591 ..
592 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
593 Some(Mutation::Update(_)) => false,
594 _ => {
595 tracing::warn!(
596 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
597 self
598 );
599 false
600 }
601 }
602 }
603
604 pub fn is_resume(&self) -> bool {
606 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
607 }
608
609 pub fn as_update_merge(
612 &self,
613 actor_id: ActorId,
614 upstream_fragment_id: UpstreamFragmentId,
615 ) -> Option<&MergeUpdate> {
616 self.mutation
617 .as_deref()
618 .and_then(|mutation| match mutation {
619 Mutation::Update(UpdateMutation { merges, .. }) => {
620 merges.get(&(actor_id, upstream_fragment_id))
621 }
622 _ => None,
623 })
624 }
625
626 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
629 self.mutation
630 .as_deref()
631 .and_then(|mutation| match mutation {
632 Mutation::Add(AddMutation {
633 new_upstream_sinks, ..
634 }) => new_upstream_sinks.get(&fragment_id),
635 _ => None,
636 })
637 }
638
639 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
641 self.mutation
642 .as_deref()
643 .and_then(|mutation| match mutation {
644 Mutation::Stop(StopMutation {
645 dropped_sink_fragments,
646 ..
647 }) => Some(dropped_sink_fragments),
648 _ => None,
649 })
650 }
651
652 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
658 self.mutation
659 .as_deref()
660 .and_then(|mutation| match mutation {
661 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
662 vnode_bitmaps.get(&actor_id).cloned()
663 }
664 _ => None,
665 })
666 }
667
668 pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
669 self.mutation
670 .as_deref()
671 .and_then(|mutation| match mutation {
672 Mutation::Update(UpdateMutation {
673 sink_schema_change, ..
674 }) => sink_schema_change.get(&sink_id).cloned(),
675 _ => None,
676 })
677 }
678
679 pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
680 match self.mutation.as_deref() {
681 Some(Mutation::DropSubscriptions {
682 subscriptions_to_drop,
683 })
684 | Some(Mutation::Update(UpdateMutation {
685 subscriptions_to_drop,
686 ..
687 })) => Some(subscriptions_to_drop.as_slice()),
688 _ => None,
689 }
690 }
691
692 pub fn get_curr_epoch(&self) -> Epoch {
693 Epoch(self.epoch.curr)
694 }
695
696 pub fn tracing_context(&self) -> &TracingContext {
698 &self.tracing_context
699 }
700
701 pub fn added_subscriber_on_mv_table(
702 &self,
703 mv_table_id: TableId,
704 ) -> impl Iterator<Item = SubscriberId> + '_ {
705 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
706 Some(add)
707 } else {
708 None
709 }
710 .into_iter()
711 .flat_map(move |add| {
712 add.subscriptions_to_add.iter().filter_map(
713 move |(upstream_mv_table_id, subscriber_id)| {
714 if *upstream_mv_table_id == mv_table_id {
715 Some(*subscriber_id)
716 } else {
717 None
718 }
719 },
720 )
721 })
722 }
723}
724
725impl<M: PartialEq> PartialEq for BarrierInner<M> {
726 fn eq(&self, other: &Self) -> bool {
727 self.epoch == other.epoch && self.mutation == other.mutation
728 }
729}
730
731impl Mutation {
732 #[cfg(test)]
736 pub fn is_stop(&self) -> bool {
737 matches!(self, Mutation::Stop(_))
738 }
739
740 #[cfg(test)]
741 fn to_protobuf(&self) -> PbMutation {
742 use risingwave_pb::source::{
743 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
744 };
745 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
746 use risingwave_pb::stream_plan::{
747 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
748 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
749 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
750 PbThrottleMutation, PbUpdateMutation,
751 };
752 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
753 actor_splits
754 .iter()
755 .map(|(&actor_id, splits)| {
756 (
757 actor_id,
758 ConnectorSplits {
759 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
760 },
761 )
762 })
763 .collect::<HashMap<_, _>>()
764 };
765
766 match self {
767 Mutation::Stop(StopMutation {
768 dropped_actors,
769 dropped_sink_fragments,
770 }) => PbMutation::Stop(PbStopMutation {
771 actors: dropped_actors.iter().copied().collect(),
772 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
773 }),
774 Mutation::Update(UpdateMutation {
775 dispatchers,
776 merges,
777 vnode_bitmaps,
778 dropped_actors,
779 actor_splits,
780 actor_new_dispatchers,
781 actor_cdc_table_snapshot_splits,
782 sink_schema_change,
783 subscriptions_to_drop,
784 }) => PbMutation::Update(PbUpdateMutation {
785 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
786 merge_update: merges.values().cloned().collect(),
787 actor_vnode_bitmap_update: vnode_bitmaps
788 .iter()
789 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
790 .collect(),
791 dropped_actors: dropped_actors.iter().copied().collect(),
792 actor_splits: actor_splits_to_protobuf(actor_splits),
793 actor_new_dispatchers: actor_new_dispatchers
794 .iter()
795 .map(|(&actor_id, dispatchers)| {
796 (
797 actor_id,
798 PbDispatchers {
799 dispatchers: dispatchers.clone(),
800 },
801 )
802 })
803 .collect(),
804 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
805 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
806 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
807 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
808 generation: *generation,
809 })
810 }).collect()
811 }),
812 sink_schema_change: sink_schema_change
813 .iter()
814 .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
815 .collect(),
816 subscriptions_to_drop: subscriptions_to_drop.clone(),
817 }),
818 Mutation::Add(AddMutation {
819 adds,
820 added_actors,
821 splits,
822 pause,
823 subscriptions_to_add,
824 backfill_nodes_to_pause,
825 actor_cdc_table_snapshot_splits,
826 new_upstream_sinks,
827 }) => PbMutation::Add(PbAddMutation {
828 actor_dispatchers: adds
829 .iter()
830 .map(|(&actor_id, dispatchers)| {
831 (
832 actor_id,
833 PbDispatchers {
834 dispatchers: dispatchers.clone(),
835 },
836 )
837 })
838 .collect(),
839 added_actors: added_actors.iter().copied().collect(),
840 actor_splits: actor_splits_to_protobuf(splits),
841 pause: *pause,
842 subscriptions_to_add: subscriptions_to_add
843 .iter()
844 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
845 subscriber_id: *subscriber_id,
846 upstream_mv_table_id: *table_id,
847 })
848 .collect(),
849 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
850 actor_cdc_table_snapshot_splits:
851 Some(PbCdcTableSnapshotSplitsWithGeneration {
852 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
853 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
854 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
855 generation: *generation,
856 })
857 }).collect()
858 }),
859 new_upstream_sinks: new_upstream_sinks
860 .iter()
861 .map(|(k, v)| (*k, v.clone()))
862 .collect(),
863 }),
864 Mutation::SourceChangeSplit(changes) => {
865 PbMutation::Splits(PbSourceChangeSplitMutation {
866 actor_splits: changes
867 .iter()
868 .map(|(&actor_id, splits)| {
869 (
870 actor_id,
871 ConnectorSplits {
872 splits: splits
873 .clone()
874 .iter()
875 .map(ConnectorSplit::from)
876 .collect(),
877 },
878 )
879 })
880 .collect(),
881 })
882 }
883 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
884 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
885 Mutation::Throttle (changes) => PbMutation::Throttle(PbThrottleMutation {
886 fragment_throttle: changes.clone(),
887 }),
888 Mutation::DropSubscriptions {
889 subscriptions_to_drop,
890 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
891 info: subscriptions_to_drop.clone(),
892 }),
893 Mutation::ConnectorPropsChange(map) => {
894 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
895 connector_props_infos: map
896 .iter()
897 .map(|(actor_id, options)| {
898 (
899 *actor_id,
900 ConnectorPropsInfo {
901 connector_props_info: options
902 .iter()
903 .map(|(k, v)| (k.clone(), v.clone()))
904 .collect(),
905 },
906 )
907 })
908 .collect(),
909 })
910 }
911 Mutation::StartFragmentBackfill { fragment_ids } => {
912 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
913 fragment_ids: fragment_ids.iter().copied().collect(),
914 })
915 }
916 Mutation::RefreshStart {
917 table_id,
918 associated_source_id,
919 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
920 table_id: *table_id,
921 associated_source_id: *associated_source_id,
922 }),
923 Mutation::ListFinish {
924 associated_source_id,
925 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
926 associated_source_id: *associated_source_id,
927 }),
928 Mutation::LoadFinish {
929 associated_source_id,
930 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
931 associated_source_id: *associated_source_id,
932 }),
933 Mutation::ResetSource { source_id } => {
934 PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
935 source_id: source_id.as_raw_id(),
936 })
937 }
938 Mutation::InjectSourceOffsets {
939 source_id,
940 split_offsets,
941 } => PbMutation::InjectSourceOffsets(
942 risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
943 source_id: source_id.as_raw_id(),
944 split_offsets: split_offsets.clone(),
945 },
946 ),
947 }
948 }
949
950 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
951 let mutation = match prost {
952 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
953 dropped_actors: stop.actors.iter().copied().collect(),
954 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
955 }),
956
957 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
958 dispatchers: update
959 .dispatcher_update
960 .iter()
961 .map(|u| (u.actor_id, u.clone()))
962 .into_group_map(),
963 merges: update
964 .merge_update
965 .iter()
966 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
967 .collect(),
968 vnode_bitmaps: update
969 .actor_vnode_bitmap_update
970 .iter()
971 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
972 .collect(),
973 dropped_actors: update.dropped_actors.iter().copied().collect(),
974 actor_splits: update
975 .actor_splits
976 .iter()
977 .map(|(&actor_id, splits)| {
978 (
979 actor_id,
980 splits
981 .splits
982 .iter()
983 .map(|split| split.try_into().unwrap())
984 .collect(),
985 )
986 })
987 .collect(),
988 actor_new_dispatchers: update
989 .actor_new_dispatchers
990 .iter()
991 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
992 .collect(),
993 actor_cdc_table_snapshot_splits:
994 build_actor_cdc_table_snapshot_splits_with_generation(
995 update
996 .actor_cdc_table_snapshot_splits
997 .clone()
998 .unwrap_or_default(),
999 ),
1000 sink_schema_change: update
1001 .sink_schema_change
1002 .iter()
1003 .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
1004 .collect(),
1005 subscriptions_to_drop: update.subscriptions_to_drop.clone(),
1006 }),
1007
1008 PbMutation::Add(add) => Mutation::Add(AddMutation {
1009 adds: add
1010 .actor_dispatchers
1011 .iter()
1012 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1013 .collect(),
1014 added_actors: add.added_actors.iter().copied().collect(),
1015 splits: add
1018 .actor_splits
1019 .iter()
1020 .map(|(&actor_id, splits)| {
1021 (
1022 actor_id,
1023 splits
1024 .splits
1025 .iter()
1026 .map(|split| split.try_into().unwrap())
1027 .collect(),
1028 )
1029 })
1030 .collect(),
1031 pause: add.pause,
1032 subscriptions_to_add: add
1033 .subscriptions_to_add
1034 .iter()
1035 .map(
1036 |SubscriptionUpstreamInfo {
1037 subscriber_id,
1038 upstream_mv_table_id,
1039 }| { (*upstream_mv_table_id, *subscriber_id) },
1040 )
1041 .collect(),
1042 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1043 actor_cdc_table_snapshot_splits:
1044 build_actor_cdc_table_snapshot_splits_with_generation(
1045 add.actor_cdc_table_snapshot_splits
1046 .clone()
1047 .unwrap_or_default(),
1048 ),
1049 new_upstream_sinks: add
1050 .new_upstream_sinks
1051 .iter()
1052 .map(|(k, v)| (*k, v.clone()))
1053 .collect(),
1054 }),
1055
1056 PbMutation::Splits(s) => {
1057 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1058 Vec::with_capacity(s.actor_splits.len());
1059 for (&actor_id, splits) in &s.actor_splits {
1060 if !splits.splits.is_empty() {
1061 change_splits.push((
1062 actor_id,
1063 splits
1064 .splits
1065 .iter()
1066 .map(SplitImpl::try_from)
1067 .try_collect()?,
1068 ));
1069 }
1070 }
1071 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1072 }
1073 PbMutation::Pause(_) => Mutation::Pause,
1074 PbMutation::Resume(_) => Mutation::Resume,
1075 PbMutation::Throttle(changes) => Mutation::Throttle(changes.fragment_throttle.clone()),
1076 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1077 subscriptions_to_drop: drop.info.clone(),
1078 },
1079 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1080 Mutation::ConnectorPropsChange(
1081 alter_connector_props
1082 .connector_props_infos
1083 .iter()
1084 .map(|(connector_id, options)| {
1085 (
1086 *connector_id,
1087 options
1088 .connector_props_info
1089 .iter()
1090 .map(|(k, v)| (k.clone(), v.clone()))
1091 .collect(),
1092 )
1093 })
1094 .collect(),
1095 )
1096 }
1097 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1098 Mutation::StartFragmentBackfill {
1099 fragment_ids: start_fragment_backfill
1100 .fragment_ids
1101 .iter()
1102 .copied()
1103 .collect(),
1104 }
1105 }
1106 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1107 table_id: refresh_start.table_id,
1108 associated_source_id: refresh_start.associated_source_id,
1109 },
1110 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1111 associated_source_id: list_finish.associated_source_id,
1112 },
1113 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1114 associated_source_id: load_finish.associated_source_id,
1115 },
1116 PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1117 source_id: SourceId::from(reset_source.source_id),
1118 },
1119 PbMutation::InjectSourceOffsets(inject) => Mutation::InjectSourceOffsets {
1120 source_id: SourceId::from(inject.source_id),
1121 split_offsets: inject.split_offsets.clone(),
1122 },
1123 };
1124 Ok(mutation)
1125 }
1126}
1127
1128impl<M> BarrierInner<M> {
1129 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1130 let Self {
1131 epoch,
1132 mutation,
1133 kind,
1134 tracing_context,
1135 ..
1136 } = self;
1137
1138 PbBarrier {
1139 epoch: Some(PbEpoch {
1140 curr: epoch.curr,
1141 prev: epoch.prev,
1142 }),
1143 mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1144 mutation: Some(mutation),
1145 }),
1146 tracing_context: tracing_context.to_protobuf(),
1147 kind: *kind as _,
1148 }
1149 }
1150
1151 fn from_protobuf_inner(
1152 prost: &PbBarrier,
1153 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1154 ) -> StreamExecutorResult<Self> {
1155 let epoch = prost.get_epoch()?;
1156
1157 Ok(Self {
1158 kind: prost.kind(),
1159 epoch: EpochPair::new(epoch.curr, epoch.prev),
1160 mutation: mutation_from_pb(
1161 (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1162 )?,
1163 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1164 })
1165 }
1166
1167 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1168 BarrierInner {
1169 epoch: self.epoch,
1170 mutation: f(self.mutation),
1171 kind: self.kind,
1172 tracing_context: self.tracing_context,
1173 }
1174 }
1175}
1176
1177impl DispatcherBarrier {
1178 pub fn to_protobuf(&self) -> PbBarrier {
1179 self.to_protobuf_inner(|_| None)
1180 }
1181}
1182
1183impl Barrier {
1184 #[cfg(test)]
1185 pub fn to_protobuf(&self) -> PbBarrier {
1186 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1187 }
1188
1189 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1190 Self::from_protobuf_inner(prost, |mutation| {
1191 mutation
1192 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1193 .transpose()
1194 })
1195 }
1196}
1197
1198#[derive(Debug, PartialEq, Eq, Clone)]
1199pub struct Watermark {
1200 pub col_idx: usize,
1201 pub data_type: DataType,
1202 pub val: ScalarImpl,
1203}
1204
1205impl PartialOrd for Watermark {
1206 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1207 Some(self.cmp(other))
1208 }
1209}
1210
1211impl Ord for Watermark {
1212 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1213 self.val.default_cmp(&other.val)
1214 }
1215}
1216
1217impl Watermark {
1218 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1219 Self {
1220 col_idx,
1221 data_type,
1222 val,
1223 }
1224 }
1225
1226 pub async fn transform_with_expr(
1227 self,
1228 expr: &NonStrictExpression<impl Expression>,
1229 new_col_idx: usize,
1230 ) -> Option<Self> {
1231 let Self { col_idx, val, .. } = self;
1232 let row = {
1233 let mut row = vec![None; col_idx + 1];
1234 row[col_idx] = Some(val);
1235 OwnedRow::new(row)
1236 };
1237 let val = expr.eval_row_infallible(&row).await?;
1238 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1239 }
1240
1241 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1244 output_indices
1245 .iter()
1246 .position(|p| *p == self.col_idx)
1247 .map(|new_col_idx| self.with_idx(new_col_idx))
1248 }
1249
1250 pub fn to_protobuf(&self) -> PbWatermark {
1251 PbWatermark {
1252 column: Some(PbInputRef {
1253 index: self.col_idx as _,
1254 r#type: Some(self.data_type.to_protobuf()),
1255 }),
1256 val: Some(&self.val).to_protobuf().into(),
1257 }
1258 }
1259
1260 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1261 let col_ref = prost.get_column()?;
1262 let data_type = DataType::from(col_ref.get_type()?);
1263 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1264 .expect("watermark value cannot be null");
1265 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1266 }
1267
1268 pub fn with_idx(self, idx: usize) -> Self {
1269 Self::new(idx, self.data_type, self.val)
1270 }
1271}
1272
1273#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1274#[derive(Debug, EnumAsInner, Clone)]
1275pub enum MessageInner<M> {
1276 Chunk(StreamChunk),
1277 Barrier(BarrierInner<M>),
1278 Watermark(Watermark),
1279}
1280
1281impl<M> MessageInner<M> {
1282 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1283 match self {
1284 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1285 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1286 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1287 }
1288 }
1289}
1290
1291pub type Message = MessageInner<BarrierMutationType>;
1292pub type DispatcherMessage = MessageInner<()>;
1293
1294#[derive(Debug, EnumAsInner, Clone)]
1297pub enum MessageBatchInner<M> {
1298 Chunk(StreamChunk),
1299 BarrierBatch(Vec<BarrierInner<M>>),
1300 Watermark(Watermark),
1301}
1302pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1303pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1304pub type DispatcherMessageBatch = MessageBatchInner<()>;
1305
1306impl From<DispatcherMessage> for DispatcherMessageBatch {
1307 fn from(m: DispatcherMessage) -> Self {
1308 match m {
1309 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1310 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1311 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1312 }
1313 }
1314}
1315
1316impl From<StreamChunk> for Message {
1317 fn from(chunk: StreamChunk) -> Self {
1318 Message::Chunk(chunk)
1319 }
1320}
1321
1322impl<'a> TryFrom<&'a Message> for &'a Barrier {
1323 type Error = ();
1324
1325 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1326 match m {
1327 Message::Chunk(_) => Err(()),
1328 Message::Barrier(b) => Ok(b),
1329 Message::Watermark(_) => Err(()),
1330 }
1331 }
1332}
1333
1334impl Message {
1335 #[cfg(test)]
1340 pub fn is_stop(&self) -> bool {
1341 matches!(
1342 self,
1343 Message::Barrier(Barrier {
1344 mutation,
1345 ..
1346 }) if mutation.as_ref().unwrap().is_stop()
1347 )
1348 }
1349}
1350
1351impl DispatcherMessageBatch {
1352 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1353 let prost = match self {
1354 Self::Chunk(stream_chunk) => {
1355 let prost_stream_chunk = stream_chunk.to_protobuf();
1356 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1357 }
1358 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1359 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1360 }),
1361 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1362 };
1363 PbStreamMessageBatch {
1364 stream_message_batch: Some(prost),
1365 }
1366 }
1367
1368 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1369 let res = match prost.get_stream_message_batch()? {
1370 StreamMessageBatch::StreamChunk(chunk) => {
1371 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1372 }
1373 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1374 let barriers = barrier_batch
1375 .barriers
1376 .iter()
1377 .map(|barrier| {
1378 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1379 if mutation.is_some() {
1380 if cfg!(debug_assertions) {
1381 panic!("should not receive message of barrier with mutation");
1382 } else {
1383 warn!(?barrier, "receive message of barrier with mutation");
1384 }
1385 }
1386 Ok(())
1387 })
1388 })
1389 .try_collect()?;
1390 Self::BarrierBatch(barriers)
1391 }
1392 StreamMessageBatch::Watermark(watermark) => {
1393 Self::Watermark(Watermark::from_protobuf(watermark)?)
1394 }
1395 };
1396 Ok(res)
1397 }
1398
1399 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1400 ::prost::Message::encoded_len(msg)
1401 }
1402}
1403
1404pub type StreamKey = Vec<usize>;
1405pub type StreamKeyRef<'a> = &'a [usize];
1406pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1407
1408pub async fn expect_first_barrier<M: Debug>(
1410 stream: &mut (impl MessageStreamInner<M> + Unpin),
1411) -> StreamExecutorResult<BarrierInner<M>> {
1412 let message = stream
1413 .next()
1414 .instrument_await("expect_first_barrier")
1415 .await
1416 .context("failed to extract the first message: stream closed unexpectedly")??;
1417 let barrier = message
1418 .into_barrier()
1419 .expect("the first message must be a barrier");
1420 assert!(matches!(
1422 barrier.kind,
1423 BarrierKind::Checkpoint | BarrierKind::Initial
1424 ));
1425 Ok(barrier)
1426}
1427
1428pub async fn expect_first_barrier_from_aligned_stream(
1430 stream: &mut (impl AlignedMessageStream + Unpin),
1431) -> StreamExecutorResult<Barrier> {
1432 let message = stream
1433 .next()
1434 .instrument_await("expect_first_barrier")
1435 .await
1436 .context("failed to extract the first message: stream closed unexpectedly")??;
1437 let barrier = message
1438 .into_barrier()
1439 .expect("the first message must be a barrier");
1440 Ok(barrier)
1441}
1442
1443pub trait StreamConsumer: Send + 'static {
1445 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1446
1447 fn execute(self: Box<Self>) -> Self::BarrierStream;
1448}
1449
1450type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1451
1452pub struct DynamicReceivers<InputId, M> {
1456 barrier: Option<BarrierInner<M>>,
1458 start_ts: Option<Instant>,
1460 blocked: Vec<BoxedMessageInput<InputId, M>>,
1462 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1464 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1466 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1468 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1470}
1471
1472impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1473 for DynamicReceivers<InputId, M>
1474{
1475 type Item = MessageStreamItemInner<M>;
1476
1477 fn poll_next(
1478 mut self: Pin<&mut Self>,
1479 cx: &mut std::task::Context<'_>,
1480 ) -> Poll<Option<Self::Item>> {
1481 if self.is_empty() {
1482 return Poll::Ready(None);
1483 }
1484
1485 loop {
1486 match futures::ready!(self.active.poll_next_unpin(cx)) {
1487 Some((Some(Err(e)), _)) => {
1489 return Poll::Ready(Some(Err(e)));
1490 }
1491 Some((Some(Ok(message)), remaining)) => {
1493 let input_id = remaining.id();
1494 match message {
1495 MessageInner::Chunk(chunk) => {
1496 self.active.push(remaining.into_future());
1498 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1499 }
1500 MessageInner::Watermark(watermark) => {
1501 self.active.push(remaining.into_future());
1503 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1504 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1505 }
1506 }
1507 MessageInner::Barrier(barrier) => {
1508 if self.blocked.is_empty() {
1510 self.start_ts = Some(Instant::now());
1511 }
1512 self.blocked.push(remaining);
1513 if let Some(current_barrier) = self.barrier.as_ref() {
1514 if current_barrier.epoch != barrier.epoch {
1515 return Poll::Ready(Some(Err(
1516 StreamExecutorError::align_barrier(
1517 current_barrier.clone().map_mutation(|_| None),
1518 barrier.map_mutation(|_| None),
1519 ),
1520 )));
1521 }
1522 } else {
1523 self.barrier = Some(barrier);
1524 }
1525 }
1526 }
1527 }
1528 Some((None, remaining)) => {
1536 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1537 "upstream input {:?} unexpectedly closed",
1538 remaining.id()
1539 )))));
1540 }
1541 None => {
1543 assert!(!self.blocked.is_empty());
1544
1545 let start_ts = self
1546 .start_ts
1547 .take()
1548 .expect("should have received at least one barrier");
1549 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1550 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1551 }
1552 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1553 merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1554 }
1555
1556 break;
1557 }
1558 }
1559 }
1560
1561 assert!(self.active.is_terminated());
1562
1563 let barrier = self.barrier.take().unwrap();
1564
1565 let upstreams = std::mem::take(&mut self.blocked);
1566 self.extend_active(upstreams);
1567 assert!(!self.active.is_terminated());
1568
1569 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1570 }
1571}
1572
1573impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1574 pub fn new(
1575 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1576 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1577 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1578 ) -> Self {
1579 let mut this = Self {
1580 barrier: None,
1581 start_ts: None,
1582 blocked: Vec::with_capacity(upstreams.len()),
1583 active: Default::default(),
1584 buffered_watermarks: Default::default(),
1585 merge_barrier_align_duration,
1586 barrier_align_duration,
1587 };
1588 this.extend_active(upstreams);
1589 this
1590 }
1591
1592 pub fn extend_active(
1595 &mut self,
1596 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1597 ) {
1598 assert!(self.blocked.is_empty() && self.barrier.is_none());
1599
1600 self.active
1601 .extend(upstreams.into_iter().map(|s| s.into_future()));
1602 }
1603
1604 pub fn handle_watermark(
1606 &mut self,
1607 input_id: InputId,
1608 watermark: Watermark,
1609 ) -> Option<Watermark> {
1610 let col_idx = watermark.col_idx;
1611 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1613 let watermarks = self
1614 .buffered_watermarks
1615 .entry(col_idx)
1616 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1617 watermarks.handle_watermark(input_id, watermark)
1618 }
1619
1620 pub fn add_upstreams_from(
1623 &mut self,
1624 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1625 ) {
1626 assert!(self.blocked.is_empty() && self.barrier.is_none());
1627
1628 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1629 let input_ids = new_inputs.iter().map(|input| input.id());
1630 self.buffered_watermarks.values_mut().for_each(|buffers| {
1631 buffers.add_buffers(input_ids.clone());
1633 });
1634 self.active
1635 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1636 }
1637
1638 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1642 assert!(self.blocked.is_empty() && self.barrier.is_none());
1643
1644 let new_upstreams = std::mem::take(&mut self.active)
1645 .into_iter()
1646 .map(|s| s.into_inner().unwrap())
1647 .filter(|u| !upstream_input_ids.contains(&u.id()));
1648 self.extend_active(new_upstreams);
1649 self.buffered_watermarks.values_mut().for_each(|buffers| {
1650 buffers.remove_buffer(upstream_input_ids.clone());
1653 });
1654 }
1655
1656 pub fn merge_barrier_align_duration(
1657 &self,
1658 ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1659 self.merge_barrier_align_duration.clone()
1660 }
1661
1662 pub fn flush_buffered_watermarks(&mut self) {
1663 self.buffered_watermarks
1664 .values_mut()
1665 .for_each(|buffers| buffers.clear());
1666 }
1667
1668 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1669 self.blocked
1670 .iter()
1671 .map(|s| s.id())
1672 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1673 }
1674
1675 pub fn is_empty(&self) -> bool {
1676 self.blocked.is_empty() && self.active.is_empty()
1677 }
1678}
1679
1680pub(crate) struct DispatchBarrierBuffer {
1701 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1702 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1703 recv_state: BarrierReceiverState,
1704 curr_upstream_fragment_id: FragmentId,
1705 actor_id: ActorId,
1706 build_input_ctx: Arc<BuildInputContext>,
1708}
1709
1710struct BuildInputContext {
1711 pub actor_id: ActorId,
1712 pub local_barrier_manager: LocalBarrierManager,
1713 pub metrics: Arc<StreamingMetrics>,
1714 pub fragment_id: FragmentId,
1715 pub actor_config: Arc<StreamingConfig>,
1716}
1717
1718type BoxedNewInputsFuture =
1719 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1720
1721enum BarrierReceiverState {
1722 ReceivingBarrier,
1723 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1724}
1725
1726impl DispatchBarrierBuffer {
1727 pub fn new(
1728 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1729 actor_id: ActorId,
1730 curr_upstream_fragment_id: FragmentId,
1731 local_barrier_manager: LocalBarrierManager,
1732 metrics: Arc<StreamingMetrics>,
1733 fragment_id: FragmentId,
1734 actor_config: Arc<StreamingConfig>,
1735 ) -> Self {
1736 Self {
1737 buffer: VecDeque::new(),
1738 barrier_rx,
1739 recv_state: BarrierReceiverState::ReceivingBarrier,
1740 curr_upstream_fragment_id,
1741 actor_id,
1742 build_input_ctx: Arc::new(BuildInputContext {
1743 actor_id,
1744 local_barrier_manager,
1745 metrics,
1746 fragment_id,
1747 actor_config,
1748 }),
1749 }
1750 }
1751
1752 pub async fn await_next_message(
1753 &mut self,
1754 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1755 metrics: &ActorInputMetrics,
1756 ) -> StreamExecutorResult<DispatcherMessage> {
1757 let mut start_time = Instant::now();
1758 let interval_duration = Duration::from_secs(15);
1759 let mut interval =
1760 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1761
1762 loop {
1763 tokio::select! {
1764 biased;
1765 msg = stream.try_next() => {
1766 metrics
1767 .actor_input_buffer_blocking_duration_ns
1768 .inc_by(start_time.elapsed().as_nanos() as u64);
1769 return msg?.ok_or_else(
1770 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1771 );
1772 }
1773
1774 e = self.continuously_fetch_barrier_rx() => {
1775 return Err(e);
1776 }
1777
1778 _ = interval.tick() => {
1779 start_time = Instant::now();
1780 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1781 continue;
1782 }
1783 }
1784 }
1785 }
1786
1787 pub async fn pop_barrier_with_inputs(
1788 &mut self,
1789 barrier: DispatcherBarrier,
1790 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1791 while self.buffer.is_empty() {
1792 self.try_fetch_barrier_rx(false).await?;
1793 }
1794 let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1795 assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1796
1797 Ok((recv_barrier, inputs))
1798 }
1799
1800 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1801 loop {
1802 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1803 return e;
1804 }
1805 }
1806 }
1807
1808 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1809 match &mut self.recv_state {
1810 BarrierReceiverState::ReceivingBarrier => {
1811 let Some(barrier) = self.barrier_rx.recv().await else {
1812 if pending_on_end {
1813 return pending().await;
1814 } else {
1815 return Err(StreamExecutorError::channel_closed(
1816 "barrier channel closed unexpectedly",
1817 ));
1818 }
1819 };
1820 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1821 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1822 } else {
1823 self.buffer.push_back((barrier, None));
1824 }
1825 }
1826 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1827 let new_inputs = fut.await?;
1828 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1829 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1830 }
1831 }
1832 Ok(())
1833 }
1834
1835 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1836 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1837 && !update.added_upstream_actors.is_empty()
1838 {
1839 let upstream_fragment_id =
1841 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1842 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1843 new_upstream_fragment_id
1844 } else {
1845 self.curr_upstream_fragment_id
1846 };
1847 let ctx = self.build_input_ctx.clone();
1848 let added_upstream_actors = update.added_upstream_actors.clone();
1849 let barrier = barrier.clone();
1850 let fut = async move {
1851 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1852 let mut new_input = new_input(
1853 &ctx.local_barrier_manager,
1854 ctx.metrics.clone(),
1855 ctx.actor_id,
1856 ctx.fragment_id,
1857 upstream_actor,
1858 upstream_fragment_id,
1859 ctx.actor_config.clone(),
1860 )
1861 .await?;
1862
1863 let first_barrier = expect_first_barrier(&mut new_input).await?;
1866 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1867
1868 StreamExecutorResult::Ok(new_input)
1869 }))
1870 .await
1871 }
1872 .boxed();
1873
1874 Some(fut)
1875 } else {
1876 None
1877 }
1878 }
1879}