1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54 PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
55 PbWatermark, SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63 BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::{Context, anyhow};
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::{DispatchExecutor, SyncLogStoreDispatchExecutor};
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::asof_join::{AsOfCpuEncoding, AsOfMemoryEncoding};
157pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
158pub use join::{AsOfDesc, AsOfJoinType, JoinType};
159pub use lookup::*;
160pub use lookup_union::LookupUnionExecutor;
161pub use merge::MergeExecutor;
162pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
163pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
164pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
165pub use no_op::NoOpExecutor;
166pub use now::*;
167pub use over_window::*;
168pub use rearranged_chain::RearrangedChainExecutor;
169pub use receiver::ReceiverExecutor;
170use risingwave_common::id::SourceId;
171pub use row_merge::RowMergeExecutor;
172pub use sink::SinkExecutor;
173pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
174pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
175pub use temporal_join::TemporalJoinExecutor;
176pub use top_n::{
177 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
178};
179pub use troublemaker::TroublemakerExecutor;
180pub use union::UnionExecutor;
181pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
182pub use utils::DummyExecutor;
183pub use values::ValuesExecutor;
184pub use vector::*;
185pub use watermark_filter::{UpsertWatermarkFilterExecutor, WatermarkFilterExecutor};
186pub use wrapper::WrapperExecutor;
187
188use self::barrier_align::AlignedMessageStream;
189
190pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
191pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
192pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
193pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
194
195pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
196use risingwave_connector::sink::catalog::SinkId;
197use risingwave_connector::source::cdc::{
198 CdcTableSnapshotSplitAssignmentWithGeneration,
199 build_actor_cdc_table_snapshot_splits_with_generation,
200};
201use risingwave_pb::id::{ExecutorId, SubscriberId};
202use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
203
204pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
205pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
206pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
207
208#[derive(Debug, Default, Clone)]
210pub struct ExecutorInfo {
211 pub schema: Schema,
213
214 pub stream_key: StreamKey,
216
217 pub stream_kind: PbStreamKind,
219
220 pub identity: String,
222
223 pub id: ExecutorId,
225}
226
227impl ExecutorInfo {
228 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
229 Self {
230 schema,
231 stream_key,
232 stream_kind: PbStreamKind::Retract, identity,
234 id: id.into(),
235 }
236 }
237}
238
239pub trait Execute: Send + 'static {
241 fn execute(self: Box<Self>) -> BoxedMessageStream;
242
243 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
244 self.execute()
245 }
246
247 fn boxed(self) -> Box<dyn Execute>
248 where
249 Self: Sized + Send + 'static,
250 {
251 Box::new(self)
252 }
253}
254
255pub struct Executor {
258 info: ExecutorInfo,
259 execute: Box<dyn Execute>,
260}
261
262impl Executor {
263 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
264 Self { info, execute }
265 }
266
267 pub fn info(&self) -> &ExecutorInfo {
268 &self.info
269 }
270
271 pub fn schema(&self) -> &Schema {
272 &self.info.schema
273 }
274
275 pub fn stream_key(&self) -> StreamKeyRef<'_> {
276 &self.info.stream_key
277 }
278
279 pub fn stream_kind(&self) -> PbStreamKind {
280 self.info.stream_kind
281 }
282
283 pub fn identity(&self) -> &str {
284 &self.info.identity
285 }
286
287 pub fn execute(self) -> BoxedMessageStream {
288 self.execute.execute()
289 }
290
291 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
292 self.execute.execute_with_epoch(epoch)
293 }
294}
295
296impl std::fmt::Debug for Executor {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 f.write_str(self.identity())
299 }
300}
301
302impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
303 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
304 Self::new(info, execute)
305 }
306}
307
308impl<E> From<(ExecutorInfo, E)> for Executor
309where
310 E: Execute,
311{
312 fn from((info, execute): (ExecutorInfo, E)) -> Self {
313 Self::new(info, execute.boxed())
314 }
315}
316
317pub const INVALID_EPOCH: u64 = 0;
318
319type UpstreamFragmentId = FragmentId;
320type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
321
322#[derive(Debug, Clone)]
323#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
324pub struct UpdateMutation {
325 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
326 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
327 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
328 pub dropped_actors: HashSet<ActorId>,
329 pub actor_splits: SplitAssignments,
330 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
331 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
332 pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
333 pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
334}
335
336#[derive(Debug, Clone)]
337#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
338pub struct AddMutation {
339 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
340 pub added_actors: HashSet<ActorId>,
341 pub splits: SplitAssignments,
343 pub pause: bool,
344 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
346 pub backfill_nodes_to_pause: HashSet<FragmentId>,
348 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
349 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
350}
351
352#[derive(Debug, Clone)]
353#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
354pub struct StopMutation {
355 pub dropped_actors: HashSet<ActorId>,
356 pub dropped_sink_fragments: HashSet<FragmentId>,
357}
358
359#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
361#[derive(Debug, Clone)]
362pub enum Mutation {
363 Stop(StopMutation),
364 Update(UpdateMutation),
365 Add(AddMutation),
366 SourceChangeSplit(SplitAssignments),
367 Pause,
368 Resume,
369 Throttle(HashMap<FragmentId, ThrottleConfig>),
370 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
371 DropSubscriptions {
372 subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
374 },
375 StartFragmentBackfill {
376 fragment_ids: HashSet<FragmentId>,
377 },
378 RefreshStart {
379 table_id: TableId,
380 associated_source_id: SourceId,
381 },
382 ListFinish {
383 associated_source_id: SourceId,
384 },
385 LoadFinish {
386 associated_source_id: SourceId,
387 },
388 ResetSource {
389 source_id: SourceId,
390 },
391 InjectSourceOffsets {
392 source_id: SourceId,
393 split_offsets: HashMap<String, String>,
395 },
396}
397
398#[derive(Debug, Clone)]
403pub struct BarrierInner<M> {
404 pub epoch: EpochPair,
405 pub mutation: M,
406 pub kind: BarrierKind,
407
408 pub tracing_context: TracingContext,
410}
411
412pub type BarrierMutationType = Option<Arc<Mutation>>;
413pub type Barrier = BarrierInner<BarrierMutationType>;
414pub type DispatcherBarrier = BarrierInner<()>;
415
416impl<M: Default> BarrierInner<M> {
417 pub fn new_test_barrier(epoch: u64) -> Self {
419 Self {
420 epoch: EpochPair::new_test_epoch(epoch),
421 kind: BarrierKind::Checkpoint,
422 tracing_context: TracingContext::none(),
423 mutation: Default::default(),
424 }
425 }
426
427 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
428 Self {
429 epoch: EpochPair::new(epoch, prev_epoch),
430 kind: BarrierKind::Checkpoint,
431 tracing_context: TracingContext::none(),
432 mutation: Default::default(),
433 }
434 }
435}
436
437impl Barrier {
438 pub fn into_dispatcher(self) -> DispatcherBarrier {
439 DispatcherBarrier {
440 epoch: self.epoch,
441 mutation: (),
442 kind: self.kind,
443 tracing_context: self.tracing_context,
444 }
445 }
446
447 #[must_use]
448 pub fn with_mutation(self, mutation: Mutation) -> Self {
449 Self {
450 mutation: Some(Arc::new(mutation)),
451 ..self
452 }
453 }
454
455 #[must_use]
456 pub fn with_stop(self) -> Self {
457 self.with_mutation(Mutation::Stop(StopMutation {
458 dropped_actors: Default::default(),
459 dropped_sink_fragments: Default::default(),
460 }))
461 }
462
463 pub fn is_with_stop_mutation(&self) -> bool {
465 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
466 }
467
468 pub fn is_stop(&self, actor_id: ActorId) -> bool {
470 self.all_stop_actors()
471 .is_some_and(|actors| actors.contains(&actor_id))
472 }
473
474 pub fn is_checkpoint(&self) -> bool {
475 self.kind == BarrierKind::Checkpoint
476 }
477
478 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
488 match self.mutation.as_deref()? {
489 Mutation::Update(UpdateMutation { actor_splits, .. })
490 | Mutation::Add(AddMutation {
491 splits: actor_splits,
492 ..
493 }) => actor_splits.get(&actor_id),
494
495 _ => {
496 if cfg!(debug_assertions) {
497 panic!(
498 "the initial mutation of the barrier should not be {:?}",
499 self.mutation
500 );
501 }
502 None
503 }
504 }
505 .map(|s| s.as_slice())
506 }
507
508 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
510 match self.mutation.as_deref() {
511 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
512 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
513 _ => None,
514 }
515 }
516
517 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
523 match self.mutation.as_deref() {
524 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
525 added_actors.contains(&actor_id)
526 }
527 _ => false,
528 }
529 }
530
531 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
532 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
533 fragment_ids.contains(&fragment_id)
534 } else {
535 false
536 }
537 }
538
539 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
557 let Some(mutation) = self.mutation.as_deref() else {
558 return false;
559 };
560 match mutation {
561 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
563 Mutation::Update(_)
564 | Mutation::Stop(_)
565 | Mutation::Pause
566 | Mutation::Resume
567 | Mutation::SourceChangeSplit(_)
568 | Mutation::Throttle { .. }
569 | Mutation::DropSubscriptions { .. }
570 | Mutation::ConnectorPropsChange(_)
571 | Mutation::StartFragmentBackfill { .. }
572 | Mutation::RefreshStart { .. }
573 | Mutation::ListFinish { .. }
574 | Mutation::LoadFinish { .. }
575 | Mutation::ResetSource { .. }
576 | Mutation::InjectSourceOffsets { .. } => false,
577 }
578 }
579
580 pub fn is_pause_on_startup(&self) -> bool {
582 match self.mutation.as_deref() {
583 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
584 _ => false,
585 }
586 }
587
588 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
589 match self.mutation.as_deref() {
590 Some(Mutation::Add(AddMutation {
591 backfill_nodes_to_pause,
592 ..
593 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
594 Some(Mutation::Update(_)) => false,
595 _ => {
596 tracing::warn!(
597 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
598 self
599 );
600 false
601 }
602 }
603 }
604
605 pub fn is_resume(&self) -> bool {
607 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
608 }
609
610 pub fn as_update_merge(
613 &self,
614 actor_id: ActorId,
615 upstream_fragment_id: UpstreamFragmentId,
616 ) -> Option<&MergeUpdate> {
617 self.mutation
618 .as_deref()
619 .and_then(|mutation| match mutation {
620 Mutation::Update(UpdateMutation { merges, .. }) => {
621 merges.get(&(actor_id, upstream_fragment_id))
622 }
623 _ => None,
624 })
625 }
626
627 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
630 self.mutation
631 .as_deref()
632 .and_then(|mutation| match mutation {
633 Mutation::Add(AddMutation {
634 new_upstream_sinks, ..
635 }) => new_upstream_sinks.get(&fragment_id),
636 _ => None,
637 })
638 }
639
640 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
642 self.mutation
643 .as_deref()
644 .and_then(|mutation| match mutation {
645 Mutation::Stop(StopMutation {
646 dropped_sink_fragments,
647 ..
648 }) => Some(dropped_sink_fragments),
649 _ => None,
650 })
651 }
652
653 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
659 self.mutation
660 .as_deref()
661 .and_then(|mutation| match mutation {
662 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
663 vnode_bitmaps.get(&actor_id).cloned()
664 }
665 _ => None,
666 })
667 }
668
669 pub fn assume_no_update_vnode_bitmap(&self, actor_id: ActorId) -> StreamExecutorResult<()> {
670 if self.as_update_vnode_bitmap(actor_id).is_some() {
671 return Err(anyhow!("updating vnode bitmap in place is not supported").into());
672 }
673 Ok(())
674 }
675
676 pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
677 self.mutation
678 .as_deref()
679 .and_then(|mutation| match mutation {
680 Mutation::Update(UpdateMutation {
681 sink_schema_change, ..
682 }) => sink_schema_change.get(&sink_id).cloned(),
683 _ => None,
684 })
685 }
686
687 pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
688 match self.mutation.as_deref() {
689 Some(Mutation::DropSubscriptions {
690 subscriptions_to_drop,
691 })
692 | Some(Mutation::Update(UpdateMutation {
693 subscriptions_to_drop,
694 ..
695 })) => Some(subscriptions_to_drop.as_slice()),
696 _ => None,
697 }
698 }
699
700 pub fn get_curr_epoch(&self) -> Epoch {
701 Epoch(self.epoch.curr)
702 }
703
704 pub fn tracing_context(&self) -> &TracingContext {
706 &self.tracing_context
707 }
708
709 pub fn added_subscriber_on_mv_table(
710 &self,
711 mv_table_id: TableId,
712 ) -> impl Iterator<Item = SubscriberId> + '_ {
713 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
714 Some(add)
715 } else {
716 None
717 }
718 .into_iter()
719 .flat_map(move |add| {
720 add.subscriptions_to_add.iter().filter_map(
721 move |(upstream_mv_table_id, subscriber_id)| {
722 if *upstream_mv_table_id == mv_table_id {
723 Some(*subscriber_id)
724 } else {
725 None
726 }
727 },
728 )
729 })
730 }
731}
732
733impl<M: PartialEq> PartialEq for BarrierInner<M> {
734 fn eq(&self, other: &Self) -> bool {
735 self.epoch == other.epoch && self.mutation == other.mutation
736 }
737}
738
739impl Mutation {
740 #[cfg(test)]
744 pub fn is_stop(&self) -> bool {
745 matches!(self, Mutation::Stop(_))
746 }
747
748 #[cfg(test)]
749 fn to_protobuf(&self) -> PbMutation {
750 use risingwave_pb::source::{
751 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
752 };
753 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
754 use risingwave_pb::stream_plan::{
755 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
756 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
757 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
758 PbThrottleMutation, PbUpdateMutation,
759 };
760 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
761 actor_splits
762 .iter()
763 .map(|(&actor_id, splits)| {
764 (
765 actor_id,
766 ConnectorSplits {
767 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
768 },
769 )
770 })
771 .collect::<HashMap<_, _>>()
772 };
773
774 match self {
775 Mutation::Stop(StopMutation {
776 dropped_actors,
777 dropped_sink_fragments,
778 }) => PbMutation::Stop(PbStopMutation {
779 actors: dropped_actors.iter().copied().collect(),
780 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
781 }),
782 Mutation::Update(UpdateMutation {
783 dispatchers,
784 merges,
785 vnode_bitmaps,
786 dropped_actors,
787 actor_splits,
788 actor_new_dispatchers,
789 actor_cdc_table_snapshot_splits,
790 sink_schema_change,
791 subscriptions_to_drop,
792 }) => PbMutation::Update(PbUpdateMutation {
793 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
794 merge_update: merges.values().cloned().collect(),
795 actor_vnode_bitmap_update: vnode_bitmaps
796 .iter()
797 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
798 .collect(),
799 dropped_actors: dropped_actors.iter().copied().collect(),
800 actor_splits: actor_splits_to_protobuf(actor_splits),
801 actor_new_dispatchers: actor_new_dispatchers
802 .iter()
803 .map(|(&actor_id, dispatchers)| {
804 (
805 actor_id,
806 PbDispatchers {
807 dispatchers: dispatchers.clone(),
808 },
809 )
810 })
811 .collect(),
812 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
813 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
814 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
815 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
816 generation: *generation,
817 })
818 }).collect()
819 }),
820 sink_schema_change: sink_schema_change
821 .iter()
822 .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
823 .collect(),
824 subscriptions_to_drop: subscriptions_to_drop.clone(),
825 }),
826 Mutation::Add(AddMutation {
827 adds,
828 added_actors,
829 splits,
830 pause,
831 subscriptions_to_add,
832 backfill_nodes_to_pause,
833 actor_cdc_table_snapshot_splits,
834 new_upstream_sinks,
835 }) => PbMutation::Add(PbAddMutation {
836 actor_dispatchers: adds
837 .iter()
838 .map(|(&actor_id, dispatchers)| {
839 (
840 actor_id,
841 PbDispatchers {
842 dispatchers: dispatchers.clone(),
843 },
844 )
845 })
846 .collect(),
847 added_actors: added_actors.iter().copied().collect(),
848 actor_splits: actor_splits_to_protobuf(splits),
849 pause: *pause,
850 subscriptions_to_add: subscriptions_to_add
851 .iter()
852 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
853 subscriber_id: *subscriber_id,
854 upstream_mv_table_id: *table_id,
855 })
856 .collect(),
857 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
858 actor_cdc_table_snapshot_splits:
859 Some(PbCdcTableSnapshotSplitsWithGeneration {
860 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
861 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
862 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
863 generation: *generation,
864 })
865 }).collect()
866 }),
867 new_upstream_sinks: new_upstream_sinks
868 .iter()
869 .map(|(k, v)| (*k, v.clone()))
870 .collect(),
871 }),
872 Mutation::SourceChangeSplit(changes) => {
873 PbMutation::Splits(PbSourceChangeSplitMutation {
874 actor_splits: changes
875 .iter()
876 .map(|(&actor_id, splits)| {
877 (
878 actor_id,
879 ConnectorSplits {
880 splits: splits
881 .clone()
882 .iter()
883 .map(ConnectorSplit::from)
884 .collect(),
885 },
886 )
887 })
888 .collect(),
889 })
890 }
891 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
892 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
893 Mutation::Throttle (changes) => PbMutation::Throttle(PbThrottleMutation {
894 fragment_throttle: changes.clone(),
895 }),
896 Mutation::DropSubscriptions {
897 subscriptions_to_drop,
898 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
899 info: subscriptions_to_drop.clone(),
900 }),
901 Mutation::ConnectorPropsChange(map) => {
902 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
903 connector_props_infos: map
904 .iter()
905 .map(|(actor_id, options)| {
906 (
907 *actor_id,
908 ConnectorPropsInfo {
909 connector_props_info: options
910 .iter()
911 .map(|(k, v)| (k.clone(), v.clone()))
912 .collect(),
913 },
914 )
915 })
916 .collect(),
917 })
918 }
919 Mutation::StartFragmentBackfill { fragment_ids } => {
920 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
921 fragment_ids: fragment_ids.iter().copied().collect(),
922 })
923 }
924 Mutation::RefreshStart {
925 table_id,
926 associated_source_id,
927 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
928 table_id: *table_id,
929 associated_source_id: *associated_source_id,
930 }),
931 Mutation::ListFinish {
932 associated_source_id,
933 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
934 associated_source_id: *associated_source_id,
935 }),
936 Mutation::LoadFinish {
937 associated_source_id,
938 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
939 associated_source_id: *associated_source_id,
940 }),
941 Mutation::ResetSource { source_id } => {
942 PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
943 source_id: source_id.as_raw_id(),
944 })
945 }
946 Mutation::InjectSourceOffsets {
947 source_id,
948 split_offsets,
949 } => PbMutation::InjectSourceOffsets(
950 risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
951 source_id: source_id.as_raw_id(),
952 split_offsets: split_offsets.clone(),
953 },
954 ),
955 }
956 }
957
958 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
959 let mutation = match prost {
960 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
961 dropped_actors: stop.actors.iter().copied().collect(),
962 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
963 }),
964
965 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
966 dispatchers: update
967 .dispatcher_update
968 .iter()
969 .map(|u| (u.actor_id, u.clone()))
970 .into_group_map(),
971 merges: update
972 .merge_update
973 .iter()
974 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
975 .collect(),
976 vnode_bitmaps: update
977 .actor_vnode_bitmap_update
978 .iter()
979 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
980 .collect(),
981 dropped_actors: update.dropped_actors.iter().copied().collect(),
982 actor_splits: update
983 .actor_splits
984 .iter()
985 .map(|(&actor_id, splits)| {
986 (
987 actor_id,
988 splits
989 .splits
990 .iter()
991 .map(|split| split.try_into().unwrap())
992 .collect(),
993 )
994 })
995 .collect(),
996 actor_new_dispatchers: update
997 .actor_new_dispatchers
998 .iter()
999 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1000 .collect(),
1001 actor_cdc_table_snapshot_splits:
1002 build_actor_cdc_table_snapshot_splits_with_generation(
1003 update
1004 .actor_cdc_table_snapshot_splits
1005 .clone()
1006 .unwrap_or_default(),
1007 ),
1008 sink_schema_change: update
1009 .sink_schema_change
1010 .iter()
1011 .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
1012 .collect(),
1013 subscriptions_to_drop: update.subscriptions_to_drop.clone(),
1014 }),
1015
1016 PbMutation::Add(add) => Mutation::Add(AddMutation {
1017 adds: add
1018 .actor_dispatchers
1019 .iter()
1020 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1021 .collect(),
1022 added_actors: add.added_actors.iter().copied().collect(),
1023 splits: add
1026 .actor_splits
1027 .iter()
1028 .map(|(&actor_id, splits)| {
1029 (
1030 actor_id,
1031 splits
1032 .splits
1033 .iter()
1034 .map(|split| split.try_into().unwrap())
1035 .collect(),
1036 )
1037 })
1038 .collect(),
1039 pause: add.pause,
1040 subscriptions_to_add: add
1041 .subscriptions_to_add
1042 .iter()
1043 .map(
1044 |SubscriptionUpstreamInfo {
1045 subscriber_id,
1046 upstream_mv_table_id,
1047 }| { (*upstream_mv_table_id, *subscriber_id) },
1048 )
1049 .collect(),
1050 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1051 actor_cdc_table_snapshot_splits:
1052 build_actor_cdc_table_snapshot_splits_with_generation(
1053 add.actor_cdc_table_snapshot_splits
1054 .clone()
1055 .unwrap_or_default(),
1056 ),
1057 new_upstream_sinks: add
1058 .new_upstream_sinks
1059 .iter()
1060 .map(|(k, v)| (*k, v.clone()))
1061 .collect(),
1062 }),
1063
1064 PbMutation::Splits(s) => {
1065 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1066 Vec::with_capacity(s.actor_splits.len());
1067 for (&actor_id, splits) in &s.actor_splits {
1068 if !splits.splits.is_empty() {
1069 change_splits.push((
1070 actor_id,
1071 splits
1072 .splits
1073 .iter()
1074 .map(SplitImpl::try_from)
1075 .try_collect()?,
1076 ));
1077 }
1078 }
1079 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1080 }
1081 PbMutation::Pause(_) => Mutation::Pause,
1082 PbMutation::Resume(_) => Mutation::Resume,
1083 PbMutation::Throttle(changes) => Mutation::Throttle(changes.fragment_throttle.clone()),
1084 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1085 subscriptions_to_drop: drop.info.clone(),
1086 },
1087 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1088 Mutation::ConnectorPropsChange(
1089 alter_connector_props
1090 .connector_props_infos
1091 .iter()
1092 .map(|(connector_id, options)| {
1093 (
1094 *connector_id,
1095 options
1096 .connector_props_info
1097 .iter()
1098 .map(|(k, v)| (k.clone(), v.clone()))
1099 .collect(),
1100 )
1101 })
1102 .collect(),
1103 )
1104 }
1105 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1106 Mutation::StartFragmentBackfill {
1107 fragment_ids: start_fragment_backfill
1108 .fragment_ids
1109 .iter()
1110 .copied()
1111 .collect(),
1112 }
1113 }
1114 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1115 table_id: refresh_start.table_id,
1116 associated_source_id: refresh_start.associated_source_id,
1117 },
1118 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1119 associated_source_id: list_finish.associated_source_id,
1120 },
1121 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1122 associated_source_id: load_finish.associated_source_id,
1123 },
1124 PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1125 source_id: SourceId::from(reset_source.source_id),
1126 },
1127 PbMutation::InjectSourceOffsets(inject) => Mutation::InjectSourceOffsets {
1128 source_id: SourceId::from(inject.source_id),
1129 split_offsets: inject.split_offsets.clone(),
1130 },
1131 };
1132 Ok(mutation)
1133 }
1134}
1135
1136impl<M> BarrierInner<M> {
1137 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1138 let Self {
1139 epoch,
1140 mutation,
1141 kind,
1142 tracing_context,
1143 ..
1144 } = self;
1145
1146 PbBarrier {
1147 epoch: Some(PbEpoch {
1148 curr: epoch.curr,
1149 prev: epoch.prev,
1150 }),
1151 mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1152 mutation: Some(mutation),
1153 }),
1154 tracing_context: tracing_context.to_protobuf(),
1155 kind: *kind as _,
1156 }
1157 }
1158
1159 fn from_protobuf_inner(
1160 prost: &PbBarrier,
1161 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1162 ) -> StreamExecutorResult<Self> {
1163 let epoch = prost.get_epoch()?;
1164
1165 Ok(Self {
1166 kind: prost.kind(),
1167 epoch: EpochPair::new(epoch.curr, epoch.prev),
1168 mutation: mutation_from_pb(
1169 (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1170 )?,
1171 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1172 })
1173 }
1174
1175 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1176 BarrierInner {
1177 epoch: self.epoch,
1178 mutation: f(self.mutation),
1179 kind: self.kind,
1180 tracing_context: self.tracing_context,
1181 }
1182 }
1183}
1184
1185impl DispatcherBarrier {
1186 pub fn to_protobuf(&self) -> PbBarrier {
1187 self.to_protobuf_inner(|_| None)
1188 }
1189}
1190
1191impl Barrier {
1192 #[cfg(test)]
1193 pub fn to_protobuf(&self) -> PbBarrier {
1194 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1195 }
1196
1197 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1198 Self::from_protobuf_inner(prost, |mutation| {
1199 mutation
1200 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1201 .transpose()
1202 })
1203 }
1204}
1205
1206#[derive(Debug, PartialEq, Eq, Clone)]
1207pub struct Watermark {
1208 pub col_idx: usize,
1209 pub data_type: DataType,
1210 pub val: ScalarImpl,
1211}
1212
1213impl PartialOrd for Watermark {
1214 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1215 Some(self.cmp(other))
1216 }
1217}
1218
1219impl Ord for Watermark {
1220 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1221 self.val.default_cmp(&other.val)
1222 }
1223}
1224
1225impl Watermark {
1226 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1227 Self {
1228 col_idx,
1229 data_type,
1230 val,
1231 }
1232 }
1233
1234 pub async fn transform_with_expr(
1235 self,
1236 expr: &NonStrictExpression<impl Expression>,
1237 new_col_idx: usize,
1238 ) -> Option<Self> {
1239 let Self { col_idx, val, .. } = self;
1240 let row = {
1241 let mut row = vec![None; col_idx + 1];
1242 row[col_idx] = Some(val);
1243 OwnedRow::new(row)
1244 };
1245 let val = expr.eval_row_infallible(&row).await?;
1246 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1247 }
1248
1249 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1252 output_indices
1253 .iter()
1254 .position(|p| *p == self.col_idx)
1255 .map(|new_col_idx| self.with_idx(new_col_idx))
1256 }
1257
1258 pub fn to_protobuf(&self) -> PbWatermark {
1259 PbWatermark {
1260 column: Some(PbInputRef {
1261 index: self.col_idx as _,
1262 r#type: Some(self.data_type.to_protobuf()),
1263 }),
1264 val: Some(&self.val).to_protobuf().into(),
1265 }
1266 }
1267
1268 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1269 let col_ref = prost.get_column()?;
1270 let data_type = DataType::from(col_ref.get_type()?);
1271 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1272 .expect("watermark value cannot be null");
1273 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1274 }
1275
1276 pub fn with_idx(self, idx: usize) -> Self {
1277 Self::new(idx, self.data_type, self.val)
1278 }
1279}
1280
1281#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1282#[derive(Debug, EnumAsInner, Clone)]
1283pub enum MessageInner<M> {
1284 Chunk(StreamChunk),
1285 Barrier(BarrierInner<M>),
1286 Watermark(Watermark),
1287}
1288
1289impl<M> MessageInner<M> {
1290 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1291 match self {
1292 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1293 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1294 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1295 }
1296 }
1297}
1298
1299pub type Message = MessageInner<BarrierMutationType>;
1300pub type DispatcherMessage = MessageInner<()>;
1301
1302#[derive(Debug, EnumAsInner, Clone)]
1305pub enum MessageBatchInner<M> {
1306 Chunk(StreamChunk),
1307 BarrierBatch(Vec<BarrierInner<M>>),
1308 Watermark(Watermark),
1309}
1310pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1311pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1312pub type DispatcherMessageBatch = MessageBatchInner<()>;
1313
1314impl<M> From<MessageInner<M>> for MessageBatchInner<M> {
1315 fn from(m: MessageInner<M>) -> Self {
1316 match m {
1317 MessageInner::Chunk(c) => Self::Chunk(c),
1318 MessageInner::Barrier(b) => Self::BarrierBatch(vec![b]),
1319 MessageInner::Watermark(w) => Self::Watermark(w),
1320 }
1321 }
1322}
1323
1324impl From<StreamChunk> for Message {
1325 fn from(chunk: StreamChunk) -> Self {
1326 Message::Chunk(chunk)
1327 }
1328}
1329
1330impl<'a> TryFrom<&'a Message> for &'a Barrier {
1331 type Error = ();
1332
1333 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1334 match m {
1335 Message::Chunk(_) => Err(()),
1336 Message::Barrier(b) => Ok(b),
1337 Message::Watermark(_) => Err(()),
1338 }
1339 }
1340}
1341
1342impl Message {
1343 #[cfg(test)]
1348 pub fn is_stop(&self) -> bool {
1349 matches!(
1350 self,
1351 Message::Barrier(Barrier {
1352 mutation,
1353 ..
1354 }) if mutation.as_ref().unwrap().is_stop()
1355 )
1356 }
1357}
1358
1359impl DispatcherMessageBatch {
1360 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1361 let prost = match self {
1362 Self::Chunk(stream_chunk) => {
1363 let prost_stream_chunk = stream_chunk.to_protobuf();
1364 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1365 }
1366 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1367 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1368 }),
1369 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1370 };
1371 PbStreamMessageBatch {
1372 stream_message_batch: Some(prost),
1373 }
1374 }
1375
1376 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1377 let res = match prost.get_stream_message_batch()? {
1378 StreamMessageBatch::StreamChunk(chunk) => {
1379 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1380 }
1381 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1382 let barriers = barrier_batch
1383 .barriers
1384 .iter()
1385 .map(|barrier| {
1386 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1387 if mutation.is_some() {
1388 if cfg!(debug_assertions) {
1389 panic!("should not receive message of barrier with mutation");
1390 } else {
1391 warn!(?barrier, "receive message of barrier with mutation");
1392 }
1393 }
1394 Ok(())
1395 })
1396 })
1397 .try_collect()?;
1398 Self::BarrierBatch(barriers)
1399 }
1400 StreamMessageBatch::Watermark(watermark) => {
1401 Self::Watermark(Watermark::from_protobuf(watermark)?)
1402 }
1403 };
1404 Ok(res)
1405 }
1406
1407 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1408 ::prost::Message::encoded_len(msg)
1409 }
1410}
1411
1412pub type StreamKey = Vec<usize>;
1413pub type StreamKeyRef<'a> = &'a [usize];
1414pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1415
1416pub async fn expect_first_barrier<M: Debug>(
1418 stream: &mut (impl MessageStreamInner<M> + Unpin),
1419) -> StreamExecutorResult<BarrierInner<M>> {
1420 let message = stream
1421 .next()
1422 .instrument_await("expect_first_barrier")
1423 .await
1424 .context("failed to extract the first message: stream closed unexpectedly")??;
1425 let barrier = message
1426 .into_barrier()
1427 .expect("the first message must be a barrier");
1428 assert!(matches!(
1430 barrier.kind,
1431 BarrierKind::Checkpoint | BarrierKind::Initial
1432 ));
1433 Ok(barrier)
1434}
1435
1436pub async fn expect_first_barrier_from_aligned_stream(
1438 stream: &mut (impl AlignedMessageStream + Unpin),
1439) -> StreamExecutorResult<Barrier> {
1440 let message = stream
1441 .next()
1442 .instrument_await("expect_first_barrier")
1443 .await
1444 .context("failed to extract the first message: stream closed unexpectedly")??;
1445 let barrier = message
1446 .into_barrier()
1447 .expect("the first message must be a barrier");
1448 Ok(barrier)
1449}
1450
1451pub trait StreamConsumer: Send + 'static {
1453 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1454
1455 fn execute(self: Box<Self>) -> Self::BarrierStream;
1456}
1457
1458type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1459
1460pub struct DynamicReceivers<InputId, M> {
1464 barrier: Option<BarrierInner<M>>,
1466 start_ts: Option<Instant>,
1468 blocked: Vec<BoxedMessageInput<InputId, M>>,
1470 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1472 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1474 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1476 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1478}
1479
1480impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1481 for DynamicReceivers<InputId, M>
1482{
1483 type Item = MessageStreamItemInner<M>;
1484
1485 fn poll_next(
1486 mut self: Pin<&mut Self>,
1487 cx: &mut std::task::Context<'_>,
1488 ) -> Poll<Option<Self::Item>> {
1489 if self.is_empty() {
1490 return Poll::Ready(None);
1491 }
1492
1493 loop {
1494 match futures::ready!(self.active.poll_next_unpin(cx)) {
1495 Some((Some(Err(e)), _)) => {
1497 return Poll::Ready(Some(Err(e)));
1498 }
1499 Some((Some(Ok(message)), remaining)) => {
1501 let input_id = remaining.id();
1502 match message {
1503 MessageInner::Chunk(chunk) => {
1504 self.active.push(remaining.into_future());
1506 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1507 }
1508 MessageInner::Watermark(watermark) => {
1509 self.active.push(remaining.into_future());
1511 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1512 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1513 }
1514 }
1515 MessageInner::Barrier(barrier) => {
1516 if self.blocked.is_empty() {
1518 self.start_ts = Some(Instant::now());
1519 }
1520 self.blocked.push(remaining);
1521 if let Some(current_barrier) = self.barrier.as_ref() {
1522 if current_barrier.epoch != barrier.epoch {
1523 return Poll::Ready(Some(Err(
1524 StreamExecutorError::align_barrier(
1525 current_barrier.clone().map_mutation(|_| None),
1526 barrier.map_mutation(|_| None),
1527 ),
1528 )));
1529 }
1530 } else {
1531 self.barrier = Some(barrier);
1532 }
1533 }
1534 }
1535 }
1536 Some((None, remaining)) => {
1544 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1545 "upstream input {:?} unexpectedly closed",
1546 remaining.id()
1547 )))));
1548 }
1549 None => {
1551 assert!(!self.blocked.is_empty());
1552
1553 let start_ts = self
1554 .start_ts
1555 .take()
1556 .expect("should have received at least one barrier");
1557 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1558 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1559 }
1560 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1561 merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1562 }
1563
1564 break;
1565 }
1566 }
1567 }
1568
1569 assert!(self.active.is_terminated());
1570
1571 let barrier = self.barrier.take().unwrap();
1572
1573 let upstreams = std::mem::take(&mut self.blocked);
1574 self.extend_active(upstreams);
1575 assert!(!self.active.is_terminated());
1576
1577 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1578 }
1579}
1580
1581impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1582 pub fn new(
1583 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1584 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1585 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1586 ) -> Self {
1587 let mut this = Self {
1588 barrier: None,
1589 start_ts: None,
1590 blocked: Vec::with_capacity(upstreams.len()),
1591 active: Default::default(),
1592 buffered_watermarks: Default::default(),
1593 merge_barrier_align_duration,
1594 barrier_align_duration,
1595 };
1596 this.extend_active(upstreams);
1597 this
1598 }
1599
1600 pub fn extend_active(
1603 &mut self,
1604 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1605 ) {
1606 assert!(self.blocked.is_empty() && self.barrier.is_none());
1607
1608 self.active
1609 .extend(upstreams.into_iter().map(|s| s.into_future()));
1610 }
1611
1612 pub fn handle_watermark(
1614 &mut self,
1615 input_id: InputId,
1616 watermark: Watermark,
1617 ) -> Option<Watermark> {
1618 let col_idx = watermark.col_idx;
1619 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1621 let watermarks = self
1622 .buffered_watermarks
1623 .entry(col_idx)
1624 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1625 watermarks.handle_watermark(input_id, watermark)
1626 }
1627
1628 pub fn add_upstreams_from(
1631 &mut self,
1632 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1633 ) {
1634 assert!(self.blocked.is_empty() && self.barrier.is_none());
1635
1636 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1637 let input_ids = new_inputs.iter().map(|input| input.id());
1638 self.buffered_watermarks.values_mut().for_each(|buffers| {
1639 buffers.add_buffers(input_ids.clone());
1641 });
1642 self.active
1643 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1644 }
1645
1646 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1650 assert!(self.blocked.is_empty() && self.barrier.is_none());
1651
1652 let new_upstreams = std::mem::take(&mut self.active)
1653 .into_iter()
1654 .map(|s| s.into_inner().unwrap())
1655 .filter(|u| !upstream_input_ids.contains(&u.id()));
1656 self.extend_active(new_upstreams);
1657 self.buffered_watermarks.values_mut().for_each(|buffers| {
1658 buffers.remove_buffer(upstream_input_ids.clone());
1661 });
1662 }
1663
1664 pub fn merge_barrier_align_duration(
1665 &self,
1666 ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1667 self.merge_barrier_align_duration.clone()
1668 }
1669
1670 pub fn flush_buffered_watermarks(&mut self) {
1671 self.buffered_watermarks
1672 .values_mut()
1673 .for_each(|buffers| buffers.clear());
1674 }
1675
1676 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1677 self.blocked
1678 .iter()
1679 .map(|s| s.id())
1680 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1681 }
1682
1683 pub fn is_empty(&self) -> bool {
1684 self.blocked.is_empty() && self.active.is_empty()
1685 }
1686}
1687
1688pub(crate) struct DispatchBarrierBuffer {
1709 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1710 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1711 recv_state: BarrierReceiverState,
1712 curr_upstream_fragment_id: FragmentId,
1713 actor_id: ActorId,
1714 build_input_ctx: Arc<BuildInputContext>,
1716}
1717
1718struct BuildInputContext {
1719 pub actor_id: ActorId,
1720 pub local_barrier_manager: LocalBarrierManager,
1721 pub metrics: Arc<StreamingMetrics>,
1722 pub fragment_id: FragmentId,
1723 pub actor_config: Arc<StreamingConfig>,
1724}
1725
1726type BoxedNewInputsFuture =
1727 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1728
1729enum BarrierReceiverState {
1730 ReceivingBarrier,
1731 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1732}
1733
1734impl DispatchBarrierBuffer {
1735 pub fn new(
1736 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1737 actor_id: ActorId,
1738 curr_upstream_fragment_id: FragmentId,
1739 local_barrier_manager: LocalBarrierManager,
1740 metrics: Arc<StreamingMetrics>,
1741 fragment_id: FragmentId,
1742 actor_config: Arc<StreamingConfig>,
1743 ) -> Self {
1744 Self {
1745 buffer: VecDeque::new(),
1746 barrier_rx,
1747 recv_state: BarrierReceiverState::ReceivingBarrier,
1748 curr_upstream_fragment_id,
1749 actor_id,
1750 build_input_ctx: Arc::new(BuildInputContext {
1751 actor_id,
1752 local_barrier_manager,
1753 metrics,
1754 fragment_id,
1755 actor_config,
1756 }),
1757 }
1758 }
1759
1760 pub async fn await_next_message(
1761 &mut self,
1762 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1763 metrics: &ActorInputMetrics,
1764 ) -> StreamExecutorResult<DispatcherMessage> {
1765 let mut start_time = Instant::now();
1766 let interval_duration = Duration::from_secs(15);
1767 let mut interval =
1768 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1769
1770 loop {
1771 tokio::select! {
1772 biased;
1773 msg = stream.try_next() => {
1774 metrics
1775 .actor_input_buffer_blocking_duration_ns
1776 .inc_by(start_time.elapsed().as_nanos() as u64);
1777 return msg?.ok_or_else(
1778 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1779 );
1780 }
1781
1782 e = self.continuously_fetch_barrier_rx() => {
1783 return Err(e);
1784 }
1785
1786 _ = interval.tick() => {
1787 start_time = Instant::now();
1788 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1789 continue;
1790 }
1791 }
1792 }
1793 }
1794
1795 pub async fn pop_barrier_with_inputs(
1796 &mut self,
1797 barrier: DispatcherBarrier,
1798 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1799 while self.buffer.is_empty() {
1800 self.try_fetch_barrier_rx(false).await?;
1801 }
1802 let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1803 assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1804
1805 Ok((recv_barrier, inputs))
1806 }
1807
1808 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1809 loop {
1810 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1811 return e;
1812 }
1813 }
1814 }
1815
1816 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1817 match &mut self.recv_state {
1818 BarrierReceiverState::ReceivingBarrier => {
1819 let Some(barrier) = self.barrier_rx.recv().await else {
1820 if pending_on_end {
1821 return pending().await;
1822 } else {
1823 return Err(StreamExecutorError::channel_closed(
1824 "barrier channel closed unexpectedly",
1825 ));
1826 }
1827 };
1828 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1829 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1830 } else {
1831 self.buffer.push_back((barrier, None));
1832 }
1833 }
1834 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1835 let new_inputs = fut.await?;
1836 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1837 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1838 }
1839 }
1840 Ok(())
1841 }
1842
1843 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1844 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1845 && !update.added_upstream_actors.is_empty()
1846 {
1847 let upstream_fragment_id =
1849 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1850 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1851 new_upstream_fragment_id
1852 } else {
1853 self.curr_upstream_fragment_id
1854 };
1855 let ctx = self.build_input_ctx.clone();
1856 let added_upstream_actors = update.added_upstream_actors.clone();
1857 let barrier = barrier.clone();
1858 let fut = async move {
1859 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1860 let mut new_input = new_input(
1861 &ctx.local_barrier_manager,
1862 ctx.metrics.clone(),
1863 ctx.actor_id,
1864 ctx.fragment_id,
1865 upstream_actor,
1866 upstream_fragment_id,
1867 ctx.actor_config.clone(),
1868 )
1869 .await?;
1870
1871 let first_barrier = expect_first_barrier(&mut new_input).await?;
1874 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1875
1876 StreamExecutorResult::Ok(new_input)
1877 }))
1878 .await
1879 }
1880 .boxed();
1881
1882 Some(fut)
1883 } else {
1884 None
1885 }
1886 }
1887}