1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::core::{AtomicU64, GenericCounter};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::bitmap::Bitmap;
35use risingwave_common::catalog::{Schema, TableId};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54 PbBarrier, PbBarrierMutation, PbDispatcher, PbSinkSchemaChange, PbStreamMessageBatch,
55 PbWatermark, SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63 BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod iceberg_with_pk_index;
94mod join;
95pub mod locality_provider;
96mod lookup;
97mod lookup_union;
98mod merge;
99mod mview;
100mod nested_loop_temporal_join;
101mod no_op;
102mod now;
103mod over_window;
104pub mod project;
105mod rearranged_chain;
106mod receiver;
107pub mod row_id_gen;
108mod sink;
109pub mod source;
110mod stream_reader;
111pub mod subtask;
112mod temporal_join;
113mod top_n;
114mod troublemaker;
115mod union;
116mod upstream_sink_union;
117mod values;
118mod watermark;
119mod watermark_filter;
120mod wrapper;
121
122mod approx_percentile;
123
124mod row_merge;
125
126#[cfg(test)]
127mod integration_tests;
128mod sync_kv_log_store;
129#[cfg(any(test, feature = "test"))]
130pub mod test_utils;
131mod utils;
132mod vector;
133
134pub use actor::{Actor, ActorContext, ActorContextRef};
135use anyhow::{Context, anyhow};
136pub use approx_percentile::global::GlobalApproxPercentileExecutor;
137pub use approx_percentile::local::LocalApproxPercentileExecutor;
138pub use backfill::arrangement_backfill::*;
139pub use backfill::cdc::{
140 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
141};
142pub use backfill::no_shuffle_backfill::*;
143pub use backfill::snapshot_backfill::*;
144pub use barrier_recv::BarrierRecvExecutor;
145pub use batch_query::BatchQueryExecutor;
146pub use chain::ChainExecutor;
147pub use changelog::ChangeLogExecutor;
148pub use dedup::AppendOnlyDedupExecutor;
149pub use dispatch::{DispatchExecutor, SyncLogStoreDispatchExecutor};
150pub use dynamic_filter::DynamicFilterExecutor;
151pub use error::{StreamExecutorError, StreamExecutorResult};
152pub use expand::ExpandExecutor;
153pub use filter::{FilterExecutor, UpsertFilterExecutor};
154pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
155pub use hash_join::*;
156pub use hop_window::HopWindowExecutor;
157pub use iceberg_with_pk_index::{
158 DvHandlerImpl, DvMergerExecutor, IcebergWriterImpl, WriterExecutor,
159};
160pub use join::asof_join::{AsOfCpuEncoding, AsOfMemoryEncoding};
161pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
162pub use join::{AsOfDesc, AsOfJoinType, JoinType};
163pub use lookup::*;
164pub use lookup_union::LookupUnionExecutor;
165pub use merge::MergeExecutor;
166pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
167pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
168pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
169pub use no_op::NoOpExecutor;
170pub use now::*;
171pub use over_window::*;
172pub use rearranged_chain::RearrangedChainExecutor;
173pub use receiver::ReceiverExecutor;
174use risingwave_common::id::SourceId;
175pub use row_merge::RowMergeExecutor;
176pub use sink::SinkExecutor;
177pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
178pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
179pub use temporal_join::TemporalJoinExecutor;
180pub use top_n::{
181 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
182};
183pub use troublemaker::TroublemakerExecutor;
184pub use union::UnionExecutor;
185pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
186pub use utils::DummyExecutor;
187pub use values::ValuesExecutor;
188pub use vector::*;
189pub use watermark_filter::{UpsertWatermarkFilterExecutor, WatermarkFilterExecutor};
190pub use wrapper::WrapperExecutor;
191
192use self::barrier_align::AlignedMessageStream;
193
194pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
195pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
196pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
197pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
198
199pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
200use risingwave_connector::sink::catalog::SinkId;
201use risingwave_connector::source::cdc::{
202 CdcTableSnapshotSplitAssignmentWithGeneration,
203 build_actor_cdc_table_snapshot_splits_with_generation,
204};
205use risingwave_pb::id::{ExecutorId, SubscriberId};
206use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
207
208pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
209pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
210pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
211
212#[derive(Debug, Default, Clone)]
214pub struct ExecutorInfo {
215 pub schema: Schema,
217
218 pub stream_key: StreamKey,
220
221 pub stream_kind: PbStreamKind,
223
224 pub identity: String,
226
227 pub id: ExecutorId,
229}
230
231impl ExecutorInfo {
232 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
233 Self {
234 schema,
235 stream_key,
236 stream_kind: PbStreamKind::Retract, identity,
238 id: id.into(),
239 }
240 }
241}
242
243pub trait Execute: Send + 'static {
245 fn execute(self: Box<Self>) -> BoxedMessageStream;
246
247 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
248 self.execute()
249 }
250
251 fn boxed(self) -> Box<dyn Execute>
252 where
253 Self: Sized + Send + 'static,
254 {
255 Box::new(self)
256 }
257}
258
259pub struct Executor {
262 info: ExecutorInfo,
263 execute: Box<dyn Execute>,
264}
265
266impl Executor {
267 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
268 Self { info, execute }
269 }
270
271 pub fn info(&self) -> &ExecutorInfo {
272 &self.info
273 }
274
275 pub fn schema(&self) -> &Schema {
276 &self.info.schema
277 }
278
279 pub fn stream_key(&self) -> StreamKeyRef<'_> {
280 &self.info.stream_key
281 }
282
283 pub fn stream_kind(&self) -> PbStreamKind {
284 self.info.stream_kind
285 }
286
287 pub fn identity(&self) -> &str {
288 &self.info.identity
289 }
290
291 pub fn execute(self) -> BoxedMessageStream {
292 self.execute.execute()
293 }
294
295 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
296 self.execute.execute_with_epoch(epoch)
297 }
298}
299
300impl std::fmt::Debug for Executor {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 f.write_str(self.identity())
303 }
304}
305
306impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
307 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
308 Self::new(info, execute)
309 }
310}
311
312impl<E> From<(ExecutorInfo, E)> for Executor
313where
314 E: Execute,
315{
316 fn from((info, execute): (ExecutorInfo, E)) -> Self {
317 Self::new(info, execute.boxed())
318 }
319}
320
321pub const INVALID_EPOCH: u64 = 0;
322
323type UpstreamFragmentId = FragmentId;
324type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
325
326#[derive(Debug, Clone)]
327#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
328pub struct UpdateMutation {
329 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
330 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
331 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
332 pub dropped_actors: HashSet<ActorId>,
333 pub actor_splits: SplitAssignments,
334 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
335 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
336 pub sink_schema_change: HashMap<SinkId, PbSinkSchemaChange>,
337 pub subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
338}
339
340#[derive(Debug, Clone)]
341#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
342pub struct AddMutation {
343 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
344 pub added_actors: HashSet<ActorId>,
345 pub splits: SplitAssignments,
347 pub pause: bool,
348 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
350 pub backfill_nodes_to_pause: HashSet<FragmentId>,
352 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
353 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
354}
355
356#[derive(Debug, Clone)]
357#[cfg_attr(any(test, feature = "test"), derive(Default, PartialEq))]
358pub struct StopMutation {
359 pub dropped_actors: HashSet<ActorId>,
360 pub dropped_sink_fragments: HashSet<FragmentId>,
361}
362
363#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
365#[derive(Debug, Clone)]
366pub enum Mutation {
367 Stop(StopMutation),
368 Update(UpdateMutation),
369 Add(AddMutation),
370 SourceChangeSplit(SplitAssignments),
371 Pause,
372 Resume,
373 Throttle(HashMap<FragmentId, ThrottleConfig>),
374 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
375 DropSubscriptions {
376 subscriptions_to_drop: Vec<SubscriptionUpstreamInfo>,
378 },
379 StartFragmentBackfill {
380 fragment_ids: HashSet<FragmentId>,
381 },
382 RefreshStart {
383 table_id: TableId,
384 associated_source_id: SourceId,
385 },
386 ListFinish {
387 associated_source_id: SourceId,
388 },
389 LoadFinish {
390 associated_source_id: SourceId,
391 },
392 ResetSource {
393 source_id: SourceId,
394 },
395 InjectSourceOffsets {
396 source_id: SourceId,
397 split_offsets: HashMap<String, String>,
399 },
400}
401
402#[derive(Debug, Clone)]
407pub struct BarrierInner<M> {
408 pub epoch: EpochPair,
409 pub mutation: M,
410 pub kind: BarrierKind,
411
412 pub tracing_context: TracingContext,
414}
415
416pub type BarrierMutationType = Option<Arc<Mutation>>;
417pub type Barrier = BarrierInner<BarrierMutationType>;
418pub type DispatcherBarrier = BarrierInner<()>;
419
420impl<M: Default> BarrierInner<M> {
421 pub fn new_test_barrier(epoch: u64) -> Self {
423 Self {
424 epoch: EpochPair::new_test_epoch(epoch),
425 kind: BarrierKind::Checkpoint,
426 tracing_context: TracingContext::none(),
427 mutation: Default::default(),
428 }
429 }
430
431 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
432 Self {
433 epoch: EpochPair::new(epoch, prev_epoch),
434 kind: BarrierKind::Checkpoint,
435 tracing_context: TracingContext::none(),
436 mutation: Default::default(),
437 }
438 }
439}
440
441impl Barrier {
442 pub fn into_dispatcher(self) -> DispatcherBarrier {
443 DispatcherBarrier {
444 epoch: self.epoch,
445 mutation: (),
446 kind: self.kind,
447 tracing_context: self.tracing_context,
448 }
449 }
450
451 #[must_use]
452 pub fn with_mutation(self, mutation: Mutation) -> Self {
453 Self {
454 mutation: Some(Arc::new(mutation)),
455 ..self
456 }
457 }
458
459 #[must_use]
460 pub fn with_stop(self) -> Self {
461 self.with_mutation(Mutation::Stop(StopMutation {
462 dropped_actors: Default::default(),
463 dropped_sink_fragments: Default::default(),
464 }))
465 }
466
467 pub fn is_with_stop_mutation(&self) -> bool {
469 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
470 }
471
472 pub fn is_stop(&self, actor_id: ActorId) -> bool {
474 self.all_stop_actors()
475 .is_some_and(|actors| actors.contains(&actor_id))
476 }
477
478 pub fn is_checkpoint(&self) -> bool {
479 self.kind == BarrierKind::Checkpoint
480 }
481
482 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
492 match self.mutation.as_deref()? {
493 Mutation::Update(UpdateMutation { actor_splits, .. })
494 | Mutation::Add(AddMutation {
495 splits: actor_splits,
496 ..
497 }) => actor_splits.get(&actor_id),
498
499 _ => {
500 if cfg!(debug_assertions) {
501 panic!(
502 "the initial mutation of the barrier should not be {:?}",
503 self.mutation
504 );
505 }
506 None
507 }
508 }
509 .map(|s| s.as_slice())
510 }
511
512 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
514 match self.mutation.as_deref() {
515 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
516 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
517 _ => None,
518 }
519 }
520
521 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
527 match self.mutation.as_deref() {
528 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
529 added_actors.contains(&actor_id)
530 }
531 _ => false,
532 }
533 }
534
535 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
536 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
537 fragment_ids.contains(&fragment_id)
538 } else {
539 false
540 }
541 }
542
543 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
561 let Some(mutation) = self.mutation.as_deref() else {
562 return false;
563 };
564 match mutation {
565 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
567 Mutation::Update(_)
568 | Mutation::Stop(_)
569 | Mutation::Pause
570 | Mutation::Resume
571 | Mutation::SourceChangeSplit(_)
572 | Mutation::Throttle { .. }
573 | Mutation::DropSubscriptions { .. }
574 | Mutation::ConnectorPropsChange(_)
575 | Mutation::StartFragmentBackfill { .. }
576 | Mutation::RefreshStart { .. }
577 | Mutation::ListFinish { .. }
578 | Mutation::LoadFinish { .. }
579 | Mutation::ResetSource { .. }
580 | Mutation::InjectSourceOffsets { .. } => false,
581 }
582 }
583
584 pub fn is_pause_on_startup(&self) -> bool {
586 match self.mutation.as_deref() {
587 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
588 _ => false,
589 }
590 }
591
592 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
593 match self.mutation.as_deref() {
594 Some(Mutation::Add(AddMutation {
595 backfill_nodes_to_pause,
596 ..
597 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
598 Some(Mutation::Update(_)) => false,
599 _ => {
600 tracing::warn!(
601 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
602 self
603 );
604 false
605 }
606 }
607 }
608
609 pub fn is_resume(&self) -> bool {
611 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
612 }
613
614 pub fn as_update_merge(
617 &self,
618 actor_id: ActorId,
619 upstream_fragment_id: UpstreamFragmentId,
620 ) -> Option<&MergeUpdate> {
621 self.mutation
622 .as_deref()
623 .and_then(|mutation| match mutation {
624 Mutation::Update(UpdateMutation { merges, .. }) => {
625 merges.get(&(actor_id, upstream_fragment_id))
626 }
627 _ => None,
628 })
629 }
630
631 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
634 self.mutation
635 .as_deref()
636 .and_then(|mutation| match mutation {
637 Mutation::Add(AddMutation {
638 new_upstream_sinks, ..
639 }) => new_upstream_sinks.get(&fragment_id),
640 _ => None,
641 })
642 }
643
644 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
646 self.mutation
647 .as_deref()
648 .and_then(|mutation| match mutation {
649 Mutation::Stop(StopMutation {
650 dropped_sink_fragments,
651 ..
652 }) => Some(dropped_sink_fragments),
653 _ => None,
654 })
655 }
656
657 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
663 self.mutation
664 .as_deref()
665 .and_then(|mutation| match mutation {
666 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
667 vnode_bitmaps.get(&actor_id).cloned()
668 }
669 _ => None,
670 })
671 }
672
673 pub fn assume_no_update_vnode_bitmap(&self, actor_id: ActorId) -> StreamExecutorResult<()> {
674 if self.as_update_vnode_bitmap(actor_id).is_some() {
675 return Err(anyhow!("updating vnode bitmap in place is not supported").into());
676 }
677 Ok(())
678 }
679
680 pub fn as_sink_schema_change(&self, sink_id: SinkId) -> Option<PbSinkSchemaChange> {
681 self.mutation
682 .as_deref()
683 .and_then(|mutation| match mutation {
684 Mutation::Update(UpdateMutation {
685 sink_schema_change, ..
686 }) => sink_schema_change.get(&sink_id).cloned(),
687 _ => None,
688 })
689 }
690
691 pub fn as_subscriptions_to_drop(&self) -> Option<&[SubscriptionUpstreamInfo]> {
692 match self.mutation.as_deref() {
693 Some(Mutation::DropSubscriptions {
694 subscriptions_to_drop,
695 })
696 | Some(Mutation::Update(UpdateMutation {
697 subscriptions_to_drop,
698 ..
699 })) => Some(subscriptions_to_drop.as_slice()),
700 _ => None,
701 }
702 }
703
704 pub fn get_curr_epoch(&self) -> Epoch {
705 Epoch(self.epoch.curr)
706 }
707
708 pub fn tracing_context(&self) -> &TracingContext {
710 &self.tracing_context
711 }
712
713 pub fn added_subscriber_on_mv_table(
714 &self,
715 mv_table_id: TableId,
716 ) -> impl Iterator<Item = SubscriberId> + '_ {
717 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
718 Some(add)
719 } else {
720 None
721 }
722 .into_iter()
723 .flat_map(move |add| {
724 add.subscriptions_to_add.iter().filter_map(
725 move |(upstream_mv_table_id, subscriber_id)| {
726 if *upstream_mv_table_id == mv_table_id {
727 Some(*subscriber_id)
728 } else {
729 None
730 }
731 },
732 )
733 })
734 }
735}
736
737impl<M: PartialEq> PartialEq for BarrierInner<M> {
738 fn eq(&self, other: &Self) -> bool {
739 self.epoch == other.epoch && self.mutation == other.mutation
740 }
741}
742
743impl Mutation {
744 #[cfg(test)]
748 pub fn is_stop(&self) -> bool {
749 matches!(self, Mutation::Stop(_))
750 }
751
752 #[cfg(test)]
753 fn to_protobuf(&self) -> PbMutation {
754 use risingwave_pb::source::{
755 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplitsWithGeneration,
756 };
757 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
758 use risingwave_pb::stream_plan::{
759 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
760 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation,
761 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
762 PbThrottleMutation, PbUpdateMutation,
763 };
764 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
765 actor_splits
766 .iter()
767 .map(|(&actor_id, splits)| {
768 (
769 actor_id,
770 ConnectorSplits {
771 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
772 },
773 )
774 })
775 .collect::<HashMap<_, _>>()
776 };
777
778 match self {
779 Mutation::Stop(StopMutation {
780 dropped_actors,
781 dropped_sink_fragments,
782 }) => PbMutation::Stop(PbStopMutation {
783 actors: dropped_actors.iter().copied().collect(),
784 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
785 }),
786 Mutation::Update(UpdateMutation {
787 dispatchers,
788 merges,
789 vnode_bitmaps,
790 dropped_actors,
791 actor_splits,
792 actor_new_dispatchers,
793 actor_cdc_table_snapshot_splits,
794 sink_schema_change,
795 subscriptions_to_drop,
796 }) => PbMutation::Update(PbUpdateMutation {
797 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
798 merge_update: merges.values().cloned().collect(),
799 actor_vnode_bitmap_update: vnode_bitmaps
800 .iter()
801 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
802 .collect(),
803 dropped_actors: dropped_actors.iter().copied().collect(),
804 actor_splits: actor_splits_to_protobuf(actor_splits),
805 actor_new_dispatchers: actor_new_dispatchers
806 .iter()
807 .map(|(&actor_id, dispatchers)| {
808 (
809 actor_id,
810 PbDispatchers {
811 dispatchers: dispatchers.clone(),
812 },
813 )
814 })
815 .collect(),
816 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
817 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
818 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
819 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
820 generation: *generation,
821 })
822 }).collect()
823 }),
824 sink_schema_change: sink_schema_change
825 .iter()
826 .map(|(sink_id, change)| ((*sink_id).as_raw_id(), change.clone()))
827 .collect(),
828 subscriptions_to_drop: subscriptions_to_drop.clone(),
829 }),
830 Mutation::Add(AddMutation {
831 adds,
832 added_actors,
833 splits,
834 pause,
835 subscriptions_to_add,
836 backfill_nodes_to_pause,
837 actor_cdc_table_snapshot_splits,
838 new_upstream_sinks,
839 }) => PbMutation::Add(PbAddMutation {
840 actor_dispatchers: adds
841 .iter()
842 .map(|(&actor_id, dispatchers)| {
843 (
844 actor_id,
845 PbDispatchers {
846 dispatchers: dispatchers.clone(),
847 },
848 )
849 })
850 .collect(),
851 added_actors: added_actors.iter().copied().collect(),
852 actor_splits: actor_splits_to_protobuf(splits),
853 pause: *pause,
854 subscriptions_to_add: subscriptions_to_add
855 .iter()
856 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
857 subscriber_id: *subscriber_id,
858 upstream_mv_table_id: *table_id,
859 })
860 .collect(),
861 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
862 actor_cdc_table_snapshot_splits:
863 Some(PbCdcTableSnapshotSplitsWithGeneration {
864 splits:actor_cdc_table_snapshot_splits.splits.iter().map(|(actor_id,(splits, generation))| {
865 (*actor_id, risingwave_pb::source::PbCdcTableSnapshotSplits {
866 splits: splits.iter().map(risingwave_connector::source::cdc::build_cdc_table_snapshot_split).collect(),
867 generation: *generation,
868 })
869 }).collect()
870 }),
871 new_upstream_sinks: new_upstream_sinks
872 .iter()
873 .map(|(k, v)| (*k, v.clone()))
874 .collect(),
875 }),
876 Mutation::SourceChangeSplit(changes) => {
877 PbMutation::Splits(PbSourceChangeSplitMutation {
878 actor_splits: changes
879 .iter()
880 .map(|(&actor_id, splits)| {
881 (
882 actor_id,
883 ConnectorSplits {
884 splits: splits
885 .clone()
886 .iter()
887 .map(ConnectorSplit::from)
888 .collect(),
889 },
890 )
891 })
892 .collect(),
893 })
894 }
895 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
896 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
897 Mutation::Throttle (changes) => PbMutation::Throttle(PbThrottleMutation {
898 fragment_throttle: changes.clone(),
899 }),
900 Mutation::DropSubscriptions {
901 subscriptions_to_drop,
902 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
903 info: subscriptions_to_drop.clone(),
904 }),
905 Mutation::ConnectorPropsChange(map) => {
906 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
907 connector_props_infos: map
908 .iter()
909 .map(|(actor_id, options)| {
910 (
911 *actor_id,
912 ConnectorPropsInfo {
913 connector_props_info: options
914 .iter()
915 .map(|(k, v)| (k.clone(), v.clone()))
916 .collect(),
917 },
918 )
919 })
920 .collect(),
921 })
922 }
923 Mutation::StartFragmentBackfill { fragment_ids } => {
924 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
925 fragment_ids: fragment_ids.iter().copied().collect(),
926 })
927 }
928 Mutation::RefreshStart {
929 table_id,
930 associated_source_id,
931 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
932 table_id: *table_id,
933 associated_source_id: *associated_source_id,
934 }),
935 Mutation::ListFinish {
936 associated_source_id,
937 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
938 associated_source_id: *associated_source_id,
939 }),
940 Mutation::LoadFinish {
941 associated_source_id,
942 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
943 associated_source_id: *associated_source_id,
944 }),
945 Mutation::ResetSource { source_id } => {
946 PbMutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
947 source_id: source_id.as_raw_id(),
948 })
949 }
950 Mutation::InjectSourceOffsets {
951 source_id,
952 split_offsets,
953 } => PbMutation::InjectSourceOffsets(
954 risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
955 source_id: source_id.as_raw_id(),
956 split_offsets: split_offsets.clone(),
957 },
958 ),
959 }
960 }
961
962 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
963 let mutation = match prost {
964 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
965 dropped_actors: stop.actors.iter().copied().collect(),
966 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
967 }),
968
969 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
970 dispatchers: update
971 .dispatcher_update
972 .iter()
973 .map(|u| (u.actor_id, u.clone()))
974 .into_group_map(),
975 merges: update
976 .merge_update
977 .iter()
978 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
979 .collect(),
980 vnode_bitmaps: update
981 .actor_vnode_bitmap_update
982 .iter()
983 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
984 .collect(),
985 dropped_actors: update.dropped_actors.iter().copied().collect(),
986 actor_splits: update
987 .actor_splits
988 .iter()
989 .map(|(&actor_id, splits)| {
990 (
991 actor_id,
992 splits
993 .splits
994 .iter()
995 .map(|split| split.try_into().unwrap())
996 .collect(),
997 )
998 })
999 .collect(),
1000 actor_new_dispatchers: update
1001 .actor_new_dispatchers
1002 .iter()
1003 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1004 .collect(),
1005 actor_cdc_table_snapshot_splits:
1006 build_actor_cdc_table_snapshot_splits_with_generation(
1007 update
1008 .actor_cdc_table_snapshot_splits
1009 .clone()
1010 .unwrap_or_default(),
1011 ),
1012 sink_schema_change: update
1013 .sink_schema_change
1014 .iter()
1015 .map(|(sink_id, change)| (SinkId::from(*sink_id), change.clone()))
1016 .collect(),
1017 subscriptions_to_drop: update.subscriptions_to_drop.clone(),
1018 }),
1019
1020 PbMutation::Add(add) => Mutation::Add(AddMutation {
1021 adds: add
1022 .actor_dispatchers
1023 .iter()
1024 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
1025 .collect(),
1026 added_actors: add.added_actors.iter().copied().collect(),
1027 splits: add
1030 .actor_splits
1031 .iter()
1032 .map(|(&actor_id, splits)| {
1033 (
1034 actor_id,
1035 splits
1036 .splits
1037 .iter()
1038 .map(|split| split.try_into().unwrap())
1039 .collect(),
1040 )
1041 })
1042 .collect(),
1043 pause: add.pause,
1044 subscriptions_to_add: add
1045 .subscriptions_to_add
1046 .iter()
1047 .map(
1048 |SubscriptionUpstreamInfo {
1049 subscriber_id,
1050 upstream_mv_table_id,
1051 }| { (*upstream_mv_table_id, *subscriber_id) },
1052 )
1053 .collect(),
1054 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1055 actor_cdc_table_snapshot_splits:
1056 build_actor_cdc_table_snapshot_splits_with_generation(
1057 add.actor_cdc_table_snapshot_splits
1058 .clone()
1059 .unwrap_or_default(),
1060 ),
1061 new_upstream_sinks: add
1062 .new_upstream_sinks
1063 .iter()
1064 .map(|(k, v)| (*k, v.clone()))
1065 .collect(),
1066 }),
1067
1068 PbMutation::Splits(s) => {
1069 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1070 Vec::with_capacity(s.actor_splits.len());
1071 for (&actor_id, splits) in &s.actor_splits {
1072 if !splits.splits.is_empty() {
1073 change_splits.push((
1074 actor_id,
1075 splits
1076 .splits
1077 .iter()
1078 .map(SplitImpl::try_from)
1079 .try_collect()?,
1080 ));
1081 }
1082 }
1083 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1084 }
1085 PbMutation::Pause(_) => Mutation::Pause,
1086 PbMutation::Resume(_) => Mutation::Resume,
1087 PbMutation::Throttle(changes) => Mutation::Throttle(changes.fragment_throttle.clone()),
1088 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1089 subscriptions_to_drop: drop.info.clone(),
1090 },
1091 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1092 Mutation::ConnectorPropsChange(
1093 alter_connector_props
1094 .connector_props_infos
1095 .iter()
1096 .map(|(connector_id, options)| {
1097 (
1098 *connector_id,
1099 options
1100 .connector_props_info
1101 .iter()
1102 .map(|(k, v)| (k.clone(), v.clone()))
1103 .collect(),
1104 )
1105 })
1106 .collect(),
1107 )
1108 }
1109 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1110 Mutation::StartFragmentBackfill {
1111 fragment_ids: start_fragment_backfill
1112 .fragment_ids
1113 .iter()
1114 .copied()
1115 .collect(),
1116 }
1117 }
1118 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1119 table_id: refresh_start.table_id,
1120 associated_source_id: refresh_start.associated_source_id,
1121 },
1122 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1123 associated_source_id: list_finish.associated_source_id,
1124 },
1125 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1126 associated_source_id: load_finish.associated_source_id,
1127 },
1128 PbMutation::ResetSource(reset_source) => Mutation::ResetSource {
1129 source_id: SourceId::from(reset_source.source_id),
1130 },
1131 PbMutation::InjectSourceOffsets(inject) => Mutation::InjectSourceOffsets {
1132 source_id: SourceId::from(inject.source_id),
1133 split_offsets: inject.split_offsets.clone(),
1134 },
1135 };
1136 Ok(mutation)
1137 }
1138}
1139
1140impl<M> BarrierInner<M> {
1141 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1142 let Self {
1143 epoch,
1144 mutation,
1145 kind,
1146 tracing_context,
1147 ..
1148 } = self;
1149
1150 PbBarrier {
1151 epoch: Some(PbEpoch {
1152 curr: epoch.curr,
1153 prev: epoch.prev,
1154 }),
1155 mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1156 mutation: Some(mutation),
1157 }),
1158 tracing_context: tracing_context.to_protobuf(),
1159 kind: *kind as _,
1160 }
1161 }
1162
1163 fn from_protobuf_inner(
1164 prost: &PbBarrier,
1165 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1166 ) -> StreamExecutorResult<Self> {
1167 let epoch = prost.get_epoch()?;
1168
1169 Ok(Self {
1170 kind: prost.kind(),
1171 epoch: EpochPair::new(epoch.curr, epoch.prev),
1172 mutation: mutation_from_pb(
1173 (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1174 )?,
1175 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1176 })
1177 }
1178
1179 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1180 BarrierInner {
1181 epoch: self.epoch,
1182 mutation: f(self.mutation),
1183 kind: self.kind,
1184 tracing_context: self.tracing_context,
1185 }
1186 }
1187}
1188
1189impl DispatcherBarrier {
1190 pub fn to_protobuf(&self) -> PbBarrier {
1191 self.to_protobuf_inner(|_| None)
1192 }
1193}
1194
1195impl Barrier {
1196 #[cfg(test)]
1197 pub fn to_protobuf(&self) -> PbBarrier {
1198 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1199 }
1200
1201 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1202 Self::from_protobuf_inner(prost, |mutation| {
1203 mutation
1204 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1205 .transpose()
1206 })
1207 }
1208}
1209
1210#[derive(Debug, PartialEq, Eq, Clone)]
1211pub struct Watermark {
1212 pub col_idx: usize,
1213 pub data_type: DataType,
1214 pub val: ScalarImpl,
1215}
1216
1217impl PartialOrd for Watermark {
1218 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1219 Some(self.cmp(other))
1220 }
1221}
1222
1223impl Ord for Watermark {
1224 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1225 self.val.default_cmp(&other.val)
1226 }
1227}
1228
1229impl Watermark {
1230 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1231 Self {
1232 col_idx,
1233 data_type,
1234 val,
1235 }
1236 }
1237
1238 pub async fn transform_with_expr(
1239 self,
1240 expr: &NonStrictExpression<impl Expression>,
1241 new_col_idx: usize,
1242 ) -> Option<Self> {
1243 let Self { col_idx, val, .. } = self;
1244 let row = {
1245 let mut row = vec![None; col_idx + 1];
1246 row[col_idx] = Some(val);
1247 OwnedRow::new(row)
1248 };
1249 let val = expr.eval_row_infallible(&row).await?;
1250 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1251 }
1252
1253 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1256 output_indices
1257 .iter()
1258 .position(|p| *p == self.col_idx)
1259 .map(|new_col_idx| self.with_idx(new_col_idx))
1260 }
1261
1262 pub fn to_protobuf(&self) -> PbWatermark {
1263 PbWatermark {
1264 column: Some(PbInputRef {
1265 index: self.col_idx as _,
1266 r#type: Some(self.data_type.to_protobuf()),
1267 }),
1268 val: Some(&self.val).to_protobuf().into(),
1269 }
1270 }
1271
1272 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1273 let col_ref = prost.get_column()?;
1274 let data_type = DataType::from(col_ref.get_type()?);
1275 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1276 .expect("watermark value cannot be null");
1277 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1278 }
1279
1280 pub fn with_idx(self, idx: usize) -> Self {
1281 Self::new(idx, self.data_type, self.val)
1282 }
1283}
1284
1285#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
1286#[derive(Debug, EnumAsInner, Clone)]
1287pub enum MessageInner<M> {
1288 Chunk(StreamChunk),
1289 Barrier(BarrierInner<M>),
1290 Watermark(Watermark),
1291}
1292
1293impl<M> MessageInner<M> {
1294 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1295 match self {
1296 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1297 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1298 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1299 }
1300 }
1301}
1302
1303pub type Message = MessageInner<BarrierMutationType>;
1304pub type DispatcherMessage = MessageInner<()>;
1305
1306#[derive(Debug, EnumAsInner, Clone)]
1309pub enum MessageBatchInner<M> {
1310 Chunk(StreamChunk),
1311 BarrierBatch(Vec<BarrierInner<M>>),
1312 Watermark(Watermark),
1313}
1314pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1315pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1316pub type DispatcherMessageBatch = MessageBatchInner<()>;
1317
1318impl<M> From<MessageInner<M>> for MessageBatchInner<M> {
1319 fn from(m: MessageInner<M>) -> Self {
1320 match m {
1321 MessageInner::Chunk(c) => Self::Chunk(c),
1322 MessageInner::Barrier(b) => Self::BarrierBatch(vec![b]),
1323 MessageInner::Watermark(w) => Self::Watermark(w),
1324 }
1325 }
1326}
1327
1328impl From<StreamChunk> for Message {
1329 fn from(chunk: StreamChunk) -> Self {
1330 Message::Chunk(chunk)
1331 }
1332}
1333
1334impl<'a> TryFrom<&'a Message> for &'a Barrier {
1335 type Error = ();
1336
1337 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1338 match m {
1339 Message::Chunk(_) => Err(()),
1340 Message::Barrier(b) => Ok(b),
1341 Message::Watermark(_) => Err(()),
1342 }
1343 }
1344}
1345
1346impl Message {
1347 #[cfg(test)]
1352 pub fn is_stop(&self) -> bool {
1353 matches!(
1354 self,
1355 Message::Barrier(Barrier {
1356 mutation,
1357 ..
1358 }) if mutation.as_ref().unwrap().is_stop()
1359 )
1360 }
1361}
1362
1363impl DispatcherMessageBatch {
1364 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1365 let prost = match self {
1366 Self::Chunk(stream_chunk) => {
1367 let prost_stream_chunk = stream_chunk.to_protobuf();
1368 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1369 }
1370 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1371 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1372 }),
1373 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1374 };
1375 PbStreamMessageBatch {
1376 stream_message_batch: Some(prost),
1377 }
1378 }
1379
1380 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1381 let res = match prost.get_stream_message_batch()? {
1382 StreamMessageBatch::StreamChunk(chunk) => {
1383 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1384 }
1385 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1386 let barriers = barrier_batch
1387 .barriers
1388 .iter()
1389 .map(|barrier| {
1390 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1391 if mutation.is_some() {
1392 if cfg!(debug_assertions) {
1393 panic!("should not receive message of barrier with mutation");
1394 } else {
1395 warn!(?barrier, "receive message of barrier with mutation");
1396 }
1397 }
1398 Ok(())
1399 })
1400 })
1401 .try_collect()?;
1402 Self::BarrierBatch(barriers)
1403 }
1404 StreamMessageBatch::Watermark(watermark) => {
1405 Self::Watermark(Watermark::from_protobuf(watermark)?)
1406 }
1407 };
1408 Ok(res)
1409 }
1410
1411 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1412 ::prost::Message::encoded_len(msg)
1413 }
1414}
1415
1416pub type StreamKey = Vec<usize>;
1417pub type StreamKeyRef<'a> = &'a [usize];
1418pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1419
1420pub async fn expect_first_barrier<M: Debug>(
1422 stream: &mut (impl MessageStreamInner<M> + Unpin),
1423) -> StreamExecutorResult<BarrierInner<M>> {
1424 let message = stream
1425 .next()
1426 .instrument_await("expect_first_barrier")
1427 .await
1428 .context("failed to extract the first message: stream closed unexpectedly")??;
1429 let barrier = message
1430 .into_barrier()
1431 .expect("the first message must be a barrier");
1432 assert!(matches!(
1434 barrier.kind,
1435 BarrierKind::Checkpoint | BarrierKind::Initial
1436 ));
1437 Ok(barrier)
1438}
1439
1440pub async fn expect_first_barrier_from_aligned_stream(
1442 stream: &mut (impl AlignedMessageStream + Unpin),
1443) -> StreamExecutorResult<Barrier> {
1444 let message = stream
1445 .next()
1446 .instrument_await("expect_first_barrier")
1447 .await
1448 .context("failed to extract the first message: stream closed unexpectedly")??;
1449 let barrier = message
1450 .into_barrier()
1451 .expect("the first message must be a barrier");
1452 Ok(barrier)
1453}
1454
1455pub trait StreamConsumer: Send + 'static {
1457 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1458
1459 fn execute(self: Box<Self>) -> Self::BarrierStream;
1460}
1461
1462type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1463
1464pub struct DynamicReceivers<InputId, M> {
1468 barrier: Option<BarrierInner<M>>,
1470 start_ts: Option<Instant>,
1472 blocked: Vec<BoxedMessageInput<InputId, M>>,
1474 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1476 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1478 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1480 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1482}
1483
1484impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1485 for DynamicReceivers<InputId, M>
1486{
1487 type Item = MessageStreamItemInner<M>;
1488
1489 fn poll_next(
1490 mut self: Pin<&mut Self>,
1491 cx: &mut std::task::Context<'_>,
1492 ) -> Poll<Option<Self::Item>> {
1493 if self.is_empty() {
1494 return Poll::Ready(None);
1495 }
1496
1497 loop {
1498 match futures::ready!(self.active.poll_next_unpin(cx)) {
1499 Some((Some(Err(e)), _)) => {
1501 return Poll::Ready(Some(Err(e)));
1502 }
1503 Some((Some(Ok(message)), remaining)) => {
1505 let input_id = remaining.id();
1506 match message {
1507 MessageInner::Chunk(chunk) => {
1508 self.active.push(remaining.into_future());
1510 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1511 }
1512 MessageInner::Watermark(watermark) => {
1513 self.active.push(remaining.into_future());
1515 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1516 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1517 }
1518 }
1519 MessageInner::Barrier(barrier) => {
1520 if self.blocked.is_empty() {
1522 self.start_ts = Some(Instant::now());
1523 }
1524 self.blocked.push(remaining);
1525 if let Some(current_barrier) = self.barrier.as_ref() {
1526 if current_barrier.epoch != barrier.epoch {
1527 return Poll::Ready(Some(Err(
1528 StreamExecutorError::align_barrier(
1529 current_barrier.clone().map_mutation(|_| None),
1530 barrier.map_mutation(|_| None),
1531 ),
1532 )));
1533 }
1534 } else {
1535 self.barrier = Some(barrier);
1536 }
1537 }
1538 }
1539 }
1540 Some((None, remaining)) => {
1548 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1549 "upstream input {:?} unexpectedly closed",
1550 remaining.id()
1551 )))));
1552 }
1553 None => {
1555 assert!(!self.blocked.is_empty());
1556
1557 let start_ts = self
1558 .start_ts
1559 .take()
1560 .expect("should have received at least one barrier");
1561 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1562 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1563 }
1564 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1565 merge_barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1566 }
1567
1568 break;
1569 }
1570 }
1571 }
1572
1573 assert!(self.active.is_terminated());
1574
1575 let barrier = self.barrier.take().unwrap();
1576
1577 let upstreams = std::mem::take(&mut self.blocked);
1578 self.extend_active(upstreams);
1579 assert!(!self.active.is_terminated());
1580
1581 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1582 }
1583}
1584
1585impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1586 pub fn new(
1587 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1588 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1589 merge_barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1590 ) -> Self {
1591 let mut this = Self {
1592 barrier: None,
1593 start_ts: None,
1594 blocked: Vec::with_capacity(upstreams.len()),
1595 active: Default::default(),
1596 buffered_watermarks: Default::default(),
1597 merge_barrier_align_duration,
1598 barrier_align_duration,
1599 };
1600 this.extend_active(upstreams);
1601 this
1602 }
1603
1604 pub fn extend_active(
1607 &mut self,
1608 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1609 ) {
1610 assert!(self.blocked.is_empty() && self.barrier.is_none());
1611
1612 self.active
1613 .extend(upstreams.into_iter().map(|s| s.into_future()));
1614 }
1615
1616 pub fn handle_watermark(
1618 &mut self,
1619 input_id: InputId,
1620 watermark: Watermark,
1621 ) -> Option<Watermark> {
1622 let col_idx = watermark.col_idx;
1623 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1625 let watermarks = self
1626 .buffered_watermarks
1627 .entry(col_idx)
1628 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1629 watermarks.handle_watermark(input_id, watermark)
1630 }
1631
1632 pub fn add_upstreams_from(
1635 &mut self,
1636 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1637 ) {
1638 assert!(self.blocked.is_empty() && self.barrier.is_none());
1639
1640 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1641 let input_ids = new_inputs.iter().map(|input| input.id());
1642 self.buffered_watermarks.values_mut().for_each(|buffers| {
1643 buffers.add_buffers(input_ids.clone());
1645 });
1646 self.active
1647 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1648 }
1649
1650 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1654 assert!(self.blocked.is_empty() && self.barrier.is_none());
1655
1656 let new_upstreams = std::mem::take(&mut self.active)
1657 .into_iter()
1658 .map(|s| s.into_inner().unwrap())
1659 .filter(|u| !upstream_input_ids.contains(&u.id()));
1660 self.extend_active(new_upstreams);
1661 self.buffered_watermarks.values_mut().for_each(|buffers| {
1662 buffers.remove_buffer(upstream_input_ids.clone());
1665 });
1666 }
1667
1668 pub fn merge_barrier_align_duration(
1669 &self,
1670 ) -> Option<LabelGuardedMetric<GenericCounter<AtomicU64>>> {
1671 self.merge_barrier_align_duration.clone()
1672 }
1673
1674 pub fn flush_buffered_watermarks(&mut self) {
1675 self.buffered_watermarks
1676 .values_mut()
1677 .for_each(|buffers| buffers.clear());
1678 }
1679
1680 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1681 self.blocked
1682 .iter()
1683 .map(|s| s.id())
1684 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1685 }
1686
1687 pub fn is_empty(&self) -> bool {
1688 self.blocked.is_empty() && self.active.is_empty()
1689 }
1690}
1691
1692pub(crate) struct DispatchBarrierBuffer {
1713 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1714 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1715 recv_state: BarrierReceiverState,
1716 curr_upstream_fragment_id: FragmentId,
1717 actor_id: ActorId,
1718 build_input_ctx: Arc<BuildInputContext>,
1720}
1721
1722struct BuildInputContext {
1723 pub actor_id: ActorId,
1724 pub local_barrier_manager: LocalBarrierManager,
1725 pub metrics: Arc<StreamingMetrics>,
1726 pub fragment_id: FragmentId,
1727 pub actor_config: Arc<StreamingConfig>,
1728}
1729
1730type BoxedNewInputsFuture =
1731 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1732
1733enum BarrierReceiverState {
1734 ReceivingBarrier,
1735 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1736}
1737
1738impl DispatchBarrierBuffer {
1739 pub fn new(
1740 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1741 actor_id: ActorId,
1742 curr_upstream_fragment_id: FragmentId,
1743 local_barrier_manager: LocalBarrierManager,
1744 metrics: Arc<StreamingMetrics>,
1745 fragment_id: FragmentId,
1746 actor_config: Arc<StreamingConfig>,
1747 ) -> Self {
1748 Self {
1749 buffer: VecDeque::new(),
1750 barrier_rx,
1751 recv_state: BarrierReceiverState::ReceivingBarrier,
1752 curr_upstream_fragment_id,
1753 actor_id,
1754 build_input_ctx: Arc::new(BuildInputContext {
1755 actor_id,
1756 local_barrier_manager,
1757 metrics,
1758 fragment_id,
1759 actor_config,
1760 }),
1761 }
1762 }
1763
1764 pub async fn await_next_message(
1765 &mut self,
1766 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1767 metrics: &ActorInputMetrics,
1768 ) -> StreamExecutorResult<DispatcherMessage> {
1769 let mut start_time = Instant::now();
1770 let interval_duration = Duration::from_secs(15);
1771 let mut interval =
1772 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1773
1774 loop {
1775 tokio::select! {
1776 biased;
1777 msg = stream.try_next() => {
1778 metrics
1779 .actor_input_buffer_blocking_duration_ns
1780 .inc_by(start_time.elapsed().as_nanos() as u64);
1781 return msg?.ok_or_else(
1782 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1783 );
1784 }
1785
1786 e = self.continuously_fetch_barrier_rx() => {
1787 return Err(e);
1788 }
1789
1790 _ = interval.tick() => {
1791 start_time = Instant::now();
1792 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1793 continue;
1794 }
1795 }
1796 }
1797 }
1798
1799 pub async fn pop_barrier_with_inputs(
1800 &mut self,
1801 barrier: DispatcherBarrier,
1802 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1803 while self.buffer.is_empty() {
1804 self.try_fetch_barrier_rx(false).await?;
1805 }
1806 let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1807 assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1808
1809 Ok((recv_barrier, inputs))
1810 }
1811
1812 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1813 loop {
1814 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1815 return e;
1816 }
1817 }
1818 }
1819
1820 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1821 match &mut self.recv_state {
1822 BarrierReceiverState::ReceivingBarrier => {
1823 let Some(barrier) = self.barrier_rx.recv().await else {
1824 if pending_on_end {
1825 return pending().await;
1826 } else {
1827 return Err(StreamExecutorError::channel_closed(
1828 "barrier channel closed unexpectedly",
1829 ));
1830 }
1831 };
1832 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1833 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1834 } else {
1835 self.buffer.push_back((barrier, None));
1836 }
1837 }
1838 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1839 let new_inputs = fut.await?;
1840 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1841 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1842 }
1843 }
1844 Ok(())
1845 }
1846
1847 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1848 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1849 && !update.added_upstream_actors.is_empty()
1850 {
1851 let upstream_fragment_id =
1853 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1854 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1855 new_upstream_fragment_id
1856 } else {
1857 self.curr_upstream_fragment_id
1858 };
1859 let ctx = self.build_input_ctx.clone();
1860 let added_upstream_actors = update.added_upstream_actors.clone();
1861 let barrier = barrier.clone();
1862 let fut = async move {
1863 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1864 let mut new_input = new_input(
1865 &ctx.local_barrier_manager,
1866 ctx.metrics.clone(),
1867 ctx.actor_id,
1868 ctx.fragment_id,
1869 upstream_actor,
1870 upstream_fragment_id,
1871 ctx.actor_config.clone(),
1872 )
1873 .await?;
1874
1875 let first_barrier = expect_first_barrier(&mut new_input).await?;
1878 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1879
1880 StreamExecutorResult::Ok(new_input)
1881 }))
1882 .await
1883 }
1884 .boxed();
1885
1886 Some(fut)
1887 } else {
1888 None
1889 }
1890 }
1891}