1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::config::StreamingConfig;
38use risingwave_common::metrics::LabelGuardedMetric;
39use risingwave_common::row::OwnedRow;
40use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
41use risingwave_common::util::epoch::{Epoch, EpochPair};
42use risingwave_common::util::tracing::TracingContext;
43use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
44use risingwave_connector::source::SplitImpl;
45use risingwave_expr::expr::{Expression, NonStrictExpression};
46use risingwave_pb::data::PbEpoch;
47use risingwave_pb::expr::PbInputRef;
48use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
49use risingwave_pb::stream_plan::barrier::BarrierKind;
50use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
51use risingwave_pb::stream_plan::stream_node::PbStreamKind;
52use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
53use risingwave_pb::stream_plan::{
54 PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, PbWatermark,
55 SubscriptionUpstreamInfo,
56};
57use smallvec::SmallVec;
58use tokio::sync::mpsc;
59use tokio::time::{Duration, Instant};
60
61use crate::error::StreamResult;
62use crate::executor::exchange::input::{
63 BoxedActorInput, BoxedInput, assert_equal_dispatcher_barrier, new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::{DispatchExecutor, DispatcherImpl};
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::{MaterializeExecutor, RefreshableMaterializeArgs};
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector::*;
184pub use watermark_filter::WatermarkFilterExecutor;
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197 CdcTableSnapshotSplitAssignmentWithGeneration,
198 build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::id::SubscriberId;
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202
203pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
204pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
205pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
206
207#[derive(Debug, Default, Clone)]
209pub struct ExecutorInfo {
210 pub schema: Schema,
212
213 pub stream_key: StreamKey,
215
216 pub stream_kind: PbStreamKind,
218
219 pub identity: String,
221
222 pub id: u64,
224}
225
226impl ExecutorInfo {
227 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
228 Self {
229 schema,
230 stream_key,
231 stream_kind: PbStreamKind::Retract, identity,
233 id,
234 }
235 }
236}
237
238pub trait Execute: Send + 'static {
240 fn execute(self: Box<Self>) -> BoxedMessageStream;
241
242 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
243 self.execute()
244 }
245
246 fn boxed(self) -> Box<dyn Execute>
247 where
248 Self: Sized + Send + 'static,
249 {
250 Box::new(self)
251 }
252}
253
254pub struct Executor {
257 info: ExecutorInfo,
258 execute: Box<dyn Execute>,
259}
260
261impl Executor {
262 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
263 Self { info, execute }
264 }
265
266 pub fn info(&self) -> &ExecutorInfo {
267 &self.info
268 }
269
270 pub fn schema(&self) -> &Schema {
271 &self.info.schema
272 }
273
274 pub fn stream_key(&self) -> StreamKeyRef<'_> {
275 &self.info.stream_key
276 }
277
278 pub fn stream_kind(&self) -> PbStreamKind {
279 self.info.stream_kind
280 }
281
282 pub fn identity(&self) -> &str {
283 &self.info.identity
284 }
285
286 pub fn execute(self) -> BoxedMessageStream {
287 self.execute.execute()
288 }
289
290 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
291 self.execute.execute_with_epoch(epoch)
292 }
293}
294
295impl std::fmt::Debug for Executor {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.write_str(self.identity())
298 }
299}
300
301impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
302 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
303 Self::new(info, execute)
304 }
305}
306
307impl<E> From<(ExecutorInfo, E)> for Executor
308where
309 E: Execute,
310{
311 fn from((info, execute): (ExecutorInfo, E)) -> Self {
312 Self::new(info, execute.boxed())
313 }
314}
315
316pub const INVALID_EPOCH: u64 = 0;
317
318type UpstreamFragmentId = FragmentId;
319type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
320
321#[derive(Debug, Clone, PartialEq)]
322#[cfg_attr(any(test, feature = "test"), derive(Default))]
323pub struct UpdateMutation {
324 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
325 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
326 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
327 pub dropped_actors: HashSet<ActorId>,
328 pub actor_splits: SplitAssignments,
329 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
330 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
331 pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
332}
333
334#[derive(Debug, Clone, PartialEq)]
335#[cfg_attr(any(test, feature = "test"), derive(Default))]
336pub struct AddMutation {
337 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
338 pub added_actors: HashSet<ActorId>,
339 pub splits: SplitAssignments,
341 pub pause: bool,
342 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
344 pub backfill_nodes_to_pause: HashSet<FragmentId>,
346 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
347 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
348}
349
350#[derive(Debug, Clone, PartialEq)]
351#[cfg_attr(any(test, feature = "test"), derive(Default))]
352pub struct StopMutation {
353 pub dropped_actors: HashSet<ActorId>,
354 pub dropped_sink_fragments: HashSet<FragmentId>,
355}
356
357#[derive(Debug, Clone, PartialEq)]
359pub enum Mutation {
360 Stop(StopMutation),
361 Update(UpdateMutation),
362 Add(AddMutation),
363 SourceChangeSplit(SplitAssignments),
364 Pause,
365 Resume,
366 Throttle(HashMap<ActorId, Option<u32>>),
367 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
368 DropSubscriptions {
369 subscriptions_to_drop: Vec<(SubscriberId, TableId)>,
371 },
372 StartFragmentBackfill {
373 fragment_ids: HashSet<FragmentId>,
374 },
375 RefreshStart {
376 table_id: TableId,
377 associated_source_id: SourceId,
378 },
379 ListFinish {
380 associated_source_id: SourceId,
381 },
382 LoadFinish {
383 associated_source_id: SourceId,
384 },
385}
386
387#[derive(Debug, Clone)]
392pub struct BarrierInner<M> {
393 pub epoch: EpochPair,
394 pub mutation: M,
395 pub kind: BarrierKind,
396
397 pub tracing_context: TracingContext,
399}
400
401pub type BarrierMutationType = Option<Arc<Mutation>>;
402pub type Barrier = BarrierInner<BarrierMutationType>;
403pub type DispatcherBarrier = BarrierInner<()>;
404
405impl<M: Default> BarrierInner<M> {
406 pub fn new_test_barrier(epoch: u64) -> Self {
408 Self {
409 epoch: EpochPair::new_test_epoch(epoch),
410 kind: BarrierKind::Checkpoint,
411 tracing_context: TracingContext::none(),
412 mutation: Default::default(),
413 }
414 }
415
416 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
417 Self {
418 epoch: EpochPair::new(epoch, prev_epoch),
419 kind: BarrierKind::Checkpoint,
420 tracing_context: TracingContext::none(),
421 mutation: Default::default(),
422 }
423 }
424}
425
426impl Barrier {
427 pub fn into_dispatcher(self) -> DispatcherBarrier {
428 DispatcherBarrier {
429 epoch: self.epoch,
430 mutation: (),
431 kind: self.kind,
432 tracing_context: self.tracing_context,
433 }
434 }
435
436 #[must_use]
437 pub fn with_mutation(self, mutation: Mutation) -> Self {
438 Self {
439 mutation: Some(Arc::new(mutation)),
440 ..self
441 }
442 }
443
444 #[must_use]
445 pub fn with_stop(self) -> Self {
446 self.with_mutation(Mutation::Stop(StopMutation {
447 dropped_actors: Default::default(),
448 dropped_sink_fragments: Default::default(),
449 }))
450 }
451
452 pub fn is_with_stop_mutation(&self) -> bool {
454 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
455 }
456
457 pub fn is_stop(&self, actor_id: ActorId) -> bool {
459 self.all_stop_actors()
460 .is_some_and(|actors| actors.contains(&actor_id))
461 }
462
463 pub fn is_checkpoint(&self) -> bool {
464 self.kind == BarrierKind::Checkpoint
465 }
466
467 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
477 match self.mutation.as_deref()? {
478 Mutation::Update(UpdateMutation { actor_splits, .. })
479 | Mutation::Add(AddMutation {
480 splits: actor_splits,
481 ..
482 }) => actor_splits.get(&actor_id),
483
484 _ => {
485 if cfg!(debug_assertions) {
486 panic!(
487 "the initial mutation of the barrier should not be {:?}",
488 self.mutation
489 );
490 }
491 None
492 }
493 }
494 .map(|s| s.as_slice())
495 }
496
497 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
499 match self.mutation.as_deref() {
500 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
501 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
502 _ => None,
503 }
504 }
505
506 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
512 match self.mutation.as_deref() {
513 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
514 added_actors.contains(&actor_id)
515 }
516 _ => false,
517 }
518 }
519
520 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
521 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
522 fragment_ids.contains(&fragment_id)
523 } else {
524 false
525 }
526 }
527
528 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
546 let Some(mutation) = self.mutation.as_deref() else {
547 return false;
548 };
549 match mutation {
550 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
552 Mutation::Update(_)
553 | Mutation::Stop(_)
554 | Mutation::Pause
555 | Mutation::Resume
556 | Mutation::SourceChangeSplit(_)
557 | Mutation::Throttle(_)
558 | Mutation::DropSubscriptions { .. }
559 | Mutation::ConnectorPropsChange(_)
560 | Mutation::StartFragmentBackfill { .. }
561 | Mutation::RefreshStart { .. }
562 | Mutation::ListFinish { .. }
563 | Mutation::LoadFinish { .. } => false,
564 }
565 }
566
567 pub fn is_pause_on_startup(&self) -> bool {
569 match self.mutation.as_deref() {
570 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
571 _ => false,
572 }
573 }
574
575 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
576 match self.mutation.as_deref() {
577 Some(Mutation::Add(AddMutation {
578 backfill_nodes_to_pause,
579 ..
580 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
581 Some(Mutation::Update(_)) => false,
582 _ => {
583 tracing::warn!(
584 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
585 self
586 );
587 false
588 }
589 }
590 }
591
592 pub fn is_resume(&self) -> bool {
594 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
595 }
596
597 pub fn as_update_merge(
600 &self,
601 actor_id: ActorId,
602 upstream_fragment_id: UpstreamFragmentId,
603 ) -> Option<&MergeUpdate> {
604 self.mutation
605 .as_deref()
606 .and_then(|mutation| match mutation {
607 Mutation::Update(UpdateMutation { merges, .. }) => {
608 merges.get(&(actor_id, upstream_fragment_id))
609 }
610 _ => None,
611 })
612 }
613
614 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
617 self.mutation
618 .as_deref()
619 .and_then(|mutation| match mutation {
620 Mutation::Add(AddMutation {
621 new_upstream_sinks, ..
622 }) => new_upstream_sinks.get(&fragment_id),
623 _ => None,
624 })
625 }
626
627 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
629 self.mutation
630 .as_deref()
631 .and_then(|mutation| match mutation {
632 Mutation::Stop(StopMutation {
633 dropped_sink_fragments,
634 ..
635 }) => Some(dropped_sink_fragments),
636 _ => None,
637 })
638 }
639
640 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
646 self.mutation
647 .as_deref()
648 .and_then(|mutation| match mutation {
649 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
650 vnode_bitmaps.get(&actor_id).cloned()
651 }
652 _ => None,
653 })
654 }
655
656 pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
657 self.mutation
658 .as_deref()
659 .and_then(|mutation| match mutation {
660 Mutation::Update(UpdateMutation {
661 sink_add_columns, ..
662 }) => sink_add_columns.get(&sink_id).cloned(),
663 _ => None,
664 })
665 }
666
667 pub fn get_curr_epoch(&self) -> Epoch {
668 Epoch(self.epoch.curr)
669 }
670
671 pub fn tracing_context(&self) -> &TracingContext {
673 &self.tracing_context
674 }
675
676 pub fn added_subscriber_on_mv_table(
677 &self,
678 mv_table_id: TableId,
679 ) -> impl Iterator<Item = SubscriberId> + '_ {
680 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
681 Some(add)
682 } else {
683 None
684 }
685 .into_iter()
686 .flat_map(move |add| {
687 add.subscriptions_to_add.iter().filter_map(
688 move |(upstream_mv_table_id, subscriber_id)| {
689 if *upstream_mv_table_id == mv_table_id {
690 Some(*subscriber_id)
691 } else {
692 None
693 }
694 },
695 )
696 })
697 }
698}
699
700impl<M: PartialEq> PartialEq for BarrierInner<M> {
701 fn eq(&self, other: &Self) -> bool {
702 self.epoch == other.epoch && self.mutation == other.mutation
703 }
704}
705
706impl Mutation {
707 #[cfg(test)]
711 pub fn is_stop(&self) -> bool {
712 matches!(self, Mutation::Stop(_))
713 }
714
715 #[cfg(test)]
716 fn to_protobuf(&self) -> PbMutation {
717 use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
718 use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
719 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
720 use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
721 use risingwave_pb::stream_plan::{
722 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
723 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation, PbSinkAddColumns,
724 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
725 PbThrottleMutation, PbUpdateMutation,
726 };
727 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
728 actor_splits
729 .iter()
730 .map(|(&actor_id, splits)| {
731 (
732 actor_id,
733 ConnectorSplits {
734 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
735 },
736 )
737 })
738 .collect::<HashMap<_, _>>()
739 };
740
741 match self {
742 Mutation::Stop(StopMutation {
743 dropped_actors,
744 dropped_sink_fragments,
745 }) => PbMutation::Stop(PbStopMutation {
746 actors: dropped_actors.iter().copied().collect(),
747 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
748 }),
749 Mutation::Update(UpdateMutation {
750 dispatchers,
751 merges,
752 vnode_bitmaps,
753 dropped_actors,
754 actor_splits,
755 actor_new_dispatchers,
756 actor_cdc_table_snapshot_splits,
757 sink_add_columns,
758 }) => PbMutation::Update(PbUpdateMutation {
759 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
760 merge_update: merges.values().cloned().collect(),
761 actor_vnode_bitmap_update: vnode_bitmaps
762 .iter()
763 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
764 .collect(),
765 dropped_actors: dropped_actors.iter().copied().collect(),
766 actor_splits: actor_splits_to_protobuf(actor_splits),
767 actor_new_dispatchers: actor_new_dispatchers
768 .iter()
769 .map(|(&actor_id, dispatchers)| {
770 (
771 actor_id,
772 PbDispatchers {
773 dispatchers: dispatchers.clone(),
774 },
775 )
776 })
777 .collect(),
778 actor_cdc_table_snapshot_splits:
779 build_pb_actor_cdc_table_snapshot_splits_with_generation(
780 actor_cdc_table_snapshot_splits.clone(),
781 )
782 .into(),
783 sink_add_columns: sink_add_columns
784 .iter()
785 .map(|(sink_id, add_columns)| {
786 (
787 *sink_id,
788 PbSinkAddColumns {
789 fields: add_columns.iter().map(|field| field.to_prost()).collect(),
790 },
791 )
792 })
793 .collect(),
794 }),
795 Mutation::Add(AddMutation {
796 adds,
797 added_actors,
798 splits,
799 pause,
800 subscriptions_to_add,
801 backfill_nodes_to_pause,
802 actor_cdc_table_snapshot_splits,
803 new_upstream_sinks,
804 }) => PbMutation::Add(PbAddMutation {
805 actor_dispatchers: adds
806 .iter()
807 .map(|(&actor_id, dispatchers)| {
808 (
809 actor_id,
810 PbDispatchers {
811 dispatchers: dispatchers.clone(),
812 },
813 )
814 })
815 .collect(),
816 added_actors: added_actors.iter().copied().collect(),
817 actor_splits: actor_splits_to_protobuf(splits),
818 pause: *pause,
819 subscriptions_to_add: subscriptions_to_add
820 .iter()
821 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
822 subscriber_id: *subscriber_id,
823 upstream_mv_table_id: *table_id,
824 })
825 .collect(),
826 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
827 actor_cdc_table_snapshot_splits:
828 build_pb_actor_cdc_table_snapshot_splits_with_generation(
829 actor_cdc_table_snapshot_splits.clone(),
830 )
831 .into(),
832 new_upstream_sinks: new_upstream_sinks
833 .iter()
834 .map(|(k, v)| (*k, v.clone()))
835 .collect(),
836 }),
837 Mutation::SourceChangeSplit(changes) => {
838 PbMutation::Splits(PbSourceChangeSplitMutation {
839 actor_splits: changes
840 .iter()
841 .map(|(&actor_id, splits)| {
842 (
843 actor_id,
844 ConnectorSplits {
845 splits: splits
846 .clone()
847 .iter()
848 .map(ConnectorSplit::from)
849 .collect(),
850 },
851 )
852 })
853 .collect(),
854 })
855 }
856 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
857 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
858 Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
859 actor_throttle: changes
860 .iter()
861 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
862 .collect(),
863 }),
864 Mutation::DropSubscriptions {
865 subscriptions_to_drop,
866 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
867 info: subscriptions_to_drop
868 .iter()
869 .map(
870 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
871 subscriber_id: *subscriber_id,
872 upstream_mv_table_id: *upstream_mv_table_id,
873 },
874 )
875 .collect(),
876 }),
877 Mutation::ConnectorPropsChange(map) => {
878 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
879 connector_props_infos: map
880 .iter()
881 .map(|(actor_id, options)| {
882 (
883 *actor_id,
884 ConnectorPropsInfo {
885 connector_props_info: options
886 .iter()
887 .map(|(k, v)| (k.clone(), v.clone()))
888 .collect(),
889 },
890 )
891 })
892 .collect(),
893 })
894 }
895 Mutation::StartFragmentBackfill { fragment_ids } => {
896 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
897 fragment_ids: fragment_ids.iter().copied().collect(),
898 })
899 }
900 Mutation::RefreshStart {
901 table_id,
902 associated_source_id,
903 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
904 table_id: *table_id,
905 associated_source_id: *associated_source_id,
906 }),
907 Mutation::ListFinish {
908 associated_source_id,
909 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
910 associated_source_id: *associated_source_id,
911 }),
912 Mutation::LoadFinish {
913 associated_source_id,
914 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
915 associated_source_id: *associated_source_id,
916 }),
917 }
918 }
919
920 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
921 let mutation = match prost {
922 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
923 dropped_actors: stop.actors.iter().copied().collect(),
924 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
925 }),
926
927 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
928 dispatchers: update
929 .dispatcher_update
930 .iter()
931 .map(|u| (u.actor_id, u.clone()))
932 .into_group_map(),
933 merges: update
934 .merge_update
935 .iter()
936 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
937 .collect(),
938 vnode_bitmaps: update
939 .actor_vnode_bitmap_update
940 .iter()
941 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
942 .collect(),
943 dropped_actors: update.dropped_actors.iter().copied().collect(),
944 actor_splits: update
945 .actor_splits
946 .iter()
947 .map(|(&actor_id, splits)| {
948 (
949 actor_id,
950 splits
951 .splits
952 .iter()
953 .map(|split| split.try_into().unwrap())
954 .collect(),
955 )
956 })
957 .collect(),
958 actor_new_dispatchers: update
959 .actor_new_dispatchers
960 .iter()
961 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
962 .collect(),
963 actor_cdc_table_snapshot_splits:
964 build_actor_cdc_table_snapshot_splits_with_generation(
965 update
966 .actor_cdc_table_snapshot_splits
967 .clone()
968 .unwrap_or_default(),
969 ),
970 sink_add_columns: update
971 .sink_add_columns
972 .iter()
973 .map(|(sink_id, add_columns)| {
974 (
975 *sink_id,
976 add_columns.fields.iter().map(Field::from_prost).collect(),
977 )
978 })
979 .collect(),
980 }),
981
982 PbMutation::Add(add) => Mutation::Add(AddMutation {
983 adds: add
984 .actor_dispatchers
985 .iter()
986 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
987 .collect(),
988 added_actors: add.added_actors.iter().copied().collect(),
989 splits: add
992 .actor_splits
993 .iter()
994 .map(|(&actor_id, splits)| {
995 (
996 actor_id,
997 splits
998 .splits
999 .iter()
1000 .map(|split| split.try_into().unwrap())
1001 .collect(),
1002 )
1003 })
1004 .collect(),
1005 pause: add.pause,
1006 subscriptions_to_add: add
1007 .subscriptions_to_add
1008 .iter()
1009 .map(
1010 |SubscriptionUpstreamInfo {
1011 subscriber_id,
1012 upstream_mv_table_id,
1013 }| { (*upstream_mv_table_id, *subscriber_id) },
1014 )
1015 .collect(),
1016 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1017 actor_cdc_table_snapshot_splits:
1018 build_actor_cdc_table_snapshot_splits_with_generation(
1019 add.actor_cdc_table_snapshot_splits
1020 .clone()
1021 .unwrap_or_default(),
1022 ),
1023 new_upstream_sinks: add
1024 .new_upstream_sinks
1025 .iter()
1026 .map(|(k, v)| (*k, v.clone()))
1027 .collect(),
1028 }),
1029
1030 PbMutation::Splits(s) => {
1031 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1032 Vec::with_capacity(s.actor_splits.len());
1033 for (&actor_id, splits) in &s.actor_splits {
1034 if !splits.splits.is_empty() {
1035 change_splits.push((
1036 actor_id,
1037 splits
1038 .splits
1039 .iter()
1040 .map(SplitImpl::try_from)
1041 .try_collect()?,
1042 ));
1043 }
1044 }
1045 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1046 }
1047 PbMutation::Pause(_) => Mutation::Pause,
1048 PbMutation::Resume(_) => Mutation::Resume,
1049 PbMutation::Throttle(changes) => Mutation::Throttle(
1050 changes
1051 .actor_throttle
1052 .iter()
1053 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1054 .collect(),
1055 ),
1056 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1057 subscriptions_to_drop: drop
1058 .info
1059 .iter()
1060 .map(|info| (info.subscriber_id, info.upstream_mv_table_id))
1061 .collect(),
1062 },
1063 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1064 Mutation::ConnectorPropsChange(
1065 alter_connector_props
1066 .connector_props_infos
1067 .iter()
1068 .map(|(connector_id, options)| {
1069 (
1070 *connector_id,
1071 options
1072 .connector_props_info
1073 .iter()
1074 .map(|(k, v)| (k.clone(), v.clone()))
1075 .collect(),
1076 )
1077 })
1078 .collect(),
1079 )
1080 }
1081 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1082 Mutation::StartFragmentBackfill {
1083 fragment_ids: start_fragment_backfill
1084 .fragment_ids
1085 .iter()
1086 .copied()
1087 .collect(),
1088 }
1089 }
1090 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1091 table_id: refresh_start.table_id,
1092 associated_source_id: refresh_start.associated_source_id,
1093 },
1094 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1095 associated_source_id: list_finish.associated_source_id,
1096 },
1097 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1098 associated_source_id: load_finish.associated_source_id,
1099 },
1100 };
1101 Ok(mutation)
1102 }
1103}
1104
1105impl<M> BarrierInner<M> {
1106 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1107 let Self {
1108 epoch,
1109 mutation,
1110 kind,
1111 tracing_context,
1112 ..
1113 } = self;
1114
1115 PbBarrier {
1116 epoch: Some(PbEpoch {
1117 curr: epoch.curr,
1118 prev: epoch.prev,
1119 }),
1120 mutation: barrier_fn(mutation).map(|mutation| PbBarrierMutation {
1121 mutation: Some(mutation),
1122 }),
1123 tracing_context: tracing_context.to_protobuf(),
1124 kind: *kind as _,
1125 }
1126 }
1127
1128 fn from_protobuf_inner(
1129 prost: &PbBarrier,
1130 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1131 ) -> StreamExecutorResult<Self> {
1132 let epoch = prost.get_epoch()?;
1133
1134 Ok(Self {
1135 kind: prost.kind(),
1136 epoch: EpochPair::new(epoch.curr, epoch.prev),
1137 mutation: mutation_from_pb(
1138 (prost.mutation.as_ref()).and_then(|mutation| mutation.mutation.as_ref()),
1139 )?,
1140 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1141 })
1142 }
1143
1144 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1145 BarrierInner {
1146 epoch: self.epoch,
1147 mutation: f(self.mutation),
1148 kind: self.kind,
1149 tracing_context: self.tracing_context,
1150 }
1151 }
1152}
1153
1154impl DispatcherBarrier {
1155 pub fn to_protobuf(&self) -> PbBarrier {
1156 self.to_protobuf_inner(|_| None)
1157 }
1158}
1159
1160impl Barrier {
1161 #[cfg(test)]
1162 pub fn to_protobuf(&self) -> PbBarrier {
1163 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1164 }
1165
1166 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1167 Self::from_protobuf_inner(prost, |mutation| {
1168 mutation
1169 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1170 .transpose()
1171 })
1172 }
1173}
1174
1175#[derive(Debug, PartialEq, Eq, Clone)]
1176pub struct Watermark {
1177 pub col_idx: usize,
1178 pub data_type: DataType,
1179 pub val: ScalarImpl,
1180}
1181
1182impl PartialOrd for Watermark {
1183 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1184 Some(self.cmp(other))
1185 }
1186}
1187
1188impl Ord for Watermark {
1189 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1190 self.val.default_cmp(&other.val)
1191 }
1192}
1193
1194impl Watermark {
1195 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1196 Self {
1197 col_idx,
1198 data_type,
1199 val,
1200 }
1201 }
1202
1203 pub async fn transform_with_expr(
1204 self,
1205 expr: &NonStrictExpression<impl Expression>,
1206 new_col_idx: usize,
1207 ) -> Option<Self> {
1208 let Self { col_idx, val, .. } = self;
1209 let row = {
1210 let mut row = vec![None; col_idx + 1];
1211 row[col_idx] = Some(val);
1212 OwnedRow::new(row)
1213 };
1214 let val = expr.eval_row_infallible(&row).await?;
1215 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1216 }
1217
1218 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1221 output_indices
1222 .iter()
1223 .position(|p| *p == self.col_idx)
1224 .map(|new_col_idx| self.with_idx(new_col_idx))
1225 }
1226
1227 pub fn to_protobuf(&self) -> PbWatermark {
1228 PbWatermark {
1229 column: Some(PbInputRef {
1230 index: self.col_idx as _,
1231 r#type: Some(self.data_type.to_protobuf()),
1232 }),
1233 val: Some(&self.val).to_protobuf().into(),
1234 }
1235 }
1236
1237 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1238 let col_ref = prost.get_column()?;
1239 let data_type = DataType::from(col_ref.get_type()?);
1240 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1241 .expect("watermark value cannot be null");
1242 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1243 }
1244
1245 pub fn with_idx(self, idx: usize) -> Self {
1246 Self::new(idx, self.data_type, self.val)
1247 }
1248}
1249
1250#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1251pub enum MessageInner<M> {
1252 Chunk(StreamChunk),
1253 Barrier(BarrierInner<M>),
1254 Watermark(Watermark),
1255}
1256
1257impl<M> MessageInner<M> {
1258 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1259 match self {
1260 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1261 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1262 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1263 }
1264 }
1265}
1266
1267pub type Message = MessageInner<BarrierMutationType>;
1268pub type DispatcherMessage = MessageInner<()>;
1269
1270#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1273pub enum MessageBatchInner<M> {
1274 Chunk(StreamChunk),
1275 BarrierBatch(Vec<BarrierInner<M>>),
1276 Watermark(Watermark),
1277}
1278pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1279pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1280pub type DispatcherMessageBatch = MessageBatchInner<()>;
1281
1282impl From<DispatcherMessage> for DispatcherMessageBatch {
1283 fn from(m: DispatcherMessage) -> Self {
1284 match m {
1285 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1286 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1287 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1288 }
1289 }
1290}
1291
1292impl From<StreamChunk> for Message {
1293 fn from(chunk: StreamChunk) -> Self {
1294 Message::Chunk(chunk)
1295 }
1296}
1297
1298impl<'a> TryFrom<&'a Message> for &'a Barrier {
1299 type Error = ();
1300
1301 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1302 match m {
1303 Message::Chunk(_) => Err(()),
1304 Message::Barrier(b) => Ok(b),
1305 Message::Watermark(_) => Err(()),
1306 }
1307 }
1308}
1309
1310impl Message {
1311 #[cfg(test)]
1316 pub fn is_stop(&self) -> bool {
1317 matches!(
1318 self,
1319 Message::Barrier(Barrier {
1320 mutation,
1321 ..
1322 }) if mutation.as_ref().unwrap().is_stop()
1323 )
1324 }
1325}
1326
1327impl DispatcherMessageBatch {
1328 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1329 let prost = match self {
1330 Self::Chunk(stream_chunk) => {
1331 let prost_stream_chunk = stream_chunk.to_protobuf();
1332 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1333 }
1334 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1335 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1336 }),
1337 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1338 };
1339 PbStreamMessageBatch {
1340 stream_message_batch: Some(prost),
1341 }
1342 }
1343
1344 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1345 let res = match prost.get_stream_message_batch()? {
1346 StreamMessageBatch::StreamChunk(chunk) => {
1347 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1348 }
1349 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1350 let barriers = barrier_batch
1351 .barriers
1352 .iter()
1353 .map(|barrier| {
1354 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1355 if mutation.is_some() {
1356 if cfg!(debug_assertions) {
1357 panic!("should not receive message of barrier with mutation");
1358 } else {
1359 warn!(?barrier, "receive message of barrier with mutation");
1360 }
1361 }
1362 Ok(())
1363 })
1364 })
1365 .try_collect()?;
1366 Self::BarrierBatch(barriers)
1367 }
1368 StreamMessageBatch::Watermark(watermark) => {
1369 Self::Watermark(Watermark::from_protobuf(watermark)?)
1370 }
1371 };
1372 Ok(res)
1373 }
1374
1375 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1376 ::prost::Message::encoded_len(msg)
1377 }
1378}
1379
1380pub type StreamKey = Vec<usize>;
1381pub type StreamKeyRef<'a> = &'a [usize];
1382pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1383
1384pub async fn expect_first_barrier<M: Debug>(
1386 stream: &mut (impl MessageStreamInner<M> + Unpin),
1387) -> StreamExecutorResult<BarrierInner<M>> {
1388 let message = stream
1389 .next()
1390 .instrument_await("expect_first_barrier")
1391 .await
1392 .context("failed to extract the first message: stream closed unexpectedly")??;
1393 let barrier = message
1394 .into_barrier()
1395 .expect("the first message must be a barrier");
1396 assert!(matches!(
1398 barrier.kind,
1399 BarrierKind::Checkpoint | BarrierKind::Initial
1400 ));
1401 Ok(barrier)
1402}
1403
1404pub async fn expect_first_barrier_from_aligned_stream(
1406 stream: &mut (impl AlignedMessageStream + Unpin),
1407) -> StreamExecutorResult<Barrier> {
1408 let message = stream
1409 .next()
1410 .instrument_await("expect_first_barrier")
1411 .await
1412 .context("failed to extract the first message: stream closed unexpectedly")??;
1413 let barrier = message
1414 .into_barrier()
1415 .expect("the first message must be a barrier");
1416 Ok(barrier)
1417}
1418
1419pub trait StreamConsumer: Send + 'static {
1421 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1422
1423 fn execute(self: Box<Self>) -> Self::BarrierStream;
1424}
1425
1426type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1427
1428pub struct DynamicReceivers<InputId, M> {
1432 barrier: Option<BarrierInner<M>>,
1434 start_ts: Option<Instant>,
1436 blocked: Vec<BoxedMessageInput<InputId, M>>,
1438 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1440 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1442 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1444 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1446}
1447
1448impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1449 for DynamicReceivers<InputId, M>
1450{
1451 type Item = MessageStreamItemInner<M>;
1452
1453 fn poll_next(
1454 mut self: Pin<&mut Self>,
1455 cx: &mut std::task::Context<'_>,
1456 ) -> Poll<Option<Self::Item>> {
1457 if self.is_empty() {
1458 return Poll::Ready(None);
1459 }
1460
1461 loop {
1462 match futures::ready!(self.active.poll_next_unpin(cx)) {
1463 Some((Some(Err(e)), _)) => {
1465 return Poll::Ready(Some(Err(e)));
1466 }
1467 Some((Some(Ok(message)), remaining)) => {
1469 let input_id = remaining.id();
1470 match message {
1471 MessageInner::Chunk(chunk) => {
1472 self.active.push(remaining.into_future());
1474 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1475 }
1476 MessageInner::Watermark(watermark) => {
1477 self.active.push(remaining.into_future());
1479 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1480 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1481 }
1482 }
1483 MessageInner::Barrier(barrier) => {
1484 if self.blocked.is_empty() {
1486 self.start_ts = Some(Instant::now());
1487 }
1488 self.blocked.push(remaining);
1489 if let Some(current_barrier) = self.barrier.as_ref() {
1490 if current_barrier.epoch != barrier.epoch {
1491 return Poll::Ready(Some(Err(
1492 StreamExecutorError::align_barrier(
1493 current_barrier.clone().map_mutation(|_| None),
1494 barrier.map_mutation(|_| None),
1495 ),
1496 )));
1497 }
1498 } else {
1499 self.barrier = Some(barrier);
1500 }
1501 }
1502 }
1503 }
1504 Some((None, remaining)) => {
1512 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1513 "upstream input {:?} unexpectedly closed",
1514 remaining.id()
1515 )))));
1516 }
1517 None => {
1519 assert!(!self.blocked.is_empty());
1520
1521 let start_ts = self
1522 .start_ts
1523 .take()
1524 .expect("should have received at least one barrier");
1525 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1526 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1527 }
1528 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1529 merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1531 }
1532
1533 break;
1534 }
1535 }
1536 }
1537
1538 assert!(self.active.is_terminated());
1539
1540 let barrier = self.barrier.take().unwrap();
1541
1542 let upstreams = std::mem::take(&mut self.blocked);
1543 self.extend_active(upstreams);
1544 assert!(!self.active.is_terminated());
1545
1546 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1547 }
1548}
1549
1550impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1551 pub fn new(
1552 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1553 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1554 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1555 ) -> Self {
1556 let mut this = Self {
1557 barrier: None,
1558 start_ts: None,
1559 blocked: Vec::with_capacity(upstreams.len()),
1560 active: Default::default(),
1561 buffered_watermarks: Default::default(),
1562 merge_barrier_align_duration,
1563 barrier_align_duration,
1564 };
1565 this.extend_active(upstreams);
1566 this
1567 }
1568
1569 pub fn extend_active(
1572 &mut self,
1573 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1574 ) {
1575 assert!(self.blocked.is_empty() && self.barrier.is_none());
1576
1577 self.active
1578 .extend(upstreams.into_iter().map(|s| s.into_future()));
1579 }
1580
1581 pub fn handle_watermark(
1583 &mut self,
1584 input_id: InputId,
1585 watermark: Watermark,
1586 ) -> Option<Watermark> {
1587 let col_idx = watermark.col_idx;
1588 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1590 let watermarks = self
1591 .buffered_watermarks
1592 .entry(col_idx)
1593 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1594 watermarks.handle_watermark(input_id, watermark)
1595 }
1596
1597 pub fn add_upstreams_from(
1600 &mut self,
1601 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1602 ) {
1603 assert!(self.blocked.is_empty() && self.barrier.is_none());
1604
1605 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1606 let input_ids = new_inputs.iter().map(|input| input.id());
1607 self.buffered_watermarks.values_mut().for_each(|buffers| {
1608 buffers.add_buffers(input_ids.clone());
1610 });
1611 self.active
1612 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1613 }
1614
1615 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1619 assert!(self.blocked.is_empty() && self.barrier.is_none());
1620
1621 let new_upstreams = std::mem::take(&mut self.active)
1622 .into_iter()
1623 .map(|s| s.into_inner().unwrap())
1624 .filter(|u| !upstream_input_ids.contains(&u.id()));
1625 self.extend_active(new_upstreams);
1626 self.buffered_watermarks.values_mut().for_each(|buffers| {
1627 buffers.remove_buffer(upstream_input_ids.clone());
1630 });
1631 }
1632
1633 pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1634 self.merge_barrier_align_duration.clone()
1635 }
1636
1637 pub fn flush_buffered_watermarks(&mut self) {
1638 self.buffered_watermarks
1639 .values_mut()
1640 .for_each(|buffers| buffers.clear());
1641 }
1642
1643 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1644 self.blocked
1645 .iter()
1646 .map(|s| s.id())
1647 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1648 }
1649
1650 pub fn is_empty(&self) -> bool {
1651 self.blocked.is_empty() && self.active.is_empty()
1652 }
1653}
1654
1655pub(crate) struct DispatchBarrierBuffer {
1676 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1677 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1678 recv_state: BarrierReceiverState,
1679 curr_upstream_fragment_id: FragmentId,
1680 actor_id: ActorId,
1681 build_input_ctx: Arc<BuildInputContext>,
1683}
1684
1685struct BuildInputContext {
1686 pub actor_id: ActorId,
1687 pub local_barrier_manager: LocalBarrierManager,
1688 pub metrics: Arc<StreamingMetrics>,
1689 pub fragment_id: FragmentId,
1690 pub actor_config: Arc<StreamingConfig>,
1691}
1692
1693type BoxedNewInputsFuture =
1694 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1695
1696enum BarrierReceiverState {
1697 ReceivingBarrier,
1698 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1699}
1700
1701impl DispatchBarrierBuffer {
1702 pub fn new(
1703 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1704 actor_id: ActorId,
1705 curr_upstream_fragment_id: FragmentId,
1706 local_barrier_manager: LocalBarrierManager,
1707 metrics: Arc<StreamingMetrics>,
1708 fragment_id: FragmentId,
1709 actor_config: Arc<StreamingConfig>,
1710 ) -> Self {
1711 Self {
1712 buffer: VecDeque::new(),
1713 barrier_rx,
1714 recv_state: BarrierReceiverState::ReceivingBarrier,
1715 curr_upstream_fragment_id,
1716 actor_id,
1717 build_input_ctx: Arc::new(BuildInputContext {
1718 actor_id,
1719 local_barrier_manager,
1720 metrics,
1721 fragment_id,
1722 actor_config,
1723 }),
1724 }
1725 }
1726
1727 pub async fn await_next_message(
1728 &mut self,
1729 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1730 metrics: &ActorInputMetrics,
1731 ) -> StreamExecutorResult<DispatcherMessage> {
1732 let mut start_time = Instant::now();
1733 let interval_duration = Duration::from_secs(15);
1734 let mut interval =
1735 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1736
1737 loop {
1738 tokio::select! {
1739 biased;
1740 msg = stream.try_next() => {
1741 metrics
1742 .actor_input_buffer_blocking_duration_ns
1743 .inc_by(start_time.elapsed().as_nanos() as u64);
1744 return msg?.ok_or_else(
1745 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1746 );
1747 }
1748
1749 e = self.continuously_fetch_barrier_rx() => {
1750 return Err(e);
1751 }
1752
1753 _ = interval.tick() => {
1754 start_time = Instant::now();
1755 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1756 continue;
1757 }
1758 }
1759 }
1760 }
1761
1762 pub async fn pop_barrier_with_inputs(
1763 &mut self,
1764 barrier: DispatcherBarrier,
1765 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1766 while self.buffer.is_empty() {
1767 self.try_fetch_barrier_rx(false).await?;
1768 }
1769 let (recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1770 assert_equal_dispatcher_barrier(&recv_barrier, &barrier);
1771
1772 Ok((recv_barrier, inputs))
1773 }
1774
1775 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1776 loop {
1777 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1778 return e;
1779 }
1780 }
1781 }
1782
1783 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1784 match &mut self.recv_state {
1785 BarrierReceiverState::ReceivingBarrier => {
1786 let Some(barrier) = self.barrier_rx.recv().await else {
1787 if pending_on_end {
1788 return pending().await;
1789 } else {
1790 return Err(StreamExecutorError::channel_closed(
1791 "barrier channel closed unexpectedly",
1792 ));
1793 }
1794 };
1795 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1796 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1797 } else {
1798 self.buffer.push_back((barrier, None));
1799 }
1800 }
1801 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1802 let new_inputs = fut.await?;
1803 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1804 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1805 }
1806 }
1807 Ok(())
1808 }
1809
1810 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1811 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1812 && !update.added_upstream_actors.is_empty()
1813 {
1814 let upstream_fragment_id =
1816 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1817 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1818 new_upstream_fragment_id
1819 } else {
1820 self.curr_upstream_fragment_id
1821 };
1822 let ctx = self.build_input_ctx.clone();
1823 let added_upstream_actors = update.added_upstream_actors.clone();
1824 let barrier = barrier.clone();
1825 let fut = async move {
1826 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1827 let mut new_input = new_input(
1828 &ctx.local_barrier_manager,
1829 ctx.metrics.clone(),
1830 ctx.actor_id,
1831 ctx.fragment_id,
1832 upstream_actor,
1833 upstream_fragment_id,
1834 ctx.actor_config.clone(),
1835 )
1836 .await?;
1837
1838 let first_barrier = expect_first_barrier(&mut new_input).await?;
1841 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1842
1843 StreamExecutorResult::Ok(new_input)
1844 }))
1845 .await
1846 }
1847 .boxed();
1848
1849 Some(fut)
1850 } else {
1851 None
1852 }
1853 }
1854}