1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
51use risingwave_pb::stream_plan::stream_node::PbStreamKind;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54 BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
55 DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
56 PbDispatcher, PbSinkAddColumns, PbStopMutation, PbStreamMessageBatch, PbUpdateMutation,
57 PbWatermark, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
58 SubscriptionUpstreamInfo, ThrottleMutation,
59};
60use smallvec::SmallVec;
61use tokio::sync::mpsc;
62use tokio::time::Instant;
63
64use crate::error::StreamResult;
65use crate::executor::exchange::input::{
66 BoxedActorInput, BoxedInput, apply_dispatcher_barrier, assert_equal_dispatcher_barrier,
67 new_input,
68};
69use crate::executor::prelude::StreamingMetrics;
70use crate::executor::watermark::BufferedWatermarks;
71use crate::task::{ActorId, FragmentId, LocalBarrierManager};
72
73mod actor;
74mod barrier_align;
75pub mod exchange;
76pub mod monitor;
77
78pub mod aggregate;
79pub mod asof_join;
80mod backfill;
81mod barrier_recv;
82mod batch_query;
83mod chain;
84mod changelog;
85mod dedup;
86mod dispatch;
87pub mod dml;
88mod dynamic_filter;
89pub mod eowc;
90pub mod error;
91mod expand;
92mod filter;
93pub mod hash_join;
94mod hop_window;
95mod join;
96pub mod locality_provider;
97mod lookup;
98mod lookup_union;
99mod merge;
100mod mview;
101mod nested_loop_temporal_join;
102mod no_op;
103mod now;
104mod over_window;
105pub mod project;
106mod rearranged_chain;
107mod receiver;
108pub mod row_id_gen;
109mod sink;
110pub mod source;
111mod stream_reader;
112pub mod subtask;
113mod temporal_join;
114mod top_n;
115mod troublemaker;
116mod union;
117mod upstream_sink_union;
118mod values;
119mod watermark;
120mod watermark_filter;
121mod wrapper;
122
123mod approx_percentile;
124
125mod row_merge;
126
127#[cfg(test)]
128mod integration_tests;
129mod sync_kv_log_store;
130#[cfg(any(test, feature = "test"))]
131pub mod test_utils;
132mod utils;
133mod vector_index;
134
135pub use actor::{Actor, ActorContext, ActorContextRef};
136use anyhow::Context;
137pub use approx_percentile::global::GlobalApproxPercentileExecutor;
138pub use approx_percentile::local::LocalApproxPercentileExecutor;
139pub use backfill::arrangement_backfill::*;
140pub use backfill::cdc::{
141 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
142};
143pub use backfill::no_shuffle_backfill::*;
144pub use backfill::snapshot_backfill::*;
145pub use barrier_recv::BarrierRecvExecutor;
146pub use batch_query::BatchQueryExecutor;
147pub use chain::ChainExecutor;
148pub use changelog::ChangeLogExecutor;
149pub use dedup::AppendOnlyDedupExecutor;
150pub use dispatch::{DispatchExecutor, DispatcherImpl};
151pub use dynamic_filter::DynamicFilterExecutor;
152pub use error::{StreamExecutorError, StreamExecutorResult};
153pub use expand::ExpandExecutor;
154pub use filter::{FilterExecutor, UpsertFilterExecutor};
155pub use hash_join::*;
156pub use hop_window::HopWindowExecutor;
157pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
158pub use join::{AsOfDesc, AsOfJoinType, JoinType};
159pub use lookup::*;
160pub use lookup_union::LookupUnionExecutor;
161pub use merge::MergeExecutor;
162pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
163pub use mview::*;
164pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
165pub use no_op::NoOpExecutor;
166pub use now::*;
167pub use over_window::*;
168pub use rearranged_chain::RearrangedChainExecutor;
169pub use receiver::ReceiverExecutor;
170use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
171pub use row_merge::RowMergeExecutor;
172pub use sink::SinkExecutor;
173pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
174pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
175pub use temporal_join::TemporalJoinExecutor;
176pub use top_n::{
177 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
178};
179pub use troublemaker::TroublemakerExecutor;
180pub use union::UnionExecutor;
181pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
182pub use utils::DummyExecutor;
183pub use values::ValuesExecutor;
184pub use vector_index::VectorIndexWriteExecutor;
185pub use watermark_filter::WatermarkFilterExecutor;
186pub use wrapper::WrapperExecutor;
187
188use self::barrier_align::AlignedMessageStream;
189
190pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
191pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
192pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
193pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
194
195pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
196use risingwave_connector::sink::catalog::SinkId;
197use risingwave_connector::source::cdc::{
198 CdcTableSnapshotSplitAssignmentWithGeneration,
199 build_actor_cdc_table_snapshot_splits_with_generation,
200 build_pb_actor_cdc_table_snapshot_splits_with_generation,
201};
202use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
203use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
204
205pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
206pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
207pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
208
209#[derive(Debug, Default, Clone)]
211pub struct ExecutorInfo {
212 pub schema: Schema,
214
215 pub pk_indices: PkIndices,
219
220 pub stream_kind: PbStreamKind,
222
223 pub identity: String,
225
226 pub id: u64,
228}
229
230impl ExecutorInfo {
231 pub fn for_test(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
232 Self {
233 schema,
234 pk_indices,
235 stream_kind: PbStreamKind::Retract, identity,
237 id,
238 }
239 }
240}
241
242pub trait Execute: Send + 'static {
244 fn execute(self: Box<Self>) -> BoxedMessageStream;
245
246 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
247 self.execute()
248 }
249
250 fn boxed(self) -> Box<dyn Execute>
251 where
252 Self: Sized + Send + 'static,
253 {
254 Box::new(self)
255 }
256}
257
258pub struct Executor {
261 info: ExecutorInfo,
262 execute: Box<dyn Execute>,
263}
264
265impl Executor {
266 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
267 Self { info, execute }
268 }
269
270 pub fn info(&self) -> &ExecutorInfo {
271 &self.info
272 }
273
274 pub fn schema(&self) -> &Schema {
275 &self.info.schema
276 }
277
278 pub fn pk_indices(&self) -> PkIndicesRef<'_> {
279 &self.info.pk_indices
280 }
281
282 pub fn stream_kind(&self) -> PbStreamKind {
283 self.info.stream_kind
284 }
285
286 pub fn identity(&self) -> &str {
287 &self.info.identity
288 }
289
290 pub fn execute(self) -> BoxedMessageStream {
291 self.execute.execute()
292 }
293
294 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
295 self.execute.execute_with_epoch(epoch)
296 }
297}
298
299impl std::fmt::Debug for Executor {
300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 f.write_str(self.identity())
302 }
303}
304
305impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
306 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
307 Self::new(info, execute)
308 }
309}
310
311impl<E> From<(ExecutorInfo, E)> for Executor
312where
313 E: Execute,
314{
315 fn from((info, execute): (ExecutorInfo, E)) -> Self {
316 Self::new(info, execute.boxed())
317 }
318}
319
320pub const INVALID_EPOCH: u64 = 0;
321
322type UpstreamFragmentId = FragmentId;
323type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
324
325#[derive(Debug, Clone, PartialEq)]
326#[cfg_attr(any(test, feature = "test"), derive(Default))]
327pub struct UpdateMutation {
328 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
329 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
330 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
331 pub dropped_actors: HashSet<ActorId>,
332 pub actor_splits: SplitAssignments,
333 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
334 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
335 pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
336}
337
338#[derive(Debug, Clone, PartialEq)]
339#[cfg_attr(any(test, feature = "test"), derive(Default))]
340pub struct AddMutation {
341 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
342 pub added_actors: HashSet<ActorId>,
343 pub splits: SplitAssignments,
345 pub pause: bool,
346 pub subscriptions_to_add: Vec<(TableId, u32)>,
348 pub backfill_nodes_to_pause: HashSet<FragmentId>,
350 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
351 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
352}
353
354#[derive(Debug, Clone, PartialEq)]
355#[cfg_attr(any(test, feature = "test"), derive(Default))]
356pub struct StopMutation {
357 pub dropped_actors: HashSet<ActorId>,
358 pub dropped_sink_fragments: HashSet<FragmentId>,
359}
360
361#[derive(Debug, Clone, PartialEq)]
363pub enum Mutation {
364 Stop(StopMutation),
365 Update(UpdateMutation),
366 Add(AddMutation),
367 SourceChangeSplit(SplitAssignments),
368 Pause,
369 Resume,
370 Throttle(HashMap<ActorId, Option<u32>>),
371 AddAndUpdate(AddMutation, UpdateMutation),
372 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
373 DropSubscriptions {
374 subscriptions_to_drop: Vec<(u32, TableId)>,
376 },
377 StartFragmentBackfill {
378 fragment_ids: HashSet<FragmentId>,
379 },
380 RefreshStart {
381 table_id: TableId,
382 associated_source_id: TableId,
383 },
384 ListFinish {
385 associated_source_id: TableId,
386 },
387 LoadFinish {
388 associated_source_id: TableId,
389 },
390}
391
392#[derive(Debug, Clone)]
397pub struct BarrierInner<M> {
398 pub epoch: EpochPair,
399 pub mutation: M,
400 pub kind: BarrierKind,
401
402 pub tracing_context: TracingContext,
404
405 pub passed_actors: Vec<ActorId>,
407}
408
409pub type BarrierMutationType = Option<Arc<Mutation>>;
410pub type Barrier = BarrierInner<BarrierMutationType>;
411pub type DispatcherBarrier = BarrierInner<()>;
412
413impl<M: Default> BarrierInner<M> {
414 pub fn new_test_barrier(epoch: u64) -> Self {
416 Self {
417 epoch: EpochPair::new_test_epoch(epoch),
418 kind: BarrierKind::Checkpoint,
419 tracing_context: TracingContext::none(),
420 mutation: Default::default(),
421 passed_actors: Default::default(),
422 }
423 }
424
425 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
426 Self {
427 epoch: EpochPair::new(epoch, prev_epoch),
428 kind: BarrierKind::Checkpoint,
429 tracing_context: TracingContext::none(),
430 mutation: Default::default(),
431 passed_actors: Default::default(),
432 }
433 }
434}
435
436impl Barrier {
437 pub fn into_dispatcher(self) -> DispatcherBarrier {
438 DispatcherBarrier {
439 epoch: self.epoch,
440 mutation: (),
441 kind: self.kind,
442 tracing_context: self.tracing_context,
443 passed_actors: self.passed_actors,
444 }
445 }
446
447 #[must_use]
448 pub fn with_mutation(self, mutation: Mutation) -> Self {
449 Self {
450 mutation: Some(Arc::new(mutation)),
451 ..self
452 }
453 }
454
455 #[must_use]
456 pub fn with_stop(self) -> Self {
457 self.with_mutation(Mutation::Stop(StopMutation {
458 dropped_actors: Default::default(),
459 dropped_sink_fragments: Default::default(),
460 }))
461 }
462
463 pub fn is_with_stop_mutation(&self) -> bool {
465 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
466 }
467
468 pub fn is_stop(&self, actor_id: ActorId) -> bool {
470 self.all_stop_actors()
471 .is_some_and(|actors| actors.contains(&actor_id))
472 }
473
474 pub fn is_checkpoint(&self) -> bool {
475 self.kind == BarrierKind::Checkpoint
476 }
477
478 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
489 match self.mutation.as_deref()? {
490 Mutation::Update(UpdateMutation { actor_splits, .. })
491 | Mutation::Add(AddMutation {
492 splits: actor_splits,
493 ..
494 }) => actor_splits.get(&actor_id),
495
496 Mutation::AddAndUpdate(
497 AddMutation {
498 splits: add_actor_splits,
499 ..
500 },
501 UpdateMutation {
502 actor_splits: update_actor_splits,
503 ..
504 },
505 ) => add_actor_splits
506 .get(&actor_id)
507 .or_else(|| update_actor_splits.get(&actor_id)),
509
510 _ => {
511 if cfg!(debug_assertions) {
512 panic!(
513 "the initial mutation of the barrier should not be {:?}",
514 self.mutation
515 );
516 }
517 None
518 }
519 }
520 .map(|s| s.as_slice())
521 }
522
523 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
525 match self.mutation.as_deref() {
526 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
527 Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
528 | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
529 Some(dropped_actors)
530 }
531 _ => None,
532 }
533 }
534
535 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
541 match self.mutation.as_deref() {
542 Some(Mutation::Add(AddMutation { added_actors, .. }))
543 | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
544 added_actors.contains(&actor_id)
545 }
546 _ => false,
547 }
548 }
549
550 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
551 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
552 fragment_ids.contains(&fragment_id)
553 } else {
554 false
555 }
556 }
557
558 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
576 let Some(mutation) = self.mutation.as_deref() else {
577 return false;
578 };
579 match mutation {
580 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
582 Mutation::AddAndUpdate(
584 AddMutation { adds, .. },
585 UpdateMutation {
586 dispatchers,
587 actor_new_dispatchers,
588 ..
589 },
590 ) => {
591 adds.get(&upstream_actor_id).is_some()
592 || actor_new_dispatchers.get(&upstream_actor_id).is_some()
593 || dispatchers.get(&upstream_actor_id).is_some()
594 }
595 Mutation::Update(_)
596 | Mutation::Stop(_)
597 | Mutation::Pause
598 | Mutation::Resume
599 | Mutation::SourceChangeSplit(_)
600 | Mutation::Throttle(_)
601 | Mutation::DropSubscriptions { .. }
602 | Mutation::ConnectorPropsChange(_)
603 | Mutation::StartFragmentBackfill { .. }
604 | Mutation::RefreshStart { .. }
605 | Mutation::ListFinish { .. }
606 | Mutation::LoadFinish { .. } => false,
607 }
608 }
609
610 pub fn is_pause_on_startup(&self) -> bool {
612 match self.mutation.as_deref() {
613 Some(Mutation::Add(AddMutation { pause, .. }))
614 | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
615 _ => false,
616 }
617 }
618
619 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
620 match self.mutation.as_deref() {
621 Some(Mutation::Add(AddMutation {
622 backfill_nodes_to_pause,
623 ..
624 }))
625 | Some(Mutation::AddAndUpdate(
626 AddMutation {
627 backfill_nodes_to_pause,
628 ..
629 },
630 _,
631 )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
632 _ => {
633 tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
634 true
635 }
636 }
637 }
638
639 pub fn is_resume(&self) -> bool {
641 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
642 }
643
644 pub fn as_update_merge(
647 &self,
648 actor_id: ActorId,
649 upstream_fragment_id: UpstreamFragmentId,
650 ) -> Option<&MergeUpdate> {
651 self.mutation
652 .as_deref()
653 .and_then(|mutation| match mutation {
654 Mutation::Update(UpdateMutation { merges, .. })
655 | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
656 merges.get(&(actor_id, upstream_fragment_id))
657 }
658 _ => None,
659 })
660 }
661
662 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
665 self.mutation
666 .as_deref()
667 .and_then(|mutation| match mutation {
668 Mutation::Add(AddMutation {
669 new_upstream_sinks, ..
670 }) => new_upstream_sinks.get(&fragment_id),
671 _ => None,
672 })
673 }
674
675 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
677 self.mutation
678 .as_deref()
679 .and_then(|mutation| match mutation {
680 Mutation::Stop(StopMutation {
681 dropped_sink_fragments,
682 ..
683 }) => Some(dropped_sink_fragments),
684 _ => None,
685 })
686 }
687
688 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
694 self.mutation
695 .as_deref()
696 .and_then(|mutation| match mutation {
697 Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
698 | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
699 vnode_bitmaps.get(&actor_id).cloned()
700 }
701 _ => None,
702 })
703 }
704
705 pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
706 self.mutation
707 .as_deref()
708 .and_then(|mutation| match mutation {
709 Mutation::Update(UpdateMutation {
710 sink_add_columns, ..
711 })
712 | Mutation::AddAndUpdate(
713 _,
714 UpdateMutation {
715 sink_add_columns, ..
716 },
717 ) => sink_add_columns.get(&sink_id).cloned(),
718 _ => None,
719 })
720 }
721
722 pub fn get_curr_epoch(&self) -> Epoch {
723 Epoch(self.epoch.curr)
724 }
725
726 pub fn tracing_context(&self) -> &TracingContext {
728 &self.tracing_context
729 }
730
731 pub fn added_subscriber_on_mv_table(
732 &self,
733 mv_table_id: TableId,
734 ) -> impl Iterator<Item = u32> + '_ {
735 if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
736 self.mutation.as_deref()
737 {
738 Some(add)
739 } else {
740 None
741 }
742 .into_iter()
743 .flat_map(move |add| {
744 add.subscriptions_to_add.iter().filter_map(
745 move |(upstream_mv_table_id, subscriber_id)| {
746 if *upstream_mv_table_id == mv_table_id {
747 Some(*subscriber_id)
748 } else {
749 None
750 }
751 },
752 )
753 })
754 }
755}
756
757impl<M: PartialEq> PartialEq for BarrierInner<M> {
758 fn eq(&self, other: &Self) -> bool {
759 self.epoch == other.epoch && self.mutation == other.mutation
760 }
761}
762
763impl Mutation {
764 #[cfg(test)]
768 pub fn is_stop(&self) -> bool {
769 matches!(self, Mutation::Stop(_))
770 }
771
772 fn to_protobuf(&self) -> PbMutation {
773 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
774 actor_splits
775 .iter()
776 .map(|(&actor_id, splits)| {
777 (
778 actor_id,
779 ConnectorSplits {
780 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
781 },
782 )
783 })
784 .collect::<HashMap<_, _>>()
785 };
786
787 match self {
788 Mutation::Stop(StopMutation {
789 dropped_actors,
790 dropped_sink_fragments,
791 }) => PbMutation::Stop(PbStopMutation {
792 actors: dropped_actors.iter().copied().collect(),
793 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
794 }),
795 Mutation::Update(UpdateMutation {
796 dispatchers,
797 merges,
798 vnode_bitmaps,
799 dropped_actors,
800 actor_splits,
801 actor_new_dispatchers,
802 actor_cdc_table_snapshot_splits,
803 sink_add_columns,
804 }) => PbMutation::Update(PbUpdateMutation {
805 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
806 merge_update: merges.values().cloned().collect(),
807 actor_vnode_bitmap_update: vnode_bitmaps
808 .iter()
809 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
810 .collect(),
811 dropped_actors: dropped_actors.iter().cloned().collect(),
812 actor_splits: actor_splits_to_protobuf(actor_splits),
813 actor_new_dispatchers: actor_new_dispatchers
814 .iter()
815 .map(|(&actor_id, dispatchers)| {
816 (
817 actor_id,
818 Dispatchers {
819 dispatchers: dispatchers.clone(),
820 },
821 )
822 })
823 .collect(),
824 actor_cdc_table_snapshot_splits:
825 build_pb_actor_cdc_table_snapshot_splits_with_generation(
826 actor_cdc_table_snapshot_splits.clone(),
827 )
828 .into(),
829 sink_add_columns: sink_add_columns
830 .iter()
831 .map(|(sink_id, add_columns)| {
832 (
833 sink_id.sink_id,
834 PbSinkAddColumns {
835 fields: add_columns.iter().map(|field| field.to_prost()).collect(),
836 },
837 )
838 })
839 .collect(),
840 }),
841 Mutation::Add(AddMutation {
842 adds,
843 added_actors,
844 splits,
845 pause,
846 subscriptions_to_add,
847 backfill_nodes_to_pause,
848 actor_cdc_table_snapshot_splits,
849 new_upstream_sinks,
850 }) => PbMutation::Add(PbAddMutation {
851 actor_dispatchers: adds
852 .iter()
853 .map(|(&actor_id, dispatchers)| {
854 (
855 actor_id,
856 Dispatchers {
857 dispatchers: dispatchers.clone(),
858 },
859 )
860 })
861 .collect(),
862 added_actors: added_actors.iter().copied().collect(),
863 actor_splits: actor_splits_to_protobuf(splits),
864 pause: *pause,
865 subscriptions_to_add: subscriptions_to_add
866 .iter()
867 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
868 subscriber_id: *subscriber_id,
869 upstream_mv_table_id: table_id.table_id,
870 })
871 .collect(),
872 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
873 actor_cdc_table_snapshot_splits:
874 build_pb_actor_cdc_table_snapshot_splits_with_generation(
875 actor_cdc_table_snapshot_splits.clone(),
876 )
877 .into(),
878 new_upstream_sinks: new_upstream_sinks
879 .iter()
880 .map(|(k, v)| (*k, v.clone()))
881 .collect(),
882 }),
883 Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
884 actor_splits: changes
885 .iter()
886 .map(|(&actor_id, splits)| {
887 (
888 actor_id,
889 ConnectorSplits {
890 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
891 },
892 )
893 })
894 .collect(),
895 }),
896 Mutation::Pause => PbMutation::Pause(PauseMutation {}),
897 Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
898 Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
899 actor_throttle: changes
900 .iter()
901 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
902 .collect(),
903 }),
904
905 Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
906 mutations: vec![
907 BarrierMutation {
908 mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
909 },
910 BarrierMutation {
911 mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
912 },
913 ],
914 }),
915 Mutation::DropSubscriptions {
916 subscriptions_to_drop,
917 } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
918 info: subscriptions_to_drop
919 .iter()
920 .map(
921 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
922 subscriber_id: *subscriber_id,
923 upstream_mv_table_id: upstream_mv_table_id.table_id,
924 },
925 )
926 .collect(),
927 }),
928 Mutation::ConnectorPropsChange(map) => {
929 PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
930 connector_props_infos: map
931 .iter()
932 .map(|(actor_id, options)| {
933 (
934 *actor_id,
935 ConnectorPropsInfo {
936 connector_props_info: options
937 .iter()
938 .map(|(k, v)| (k.clone(), v.clone()))
939 .collect(),
940 },
941 )
942 })
943 .collect(),
944 })
945 }
946 Mutation::StartFragmentBackfill { fragment_ids } => {
947 PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
948 fragment_ids: fragment_ids.iter().copied().collect(),
949 })
950 }
951 Mutation::RefreshStart {
952 table_id,
953 associated_source_id,
954 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
955 table_id: table_id.table_id,
956 associated_source_id: associated_source_id.table_id,
957 }),
958 Mutation::ListFinish {
959 associated_source_id,
960 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
961 associated_source_id: associated_source_id.table_id,
962 }),
963 Mutation::LoadFinish {
964 associated_source_id,
965 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
966 associated_source_id: associated_source_id.table_id,
967 }),
968 }
969 }
970
971 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
972 let mutation = match prost {
973 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
974 dropped_actors: stop.actors.iter().copied().collect(),
975 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
976 }),
977
978 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
979 dispatchers: update
980 .dispatcher_update
981 .iter()
982 .map(|u| (u.actor_id, u.clone()))
983 .into_group_map(),
984 merges: update
985 .merge_update
986 .iter()
987 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
988 .collect(),
989 vnode_bitmaps: update
990 .actor_vnode_bitmap_update
991 .iter()
992 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
993 .collect(),
994 dropped_actors: update.dropped_actors.iter().cloned().collect(),
995 actor_splits: update
996 .actor_splits
997 .iter()
998 .map(|(&actor_id, splits)| {
999 (
1000 actor_id,
1001 splits
1002 .splits
1003 .iter()
1004 .map(|split| split.try_into().unwrap())
1005 .collect(),
1006 )
1007 })
1008 .collect(),
1009 actor_new_dispatchers: update
1010 .actor_new_dispatchers
1011 .iter()
1012 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1013 .collect(),
1014 actor_cdc_table_snapshot_splits:
1015 build_actor_cdc_table_snapshot_splits_with_generation(
1016 update
1017 .actor_cdc_table_snapshot_splits
1018 .clone()
1019 .unwrap_or_default(),
1020 ),
1021 sink_add_columns: update
1022 .sink_add_columns
1023 .iter()
1024 .map(|(sink_id, add_columns)| {
1025 (
1026 SinkId::new(*sink_id),
1027 add_columns.fields.iter().map(Field::from_prost).collect(),
1028 )
1029 })
1030 .collect(),
1031 }),
1032
1033 PbMutation::Add(add) => Mutation::Add(AddMutation {
1034 adds: add
1035 .actor_dispatchers
1036 .iter()
1037 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1038 .collect(),
1039 added_actors: add.added_actors.iter().copied().collect(),
1040 splits: add
1043 .actor_splits
1044 .iter()
1045 .map(|(&actor_id, splits)| {
1046 (
1047 actor_id,
1048 splits
1049 .splits
1050 .iter()
1051 .map(|split| split.try_into().unwrap())
1052 .collect(),
1053 )
1054 })
1055 .collect(),
1056 pause: add.pause,
1057 subscriptions_to_add: add
1058 .subscriptions_to_add
1059 .iter()
1060 .map(
1061 |SubscriptionUpstreamInfo {
1062 subscriber_id,
1063 upstream_mv_table_id,
1064 }| {
1065 (TableId::new(*upstream_mv_table_id), *subscriber_id)
1066 },
1067 )
1068 .collect(),
1069 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1070 actor_cdc_table_snapshot_splits:
1071 build_actor_cdc_table_snapshot_splits_with_generation(
1072 add.actor_cdc_table_snapshot_splits
1073 .clone()
1074 .unwrap_or_default(),
1075 ),
1076 new_upstream_sinks: add
1077 .new_upstream_sinks
1078 .iter()
1079 .map(|(k, v)| (*k, v.clone()))
1080 .collect(),
1081 }),
1082
1083 PbMutation::Splits(s) => {
1084 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1085 Vec::with_capacity(s.actor_splits.len());
1086 for (&actor_id, splits) in &s.actor_splits {
1087 if !splits.splits.is_empty() {
1088 change_splits.push((
1089 actor_id,
1090 splits
1091 .splits
1092 .iter()
1093 .map(SplitImpl::try_from)
1094 .try_collect()?,
1095 ));
1096 }
1097 }
1098 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1099 }
1100 PbMutation::Pause(_) => Mutation::Pause,
1101 PbMutation::Resume(_) => Mutation::Resume,
1102 PbMutation::Throttle(changes) => Mutation::Throttle(
1103 changes
1104 .actor_throttle
1105 .iter()
1106 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1107 .collect(),
1108 ),
1109 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1110 subscriptions_to_drop: drop
1111 .info
1112 .iter()
1113 .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
1114 .collect(),
1115 },
1116 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1117 Mutation::ConnectorPropsChange(
1118 alter_connector_props
1119 .connector_props_infos
1120 .iter()
1121 .map(|(actor_id, options)| {
1122 (
1123 *actor_id,
1124 options
1125 .connector_props_info
1126 .iter()
1127 .map(|(k, v)| (k.clone(), v.clone()))
1128 .collect(),
1129 )
1130 })
1131 .collect(),
1132 )
1133 }
1134 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1135 Mutation::StartFragmentBackfill {
1136 fragment_ids: start_fragment_backfill
1137 .fragment_ids
1138 .iter()
1139 .copied()
1140 .collect(),
1141 }
1142 }
1143 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1144 table_id: TableId::new(refresh_start.table_id),
1145 associated_source_id: TableId::new(refresh_start.associated_source_id),
1146 },
1147 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1148 associated_source_id: TableId::new(list_finish.associated_source_id),
1149 },
1150 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1151 associated_source_id: TableId::new(load_finish.associated_source_id),
1152 },
1153 PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
1154 [
1155 BarrierMutation {
1156 mutation: Some(add),
1157 },
1158 BarrierMutation {
1159 mutation: Some(update),
1160 },
1161 ] => {
1162 let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
1163 unreachable!();
1164 };
1165
1166 let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
1167 unreachable!();
1168 };
1169
1170 Mutation::AddAndUpdate(add_mutation, update_mutation)
1171 }
1172
1173 _ => unreachable!(),
1174 },
1175 };
1176 Ok(mutation)
1177 }
1178}
1179
1180impl<M> BarrierInner<M> {
1181 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1182 let Self {
1183 epoch,
1184 mutation,
1185 kind,
1186 passed_actors,
1187 tracing_context,
1188 ..
1189 } = self;
1190
1191 PbBarrier {
1192 epoch: Some(PbEpoch {
1193 curr: epoch.curr,
1194 prev: epoch.prev,
1195 }),
1196 mutation: Some(PbBarrierMutation {
1197 mutation: barrier_fn(mutation),
1198 }),
1199 tracing_context: tracing_context.to_protobuf(),
1200 kind: *kind as _,
1201 passed_actors: passed_actors.clone(),
1202 }
1203 }
1204
1205 fn from_protobuf_inner(
1206 prost: &PbBarrier,
1207 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1208 ) -> StreamExecutorResult<Self> {
1209 let epoch = prost.get_epoch()?;
1210
1211 Ok(Self {
1212 kind: prost.kind(),
1213 epoch: EpochPair::new(epoch.curr, epoch.prev),
1214 mutation: mutation_from_pb(
1215 prost
1216 .mutation
1217 .as_ref()
1218 .and_then(|mutation| mutation.mutation.as_ref()),
1219 )?,
1220 passed_actors: prost.get_passed_actors().clone(),
1221 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1222 })
1223 }
1224
1225 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1226 BarrierInner {
1227 epoch: self.epoch,
1228 mutation: f(self.mutation),
1229 kind: self.kind,
1230 tracing_context: self.tracing_context,
1231 passed_actors: self.passed_actors,
1232 }
1233 }
1234}
1235
1236impl DispatcherBarrier {
1237 pub fn to_protobuf(&self) -> PbBarrier {
1238 self.to_protobuf_inner(|_| None)
1239 }
1240}
1241
1242impl Barrier {
1243 pub fn to_protobuf(&self) -> PbBarrier {
1244 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1245 }
1246
1247 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1248 Self::from_protobuf_inner(prost, |mutation| {
1249 mutation
1250 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1251 .transpose()
1252 })
1253 }
1254}
1255
1256#[derive(Debug, PartialEq, Eq, Clone)]
1257pub struct Watermark {
1258 pub col_idx: usize,
1259 pub data_type: DataType,
1260 pub val: ScalarImpl,
1261}
1262
1263impl PartialOrd for Watermark {
1264 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1265 Some(self.cmp(other))
1266 }
1267}
1268
1269impl Ord for Watermark {
1270 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1271 self.val.default_cmp(&other.val)
1272 }
1273}
1274
1275impl Watermark {
1276 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1277 Self {
1278 col_idx,
1279 data_type,
1280 val,
1281 }
1282 }
1283
1284 pub async fn transform_with_expr(
1285 self,
1286 expr: &NonStrictExpression<impl Expression>,
1287 new_col_idx: usize,
1288 ) -> Option<Self> {
1289 let Self { col_idx, val, .. } = self;
1290 let row = {
1291 let mut row = vec![None; col_idx + 1];
1292 row[col_idx] = Some(val);
1293 OwnedRow::new(row)
1294 };
1295 let val = expr.eval_row_infallible(&row).await?;
1296 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1297 }
1298
1299 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1302 output_indices
1303 .iter()
1304 .position(|p| *p == self.col_idx)
1305 .map(|new_col_idx| self.with_idx(new_col_idx))
1306 }
1307
1308 pub fn to_protobuf(&self) -> PbWatermark {
1309 PbWatermark {
1310 column: Some(PbInputRef {
1311 index: self.col_idx as _,
1312 r#type: Some(self.data_type.to_protobuf()),
1313 }),
1314 val: Some(&self.val).to_protobuf().into(),
1315 }
1316 }
1317
1318 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1319 let col_ref = prost.get_column()?;
1320 let data_type = DataType::from(col_ref.get_type()?);
1321 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1322 .expect("watermark value cannot be null");
1323 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1324 }
1325
1326 pub fn with_idx(self, idx: usize) -> Self {
1327 Self::new(idx, self.data_type, self.val)
1328 }
1329}
1330
1331#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1332pub enum MessageInner<M> {
1333 Chunk(StreamChunk),
1334 Barrier(BarrierInner<M>),
1335 Watermark(Watermark),
1336}
1337
1338impl<M> MessageInner<M> {
1339 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1340 match self {
1341 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1342 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1343 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1344 }
1345 }
1346}
1347
1348pub type Message = MessageInner<BarrierMutationType>;
1349pub type DispatcherMessage = MessageInner<()>;
1350
1351#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1354pub enum MessageBatchInner<M> {
1355 Chunk(StreamChunk),
1356 BarrierBatch(Vec<BarrierInner<M>>),
1357 Watermark(Watermark),
1358}
1359pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1360pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1361pub type DispatcherMessageBatch = MessageBatchInner<()>;
1362
1363impl From<DispatcherMessage> for DispatcherMessageBatch {
1364 fn from(m: DispatcherMessage) -> Self {
1365 match m {
1366 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1367 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1368 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1369 }
1370 }
1371}
1372
1373impl From<StreamChunk> for Message {
1374 fn from(chunk: StreamChunk) -> Self {
1375 Message::Chunk(chunk)
1376 }
1377}
1378
1379impl<'a> TryFrom<&'a Message> for &'a Barrier {
1380 type Error = ();
1381
1382 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1383 match m {
1384 Message::Chunk(_) => Err(()),
1385 Message::Barrier(b) => Ok(b),
1386 Message::Watermark(_) => Err(()),
1387 }
1388 }
1389}
1390
1391impl Message {
1392 #[cfg(test)]
1397 pub fn is_stop(&self) -> bool {
1398 matches!(
1399 self,
1400 Message::Barrier(Barrier {
1401 mutation,
1402 ..
1403 }) if mutation.as_ref().unwrap().is_stop()
1404 )
1405 }
1406}
1407
1408impl DispatcherMessageBatch {
1409 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1410 let prost = match self {
1411 Self::Chunk(stream_chunk) => {
1412 let prost_stream_chunk = stream_chunk.to_protobuf();
1413 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1414 }
1415 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1416 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1417 }),
1418 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1419 };
1420 PbStreamMessageBatch {
1421 stream_message_batch: Some(prost),
1422 }
1423 }
1424
1425 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1426 let res = match prost.get_stream_message_batch()? {
1427 StreamMessageBatch::StreamChunk(chunk) => {
1428 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1429 }
1430 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1431 let barriers = barrier_batch
1432 .barriers
1433 .iter()
1434 .map(|barrier| {
1435 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1436 if mutation.is_some() {
1437 if cfg!(debug_assertions) {
1438 panic!("should not receive message of barrier with mutation");
1439 } else {
1440 warn!(?barrier, "receive message of barrier with mutation");
1441 }
1442 }
1443 Ok(())
1444 })
1445 })
1446 .try_collect()?;
1447 Self::BarrierBatch(barriers)
1448 }
1449 StreamMessageBatch::Watermark(watermark) => {
1450 Self::Watermark(Watermark::from_protobuf(watermark)?)
1451 }
1452 };
1453 Ok(res)
1454 }
1455
1456 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1457 ::prost::Message::encoded_len(msg)
1458 }
1459}
1460
1461pub type PkIndices = Vec<usize>;
1462pub type PkIndicesRef<'a> = &'a [usize];
1463pub type PkDataTypes = SmallVec<[DataType; 1]>;
1464
1465pub async fn expect_first_barrier<M: Debug>(
1467 stream: &mut (impl MessageStreamInner<M> + Unpin),
1468) -> StreamExecutorResult<BarrierInner<M>> {
1469 let message = stream
1470 .next()
1471 .instrument_await("expect_first_barrier")
1472 .await
1473 .context("failed to extract the first message: stream closed unexpectedly")??;
1474 let barrier = message
1475 .into_barrier()
1476 .expect("the first message must be a barrier");
1477 assert!(matches!(
1479 barrier.kind,
1480 BarrierKind::Checkpoint | BarrierKind::Initial
1481 ));
1482 Ok(barrier)
1483}
1484
1485pub async fn expect_first_barrier_from_aligned_stream(
1487 stream: &mut (impl AlignedMessageStream + Unpin),
1488) -> StreamExecutorResult<Barrier> {
1489 let message = stream
1490 .next()
1491 .instrument_await("expect_first_barrier")
1492 .await
1493 .context("failed to extract the first message: stream closed unexpectedly")??;
1494 let barrier = message
1495 .into_barrier()
1496 .expect("the first message must be a barrier");
1497 Ok(barrier)
1498}
1499
1500pub trait StreamConsumer: Send + 'static {
1502 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1503
1504 fn execute(self: Box<Self>) -> Self::BarrierStream;
1505}
1506
1507type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1508
1509pub struct DynamicReceivers<InputId, M> {
1513 barrier: Option<BarrierInner<M>>,
1515 start_ts: Option<Instant>,
1517 blocked: Vec<BoxedMessageInput<InputId, M>>,
1519 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1521 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1523 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1525 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1527}
1528
1529impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1530 for DynamicReceivers<InputId, M>
1531{
1532 type Item = MessageStreamItemInner<M>;
1533
1534 fn poll_next(
1535 mut self: Pin<&mut Self>,
1536 cx: &mut std::task::Context<'_>,
1537 ) -> Poll<Option<Self::Item>> {
1538 if self.is_empty() {
1539 return Poll::Ready(None);
1540 }
1541
1542 loop {
1543 match futures::ready!(self.active.poll_next_unpin(cx)) {
1544 Some((Some(Err(e)), _)) => {
1546 return Poll::Ready(Some(Err(e)));
1547 }
1548 Some((Some(Ok(message)), remaining)) => {
1550 let input_id = remaining.id();
1551 match message {
1552 MessageInner::Chunk(chunk) => {
1553 self.active.push(remaining.into_future());
1555 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1556 }
1557 MessageInner::Watermark(watermark) => {
1558 self.active.push(remaining.into_future());
1560 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1561 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1562 }
1563 }
1564 MessageInner::Barrier(barrier) => {
1565 if self.blocked.is_empty() {
1567 self.start_ts = Some(Instant::now());
1568 }
1569 self.blocked.push(remaining);
1570 if let Some(current_barrier) = self.barrier.as_ref() {
1571 if current_barrier.epoch != barrier.epoch {
1572 return Poll::Ready(Some(Err(
1573 StreamExecutorError::align_barrier(
1574 current_barrier.clone().map_mutation(|_| None),
1575 barrier.map_mutation(|_| None),
1576 ),
1577 )));
1578 }
1579 } else {
1580 self.barrier = Some(barrier);
1581 }
1582 }
1583 }
1584 }
1585 Some((None, remaining)) => {
1593 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1594 "upstream input {:?} unexpectedly closed",
1595 remaining.id()
1596 )))));
1597 }
1598 None => {
1600 assert!(!self.blocked.is_empty());
1601
1602 let start_ts = self
1603 .start_ts
1604 .take()
1605 .expect("should have received at least one barrier");
1606 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1607 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1608 }
1609 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1610 merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1612 }
1613
1614 break;
1615 }
1616 }
1617 }
1618
1619 assert!(self.active.is_terminated());
1620
1621 let barrier = self.barrier.take().unwrap();
1622
1623 let upstreams = std::mem::take(&mut self.blocked);
1624 self.extend_active(upstreams);
1625 assert!(!self.active.is_terminated());
1626
1627 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1628 }
1629}
1630
1631impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1632 pub fn new(
1633 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1634 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1635 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1636 ) -> Self {
1637 let mut this = Self {
1638 barrier: None,
1639 start_ts: None,
1640 blocked: Vec::with_capacity(upstreams.len()),
1641 active: Default::default(),
1642 buffered_watermarks: Default::default(),
1643 merge_barrier_align_duration,
1644 barrier_align_duration,
1645 };
1646 this.extend_active(upstreams);
1647 this
1648 }
1649
1650 pub fn extend_active(
1653 &mut self,
1654 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1655 ) {
1656 assert!(self.blocked.is_empty() && self.barrier.is_none());
1657
1658 self.active
1659 .extend(upstreams.into_iter().map(|s| s.into_future()));
1660 }
1661
1662 pub fn handle_watermark(
1664 &mut self,
1665 input_id: InputId,
1666 watermark: Watermark,
1667 ) -> Option<Watermark> {
1668 let col_idx = watermark.col_idx;
1669 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1671 let watermarks = self
1672 .buffered_watermarks
1673 .entry(col_idx)
1674 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1675 watermarks.handle_watermark(input_id, watermark)
1676 }
1677
1678 pub fn add_upstreams_from(
1681 &mut self,
1682 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1683 ) {
1684 assert!(self.blocked.is_empty() && self.barrier.is_none());
1685
1686 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1687 let input_ids = new_inputs.iter().map(|input| input.id());
1688 self.buffered_watermarks.values_mut().for_each(|buffers| {
1689 buffers.add_buffers(input_ids.clone());
1691 });
1692 self.active
1693 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1694 }
1695
1696 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1700 assert!(self.blocked.is_empty() && self.barrier.is_none());
1701
1702 let new_upstreams = std::mem::take(&mut self.active)
1703 .into_iter()
1704 .map(|s| s.into_inner().unwrap())
1705 .filter(|u| !upstream_input_ids.contains(&u.id()));
1706 self.extend_active(new_upstreams);
1707 self.buffered_watermarks.values_mut().for_each(|buffers| {
1708 buffers.remove_buffer(upstream_input_ids.clone());
1711 });
1712 }
1713
1714 pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1715 self.merge_barrier_align_duration.clone()
1716 }
1717
1718 pub fn flush_buffered_watermarks(&mut self) {
1719 self.buffered_watermarks
1720 .values_mut()
1721 .for_each(|buffers| buffers.clear());
1722 }
1723
1724 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1725 self.blocked
1726 .iter()
1727 .map(|s| s.id())
1728 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1729 }
1730
1731 pub fn is_empty(&self) -> bool {
1732 self.blocked.is_empty() && self.active.is_empty()
1733 }
1734}
1735
1736pub(crate) struct DispatchBarrierBuffer {
1757 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1758 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1759 recv_state: BarrierReceiverState,
1760 curr_upstream_fragment_id: FragmentId,
1761 actor_id: ActorId,
1762 build_input_ctx: Arc<BuildInputContext>,
1764}
1765
1766struct BuildInputContext {
1767 pub actor_id: ActorId,
1768 pub local_barrier_manager: LocalBarrierManager,
1769 pub metrics: Arc<StreamingMetrics>,
1770 pub fragment_id: FragmentId,
1771}
1772
1773type BoxedNewInputsFuture =
1774 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1775
1776enum BarrierReceiverState {
1777 ReceivingBarrier,
1778 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1779}
1780
1781impl DispatchBarrierBuffer {
1782 pub fn new(
1783 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1784 actor_id: ActorId,
1785 curr_upstream_fragment_id: FragmentId,
1786 local_barrier_manager: LocalBarrierManager,
1787 metrics: Arc<StreamingMetrics>,
1788 fragment_id: FragmentId,
1789 ) -> Self {
1790 Self {
1791 buffer: VecDeque::new(),
1792 barrier_rx,
1793 recv_state: BarrierReceiverState::ReceivingBarrier,
1794 curr_upstream_fragment_id,
1795 actor_id,
1796 build_input_ctx: Arc::new(BuildInputContext {
1797 actor_id,
1798 local_barrier_manager,
1799 metrics,
1800 fragment_id,
1801 }),
1802 }
1803 }
1804
1805 pub async fn await_next_message(
1806 &mut self,
1807 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1808 ) -> StreamExecutorResult<DispatcherMessage> {
1809 tokio::select! {
1810 biased;
1811
1812 msg = stream.try_next() => {
1813 msg?.ok_or_else(
1814 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1815 )
1816 }
1817
1818 e = self.continuously_fetch_barrier_rx() => {
1819 Err(e)
1820 }
1821 }
1822 }
1823
1824 pub async fn pop_barrier_with_inputs(
1825 &mut self,
1826 barrier: DispatcherBarrier,
1827 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1828 while self.buffer.is_empty() {
1829 self.try_fetch_barrier_rx(false).await?;
1830 }
1831 let (mut recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1832 apply_dispatcher_barrier(&mut recv_barrier, barrier);
1833
1834 Ok((recv_barrier, inputs))
1835 }
1836
1837 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1838 loop {
1839 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1840 return e;
1841 }
1842 }
1843 }
1844
1845 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1846 match &mut self.recv_state {
1847 BarrierReceiverState::ReceivingBarrier => {
1848 let Some(barrier) = self.barrier_rx.recv().await else {
1849 if pending_on_end {
1850 return pending().await;
1851 } else {
1852 return Err(StreamExecutorError::channel_closed(
1853 "barrier channel closed unexpectedly",
1854 ));
1855 }
1856 };
1857 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1858 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1859 } else {
1860 self.buffer.push_back((barrier, None));
1861 }
1862 }
1863 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1864 let new_inputs = fut.await?;
1865 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1866 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1867 }
1868 }
1869 Ok(())
1870 }
1871
1872 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1873 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1874 && !update.added_upstream_actors.is_empty()
1875 {
1876 let upstream_fragment_id =
1878 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1879 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1880 new_upstream_fragment_id
1881 } else {
1882 self.curr_upstream_fragment_id
1883 };
1884 let ctx = self.build_input_ctx.clone();
1885 let added_upstream_actors = update.added_upstream_actors.clone();
1886 let barrier = barrier.clone();
1887 let fut = async move {
1888 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1889 let mut new_input = new_input(
1890 &ctx.local_barrier_manager,
1891 ctx.metrics.clone(),
1892 ctx.actor_id,
1893 ctx.fragment_id,
1894 upstream_actor,
1895 upstream_fragment_id,
1896 )
1897 .await?;
1898
1899 let first_barrier = expect_first_barrier(&mut new_input).await?;
1902 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1903
1904 StreamExecutorResult::Ok(new_input)
1905 }))
1906 .await
1907 }
1908 .boxed();
1909
1910 Some(fut)
1911 } else {
1912 None
1913 }
1914 }
1915}