1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::fmt::Debug;
19use std::hash::Hash;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Poll;
23use std::vec;
24
25use await_tree::InstrumentAwait;
26use enum_as_inner::EnumAsInner;
27use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
28use futures::{Stream, StreamExt};
29use itertools::Itertools;
30use prometheus::Histogram;
31use prometheus::core::{AtomicU64, GenericCounter};
32use risingwave_common::array::StreamChunk;
33use risingwave_common::bitmap::Bitmap;
34use risingwave_common::catalog::{Field, Schema, TableId};
35use risingwave_common::metrics::LabelGuardedMetric;
36use risingwave_common::row::OwnedRow;
37use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
38use risingwave_common::util::epoch::{Epoch, EpochPair};
39use risingwave_common::util::tracing::TracingContext;
40use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
41use risingwave_connector::source::SplitImpl;
42use risingwave_expr::expr::{Expression, NonStrictExpression};
43use risingwave_pb::data::PbEpoch;
44use risingwave_pb::expr::PbInputRef;
45use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
46use risingwave_pb::stream_plan::barrier::BarrierKind;
47use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
48use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
49use risingwave_pb::stream_plan::stream_node::PbStreamKind;
50use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
51use risingwave_pb::stream_plan::{
52 BarrierMutation, CombinedMutation, ConnectorPropsChangeMutation, Dispatchers,
53 DropSubscriptionsMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
54 PbDispatcher, PbSinkAddColumns, PbStopMutation, PbStreamMessageBatch, PbUpdateMutation,
55 PbWatermark, ResumeMutation, SourceChangeSplitMutation, StartFragmentBackfillMutation,
56 SubscriptionUpstreamInfo, ThrottleMutation,
57};
58use smallvec::SmallVec;
59use tokio::time::Instant;
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::BoxedInput;
63use crate::executor::watermark::BufferedWatermarks;
64use crate::task::{ActorId, FragmentId};
65
66mod actor;
67mod barrier_align;
68pub mod exchange;
69pub mod monitor;
70
71pub mod aggregate;
72pub mod asof_join;
73mod backfill;
74mod barrier_recv;
75mod batch_query;
76mod chain;
77mod changelog;
78mod dedup;
79mod dispatch;
80pub mod dml;
81mod dynamic_filter;
82pub mod eowc;
83pub mod error;
84mod expand;
85mod filter;
86pub mod hash_join;
87mod hop_window;
88mod join;
89mod lookup;
90mod lookup_union;
91mod merge;
92mod mview;
93mod nested_loop_temporal_join;
94mod no_op;
95mod now;
96mod over_window;
97pub mod project;
98mod rearranged_chain;
99mod receiver;
100pub mod row_id_gen;
101mod sink;
102pub mod source;
103mod stream_reader;
104pub mod subtask;
105mod temporal_join;
106mod top_n;
107mod troublemaker;
108mod union;
109mod upstream_sink_union;
110mod values;
111mod watermark;
112mod watermark_filter;
113mod wrapper;
114
115mod approx_percentile;
116
117mod row_merge;
118
119#[cfg(test)]
120mod integration_tests;
121mod sync_kv_log_store;
122#[cfg(any(test, feature = "test"))]
123pub mod test_utils;
124mod utils;
125mod vector_index;
126
127pub use actor::{Actor, ActorContext, ActorContextRef};
128use anyhow::Context;
129pub use approx_percentile::global::GlobalApproxPercentileExecutor;
130pub use approx_percentile::local::LocalApproxPercentileExecutor;
131pub use backfill::arrangement_backfill::*;
132pub use backfill::cdc::{
133 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
134};
135pub use backfill::no_shuffle_backfill::*;
136pub use backfill::snapshot_backfill::*;
137pub use barrier_recv::BarrierRecvExecutor;
138pub use batch_query::BatchQueryExecutor;
139pub use chain::ChainExecutor;
140pub use changelog::ChangeLogExecutor;
141pub use dedup::AppendOnlyDedupExecutor;
142pub use dispatch::{DispatchExecutor, DispatcherImpl};
143pub use dynamic_filter::DynamicFilterExecutor;
144pub use error::{StreamExecutorError, StreamExecutorResult};
145pub use expand::ExpandExecutor;
146pub use filter::FilterExecutor;
147pub use hash_join::*;
148pub use hop_window::HopWindowExecutor;
149pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
150pub use join::{AsOfDesc, AsOfJoinType, JoinType};
151pub use lookup::*;
152pub use lookup_union::LookupUnionExecutor;
153pub use merge::MergeExecutor;
154pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
155pub use mview::*;
156pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
157pub use no_op::NoOpExecutor;
158pub use now::*;
159pub use over_window::*;
160pub use rearranged_chain::RearrangedChainExecutor;
161pub use receiver::ReceiverExecutor;
162use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
163pub use row_merge::RowMergeExecutor;
164pub use sink::SinkExecutor;
165pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
166pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
167pub use temporal_join::TemporalJoinExecutor;
168pub use top_n::{
169 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
170};
171pub use troublemaker::TroublemakerExecutor;
172pub use union::UnionExecutor;
173pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
174pub use utils::DummyExecutor;
175pub use values::ValuesExecutor;
176pub use vector_index::VectorIndexWriteExecutor;
177pub use watermark_filter::WatermarkFilterExecutor;
178pub use wrapper::WrapperExecutor;
179
180use self::barrier_align::AlignedMessageStream;
181
182pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
183pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
184pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
185pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
186
187pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
188use risingwave_connector::sink::catalog::SinkId;
189use risingwave_connector::source::cdc::{
190 CdcTableSnapshotSplitAssignmentWithGeneration,
191 build_actor_cdc_table_snapshot_splits_with_generation,
192 build_pb_actor_cdc_table_snapshot_splits_with_generation,
193};
194use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
195use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
196
197pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
198pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
199pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
200
201#[derive(Debug, Default, Clone)]
203pub struct ExecutorInfo {
204 pub schema: Schema,
206
207 pub pk_indices: PkIndices,
211
212 pub stream_kind: PbStreamKind,
214
215 pub identity: String,
217
218 pub id: u64,
220}
221
222impl ExecutorInfo {
223 pub fn for_test(schema: Schema, pk_indices: PkIndices, identity: String, id: u64) -> Self {
224 Self {
225 schema,
226 pk_indices,
227 stream_kind: PbStreamKind::Retract, identity,
229 id,
230 }
231 }
232}
233
234pub trait Execute: Send + 'static {
236 fn execute(self: Box<Self>) -> BoxedMessageStream;
237
238 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
239 self.execute()
240 }
241
242 fn boxed(self) -> Box<dyn Execute>
243 where
244 Self: Sized + Send + 'static,
245 {
246 Box::new(self)
247 }
248}
249
250pub struct Executor {
253 info: ExecutorInfo,
254 execute: Box<dyn Execute>,
255}
256
257impl Executor {
258 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
259 Self { info, execute }
260 }
261
262 pub fn info(&self) -> &ExecutorInfo {
263 &self.info
264 }
265
266 pub fn schema(&self) -> &Schema {
267 &self.info.schema
268 }
269
270 pub fn pk_indices(&self) -> PkIndicesRef<'_> {
271 &self.info.pk_indices
272 }
273
274 pub fn stream_kind(&self) -> PbStreamKind {
275 self.info.stream_kind
276 }
277
278 pub fn identity(&self) -> &str {
279 &self.info.identity
280 }
281
282 pub fn execute(self) -> BoxedMessageStream {
283 self.execute.execute()
284 }
285
286 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
287 self.execute.execute_with_epoch(epoch)
288 }
289}
290
291impl std::fmt::Debug for Executor {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 f.write_str(self.identity())
294 }
295}
296
297impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
298 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
299 Self::new(info, execute)
300 }
301}
302
303impl<E> From<(ExecutorInfo, E)> for Executor
304where
305 E: Execute,
306{
307 fn from((info, execute): (ExecutorInfo, E)) -> Self {
308 Self::new(info, execute.boxed())
309 }
310}
311
312pub const INVALID_EPOCH: u64 = 0;
313
314type UpstreamFragmentId = FragmentId;
315type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
316
317#[derive(Debug, Clone, PartialEq)]
318#[cfg_attr(any(test, feature = "test"), derive(Default))]
319pub struct UpdateMutation {
320 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
321 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
322 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
323 pub dropped_actors: HashSet<ActorId>,
324 pub actor_splits: SplitAssignments,
325 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
326 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
327 pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
328}
329
330#[derive(Debug, Clone, PartialEq)]
331#[cfg_attr(any(test, feature = "test"), derive(Default))]
332pub struct AddMutation {
333 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
334 pub added_actors: HashSet<ActorId>,
335 pub splits: SplitAssignments,
337 pub pause: bool,
338 pub subscriptions_to_add: Vec<(TableId, u32)>,
340 pub backfill_nodes_to_pause: HashSet<FragmentId>,
342 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
343 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
344}
345
346#[derive(Debug, Clone, PartialEq)]
347#[cfg_attr(any(test, feature = "test"), derive(Default))]
348pub struct StopMutation {
349 pub dropped_actors: HashSet<ActorId>,
350 pub dropped_sink_fragments: HashSet<FragmentId>,
351}
352
353#[derive(Debug, Clone, PartialEq)]
355pub enum Mutation {
356 Stop(StopMutation),
357 Update(UpdateMutation),
358 Add(AddMutation),
359 SourceChangeSplit(SplitAssignments),
360 Pause,
361 Resume,
362 Throttle(HashMap<ActorId, Option<u32>>),
363 AddAndUpdate(AddMutation, UpdateMutation),
364 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
365 DropSubscriptions {
366 subscriptions_to_drop: Vec<(u32, TableId)>,
368 },
369 StartFragmentBackfill {
370 fragment_ids: HashSet<FragmentId>,
371 },
372 RefreshStart {
373 table_id: TableId,
374 associated_source_id: TableId,
375 },
376 LoadFinish {
378 associated_source_id: TableId,
379 },
380}
381
382#[derive(Debug, Clone)]
387pub struct BarrierInner<M> {
388 pub epoch: EpochPair,
389 pub mutation: M,
390 pub kind: BarrierKind,
391
392 pub tracing_context: TracingContext,
394
395 pub passed_actors: Vec<ActorId>,
397}
398
399pub type BarrierMutationType = Option<Arc<Mutation>>;
400pub type Barrier = BarrierInner<BarrierMutationType>;
401pub type DispatcherBarrier = BarrierInner<()>;
402
403impl<M: Default> BarrierInner<M> {
404 pub fn new_test_barrier(epoch: u64) -> Self {
406 Self {
407 epoch: EpochPair::new_test_epoch(epoch),
408 kind: BarrierKind::Checkpoint,
409 tracing_context: TracingContext::none(),
410 mutation: Default::default(),
411 passed_actors: Default::default(),
412 }
413 }
414
415 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
416 Self {
417 epoch: EpochPair::new(epoch, prev_epoch),
418 kind: BarrierKind::Checkpoint,
419 tracing_context: TracingContext::none(),
420 mutation: Default::default(),
421 passed_actors: Default::default(),
422 }
423 }
424}
425
426impl Barrier {
427 pub fn into_dispatcher(self) -> DispatcherBarrier {
428 DispatcherBarrier {
429 epoch: self.epoch,
430 mutation: (),
431 kind: self.kind,
432 tracing_context: self.tracing_context,
433 passed_actors: self.passed_actors,
434 }
435 }
436
437 #[must_use]
438 pub fn with_mutation(self, mutation: Mutation) -> Self {
439 Self {
440 mutation: Some(Arc::new(mutation)),
441 ..self
442 }
443 }
444
445 #[must_use]
446 pub fn with_stop(self) -> Self {
447 self.with_mutation(Mutation::Stop(StopMutation {
448 dropped_actors: Default::default(),
449 dropped_sink_fragments: Default::default(),
450 }))
451 }
452
453 pub fn is_with_stop_mutation(&self) -> bool {
455 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
456 }
457
458 pub fn is_stop(&self, actor_id: ActorId) -> bool {
460 self.all_stop_actors()
461 .is_some_and(|actors| actors.contains(&actor_id))
462 }
463
464 pub fn is_checkpoint(&self) -> bool {
465 self.kind == BarrierKind::Checkpoint
466 }
467
468 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
479 match self.mutation.as_deref()? {
480 Mutation::Update(UpdateMutation { actor_splits, .. })
481 | Mutation::Add(AddMutation {
482 splits: actor_splits,
483 ..
484 }) => actor_splits.get(&actor_id),
485
486 Mutation::AddAndUpdate(
487 AddMutation {
488 splits: add_actor_splits,
489 ..
490 },
491 UpdateMutation {
492 actor_splits: update_actor_splits,
493 ..
494 },
495 ) => add_actor_splits
496 .get(&actor_id)
497 .or_else(|| update_actor_splits.get(&actor_id)),
499
500 _ => {
501 if cfg!(debug_assertions) {
502 panic!(
503 "the initial mutation of the barrier should not be {:?}",
504 self.mutation
505 );
506 }
507 None
508 }
509 }
510 .map(|s| s.as_slice())
511 }
512
513 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
515 match self.mutation.as_deref() {
516 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
517 Some(Mutation::Update(UpdateMutation { dropped_actors, .. }))
518 | Some(Mutation::AddAndUpdate(_, UpdateMutation { dropped_actors, .. })) => {
519 Some(dropped_actors)
520 }
521 _ => None,
522 }
523 }
524
525 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
531 match self.mutation.as_deref() {
532 Some(Mutation::Add(AddMutation { added_actors, .. }))
533 | Some(Mutation::AddAndUpdate(AddMutation { added_actors, .. }, _)) => {
534 added_actors.contains(&actor_id)
535 }
536 _ => false,
537 }
538 }
539
540 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
541 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
542 fragment_ids.contains(&fragment_id)
543 } else {
544 false
545 }
546 }
547
548 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
566 let Some(mutation) = self.mutation.as_deref() else {
567 return false;
568 };
569 match mutation {
570 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
572 Mutation::AddAndUpdate(
574 AddMutation { adds, .. },
575 UpdateMutation {
576 dispatchers,
577 actor_new_dispatchers,
578 ..
579 },
580 ) => {
581 adds.get(&upstream_actor_id).is_some()
582 || actor_new_dispatchers.get(&upstream_actor_id).is_some()
583 || dispatchers.get(&upstream_actor_id).is_some()
584 }
585 Mutation::Update(_)
586 | Mutation::Stop(_)
587 | Mutation::Pause
588 | Mutation::Resume
589 | Mutation::SourceChangeSplit(_)
590 | Mutation::Throttle(_)
591 | Mutation::DropSubscriptions { .. }
592 | Mutation::ConnectorPropsChange(_)
593 | Mutation::StartFragmentBackfill { .. }
594 | Mutation::RefreshStart { .. }
595 | Mutation::LoadFinish { .. } => false,
596 }
597 }
598
599 pub fn is_pause_on_startup(&self) -> bool {
601 match self.mutation.as_deref() {
602 Some(Mutation::Add(AddMutation { pause, .. }))
603 | Some(Mutation::AddAndUpdate(AddMutation { pause, .. }, _)) => *pause,
604 _ => false,
605 }
606 }
607
608 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
609 match self.mutation.as_deref() {
610 Some(Mutation::Add(AddMutation {
611 backfill_nodes_to_pause,
612 ..
613 }))
614 | Some(Mutation::AddAndUpdate(
615 AddMutation {
616 backfill_nodes_to_pause,
617 ..
618 },
619 _,
620 )) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
621 _ => {
622 tracing::warn!("expected an AddMutation on Startup, instead got {:?}", self);
623 true
624 }
625 }
626 }
627
628 pub fn is_resume(&self) -> bool {
630 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
631 }
632
633 pub fn as_update_merge(
636 &self,
637 actor_id: ActorId,
638 upstream_fragment_id: UpstreamFragmentId,
639 ) -> Option<&MergeUpdate> {
640 self.mutation
641 .as_deref()
642 .and_then(|mutation| match mutation {
643 Mutation::Update(UpdateMutation { merges, .. })
644 | Mutation::AddAndUpdate(_, UpdateMutation { merges, .. }) => {
645 merges.get(&(actor_id, upstream_fragment_id))
646 }
647 _ => None,
648 })
649 }
650
651 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
654 self.mutation
655 .as_deref()
656 .and_then(|mutation| match mutation {
657 Mutation::Add(AddMutation {
658 new_upstream_sinks, ..
659 }) => new_upstream_sinks.get(&fragment_id),
660 _ => None,
661 })
662 }
663
664 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
666 self.mutation
667 .as_deref()
668 .and_then(|mutation| match mutation {
669 Mutation::Stop(StopMutation {
670 dropped_sink_fragments,
671 ..
672 }) => Some(dropped_sink_fragments),
673 _ => None,
674 })
675 }
676
677 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
683 self.mutation
684 .as_deref()
685 .and_then(|mutation| match mutation {
686 Mutation::Update(UpdateMutation { vnode_bitmaps, .. })
687 | Mutation::AddAndUpdate(_, UpdateMutation { vnode_bitmaps, .. }) => {
688 vnode_bitmaps.get(&actor_id).cloned()
689 }
690 _ => None,
691 })
692 }
693
694 pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
695 self.mutation
696 .as_deref()
697 .and_then(|mutation| match mutation {
698 Mutation::Update(UpdateMutation {
699 sink_add_columns, ..
700 })
701 | Mutation::AddAndUpdate(
702 _,
703 UpdateMutation {
704 sink_add_columns, ..
705 },
706 ) => sink_add_columns.get(&sink_id).cloned(),
707 _ => None,
708 })
709 }
710
711 pub fn get_curr_epoch(&self) -> Epoch {
712 Epoch(self.epoch.curr)
713 }
714
715 pub fn tracing_context(&self) -> &TracingContext {
717 &self.tracing_context
718 }
719
720 pub fn added_subscriber_on_mv_table(
721 &self,
722 mv_table_id: TableId,
723 ) -> impl Iterator<Item = u32> + '_ {
724 if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
725 self.mutation.as_deref()
726 {
727 Some(add)
728 } else {
729 None
730 }
731 .into_iter()
732 .flat_map(move |add| {
733 add.subscriptions_to_add.iter().filter_map(
734 move |(upstream_mv_table_id, subscriber_id)| {
735 if *upstream_mv_table_id == mv_table_id {
736 Some(*subscriber_id)
737 } else {
738 None
739 }
740 },
741 )
742 })
743 }
744}
745
746impl<M: PartialEq> PartialEq for BarrierInner<M> {
747 fn eq(&self, other: &Self) -> bool {
748 self.epoch == other.epoch && self.mutation == other.mutation
749 }
750}
751
752impl Mutation {
753 #[cfg(test)]
757 pub fn is_stop(&self) -> bool {
758 matches!(self, Mutation::Stop(_))
759 }
760
761 fn to_protobuf(&self) -> PbMutation {
762 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
763 actor_splits
764 .iter()
765 .map(|(&actor_id, splits)| {
766 (
767 actor_id,
768 ConnectorSplits {
769 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
770 },
771 )
772 })
773 .collect::<HashMap<_, _>>()
774 };
775
776 match self {
777 Mutation::Stop(StopMutation {
778 dropped_actors,
779 dropped_sink_fragments,
780 }) => PbMutation::Stop(PbStopMutation {
781 actors: dropped_actors.iter().copied().collect(),
782 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
783 }),
784 Mutation::Update(UpdateMutation {
785 dispatchers,
786 merges,
787 vnode_bitmaps,
788 dropped_actors,
789 actor_splits,
790 actor_new_dispatchers,
791 actor_cdc_table_snapshot_splits,
792 sink_add_columns,
793 }) => PbMutation::Update(PbUpdateMutation {
794 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
795 merge_update: merges.values().cloned().collect(),
796 actor_vnode_bitmap_update: vnode_bitmaps
797 .iter()
798 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
799 .collect(),
800 dropped_actors: dropped_actors.iter().cloned().collect(),
801 actor_splits: actor_splits_to_protobuf(actor_splits),
802 actor_new_dispatchers: actor_new_dispatchers
803 .iter()
804 .map(|(&actor_id, dispatchers)| {
805 (
806 actor_id,
807 Dispatchers {
808 dispatchers: dispatchers.clone(),
809 },
810 )
811 })
812 .collect(),
813 actor_cdc_table_snapshot_splits:
814 build_pb_actor_cdc_table_snapshot_splits_with_generation(
815 actor_cdc_table_snapshot_splits.clone(),
816 )
817 .into(),
818 sink_add_columns: sink_add_columns
819 .iter()
820 .map(|(sink_id, add_columns)| {
821 (
822 sink_id.sink_id,
823 PbSinkAddColumns {
824 fields: add_columns.iter().map(|field| field.to_prost()).collect(),
825 },
826 )
827 })
828 .collect(),
829 }),
830 Mutation::Add(AddMutation {
831 adds,
832 added_actors,
833 splits,
834 pause,
835 subscriptions_to_add,
836 backfill_nodes_to_pause,
837 actor_cdc_table_snapshot_splits,
838 new_upstream_sinks,
839 }) => PbMutation::Add(PbAddMutation {
840 actor_dispatchers: adds
841 .iter()
842 .map(|(&actor_id, dispatchers)| {
843 (
844 actor_id,
845 Dispatchers {
846 dispatchers: dispatchers.clone(),
847 },
848 )
849 })
850 .collect(),
851 added_actors: added_actors.iter().copied().collect(),
852 actor_splits: actor_splits_to_protobuf(splits),
853 pause: *pause,
854 subscriptions_to_add: subscriptions_to_add
855 .iter()
856 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
857 subscriber_id: *subscriber_id,
858 upstream_mv_table_id: table_id.table_id,
859 })
860 .collect(),
861 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
862 actor_cdc_table_snapshot_splits:
863 build_pb_actor_cdc_table_snapshot_splits_with_generation(
864 actor_cdc_table_snapshot_splits.clone(),
865 )
866 .into(),
867 new_upstream_sinks: new_upstream_sinks
868 .iter()
869 .map(|(k, v)| (*k, v.clone()))
870 .collect(),
871 }),
872 Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
873 actor_splits: changes
874 .iter()
875 .map(|(&actor_id, splits)| {
876 (
877 actor_id,
878 ConnectorSplits {
879 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
880 },
881 )
882 })
883 .collect(),
884 }),
885 Mutation::Pause => PbMutation::Pause(PauseMutation {}),
886 Mutation::Resume => PbMutation::Resume(ResumeMutation {}),
887 Mutation::Throttle(changes) => PbMutation::Throttle(ThrottleMutation {
888 actor_throttle: changes
889 .iter()
890 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
891 .collect(),
892 }),
893
894 Mutation::AddAndUpdate(add, update) => PbMutation::Combined(CombinedMutation {
895 mutations: vec![
896 BarrierMutation {
897 mutation: Some(Mutation::Add(add.clone()).to_protobuf()),
898 },
899 BarrierMutation {
900 mutation: Some(Mutation::Update(update.clone()).to_protobuf()),
901 },
902 ],
903 }),
904 Mutation::DropSubscriptions {
905 subscriptions_to_drop,
906 } => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
907 info: subscriptions_to_drop
908 .iter()
909 .map(
910 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
911 subscriber_id: *subscriber_id,
912 upstream_mv_table_id: upstream_mv_table_id.table_id,
913 },
914 )
915 .collect(),
916 }),
917 Mutation::ConnectorPropsChange(map) => {
918 PbMutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
919 connector_props_infos: map
920 .iter()
921 .map(|(actor_id, options)| {
922 (
923 *actor_id,
924 ConnectorPropsInfo {
925 connector_props_info: options
926 .iter()
927 .map(|(k, v)| (k.clone(), v.clone()))
928 .collect(),
929 },
930 )
931 })
932 .collect(),
933 })
934 }
935 Mutation::StartFragmentBackfill { fragment_ids } => {
936 PbMutation::StartFragmentBackfill(StartFragmentBackfillMutation {
937 fragment_ids: fragment_ids.iter().copied().collect(),
938 })
939 }
940 Mutation::RefreshStart {
941 table_id,
942 associated_source_id,
943 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
944 table_id: table_id.table_id,
945 associated_source_id: associated_source_id.table_id,
946 }),
947 Mutation::LoadFinish {
948 associated_source_id,
949 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
950 associated_source_id: associated_source_id.table_id,
951 }),
952 }
953 }
954
955 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
956 let mutation = match prost {
957 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
958 dropped_actors: stop.actors.iter().copied().collect(),
959 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
960 }),
961
962 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
963 dispatchers: update
964 .dispatcher_update
965 .iter()
966 .map(|u| (u.actor_id, u.clone()))
967 .into_group_map(),
968 merges: update
969 .merge_update
970 .iter()
971 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
972 .collect(),
973 vnode_bitmaps: update
974 .actor_vnode_bitmap_update
975 .iter()
976 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
977 .collect(),
978 dropped_actors: update.dropped_actors.iter().cloned().collect(),
979 actor_splits: update
980 .actor_splits
981 .iter()
982 .map(|(&actor_id, splits)| {
983 (
984 actor_id,
985 splits
986 .splits
987 .iter()
988 .map(|split| split.try_into().unwrap())
989 .collect(),
990 )
991 })
992 .collect(),
993 actor_new_dispatchers: update
994 .actor_new_dispatchers
995 .iter()
996 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
997 .collect(),
998 actor_cdc_table_snapshot_splits:
999 build_actor_cdc_table_snapshot_splits_with_generation(
1000 update
1001 .actor_cdc_table_snapshot_splits
1002 .clone()
1003 .unwrap_or_default(),
1004 ),
1005 sink_add_columns: update
1006 .sink_add_columns
1007 .iter()
1008 .map(|(sink_id, add_columns)| {
1009 (
1010 SinkId::new(*sink_id),
1011 add_columns.fields.iter().map(Field::from_prost).collect(),
1012 )
1013 })
1014 .collect(),
1015 }),
1016
1017 PbMutation::Add(add) => Mutation::Add(AddMutation {
1018 adds: add
1019 .actor_dispatchers
1020 .iter()
1021 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1022 .collect(),
1023 added_actors: add.added_actors.iter().copied().collect(),
1024 splits: add
1027 .actor_splits
1028 .iter()
1029 .map(|(&actor_id, splits)| {
1030 (
1031 actor_id,
1032 splits
1033 .splits
1034 .iter()
1035 .map(|split| split.try_into().unwrap())
1036 .collect(),
1037 )
1038 })
1039 .collect(),
1040 pause: add.pause,
1041 subscriptions_to_add: add
1042 .subscriptions_to_add
1043 .iter()
1044 .map(
1045 |SubscriptionUpstreamInfo {
1046 subscriber_id,
1047 upstream_mv_table_id,
1048 }| {
1049 (TableId::new(*upstream_mv_table_id), *subscriber_id)
1050 },
1051 )
1052 .collect(),
1053 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1054 actor_cdc_table_snapshot_splits:
1055 build_actor_cdc_table_snapshot_splits_with_generation(
1056 add.actor_cdc_table_snapshot_splits
1057 .clone()
1058 .unwrap_or_default(),
1059 ),
1060 new_upstream_sinks: add
1061 .new_upstream_sinks
1062 .iter()
1063 .map(|(k, v)| (*k, v.clone()))
1064 .collect(),
1065 }),
1066
1067 PbMutation::Splits(s) => {
1068 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1069 Vec::with_capacity(s.actor_splits.len());
1070 for (&actor_id, splits) in &s.actor_splits {
1071 if !splits.splits.is_empty() {
1072 change_splits.push((
1073 actor_id,
1074 splits
1075 .splits
1076 .iter()
1077 .map(SplitImpl::try_from)
1078 .try_collect()?,
1079 ));
1080 }
1081 }
1082 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1083 }
1084 PbMutation::Pause(_) => Mutation::Pause,
1085 PbMutation::Resume(_) => Mutation::Resume,
1086 PbMutation::Throttle(changes) => Mutation::Throttle(
1087 changes
1088 .actor_throttle
1089 .iter()
1090 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1091 .collect(),
1092 ),
1093 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1094 subscriptions_to_drop: drop
1095 .info
1096 .iter()
1097 .map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
1098 .collect(),
1099 },
1100 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1101 Mutation::ConnectorPropsChange(
1102 alter_connector_props
1103 .connector_props_infos
1104 .iter()
1105 .map(|(actor_id, options)| {
1106 (
1107 *actor_id,
1108 options
1109 .connector_props_info
1110 .iter()
1111 .map(|(k, v)| (k.clone(), v.clone()))
1112 .collect(),
1113 )
1114 })
1115 .collect(),
1116 )
1117 }
1118 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1119 Mutation::StartFragmentBackfill {
1120 fragment_ids: start_fragment_backfill
1121 .fragment_ids
1122 .iter()
1123 .copied()
1124 .collect(),
1125 }
1126 }
1127 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1128 table_id: TableId::new(refresh_start.table_id),
1129 associated_source_id: TableId::new(refresh_start.associated_source_id),
1130 },
1131 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1132 associated_source_id: TableId::new(load_finish.associated_source_id),
1133 },
1134 PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
1135 [
1136 BarrierMutation {
1137 mutation: Some(add),
1138 },
1139 BarrierMutation {
1140 mutation: Some(update),
1141 },
1142 ] => {
1143 let Mutation::Add(add_mutation) = Mutation::from_protobuf(add)? else {
1144 unreachable!();
1145 };
1146
1147 let Mutation::Update(update_mutation) = Mutation::from_protobuf(update)? else {
1148 unreachable!();
1149 };
1150
1151 Mutation::AddAndUpdate(add_mutation, update_mutation)
1152 }
1153
1154 _ => unreachable!(),
1155 },
1156 };
1157 Ok(mutation)
1158 }
1159}
1160
1161impl<M> BarrierInner<M> {
1162 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1163 let Self {
1164 epoch,
1165 mutation,
1166 kind,
1167 passed_actors,
1168 tracing_context,
1169 ..
1170 } = self;
1171
1172 PbBarrier {
1173 epoch: Some(PbEpoch {
1174 curr: epoch.curr,
1175 prev: epoch.prev,
1176 }),
1177 mutation: Some(PbBarrierMutation {
1178 mutation: barrier_fn(mutation),
1179 }),
1180 tracing_context: tracing_context.to_protobuf(),
1181 kind: *kind as _,
1182 passed_actors: passed_actors.clone(),
1183 }
1184 }
1185
1186 fn from_protobuf_inner(
1187 prost: &PbBarrier,
1188 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1189 ) -> StreamExecutorResult<Self> {
1190 let epoch = prost.get_epoch()?;
1191
1192 Ok(Self {
1193 kind: prost.kind(),
1194 epoch: EpochPair::new(epoch.curr, epoch.prev),
1195 mutation: mutation_from_pb(
1196 prost
1197 .mutation
1198 .as_ref()
1199 .and_then(|mutation| mutation.mutation.as_ref()),
1200 )?,
1201 passed_actors: prost.get_passed_actors().clone(),
1202 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1203 })
1204 }
1205
1206 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1207 BarrierInner {
1208 epoch: self.epoch,
1209 mutation: f(self.mutation),
1210 kind: self.kind,
1211 tracing_context: self.tracing_context,
1212 passed_actors: self.passed_actors,
1213 }
1214 }
1215}
1216
1217impl DispatcherBarrier {
1218 pub fn to_protobuf(&self) -> PbBarrier {
1219 self.to_protobuf_inner(|_| None)
1220 }
1221}
1222
1223impl Barrier {
1224 pub fn to_protobuf(&self) -> PbBarrier {
1225 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1226 }
1227
1228 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1229 Self::from_protobuf_inner(prost, |mutation| {
1230 mutation
1231 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1232 .transpose()
1233 })
1234 }
1235}
1236
1237#[derive(Debug, PartialEq, Eq, Clone)]
1238pub struct Watermark {
1239 pub col_idx: usize,
1240 pub data_type: DataType,
1241 pub val: ScalarImpl,
1242}
1243
1244impl PartialOrd for Watermark {
1245 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1246 Some(self.cmp(other))
1247 }
1248}
1249
1250impl Ord for Watermark {
1251 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1252 self.val.default_cmp(&other.val)
1253 }
1254}
1255
1256impl Watermark {
1257 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1258 Self {
1259 col_idx,
1260 data_type,
1261 val,
1262 }
1263 }
1264
1265 pub async fn transform_with_expr(
1266 self,
1267 expr: &NonStrictExpression<impl Expression>,
1268 new_col_idx: usize,
1269 ) -> Option<Self> {
1270 let Self { col_idx, val, .. } = self;
1271 let row = {
1272 let mut row = vec![None; col_idx + 1];
1273 row[col_idx] = Some(val);
1274 OwnedRow::new(row)
1275 };
1276 let val = expr.eval_row_infallible(&row).await?;
1277 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1278 }
1279
1280 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1283 output_indices
1284 .iter()
1285 .position(|p| *p == self.col_idx)
1286 .map(|new_col_idx| self.with_idx(new_col_idx))
1287 }
1288
1289 pub fn to_protobuf(&self) -> PbWatermark {
1290 PbWatermark {
1291 column: Some(PbInputRef {
1292 index: self.col_idx as _,
1293 r#type: Some(self.data_type.to_protobuf()),
1294 }),
1295 val: Some(&self.val).to_protobuf().into(),
1296 }
1297 }
1298
1299 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1300 let col_ref = prost.get_column()?;
1301 let data_type = DataType::from(col_ref.get_type()?);
1302 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1303 .expect("watermark value cannot be null");
1304 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1305 }
1306
1307 pub fn with_idx(self, idx: usize) -> Self {
1308 Self::new(idx, self.data_type, self.val)
1309 }
1310}
1311
1312#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1313pub enum MessageInner<M> {
1314 Chunk(StreamChunk),
1315 Barrier(BarrierInner<M>),
1316 Watermark(Watermark),
1317}
1318
1319impl<M> MessageInner<M> {
1320 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1321 match self {
1322 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1323 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1324 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1325 }
1326 }
1327}
1328
1329pub type Message = MessageInner<BarrierMutationType>;
1330pub type DispatcherMessage = MessageInner<()>;
1331
1332#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1335pub enum MessageBatchInner<M> {
1336 Chunk(StreamChunk),
1337 BarrierBatch(Vec<BarrierInner<M>>),
1338 Watermark(Watermark),
1339}
1340pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1341pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1342pub type DispatcherMessageBatch = MessageBatchInner<()>;
1343
1344impl From<DispatcherMessage> for DispatcherMessageBatch {
1345 fn from(m: DispatcherMessage) -> Self {
1346 match m {
1347 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1348 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1349 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1350 }
1351 }
1352}
1353
1354impl From<StreamChunk> for Message {
1355 fn from(chunk: StreamChunk) -> Self {
1356 Message::Chunk(chunk)
1357 }
1358}
1359
1360impl<'a> TryFrom<&'a Message> for &'a Barrier {
1361 type Error = ();
1362
1363 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1364 match m {
1365 Message::Chunk(_) => Err(()),
1366 Message::Barrier(b) => Ok(b),
1367 Message::Watermark(_) => Err(()),
1368 }
1369 }
1370}
1371
1372impl Message {
1373 #[cfg(test)]
1378 pub fn is_stop(&self) -> bool {
1379 matches!(
1380 self,
1381 Message::Barrier(Barrier {
1382 mutation,
1383 ..
1384 }) if mutation.as_ref().unwrap().is_stop()
1385 )
1386 }
1387}
1388
1389impl DispatcherMessageBatch {
1390 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1391 let prost = match self {
1392 Self::Chunk(stream_chunk) => {
1393 let prost_stream_chunk = stream_chunk.to_protobuf();
1394 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1395 }
1396 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1397 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1398 }),
1399 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1400 };
1401 PbStreamMessageBatch {
1402 stream_message_batch: Some(prost),
1403 }
1404 }
1405
1406 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1407 let res = match prost.get_stream_message_batch()? {
1408 StreamMessageBatch::StreamChunk(chunk) => {
1409 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1410 }
1411 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1412 let barriers = barrier_batch
1413 .barriers
1414 .iter()
1415 .map(|barrier| {
1416 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1417 if mutation.is_some() {
1418 if cfg!(debug_assertions) {
1419 panic!("should not receive message of barrier with mutation");
1420 } else {
1421 warn!(?barrier, "receive message of barrier with mutation");
1422 }
1423 }
1424 Ok(())
1425 })
1426 })
1427 .try_collect()?;
1428 Self::BarrierBatch(barriers)
1429 }
1430 StreamMessageBatch::Watermark(watermark) => {
1431 Self::Watermark(Watermark::from_protobuf(watermark)?)
1432 }
1433 };
1434 Ok(res)
1435 }
1436
1437 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1438 ::prost::Message::encoded_len(msg)
1439 }
1440}
1441
1442pub type PkIndices = Vec<usize>;
1443pub type PkIndicesRef<'a> = &'a [usize];
1444pub type PkDataTypes = SmallVec<[DataType; 1]>;
1445
1446pub async fn expect_first_barrier<M: Debug>(
1448 stream: &mut (impl MessageStreamInner<M> + Unpin),
1449) -> StreamExecutorResult<BarrierInner<M>> {
1450 let message = stream
1451 .next()
1452 .instrument_await("expect_first_barrier")
1453 .await
1454 .context("failed to extract the first message: stream closed unexpectedly")??;
1455 let barrier = message
1456 .into_barrier()
1457 .expect("the first message must be a barrier");
1458 assert!(matches!(
1460 barrier.kind,
1461 BarrierKind::Checkpoint | BarrierKind::Initial
1462 ));
1463 Ok(barrier)
1464}
1465
1466pub async fn expect_first_barrier_from_aligned_stream(
1468 stream: &mut (impl AlignedMessageStream + Unpin),
1469) -> StreamExecutorResult<Barrier> {
1470 let message = stream
1471 .next()
1472 .instrument_await("expect_first_barrier")
1473 .await
1474 .context("failed to extract the first message: stream closed unexpectedly")??;
1475 let barrier = message
1476 .into_barrier()
1477 .expect("the first message must be a barrier");
1478 Ok(barrier)
1479}
1480
1481pub trait StreamConsumer: Send + 'static {
1483 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1484
1485 fn execute(self: Box<Self>) -> Self::BarrierStream;
1486}
1487
1488type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1489
1490pub struct DynamicReceivers<InputId, M> {
1494 barrier: Option<BarrierInner<M>>,
1496 start_ts: Option<Instant>,
1498 blocked: Vec<BoxedMessageInput<InputId, M>>,
1500 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1502 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1504 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1506 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1508}
1509
1510impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1511 for DynamicReceivers<InputId, M>
1512{
1513 type Item = MessageStreamItemInner<M>;
1514
1515 fn poll_next(
1516 mut self: Pin<&mut Self>,
1517 cx: &mut std::task::Context<'_>,
1518 ) -> Poll<Option<Self::Item>> {
1519 if self.is_empty() {
1520 return Poll::Ready(None);
1521 }
1522
1523 loop {
1524 match futures::ready!(self.active.poll_next_unpin(cx)) {
1525 Some((Some(Err(e)), _)) => {
1527 return Poll::Ready(Some(Err(e)));
1528 }
1529 Some((Some(Ok(message)), remaining)) => {
1531 let input_id = remaining.id();
1532 match message {
1533 MessageInner::Chunk(chunk) => {
1534 self.active.push(remaining.into_future());
1536 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1537 }
1538 MessageInner::Watermark(watermark) => {
1539 self.active.push(remaining.into_future());
1541 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1542 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1543 }
1544 }
1545 MessageInner::Barrier(barrier) => {
1546 if self.blocked.is_empty() {
1548 self.start_ts = Some(Instant::now());
1549 }
1550 self.blocked.push(remaining);
1551 if let Some(current_barrier) = self.barrier.as_ref() {
1552 if current_barrier.epoch != barrier.epoch {
1553 return Poll::Ready(Some(Err(
1554 StreamExecutorError::align_barrier(
1555 current_barrier.clone().map_mutation(|_| None),
1556 barrier.map_mutation(|_| None),
1557 ),
1558 )));
1559 }
1560 } else {
1561 self.barrier = Some(barrier);
1562 }
1563 }
1564 }
1565 }
1566 Some((None, remaining)) => {
1574 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1575 "upstream input {:?} unexpectedly closed",
1576 remaining.id()
1577 )))));
1578 }
1579 None => {
1581 assert!(!self.blocked.is_empty());
1582
1583 let start_ts = self
1584 .start_ts
1585 .take()
1586 .expect("should have received at least one barrier");
1587 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1588 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1589 }
1590 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1591 merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1593 }
1594
1595 break;
1596 }
1597 }
1598 }
1599
1600 assert!(self.active.is_terminated());
1601
1602 let barrier = self.barrier.take().unwrap();
1603
1604 let upstreams = std::mem::take(&mut self.blocked);
1605 self.extend_active(upstreams);
1606 assert!(!self.active.is_terminated());
1607
1608 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1609 }
1610}
1611
1612impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1613 pub fn new(
1614 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1615 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1616 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1617 ) -> Self {
1618 let mut this = Self {
1619 barrier: None,
1620 start_ts: None,
1621 blocked: Vec::with_capacity(upstreams.len()),
1622 active: Default::default(),
1623 buffered_watermarks: Default::default(),
1624 merge_barrier_align_duration,
1625 barrier_align_duration,
1626 };
1627 this.extend_active(upstreams);
1628 this
1629 }
1630
1631 pub fn extend_active(
1634 &mut self,
1635 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1636 ) {
1637 assert!(self.blocked.is_empty() && self.barrier.is_none());
1638
1639 self.active
1640 .extend(upstreams.into_iter().map(|s| s.into_future()));
1641 }
1642
1643 pub fn handle_watermark(
1645 &mut self,
1646 input_id: InputId,
1647 watermark: Watermark,
1648 ) -> Option<Watermark> {
1649 let col_idx = watermark.col_idx;
1650 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1652 let watermarks = self
1653 .buffered_watermarks
1654 .entry(col_idx)
1655 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1656 watermarks.handle_watermark(input_id, watermark)
1657 }
1658
1659 pub fn add_upstreams_from(
1662 &mut self,
1663 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1664 ) {
1665 assert!(self.blocked.is_empty() && self.barrier.is_none());
1666
1667 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1668 let input_ids = new_inputs.iter().map(|input| input.id());
1669 self.buffered_watermarks.values_mut().for_each(|buffers| {
1670 buffers.add_buffers(input_ids.clone());
1672 });
1673 self.active
1674 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1675 }
1676
1677 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1681 assert!(self.blocked.is_empty() && self.barrier.is_none());
1682
1683 let new_upstreams = std::mem::take(&mut self.active)
1684 .into_iter()
1685 .map(|s| s.into_inner().unwrap())
1686 .filter(|u| !upstream_input_ids.contains(&u.id()));
1687 self.extend_active(new_upstreams);
1688 self.buffered_watermarks.values_mut().for_each(|buffers| {
1689 buffers.remove_buffer(upstream_input_ids.clone());
1692 });
1693 }
1694
1695 pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1696 self.merge_barrier_align_duration.clone()
1697 }
1698
1699 pub fn flush_buffered_watermarks(&mut self) {
1700 self.buffered_watermarks
1701 .values_mut()
1702 .for_each(|buffers| buffers.clear());
1703 }
1704
1705 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1706 self.blocked
1707 .iter()
1708 .map(|s| s.id())
1709 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1710 }
1711
1712 pub fn is_empty(&self) -> bool {
1713 self.blocked.is_empty() && self.active.is_empty()
1714 }
1715}