1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
51use risingwave_pb::stream_plan::stream_node::PbStreamKind;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54 BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
55 DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
56 PbDispatcher, PbSinkAddColumns, PbStopMutation, PbStreamMessageBatch, PbUpdateMutation,
57 PbWatermark, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
58 SubscriptionUpstreamInfo, ThrottleMutation,
59};
60use smallvec::SmallVec;
61use tokio::sync::mpsc;
62use tokio::time::Instant;
63
64use crate::error::StreamResult;
65use crate::executor::exchange::input::{
66 BoxedActorInput, BoxedInput, apply_dispatcher_barrier, assert_equal_dispatcher_barrier,
67 new_input,
68};
69use crate::executor::prelude::StreamingMetrics;
70use crate::executor::watermark::BufferedWatermarks;
71use crate::task::{ActorId, FragmentId, LocalBarrierManager};
72
73mod actor;
74mod barrier_align;
75pub mod exchange;
76pub mod monitor;
77
78pub mod aggregate;
79pub mod asof_join;
80mod backfill;
81mod barrier_recv;
82mod batch_query;
83mod chain;
84mod changelog;
85mod dedup;
86mod dispatch;
87pub mod dml;
88mod dynamic_filter;
89pub mod eowc;
90pub mod error;
91mod expand;
92mod filter;
93pub mod hash_join;
94mod hop_window;
95mod join;
96mod lookup;
97mod lookup_union;
98mod merge;
99mod mview;
100mod nested_loop_temporal_join;
101mod no_op;
102mod now;
103mod over_window;
104pub mod project;
105mod rearranged_chain;
106mod receiver;
107pub mod row_id_gen;
108mod sink;
109pub mod source;
110mod stream_reader;
111pub mod subtask;
112mod temporal_join;
113mod top_n;
114mod troublemaker;
115mod union;
116mod upstream_sink_union;
117mod values;
118mod watermark;
119mod watermark_filter;
120mod wrapper;
121
122mod approx_percentile;
123
124mod row_merge;
125
126#[cfg(test)]
127mod integration_tests;
128mod sync_kv_log_store;
129#[cfg(any(test, feature = "test"))]
130pub mod test_utils;
131mod utils;
132mod vector_index;
133
134pub use actor::{Actor, ActorContext, ActorContextRef};
135use anyhow::Context;
136pub use approx_percentile::global::GlobalApproxPercentileExecutor;
137pub use approx_percentile::local::LocalApproxPercentileExecutor;
138pub use backfill::arrangement_backfill::*;
139pub use backfill::cdc::{
140 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
141};
142pub use backfill::no_shuffle_backfill::*;
143pub use backfill::snapshot_backfill::*;
144pub use barrier_recv::BarrierRecvExecutor;
145pub use batch_query::BatchQueryExecutor;
146pub use chain::ChainExecutor;
147pub use changelog::ChangeLogExecutor;
148pub use dedup::AppendOnlyDedupExecutor;
149pub use dispatch::{DispatchExecutor, DispatcherImpl};
150pub use dynamic_filter::DynamicFilterExecutor;
151pub use error::{StreamExecutorError, StreamExecutorResult};
152pub use expand::ExpandExecutor;
153pub use filter::{FilterExecutor, UpsertFilterExecutor};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::*;
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector_index::VectorIndexWriteExecutor;
184pub use watermark_filter::WatermarkFilterExecutor;
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197 CdcTableSnapshotSplitAssignmentWithGeneration,
198 build_actor_cdc_table_snapshot_splits_with_generation,
199 build_pb_actor_cdc_table_snapshot_splits_with_generation,
200};
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
203
204pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
205pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
206pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
207
208#[derive(Debug, Default, Clone)]
210pub struct ExecutorInfo {
211 pub schema: Schema,
213
214 pub pk_indices: PkIndices,
218
219 pub stream_kind: PbStreamKind,
221
222 pub identity: String,
224
225 pub id: u64,
227}
228
229impl ExecutorInfo {
230 pub fn for_test(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
231 Self {
232 schema,
233 pk_indices,
234 stream_kind: PbStreamKind::Retract, identity,
236 id,
237 }
238 }
239}
240
241pub trait Execute: Send + 'static {
243 fn execute(self: Box<Self>) -> BoxedMessageStream;
244
245 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
246 self.execute()
247 }
248
249 fn boxed(self) -> Box<dyn Execute>
250 where
251 Self: Sized + Send + 'static,
252 {
253 Box::new(self)
254 }
255}
256
257pub struct Executor {
260 info: ExecutorInfo,
261 execute: Box<dyn Execute>,
262}
263
264impl Executor {
265 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
266 Self { info, execute }
267 }
268
269 pub fn info(&self) -> &ExecutorInfo {
270 &self.info
271 }
272
273 pub fn schema(&self) -> &Schema {
274 &self.info.schema
275 }
276
277 pub fn pk_indices(&self) -> PkIndicesRef<'_> {
278 &self.info.pk_indices
279 }
280
281 pub fn stream_kind(&self) -> PbStreamKind {
282 self.info.stream_kind
283 }
284
285 pub fn identity(&self) -> &str {
286 &self.info.identity
287 }
288
289 pub fn execute(self) -> BoxedMessageStream {
290 self.execute.execute()
291 }
292
293 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
294 self.execute.execute_with_epoch(epoch)
295 }
296}
297
298impl std::fmt::Debug for Executor {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 f.write_str(self.identity())
301 }
302}
303
304impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
305 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
306 Self::new(info, execute)
307 }
308}
309
310impl<E> From<(ExecutorInfo, E)> for Executor
311where
312 E: Execute,
313{
314 fn from((info, execute): (ExecutorInfo, E)) -> Self {
315 Self::new(info, execute.boxed())
316 }
317}
318
319pub const INVALID_EPOCH: u64 = 0;
320
321type UpstreamFragmentId = FragmentId;
322type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
323
324#[derive(Debug, Clone, PartialEq)]
325#[cfg_attr(any(test, feature = "test"), derive(Default))]
326pub struct UpdateMutation {
327 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
328 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
329 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
330 pub dropped_actors: HashSet<ActorId>,
331 pub actor_splits: SplitAssignments,
332 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
333 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
334 pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
335}
336
337#[derive(Debug, Clone, PartialEq)]
338#[cfg_attr(any(test, feature = "test"), derive(Default))]
339pub struct AddMutation {
340 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
341 pub added_actors: HashSet<ActorId>,
342 pub splits: SplitAssignments,
344 pub pause: bool,
345 pub subscriptions_to_add: Vec<(TableId, u32)>,
347 pub backfill_nodes_to_pause: HashSet<FragmentId>,
349 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
350 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
351}
352
353#[derive(Debug, Clone, PartialEq)]
354#[cfg_attr(any(test, feature = "test"), derive(Default))]
355pub struct StopMutation {
356 pub dropped_actors: HashSet<ActorId>,
357 pub dropped_sink_fragments: HashSet<FragmentId>,
358}
359
360#[derive(Debug, Clone, PartialEq)]
362pub enum Mutation {
363 Stop(StopMutation),
364 Update(UpdateMutation),
365 Add(AddMutation),
366 SourceChangeSplit(SplitAssignments),
367 Pause,
368 Resume,
369 Throttle(HashMap<ActorId, Option<u32>>),
370 AddAndUpdate(AddMutation, UpdateMutation),
371 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
372 DropSubscriptions {
373 subscriptions_to_drop: Vec<(u32, TableId)>,
375 },
376 StartFragmentBackfill {
377 fragment_ids: HashSet<FragmentId>,
378 },
379 RefreshStart {
380 table_id: TableId,
381 associated_source_id: TableId,
382 },
383 LoadFinish {
385 associated_source_id: TableId,
386 },
387}
388
389#[derive(Debug, Clone)]
394pub struct BarrierInner<M> {
395 pub epoch: EpochPair,
396 pub mutation: M,
397 pub kind: BarrierKind,
398
399 pub tracing_context: TracingContext,
401
402 pub passed_actors: Vec<ActorId>,
404}
405
406pub type BarrierMutationType = Option<Arc<Mutation>>;
407pub type Barrier = BarrierInner<BarrierMutationType>;
408pub type DispatcherBarrier = BarrierInner<()>;
409
410impl<M: Default> BarrierInner<M> {
411 pub fn new_test_barrier(epoch: u64) -> Self {
413 Self {
414 epoch: EpochPair::new_test_epoch(epoch),
415 kind: BarrierKind::Checkpoint,
416 tracing_context: TracingContext::none(),
417 mutation: Default::default(),
418 passed_actors: Default::default(),
419 }
420 }
421
422 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
423 Self {
424 epoch: EpochPair::new(epoch, prev_epoch),
425 kind: BarrierKind::Checkpoint,
426 tracing_context: TracingContext::none(),
427 mutation: Default::default(),
428 passed_actors: Default::default(),
429 }
430 }
431}
432
433impl Barrier {
434 pub fn into_dispatcher(self) -> DispatcherBarrier {
435 DispatcherBarrier {
436 epoch: self.epoch,
437 mutation: (),
438 kind: self.kind,
439 tracing_context: self.tracing_context,
440 passed_actors: self.passed_actors,
441 }
442 }
443
444 #[must_use]
445 pub fn with_mutation(self, mutation: Mutation) -> Self {
446 Self {
447 mutation: Some(Arc::new(mutation)),
448 ..self
449 }
450 }
451
452 #[must_use]
453 pub fn with_stop(self) -> Self {
454 self.with_mutation(Mutation::Stop(StopMutation {
455 dropped_actors: Default::default(),
456 dropped_sink_fragments: Default::default(),
457 }))
458 }
459
460 pub fn is_with_stop_mutation(&self) -> bool {
462 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
463 }
464
465 pub fn is_stop(&self, actor_id: ActorId) -> bool {
467 self.all_stop_actors()
468 .is_some_and(|actors| actors.contains(&actor_id))
469 }
470
471 pub fn is_checkpoint(&self) -> bool {
472 self.kind == BarrierKind::Checkpoint
473 }
474
475 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
486 match self.mutation.as_deref()? {
487 Mutation::Update(UpdateMutation { actor_splits, .. })
488 | Mutation::Add(AddMutation {
489 splits: actor_splits,
490 ..
491 }) => actor_splits.get(&actor_id),
492
493 Mutation::AddAndUpdate(
494 AddMutation {
495 splits: add_actor_splits,
496 ..
497 },
498 UpdateMutation {
499 actor_splits: update_actor_splits,
500 ..
501 },
502 ) => add_actor_splits
503 .get(&actor_id)
504 .or_else(|| update_actor_splits.get(&actor_id)),
506
507 _ => {
508 if cfg!(debug_assertions) {
509 panic!(
510 "the initial mutation of the barrier should not be {:?}",
511 self.mutation
512 );
513 }
514 None
515 }
516 }
517 .map(|s| s.as_slice())
518 }
519
520 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
522 match self.mutation.as_deref() {
523 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
524 Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
525 | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
526 Some(dropped_actors)
527 }
528 _ => None,
529 }
530 }
531
532 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
538 match self.mutation.as_deref() {
539 Some(Mutation::Add(AddMutation { added_actors, .. }))
540 | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
541 added_actors.contains(&actor_id)
542 }
543 _ => false,
544 }
545 }
546
547 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
548 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
549 fragment_ids.contains(&fragment_id)
550 } else {
551 false
552 }
553 }
554
555 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
573 let Some(mutation) = self.mutation.as_deref() else {
574 return false;
575 };
576 match mutation {
577 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
579 Mutation::AddAndUpdate(
581 AddMutation { adds, .. },
582 UpdateMutation {
583 dispatchers,
584 actor_new_dispatchers,
585 ..
586 },
587 ) => {
588 adds.get(&upstream_actor_id).is_some()
589 || actor_new_dispatchers.get(&upstream_actor_id).is_some()
590 || dispatchers.get(&upstream_actor_id).is_some()
591 }
592 Mutation::Update(_)
593 | Mutation::Stop(_)
594 | Mutation::Pause
595 | Mutation::Resume
596 | Mutation::SourceChangeSplit(_)
597 | Mutation::Throttle(_)
598 | Mutation::DropSubscriptions { .. }
599 | Mutation::ConnectorPropsChange(_)
600 | Mutation::StartFragmentBackfill { .. }
601 | Mutation::RefreshStart { .. }
602 | Mutation::LoadFinish { .. } => false,
603 }
604 }
605
606 pub fn is_pause_on_startup(&self) -> bool {
608 match self.mutation.as_deref() {
609 Some(Mutation::Add(AddMutation { pause, .. }))
610 | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
611 _ => false,
612 }
613 }
614
615 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
616 match self.mutation.as_deref() {
617 Some(Mutation::Add(AddMutation {
618 backfill_nodes_to_pause,
619 ..
620 }))
621 | Some(Mutation::AddAndUpdate(
622 AddMutation {
623 backfill_nodes_to_pause,
624 ..
625 },
626 _,
627 )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
628 _ => {
629 tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
630 true
631 }
632 }
633 }
634
635 pub fn is_resume(&self) -> bool {
637 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
638 }
639
640 pub fn as_update_merge(
643 &self,
644 actor_id: ActorId,
645 upstream_fragment_id: UpstreamFragmentId,
646 ) -> Option<&MergeUpdate> {
647 self.mutation
648 .as_deref()
649 .and_then(|mutation| match mutation {
650 Mutation::Update(UpdateMutation { merges, .. })
651 | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
652 merges.get(&(actor_id, upstream_fragment_id))
653 }
654 _ => None,
655 })
656 }
657
658 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
661 self.mutation
662 .as_deref()
663 .and_then(|mutation| match mutation {
664 Mutation::Add(AddMutation {
665 new_upstream_sinks, ..
666 }) => new_upstream_sinks.get(&fragment_id),
667 _ => None,
668 })
669 }
670
671 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
673 self.mutation
674 .as_deref()
675 .and_then(|mutation| match mutation {
676 Mutation::Stop(StopMutation {
677 dropped_sink_fragments,
678 ..
679 }) => Some(dropped_sink_fragments),
680 _ => None,
681 })
682 }
683
684 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
690 self.mutation
691 .as_deref()
692 .and_then(|mutation| match mutation {
693 Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
694 | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
695 vnode_bitmaps.get(&actor_id).cloned()
696 }
697 _ => None,
698 })
699 }
700
701 pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
702 self.mutation
703 .as_deref()
704 .and_then(|mutation| match mutation {
705 Mutation::Update(UpdateMutation {
706 sink_add_columns, ..
707 })
708 | Mutation::AddAndUpdate(
709 _,
710 UpdateMutation {
711 sink_add_columns, ..
712 },
713 ) => sink_add_columns.get(&sink_id).cloned(),
714 _ => None,
715 })
716 }
717
718 pub fn get_curr_epoch(&self) -> Epoch {
719 Epoch(self.epoch.curr)
720 }
721
722 pub fn tracing_context(&self) -> &TracingContext {
724 &self.tracing_context
725 }
726
727 pub fn added_subscriber_on_mv_table(
728 &self,
729 mv_table_id: TableId,
730 ) -> impl Iterator<Item = u32> + '_ {
731 if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
732 self.mutation.as_deref()
733 {
734 Some(add)
735 } else {
736 None
737 }
738 .into_iter()
739 .flat_map(move |add| {
740 add.subscriptions_to_add.iter().filter_map(
741 move |(upstream_mv_table_id, subscriber_id)| {
742 if *upstream_mv_table_id == mv_table_id {
743 Some(*subscriber_id)
744 } else {
745 None
746 }
747 },
748 )
749 })
750 }
751}
752
753impl<M: PartialEq> PartialEq for BarrierInner<M> {
754 fn eq(&self, other: &Self) -> bool {
755 self.epoch == other.epoch && self.mutation == other.mutation
756 }
757}
758
759impl Mutation {
760 #[cfg(test)]
764 pub fn is_stop(&self) -> bool {
765 matches!(self, Mutation::Stop(_))
766 }
767
768 fn to_protobuf(&self) -> PbMutation {
769 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
770 actor_splits
771 .iter()
772 .map(|(&actor_id, splits)| {
773 (
774 actor_id,
775 ConnectorSplits {
776 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
777 },
778 )
779 })
780 .collect::<HashMap<_, _>>()
781 };
782
783 match self {
784 Mutation::Stop(StopMutation {
785 dropped_actors,
786 dropped_sink_fragments,
787 }) => PbMutation::Stop(PbStopMutation {
788 actors: dropped_actors.iter().copied().collect(),
789 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
790 }),
791 Mutation::Update(UpdateMutation {
792 dispatchers,
793 merges,
794 vnode_bitmaps,
795 dropped_actors,
796 actor_splits,
797 actor_new_dispatchers,
798 actor_cdc_table_snapshot_splits,
799 sink_add_columns,
800 }) => PbMutation::Update(PbUpdateMutation {
801 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
802 merge_update: merges.values().cloned().collect(),
803 actor_vnode_bitmap_update: vnode_bitmaps
804 .iter()
805 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
806 .collect(),
807 dropped_actors: dropped_actors.iter().cloned().collect(),
808 actor_splits: actor_splits_to_protobuf(actor_splits),
809 actor_new_dispatchers: actor_new_dispatchers
810 .iter()
811 .map(|(&actor_id, dispatchers)| {
812 (
813 actor_id,
814 Dispatchers {
815 dispatchers: dispatchers.clone(),
816 },
817 )
818 })
819 .collect(),
820 actor_cdc_table_snapshot_splits:
821 build_pb_actor_cdc_table_snapshot_splits_with_generation(
822 actor_cdc_table_snapshot_splits.clone(),
823 )
824 .into(),
825 sink_add_columns: sink_add_columns
826 .iter()
827 .map(|(sink_id, add_columns)| {
828 (
829 sink_id.sink_id,
830 PbSinkAddColumns {
831 fields: add_columns.iter().map(|field| field.to_prost()).collect(),
832 },
833 )
834 })
835 .collect(),
836 }),
837 Mutation::Add(AddMutation {
838 adds,
839 added_actors,
840 splits,
841 pause,
842 subscriptions_to_add,
843 backfill_nodes_to_pause,
844 actor_cdc_table_snapshot_splits,
845 new_upstream_sinks,
846 }) => PbMutation::Add(PbAddMutation {
847 actor_dispatchers: adds
848 .iter()
849 .map(|(&actor_id, dispatchers)| {
850 (
851 actor_id,
852 Dispatchers {
853 dispatchers: dispatchers.clone(),
854 },
855 )
856 })
857 .collect(),
858 added_actors: added_actors.iter().copied().collect(),
859 actor_splits: actor_splits_to_protobuf(splits),
860 pause: *pause,
861 subscriptions_to_add: subscriptions_to_add
862 .iter()
863 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
864 subscriber_id: *subscriber_id,
865 upstream_mv_table_id: table_id.table_id,
866 })
867 .collect(),
868 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
869 actor_cdc_table_snapshot_splits:
870 build_pb_actor_cdc_table_snapshot_splits_with_generation(
871 actor_cdc_table_snapshot_splits.clone(),
872 )
873 .into(),
874 new_upstream_sinks: new_upstream_sinks
875 .iter()
876 .map(|(k, v)| (*k, v.clone()))
877 .collect(),
878 }),
879 Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
880 actor_splits: changes
881 .iter()
882 .map(|(&actor_id, splits)| {
883 (
884 actor_id,
885 ConnectorSplits {
886 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
887 },
888 )
889 })
890 .collect(),
891 }),
892 Mutation::Pause => PbMutation::Pause(PauseMutation {}),
893 Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
894 Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
895 actor_throttle: changes
896 .iter()
897 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
898 .collect(),
899 }),
900
901 Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
902 mutations: vec![
903 BarrierMutation {
904 mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
905 },
906 BarrierMutation {
907 mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
908 },
909 ],
910 }),
911 Mutation::DropSubscriptions {
912 subscriptions_to_drop,
913 } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
914 info: subscriptions_to_drop
915 .iter()
916 .map(
917 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
918 subscriber_id: *subscriber_id,
919 upstream_mv_table_id: upstream_mv_table_id.table_id,
920 },
921 )
922 .collect(),
923 }),
924 Mutation::ConnectorPropsChange(map) => {
925 PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
926 connector_props_infos: map
927 .iter()
928 .map(|(actor_id, options)| {
929 (
930 *actor_id,
931 ConnectorPropsInfo {
932 connector_props_info: options
933 .iter()
934 .map(|(k, v)| (k.clone(), v.clone()))
935 .collect(),
936 },
937 )
938 })
939 .collect(),
940 })
941 }
942 Mutation::StartFragmentBackfill { fragment_ids } => {
943 PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
944 fragment_ids: fragment_ids.iter().copied().collect(),
945 })
946 }
947 Mutation::RefreshStart {
948 table_id,
949 associated_source_id,
950 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
951 table_id: table_id.table_id,
952 associated_source_id: associated_source_id.table_id,
953 }),
954 Mutation::LoadFinish {
955 associated_source_id,
956 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
957 associated_source_id: associated_source_id.table_id,
958 }),
959 }
960 }
961
962 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
963 let mutation = match prost {
964 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
965 dropped_actors: stop.actors.iter().copied().collect(),
966 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
967 }),
968
969 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
970 dispatchers: update
971 .dispatcher_update
972 .iter()
973 .map(|u| (u.actor_id, u.clone()))
974 .into_group_map(),
975 merges: update
976 .merge_update
977 .iter()
978 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
979 .collect(),
980 vnode_bitmaps: update
981 .actor_vnode_bitmap_update
982 .iter()
983 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
984 .collect(),
985 dropped_actors: update.dropped_actors.iter().cloned().collect(),
986 actor_splits: update
987 .actor_splits
988 .iter()
989 .map(|(&actor_id, splits)| {
990 (
991 actor_id,
992 splits
993 .splits
994 .iter()
995 .map(|split| split.try_into().unwrap())
996 .collect(),
997 )
998 })
999 .collect(),
1000 actor_new_dispatchers: update
1001 .actor_new_dispatchers
1002 .iter()
1003 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1004 .collect(),
1005 actor_cdc_table_snapshot_splits:
1006 build_actor_cdc_table_snapshot_splits_with_generation(
1007 update
1008 .actor_cdc_table_snapshot_splits
1009 .clone()
1010 .unwrap_or_default(),
1011 ),
1012 sink_add_columns: update
1013 .sink_add_columns
1014 .iter()
1015 .map(|(sink_id, add_columns)| {
1016 (
1017 SinkId::new(*sink_id),
1018 add_columns.fields.iter().map(Field::from_prost).collect(),
1019 )
1020 })
1021 .collect(),
1022 }),
1023
1024 PbMutation::Add(add) => Mutation::Add(AddMutation {
1025 adds: add
1026 .actor_dispatchers
1027 .iter()
1028 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1029 .collect(),
1030 added_actors: add.added_actors.iter().copied().collect(),
1031 splits: add
1034 .actor_splits
1035 .iter()
1036 .map(|(&actor_id, splits)| {
1037 (
1038 actor_id,
1039 splits
1040 .splits
1041 .iter()
1042 .map(|split| split.try_into().unwrap())
1043 .collect(),
1044 )
1045 })
1046 .collect(),
1047 pause: add.pause,
1048 subscriptions_to_add: add
1049 .subscriptions_to_add
1050 .iter()
1051 .map(
1052 |SubscriptionUpstreamInfo {
1053 subscriber_id,
1054 upstream_mv_table_id,
1055 }| {
1056 (TableId::new(*upstream_mv_table_id), *subscriber_id)
1057 },
1058 )
1059 .collect(),
1060 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1061 actor_cdc_table_snapshot_splits:
1062 build_actor_cdc_table_snapshot_splits_with_generation(
1063 add.actor_cdc_table_snapshot_splits
1064 .clone()
1065 .unwrap_or_default(),
1066 ),
1067 new_upstream_sinks: add
1068 .new_upstream_sinks
1069 .iter()
1070 .map(|(k, v)| (*k, v.clone()))
1071 .collect(),
1072 }),
1073
1074 PbMutation::Splits(s) => {
1075 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1076 Vec::with_capacity(s.actor_splits.len());
1077 for (&actor_id, splits) in &s.actor_splits {
1078 if !splits.splits.is_empty() {
1079 change_splits.push((
1080 actor_id,
1081 splits
1082 .splits
1083 .iter()
1084 .map(SplitImpl::try_from)
1085 .try_collect()?,
1086 ));
1087 }
1088 }
1089 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1090 }
1091 PbMutation::Pause(_) => Mutation::Pause,
1092 PbMutation::Resume(_) => Mutation::Resume,
1093 PbMutation::Throttle(changes) => Mutation::Throttle(
1094 changes
1095 .actor_throttle
1096 .iter()
1097 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1098 .collect(),
1099 ),
1100 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1101 subscriptions_to_drop: drop
1102 .info
1103 .iter()
1104 .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
1105 .collect(),
1106 },
1107 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1108 Mutation::ConnectorPropsChange(
1109 alter_connector_props
1110 .connector_props_infos
1111 .iter()
1112 .map(|(actor_id, options)| {
1113 (
1114 *actor_id,
1115 options
1116 .connector_props_info
1117 .iter()
1118 .map(|(k, v)| (k.clone(), v.clone()))
1119 .collect(),
1120 )
1121 })
1122 .collect(),
1123 )
1124 }
1125 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1126 Mutation::StartFragmentBackfill {
1127 fragment_ids: start_fragment_backfill
1128 .fragment_ids
1129 .iter()
1130 .copied()
1131 .collect(),
1132 }
1133 }
1134 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1135 table_id: TableId::new(refresh_start.table_id),
1136 associated_source_id: TableId::new(refresh_start.associated_source_id),
1137 },
1138 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1139 associated_source_id: TableId::new(load_finish.associated_source_id),
1140 },
1141 PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
1142 [
1143 BarrierMutation {
1144 mutation: Some(add),
1145 },
1146 BarrierMutation {
1147 mutation: Some(update),
1148 },
1149 ] => {
1150 let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
1151 unreachable!();
1152 };
1153
1154 let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
1155 unreachable!();
1156 };
1157
1158 Mutation::AddAndUpdate(add_mutation, update_mutation)
1159 }
1160
1161 _ => unreachable!(),
1162 },
1163 };
1164 Ok(mutation)
1165 }
1166}
1167
1168impl<M> BarrierInner<M> {
1169 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1170 let Self {
1171 epoch,
1172 mutation,
1173 kind,
1174 passed_actors,
1175 tracing_context,
1176 ..
1177 } = self;
1178
1179 PbBarrier {
1180 epoch: Some(PbEpoch {
1181 curr: epoch.curr,
1182 prev: epoch.prev,
1183 }),
1184 mutation: Some(PbBarrierMutation {
1185 mutation: barrier_fn(mutation),
1186 }),
1187 tracing_context: tracing_context.to_protobuf(),
1188 kind: *kind as _,
1189 passed_actors: passed_actors.clone(),
1190 }
1191 }
1192
1193 fn from_protobuf_inner(
1194 prost: &PbBarrier,
1195 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1196 ) -> StreamExecutorResult<Self> {
1197 let epoch = prost.get_epoch()?;
1198
1199 Ok(Self {
1200 kind: prost.kind(),
1201 epoch: EpochPair::new(epoch.curr, epoch.prev),
1202 mutation: mutation_from_pb(
1203 prost
1204 .mutation
1205 .as_ref()
1206 .and_then(|mutation| mutation.mutation.as_ref()),
1207 )?,
1208 passed_actors: prost.get_passed_actors().clone(),
1209 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1210 })
1211 }
1212
1213 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1214 BarrierInner {
1215 epoch: self.epoch,
1216 mutation: f(self.mutation),
1217 kind: self.kind,
1218 tracing_context: self.tracing_context,
1219 passed_actors: self.passed_actors,
1220 }
1221 }
1222}
1223
1224impl DispatcherBarrier {
1225 pub fn to_protobuf(&self) -> PbBarrier {
1226 self.to_protobuf_inner(|_| None)
1227 }
1228}
1229
1230impl Barrier {
1231 pub fn to_protobuf(&self) -> PbBarrier {
1232 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1233 }
1234
1235 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1236 Self::from_protobuf_inner(prost, |mutation| {
1237 mutation
1238 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1239 .transpose()
1240 })
1241 }
1242}
1243
1244#[derive(Debug, PartialEq, Eq, Clone)]
1245pub struct Watermark {
1246 pub col_idx: usize,
1247 pub data_type: DataType,
1248 pub val: ScalarImpl,
1249}
1250
1251impl PartialOrd for Watermark {
1252 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1253 Some(self.cmp(other))
1254 }
1255}
1256
1257impl Ord for Watermark {
1258 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1259 self.val.default_cmp(&other.val)
1260 }
1261}
1262
1263impl Watermark {
1264 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1265 Self {
1266 col_idx,
1267 data_type,
1268 val,
1269 }
1270 }
1271
1272 pub async fn transform_with_expr(
1273 self,
1274 expr: &NonStrictExpression<impl Expression>,
1275 new_col_idx: usize,
1276 ) -> Option<Self> {
1277 let Self { col_idx, val, .. } = self;
1278 let row = {
1279 let mut row = vec![None; col_idx + 1];
1280 row[col_idx] = Some(val);
1281 OwnedRow::new(row)
1282 };
1283 let val = expr.eval_row_infallible(&row).await?;
1284 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1285 }
1286
1287 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1290 output_indices
1291 .iter()
1292 .position(|p| *p == self.col_idx)
1293 .map(|new_col_idx| self.with_idx(new_col_idx))
1294 }
1295
1296 pub fn to_protobuf(&self) -> PbWatermark {
1297 PbWatermark {
1298 column: Some(PbInputRef {
1299 index: self.col_idx as _,
1300 r#type: Some(self.data_type.to_protobuf()),
1301 }),
1302 val: Some(&self.val).to_protobuf().into(),
1303 }
1304 }
1305
1306 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1307 let col_ref = prost.get_column()?;
1308 let data_type = DataType::from(col_ref.get_type()?);
1309 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1310 .expect("watermark value cannot be null");
1311 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1312 }
1313
1314 pub fn with_idx(self, idx: usize) -> Self {
1315 Self::new(idx, self.data_type, self.val)
1316 }
1317}
1318
1319#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1320pub enum MessageInner<M> {
1321 Chunk(StreamChunk),
1322 Barrier(BarrierInner<M>),
1323 Watermark(Watermark),
1324}
1325
1326impl<M> MessageInner<M> {
1327 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1328 match self {
1329 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1330 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1331 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1332 }
1333 }
1334}
1335
1336pub type Message = MessageInner<BarrierMutationType>;
1337pub type DispatcherMessage = MessageInner<()>;
1338
1339#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1342pub enum MessageBatchInner<M> {
1343 Chunk(StreamChunk),
1344 BarrierBatch(Vec<BarrierInner<M>>),
1345 Watermark(Watermark),
1346}
1347pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1348pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1349pub type DispatcherMessageBatch = MessageBatchInner<()>;
1350
1351impl From<DispatcherMessage> for DispatcherMessageBatch {
1352 fn from(m: DispatcherMessage) -> Self {
1353 match m {
1354 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1355 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1356 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1357 }
1358 }
1359}
1360
1361impl From<StreamChunk> for Message {
1362 fn from(chunk: StreamChunk) -> Self {
1363 Message::Chunk(chunk)
1364 }
1365}
1366
1367impl<'a> TryFrom<&'a Message> for &'a Barrier {
1368 type Error = ();
1369
1370 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1371 match m {
1372 Message::Chunk(_) => Err(()),
1373 Message::Barrier(b) => Ok(b),
1374 Message::Watermark(_) => Err(()),
1375 }
1376 }
1377}
1378
1379impl Message {
1380 #[cfg(test)]
1385 pub fn is_stop(&self) -> bool {
1386 matches!(
1387 self,
1388 Message::Barrier(Barrier {
1389 mutation,
1390 ..
1391 }) if mutation.as_ref().unwrap().is_stop()
1392 )
1393 }
1394}
1395
1396impl DispatcherMessageBatch {
1397 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1398 let prost = match self {
1399 Self::Chunk(stream_chunk) => {
1400 let prost_stream_chunk = stream_chunk.to_protobuf();
1401 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1402 }
1403 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1404 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1405 }),
1406 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1407 };
1408 PbStreamMessageBatch {
1409 stream_message_batch: Some(prost),
1410 }
1411 }
1412
1413 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1414 let res = match prost.get_stream_message_batch()? {
1415 StreamMessageBatch::StreamChunk(chunk) => {
1416 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1417 }
1418 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1419 let barriers = barrier_batch
1420 .barriers
1421 .iter()
1422 .map(|barrier| {
1423 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1424 if mutation.is_some() {
1425 if cfg!(debug_assertions) {
1426 panic!("should not receive message of barrier with mutation");
1427 } else {
1428 warn!(?barrier, "receive message of barrier with mutation");
1429 }
1430 }
1431 Ok(())
1432 })
1433 })
1434 .try_collect()?;
1435 Self::BarrierBatch(barriers)
1436 }
1437 StreamMessageBatch::Watermark(watermark) => {
1438 Self::Watermark(Watermark::from_protobuf(watermark)?)
1439 }
1440 };
1441 Ok(res)
1442 }
1443
1444 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1445 ::prost::Message::encoded_len(msg)
1446 }
1447}
1448
1449pub type PkIndices = Vec<usize>;
1450pub type PkIndicesRef<'a> = &'a [usize];
1451pub type PkDataTypes = SmallVec<[DataType; 1]>;
1452
1453pub async fn expect_first_barrier<M: Debug>(
1455 stream: &mut (impl MessageStreamInner<M> + Unpin),
1456) -> StreamExecutorResult<BarrierInner<M>> {
1457 let message = stream
1458 .next()
1459 .instrument_await("expect_first_barrier")
1460 .await
1461 .context("failed to extract the first message: stream closed unexpectedly")??;
1462 let barrier = message
1463 .into_barrier()
1464 .expect("the first message must be a barrier");
1465 assert!(matches!(
1467 barrier.kind,
1468 BarrierKind::Checkpoint | BarrierKind::Initial
1469 ));
1470 Ok(barrier)
1471}
1472
1473pub async fn expect_first_barrier_from_aligned_stream(
1475 stream: &mut (impl AlignedMessageStream + Unpin),
1476) -> StreamExecutorResult<Barrier> {
1477 let message = stream
1478 .next()
1479 .instrument_await("expect_first_barrier")
1480 .await
1481 .context("failed to extract the first message: stream closed unexpectedly")??;
1482 let barrier = message
1483 .into_barrier()
1484 .expect("the first message must be a barrier");
1485 Ok(barrier)
1486}
1487
1488pub trait StreamConsumer: Send + 'static {
1490 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1491
1492 fn execute(self: Box<Self>) -> Self::BarrierStream;
1493}
1494
1495type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1496
1497pub struct DynamicReceivers<InputId, M> {
1501 barrier: Option<BarrierInner<M>>,
1503 start_ts: Option<Instant>,
1505 blocked: Vec<BoxedMessageInput<InputId, M>>,
1507 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1509 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1511 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1513 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1515}
1516
1517impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1518 for DynamicReceivers<InputId, M>
1519{
1520 type Item = MessageStreamItemInner<M>;
1521
1522 fn poll_next(
1523 mut self: Pin<&mut Self>,
1524 cx: &mut std::task::Context<'_>,
1525 ) -> Poll<Option<Self::Item>> {
1526 if self.is_empty() {
1527 return Poll::Ready(None);
1528 }
1529
1530 loop {
1531 match futures::ready!(self.active.poll_next_unpin(cx)) {
1532 Some((Some(Err(e)), _)) => {
1534 return Poll::Ready(Some(Err(e)));
1535 }
1536 Some((Some(Ok(message)), remaining)) => {
1538 let input_id = remaining.id();
1539 match message {
1540 MessageInner::Chunk(chunk) => {
1541 self.active.push(remaining.into_future());
1543 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1544 }
1545 MessageInner::Watermark(watermark) => {
1546 self.active.push(remaining.into_future());
1548 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1549 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1550 }
1551 }
1552 MessageInner::Barrier(barrier) => {
1553 if self.blocked.is_empty() {
1555 self.start_ts = Some(Instant::now());
1556 }
1557 self.blocked.push(remaining);
1558 if let Some(current_barrier) = self.barrier.as_ref() {
1559 if current_barrier.epoch != barrier.epoch {
1560 return Poll::Ready(Some(Err(
1561 StreamExecutorError::align_barrier(
1562 current_barrier.clone().map_mutation(|_| None),
1563 barrier.map_mutation(|_| None),
1564 ),
1565 )));
1566 }
1567 } else {
1568 self.barrier = Some(barrier);
1569 }
1570 }
1571 }
1572 }
1573 Some((None, remaining)) => {
1581 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1582 "upstream input {:?} unexpectedly closed",
1583 remaining.id()
1584 )))));
1585 }
1586 None => {
1588 assert!(!self.blocked.is_empty());
1589
1590 let start_ts = self
1591 .start_ts
1592 .take()
1593 .expect("should have received at least one barrier");
1594 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1595 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1596 }
1597 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1598 merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1600 }
1601
1602 break;
1603 }
1604 }
1605 }
1606
1607 assert!(self.active.is_terminated());
1608
1609 let barrier = self.barrier.take().unwrap();
1610
1611 let upstreams = std::mem::take(&mut self.blocked);
1612 self.extend_active(upstreams);
1613 assert!(!self.active.is_terminated());
1614
1615 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1616 }
1617}
1618
1619impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1620 pub fn new(
1621 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1622 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1623 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1624 ) -> Self {
1625 let mut this = Self {
1626 barrier: None,
1627 start_ts: None,
1628 blocked: Vec::with_capacity(upstreams.len()),
1629 active: Default::default(),
1630 buffered_watermarks: Default::default(),
1631 merge_barrier_align_duration,
1632 barrier_align_duration,
1633 };
1634 this.extend_active(upstreams);
1635 this
1636 }
1637
1638 pub fn extend_active(
1641 &mut self,
1642 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1643 ) {
1644 assert!(self.blocked.is_empty() && self.barrier.is_none());
1645
1646 self.active
1647 .extend(upstreams.into_iter().map(|s| s.into_future()));
1648 }
1649
1650 pub fn handle_watermark(
1652 &mut self,
1653 input_id: InputId,
1654 watermark: Watermark,
1655 ) -> Option<Watermark> {
1656 let col_idx = watermark.col_idx;
1657 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1659 let watermarks = self
1660 .buffered_watermarks
1661 .entry(col_idx)
1662 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1663 watermarks.handle_watermark(input_id, watermark)
1664 }
1665
1666 pub fn add_upstreams_from(
1669 &mut self,
1670 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1671 ) {
1672 assert!(self.blocked.is_empty() && self.barrier.is_none());
1673
1674 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1675 let input_ids = new_inputs.iter().map(|input| input.id());
1676 self.buffered_watermarks.values_mut().for_each(|buffers| {
1677 buffers.add_buffers(input_ids.clone());
1679 });
1680 self.active
1681 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1682 }
1683
1684 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1688 assert!(self.blocked.is_empty() && self.barrier.is_none());
1689
1690 let new_upstreams = std::mem::take(&mut self.active)
1691 .into_iter()
1692 .map(|s| s.into_inner().unwrap())
1693 .filter(|u| !upstream_input_ids.contains(&u.id()));
1694 self.extend_active(new_upstreams);
1695 self.buffered_watermarks.values_mut().for_each(|buffers| {
1696 buffers.remove_buffer(upstream_input_ids.clone());
1699 });
1700 }
1701
1702 pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1703 self.merge_barrier_align_duration.clone()
1704 }
1705
1706 pub fn flush_buffered_watermarks(&mut self) {
1707 self.buffered_watermarks
1708 .values_mut()
1709 .for_each(|buffers| buffers.clear());
1710 }
1711
1712 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1713 self.blocked
1714 .iter()
1715 .map(|s| s.id())
1716 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1717 }
1718
1719 pub fn is_empty(&self) -> bool {
1720 self.blocked.is_empty() && self.active.is_empty()
1721 }
1722}
1723
1724pub(crate) struct DispatchBarrierBuffer {
1745 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1746 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1747 recv_state: BarrierReceiverState,
1748 curr_upstream_fragment_id: FragmentId,
1749 actor_id: ActorId,
1750 build_input_ctx: Arc<BuildInputContext>,
1752}
1753
1754struct BuildInputContext {
1755 pub actor_id: ActorId,
1756 pub local_barrier_manager: LocalBarrierManager,
1757 pub metrics: Arc<StreamingMetrics>,
1758 pub fragment_id: FragmentId,
1759}
1760
1761type BoxedNewInputsFuture =
1762 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1763
1764enum BarrierReceiverState {
1765 ReceivingBarrier,
1766 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1767}
1768
1769impl DispatchBarrierBuffer {
1770 pub fn new(
1771 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1772 actor_id: ActorId,
1773 curr_upstream_fragment_id: FragmentId,
1774 local_barrier_manager: LocalBarrierManager,
1775 metrics: Arc<StreamingMetrics>,
1776 fragment_id: FragmentId,
1777 ) -> Self {
1778 Self {
1779 buffer: VecDeque::new(),
1780 barrier_rx,
1781 recv_state: BarrierReceiverState::ReceivingBarrier,
1782 curr_upstream_fragment_id,
1783 actor_id,
1784 build_input_ctx: Arc::new(BuildInputContext {
1785 actor_id,
1786 local_barrier_manager,
1787 metrics,
1788 fragment_id,
1789 }),
1790 }
1791 }
1792
1793 pub async fn await_next_message(
1794 &mut self,
1795 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1796 ) -> StreamExecutorResult<DispatcherMessage> {
1797 tokio::select! {
1798 biased;
1799
1800 msg = stream.try_next() => {
1801 msg?.ok_or_else(
1802 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1803 )
1804 }
1805
1806 e = self.continuously_fetch_barrier_rx() => {
1807 Err(e)
1808 }
1809 }
1810 }
1811
1812 pub async fn pop_barrier_with_inputs(
1813 &mut self,
1814 barrier: DispatcherBarrier,
1815 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1816 while self.buffer.is_empty() {
1817 self.try_fetch_barrier_rx(false).await?;
1818 }
1819 let (mut recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1820 apply_dispatcher_barrier(&mut recv_barrier, barrier);
1821
1822 Ok((recv_barrier, inputs))
1823 }
1824
1825 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1826 loop {
1827 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1828 return e;
1829 }
1830 }
1831 }
1832
1833 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1834 match &mut self.recv_state {
1835 BarrierReceiverState::ReceivingBarrier => {
1836 let Some(barrier) = self.barrier_rx.recv().await else {
1837 if pending_on_end {
1838 return pending().await;
1839 } else {
1840 return Err(StreamExecutorError::channel_closed(
1841 "barrier channel closed unexpectedly",
1842 ));
1843 }
1844 };
1845 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1846 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1847 } else {
1848 self.buffer.push_back((barrier, None));
1849 }
1850 }
1851 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1852 let new_inputs = fut.await?;
1853 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1854 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1855 }
1856 }
1857 Ok(())
1858 }
1859
1860 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1861 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1862 && !update.added_upstream_actors.is_empty()
1863 {
1864 let upstream_fragment_id =
1866 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1867 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1868 new_upstream_fragment_id
1869 } else {
1870 self.curr_upstream_fragment_id
1871 };
1872 let ctx = self.build_input_ctx.clone();
1873 let added_upstream_actors = update.added_upstream_actors.clone();
1874 let barrier = barrier.clone();
1875 let fut = async move {
1876 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1877 let mut new_input = new_input(
1878 &ctx.local_barrier_manager,
1879 ctx.metrics.clone(),
1880 ctx.actor_id,
1881 ctx.fragment_id,
1882 upstream_actor,
1883 upstream_fragment_id,
1884 )
1885 .await?;
1886
1887 let first_barrier = expect_first_barrier(&mut new_input).await?;
1890 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1891
1892 StreamExecutorResult::Ok(new_input)
1893 }))
1894 .await
1895 }
1896 .boxed();
1897
1898 Some(fut)
1899 } else {
1900 None
1901 }
1902 }
1903}