1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53 PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, PbWatermark,
54 SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62 BoxedActorInput, BoxedInput, apply_dispatcher_barrier, assert_equal_dispatcher_barrier,
63 new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::{DispatchExecutor, DispatcherImpl};
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::*;
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector::*;
184pub use watermark_filter::WatermarkFilterExecutor;
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197 CdcTableSnapshotSplitAssignmentWithGeneration,
198 build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::id::SubscriberId;
201use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
202
203pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
204pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
205pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
206
207#[derive(Debug, Default, Clone)]
209pub struct ExecutorInfo {
210 pub schema: Schema,
212
213 pub stream_key: StreamKey,
215
216 pub stream_kind: PbStreamKind,
218
219 pub identity: String,
221
222 pub id: u64,
224}
225
226impl ExecutorInfo {
227 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
228 Self {
229 schema,
230 stream_key,
231 stream_kind: PbStreamKind::Retract, identity,
233 id,
234 }
235 }
236}
237
238pub trait Execute: Send + 'static {
240 fn execute(self: Box<Self>) -> BoxedMessageStream;
241
242 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
243 self.execute()
244 }
245
246 fn boxed(self) -> Box<dyn Execute>
247 where
248 Self: Sized + Send + 'static,
249 {
250 Box::new(self)
251 }
252}
253
254pub struct Executor {
257 info: ExecutorInfo,
258 execute: Box<dyn Execute>,
259}
260
261impl Executor {
262 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
263 Self { info, execute }
264 }
265
266 pub fn info(&self) -> &ExecutorInfo {
267 &self.info
268 }
269
270 pub fn schema(&self) -> &Schema {
271 &self.info.schema
272 }
273
274 pub fn stream_key(&self) -> StreamKeyRef<'_> {
275 &self.info.stream_key
276 }
277
278 pub fn stream_kind(&self) -> PbStreamKind {
279 self.info.stream_kind
280 }
281
282 pub fn identity(&self) -> &str {
283 &self.info.identity
284 }
285
286 pub fn execute(self) -> BoxedMessageStream {
287 self.execute.execute()
288 }
289
290 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
291 self.execute.execute_with_epoch(epoch)
292 }
293}
294
295impl std::fmt::Debug for Executor {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.write_str(self.identity())
298 }
299}
300
301impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
302 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
303 Self::new(info, execute)
304 }
305}
306
307impl<E> From<(ExecutorInfo, E)> for Executor
308where
309 E: Execute,
310{
311 fn from((info, execute): (ExecutorInfo, E)) -> Self {
312 Self::new(info, execute.boxed())
313 }
314}
315
316pub const INVALID_EPOCH: u64 = 0;
317
318type UpstreamFragmentId = FragmentId;
319type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
320
321#[derive(Debug, Clone, PartialEq)]
322#[cfg_attr(any(test, feature = "test"), derive(Default))]
323pub struct UpdateMutation {
324 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
325 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
326 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
327 pub dropped_actors: HashSet<ActorId>,
328 pub actor_splits: SplitAssignments,
329 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
330 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
331 pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
332}
333
334#[derive(Debug, Clone, PartialEq)]
335#[cfg_attr(any(test, feature = "test"), derive(Default))]
336pub struct AddMutation {
337 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
338 pub added_actors: HashSet<ActorId>,
339 pub splits: SplitAssignments,
341 pub pause: bool,
342 pub subscriptions_to_add: Vec<(TableId, SubscriberId)>,
344 pub backfill_nodes_to_pause: HashSet<FragmentId>,
346 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
347 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
348}
349
350#[derive(Debug, Clone, PartialEq)]
351#[cfg_attr(any(test, feature = "test"), derive(Default))]
352pub struct StopMutation {
353 pub dropped_actors: HashSet<ActorId>,
354 pub dropped_sink_fragments: HashSet<FragmentId>,
355}
356
357#[derive(Debug, Clone, PartialEq)]
359pub enum Mutation {
360 Stop(StopMutation),
361 Update(UpdateMutation),
362 Add(AddMutation),
363 SourceChangeSplit(SplitAssignments),
364 Pause,
365 Resume,
366 Throttle(HashMap<ActorId, Option<u32>>),
367 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
368 DropSubscriptions {
369 subscriptions_to_drop: Vec<(SubscriberId, TableId)>,
371 },
372 StartFragmentBackfill {
373 fragment_ids: HashSet<FragmentId>,
374 },
375 RefreshStart {
376 table_id: TableId,
377 associated_source_id: SourceId,
378 },
379 ListFinish {
380 associated_source_id: SourceId,
381 },
382 LoadFinish {
383 associated_source_id: SourceId,
384 },
385}
386
387#[derive(Debug, Clone)]
392pub struct BarrierInner<M> {
393 pub epoch: EpochPair,
394 pub mutation: M,
395 pub kind: BarrierKind,
396
397 pub tracing_context: TracingContext,
399
400 pub passed_actors: Vec<ActorId>,
402}
403
404pub type BarrierMutationType = Option<Arc<Mutation>>;
405pub type Barrier = BarrierInner<BarrierMutationType>;
406pub type DispatcherBarrier = BarrierInner<()>;
407
408impl<M: Default> BarrierInner<M> {
409 pub fn new_test_barrier(epoch: u64) -> Self {
411 Self {
412 epoch: EpochPair::new_test_epoch(epoch),
413 kind: BarrierKind::Checkpoint,
414 tracing_context: TracingContext::none(),
415 mutation: Default::default(),
416 passed_actors: Default::default(),
417 }
418 }
419
420 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
421 Self {
422 epoch: EpochPair::new(epoch, prev_epoch),
423 kind: BarrierKind::Checkpoint,
424 tracing_context: TracingContext::none(),
425 mutation: Default::default(),
426 passed_actors: Default::default(),
427 }
428 }
429}
430
431impl Barrier {
432 pub fn into_dispatcher(self) -> DispatcherBarrier {
433 DispatcherBarrier {
434 epoch: self.epoch,
435 mutation: (),
436 kind: self.kind,
437 tracing_context: self.tracing_context,
438 passed_actors: self.passed_actors,
439 }
440 }
441
442 #[must_use]
443 pub fn with_mutation(self, mutation: Mutation) -> Self {
444 Self {
445 mutation: Some(Arc::new(mutation)),
446 ..self
447 }
448 }
449
450 #[must_use]
451 pub fn with_stop(self) -> Self {
452 self.with_mutation(Mutation::Stop(StopMutation {
453 dropped_actors: Default::default(),
454 dropped_sink_fragments: Default::default(),
455 }))
456 }
457
458 pub fn is_with_stop_mutation(&self) -> bool {
460 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
461 }
462
463 pub fn is_stop(&self, actor_id: ActorId) -> bool {
465 self.all_stop_actors()
466 .is_some_and(|actors| actors.contains(&actor_id))
467 }
468
469 pub fn is_checkpoint(&self) -> bool {
470 self.kind == BarrierKind::Checkpoint
471 }
472
473 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
483 match self.mutation.as_deref()? {
484 Mutation::Update(UpdateMutation { actor_splits, .. })
485 | Mutation::Add(AddMutation {
486 splits: actor_splits,
487 ..
488 }) => actor_splits.get(&actor_id),
489
490 _ => {
491 if cfg!(debug_assertions) {
492 panic!(
493 "the initial mutation of the barrier should not be {:?}",
494 self.mutation
495 );
496 }
497 None
498 }
499 }
500 .map(|s| s.as_slice())
501 }
502
503 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
505 match self.mutation.as_deref() {
506 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
507 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
508 _ => None,
509 }
510 }
511
512 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
518 match self.mutation.as_deref() {
519 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
520 added_actors.contains(&actor_id)
521 }
522 _ => false,
523 }
524 }
525
526 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
527 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
528 fragment_ids.contains(&fragment_id)
529 } else {
530 false
531 }
532 }
533
534 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
552 let Some(mutation) = self.mutation.as_deref() else {
553 return false;
554 };
555 match mutation {
556 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
558 Mutation::Update(_)
559 | Mutation::Stop(_)
560 | Mutation::Pause
561 | Mutation::Resume
562 | Mutation::SourceChangeSplit(_)
563 | Mutation::Throttle(_)
564 | Mutation::DropSubscriptions { .. }
565 | Mutation::ConnectorPropsChange(_)
566 | Mutation::StartFragmentBackfill { .. }
567 | Mutation::RefreshStart { .. }
568 | Mutation::ListFinish { .. }
569 | Mutation::LoadFinish { .. } => false,
570 }
571 }
572
573 pub fn is_pause_on_startup(&self) -> bool {
575 match self.mutation.as_deref() {
576 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
577 _ => false,
578 }
579 }
580
581 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
582 match self.mutation.as_deref() {
583 Some(Mutation::Add(AddMutation {
584 backfill_nodes_to_pause,
585 ..
586 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
587 Some(Mutation::Update(_)) => false,
588 _ => {
589 tracing::warn!(
590 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
591 self
592 );
593 false
594 }
595 }
596 }
597
598 pub fn is_resume(&self) -> bool {
600 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
601 }
602
603 pub fn as_update_merge(
606 &self,
607 actor_id: ActorId,
608 upstream_fragment_id: UpstreamFragmentId,
609 ) -> Option<&MergeUpdate> {
610 self.mutation
611 .as_deref()
612 .and_then(|mutation| match mutation {
613 Mutation::Update(UpdateMutation { merges, .. }) => {
614 merges.get(&(actor_id, upstream_fragment_id))
615 }
616 _ => None,
617 })
618 }
619
620 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
623 self.mutation
624 .as_deref()
625 .and_then(|mutation| match mutation {
626 Mutation::Add(AddMutation {
627 new_upstream_sinks, ..
628 }) => new_upstream_sinks.get(&fragment_id),
629 _ => None,
630 })
631 }
632
633 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
635 self.mutation
636 .as_deref()
637 .and_then(|mutation| match mutation {
638 Mutation::Stop(StopMutation {
639 dropped_sink_fragments,
640 ..
641 }) => Some(dropped_sink_fragments),
642 _ => None,
643 })
644 }
645
646 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
652 self.mutation
653 .as_deref()
654 .and_then(|mutation| match mutation {
655 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
656 vnode_bitmaps.get(&actor_id).cloned()
657 }
658 _ => None,
659 })
660 }
661
662 pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
663 self.mutation
664 .as_deref()
665 .and_then(|mutation| match mutation {
666 Mutation::Update(UpdateMutation {
667 sink_add_columns, ..
668 }) => sink_add_columns.get(&sink_id).cloned(),
669 _ => None,
670 })
671 }
672
673 pub fn get_curr_epoch(&self) -> Epoch {
674 Epoch(self.epoch.curr)
675 }
676
677 pub fn tracing_context(&self) -> &TracingContext {
679 &self.tracing_context
680 }
681
682 pub fn added_subscriber_on_mv_table(
683 &self,
684 mv_table_id: TableId,
685 ) -> impl Iterator<Item = SubscriberId> + '_ {
686 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
687 Some(add)
688 } else {
689 None
690 }
691 .into_iter()
692 .flat_map(move |add| {
693 add.subscriptions_to_add.iter().filter_map(
694 move |(upstream_mv_table_id, subscriber_id)| {
695 if *upstream_mv_table_id == mv_table_id {
696 Some(*subscriber_id)
697 } else {
698 None
699 }
700 },
701 )
702 })
703 }
704}
705
706impl<M: PartialEq> PartialEq for BarrierInner<M> {
707 fn eq(&self, other: &Self) -> bool {
708 self.epoch == other.epoch && self.mutation == other.mutation
709 }
710}
711
712impl Mutation {
713 #[cfg(test)]
717 pub fn is_stop(&self) -> bool {
718 matches!(self, Mutation::Stop(_))
719 }
720
721 #[cfg(test)]
722 fn to_protobuf(&self) -> PbMutation {
723 use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
724 use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
725 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
726 use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
727 use risingwave_pb::stream_plan::{
728 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
729 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation, PbSinkAddColumns,
730 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
731 PbThrottleMutation, PbUpdateMutation,
732 };
733 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
734 actor_splits
735 .iter()
736 .map(|(&actor_id, splits)| {
737 (
738 actor_id,
739 ConnectorSplits {
740 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
741 },
742 )
743 })
744 .collect::<HashMap<_, _>>()
745 };
746
747 match self {
748 Mutation::Stop(StopMutation {
749 dropped_actors,
750 dropped_sink_fragments,
751 }) => PbMutation::Stop(PbStopMutation {
752 actors: dropped_actors.iter().copied().collect(),
753 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
754 }),
755 Mutation::Update(UpdateMutation {
756 dispatchers,
757 merges,
758 vnode_bitmaps,
759 dropped_actors,
760 actor_splits,
761 actor_new_dispatchers,
762 actor_cdc_table_snapshot_splits,
763 sink_add_columns,
764 }) => PbMutation::Update(PbUpdateMutation {
765 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
766 merge_update: merges.values().cloned().collect(),
767 actor_vnode_bitmap_update: vnode_bitmaps
768 .iter()
769 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
770 .collect(),
771 dropped_actors: dropped_actors.iter().copied().collect(),
772 actor_splits: actor_splits_to_protobuf(actor_splits),
773 actor_new_dispatchers: actor_new_dispatchers
774 .iter()
775 .map(|(&actor_id, dispatchers)| {
776 (
777 actor_id,
778 PbDispatchers {
779 dispatchers: dispatchers.clone(),
780 },
781 )
782 })
783 .collect(),
784 actor_cdc_table_snapshot_splits:
785 build_pb_actor_cdc_table_snapshot_splits_with_generation(
786 actor_cdc_table_snapshot_splits.clone(),
787 )
788 .into(),
789 sink_add_columns: sink_add_columns
790 .iter()
791 .map(|(sink_id, add_columns)| {
792 (
793 *sink_id,
794 PbSinkAddColumns {
795 fields: add_columns.iter().map(|field| field.to_prost()).collect(),
796 },
797 )
798 })
799 .collect(),
800 }),
801 Mutation::Add(AddMutation {
802 adds,
803 added_actors,
804 splits,
805 pause,
806 subscriptions_to_add,
807 backfill_nodes_to_pause,
808 actor_cdc_table_snapshot_splits,
809 new_upstream_sinks,
810 }) => PbMutation::Add(PbAddMutation {
811 actor_dispatchers: adds
812 .iter()
813 .map(|(&actor_id, dispatchers)| {
814 (
815 actor_id,
816 PbDispatchers {
817 dispatchers: dispatchers.clone(),
818 },
819 )
820 })
821 .collect(),
822 added_actors: added_actors.iter().copied().collect(),
823 actor_splits: actor_splits_to_protobuf(splits),
824 pause: *pause,
825 subscriptions_to_add: subscriptions_to_add
826 .iter()
827 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
828 subscriber_id: *subscriber_id,
829 upstream_mv_table_id: *table_id,
830 })
831 .collect(),
832 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
833 actor_cdc_table_snapshot_splits:
834 build_pb_actor_cdc_table_snapshot_splits_with_generation(
835 actor_cdc_table_snapshot_splits.clone(),
836 )
837 .into(),
838 new_upstream_sinks: new_upstream_sinks
839 .iter()
840 .map(|(k, v)| (*k, v.clone()))
841 .collect(),
842 }),
843 Mutation::SourceChangeSplit(changes) => {
844 PbMutation::Splits(PbSourceChangeSplitMutation {
845 actor_splits: changes
846 .iter()
847 .map(|(&actor_id, splits)| {
848 (
849 actor_id,
850 ConnectorSplits {
851 splits: splits
852 .clone()
853 .iter()
854 .map(ConnectorSplit::from)
855 .collect(),
856 },
857 )
858 })
859 .collect(),
860 })
861 }
862 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
863 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
864 Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
865 actor_throttle: changes
866 .iter()
867 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
868 .collect(),
869 }),
870 Mutation::DropSubscriptions {
871 subscriptions_to_drop,
872 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
873 info: subscriptions_to_drop
874 .iter()
875 .map(
876 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
877 subscriber_id: *subscriber_id,
878 upstream_mv_table_id: *upstream_mv_table_id,
879 },
880 )
881 .collect(),
882 }),
883 Mutation::ConnectorPropsChange(map) => {
884 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
885 connector_props_infos: map
886 .iter()
887 .map(|(actor_id, options)| {
888 (
889 *actor_id,
890 ConnectorPropsInfo {
891 connector_props_info: options
892 .iter()
893 .map(|(k, v)| (k.clone(), v.clone()))
894 .collect(),
895 },
896 )
897 })
898 .collect(),
899 })
900 }
901 Mutation::StartFragmentBackfill { fragment_ids } => {
902 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
903 fragment_ids: fragment_ids.iter().copied().collect(),
904 })
905 }
906 Mutation::RefreshStart {
907 table_id,
908 associated_source_id,
909 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
910 table_id: *table_id,
911 associated_source_id: *associated_source_id,
912 }),
913 Mutation::ListFinish {
914 associated_source_id,
915 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
916 associated_source_id: *associated_source_id,
917 }),
918 Mutation::LoadFinish {
919 associated_source_id,
920 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
921 associated_source_id: *associated_source_id,
922 }),
923 }
924 }
925
926 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
927 let mutation = match prost {
928 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
929 dropped_actors: stop.actors.iter().copied().collect(),
930 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
931 }),
932
933 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
934 dispatchers: update
935 .dispatcher_update
936 .iter()
937 .map(|u| (u.actor_id, u.clone()))
938 .into_group_map(),
939 merges: update
940 .merge_update
941 .iter()
942 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
943 .collect(),
944 vnode_bitmaps: update
945 .actor_vnode_bitmap_update
946 .iter()
947 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
948 .collect(),
949 dropped_actors: update.dropped_actors.iter().copied().collect(),
950 actor_splits: update
951 .actor_splits
952 .iter()
953 .map(|(&actor_id, splits)| {
954 (
955 actor_id,
956 splits
957 .splits
958 .iter()
959 .map(|split| split.try_into().unwrap())
960 .collect(),
961 )
962 })
963 .collect(),
964 actor_new_dispatchers: update
965 .actor_new_dispatchers
966 .iter()
967 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
968 .collect(),
969 actor_cdc_table_snapshot_splits:
970 build_actor_cdc_table_snapshot_splits_with_generation(
971 update
972 .actor_cdc_table_snapshot_splits
973 .clone()
974 .unwrap_or_default(),
975 ),
976 sink_add_columns: update
977 .sink_add_columns
978 .iter()
979 .map(|(sink_id, add_columns)| {
980 (
981 *sink_id,
982 add_columns.fields.iter().map(Field::from_prost).collect(),
983 )
984 })
985 .collect(),
986 }),
987
988 PbMutation::Add(add) => Mutation::Add(AddMutation {
989 adds: add
990 .actor_dispatchers
991 .iter()
992 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
993 .collect(),
994 added_actors: add.added_actors.iter().copied().collect(),
995 splits: add
998 .actor_splits
999 .iter()
1000 .map(|(&actor_id, splits)| {
1001 (
1002 actor_id,
1003 splits
1004 .splits
1005 .iter()
1006 .map(|split| split.try_into().unwrap())
1007 .collect(),
1008 )
1009 })
1010 .collect(),
1011 pause: add.pause,
1012 subscriptions_to_add: add
1013 .subscriptions_to_add
1014 .iter()
1015 .map(
1016 |SubscriptionUpstreamInfo {
1017 subscriber_id,
1018 upstream_mv_table_id,
1019 }| { (*upstream_mv_table_id, *subscriber_id) },
1020 )
1021 .collect(),
1022 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1023 actor_cdc_table_snapshot_splits:
1024 build_actor_cdc_table_snapshot_splits_with_generation(
1025 add.actor_cdc_table_snapshot_splits
1026 .clone()
1027 .unwrap_or_default(),
1028 ),
1029 new_upstream_sinks: add
1030 .new_upstream_sinks
1031 .iter()
1032 .map(|(k, v)| (*k, v.clone()))
1033 .collect(),
1034 }),
1035
1036 PbMutation::Splits(s) => {
1037 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1038 Vec::with_capacity(s.actor_splits.len());
1039 for (&actor_id, splits) in &s.actor_splits {
1040 if !splits.splits.is_empty() {
1041 change_splits.push((
1042 actor_id,
1043 splits
1044 .splits
1045 .iter()
1046 .map(SplitImpl::try_from)
1047 .try_collect()?,
1048 ));
1049 }
1050 }
1051 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1052 }
1053 PbMutation::Pause(_) => Mutation::Pause,
1054 PbMutation::Resume(_) => Mutation::Resume,
1055 PbMutation::Throttle(changes) => Mutation::Throttle(
1056 changes
1057 .actor_throttle
1058 .iter()
1059 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1060 .collect(),
1061 ),
1062 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1063 subscriptions_to_drop: drop
1064 .info
1065 .iter()
1066 .map(|info| (info.subscriber_id, info.upstream_mv_table_id))
1067 .collect(),
1068 },
1069 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1070 Mutation::ConnectorPropsChange(
1071 alter_connector_props
1072 .connector_props_infos
1073 .iter()
1074 .map(|(connector_id, options)| {
1075 (
1076 *connector_id,
1077 options
1078 .connector_props_info
1079 .iter()
1080 .map(|(k, v)| (k.clone(), v.clone()))
1081 .collect(),
1082 )
1083 })
1084 .collect(),
1085 )
1086 }
1087 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1088 Mutation::StartFragmentBackfill {
1089 fragment_ids: start_fragment_backfill
1090 .fragment_ids
1091 .iter()
1092 .copied()
1093 .collect(),
1094 }
1095 }
1096 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1097 table_id: refresh_start.table_id,
1098 associated_source_id: refresh_start.associated_source_id,
1099 },
1100 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1101 associated_source_id: list_finish.associated_source_id,
1102 },
1103 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1104 associated_source_id: load_finish.associated_source_id,
1105 },
1106 };
1107 Ok(mutation)
1108 }
1109}
1110
1111impl<M> BarrierInner<M> {
1112 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1113 let Self {
1114 epoch,
1115 mutation,
1116 kind,
1117 passed_actors,
1118 tracing_context,
1119 ..
1120 } = self;
1121
1122 PbBarrier {
1123 epoch: Some(PbEpoch {
1124 curr: epoch.curr,
1125 prev: epoch.prev,
1126 }),
1127 mutation: Some(PbBarrierMutation {
1128 mutation: barrier_fn(mutation),
1129 }),
1130 tracing_context: tracing_context.to_protobuf(),
1131 kind: *kind as _,
1132 passed_actors: passed_actors.clone(),
1133 }
1134 }
1135
1136 fn from_protobuf_inner(
1137 prost: &PbBarrier,
1138 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1139 ) -> StreamExecutorResult<Self> {
1140 let epoch = prost.get_epoch()?;
1141
1142 Ok(Self {
1143 kind: prost.kind(),
1144 epoch: EpochPair::new(epoch.curr, epoch.prev),
1145 mutation: mutation_from_pb(
1146 prost
1147 .mutation
1148 .as_ref()
1149 .and_then(|mutation| mutation.mutation.as_ref()),
1150 )?,
1151 passed_actors: prost.get_passed_actors().clone(),
1152 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1153 })
1154 }
1155
1156 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1157 BarrierInner {
1158 epoch: self.epoch,
1159 mutation: f(self.mutation),
1160 kind: self.kind,
1161 tracing_context: self.tracing_context,
1162 passed_actors: self.passed_actors,
1163 }
1164 }
1165}
1166
1167impl DispatcherBarrier {
1168 pub fn to_protobuf(&self) -> PbBarrier {
1169 self.to_protobuf_inner(|_| None)
1170 }
1171}
1172
1173impl Barrier {
1174 #[cfg(test)]
1175 pub fn to_protobuf(&self) -> PbBarrier {
1176 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1177 }
1178
1179 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1180 Self::from_protobuf_inner(prost, |mutation| {
1181 mutation
1182 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1183 .transpose()
1184 })
1185 }
1186}
1187
1188#[derive(Debug, PartialEq, Eq, Clone)]
1189pub struct Watermark {
1190 pub col_idx: usize,
1191 pub data_type: DataType,
1192 pub val: ScalarImpl,
1193}
1194
1195impl PartialOrd for Watermark {
1196 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1197 Some(self.cmp(other))
1198 }
1199}
1200
1201impl Ord for Watermark {
1202 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1203 self.val.default_cmp(&other.val)
1204 }
1205}
1206
1207impl Watermark {
1208 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1209 Self {
1210 col_idx,
1211 data_type,
1212 val,
1213 }
1214 }
1215
1216 pub async fn transform_with_expr(
1217 self,
1218 expr: &NonStrictExpression<impl Expression>,
1219 new_col_idx: usize,
1220 ) -> Option<Self> {
1221 let Self { col_idx, val, .. } = self;
1222 let row = {
1223 let mut row = vec![None; col_idx + 1];
1224 row[col_idx] = Some(val);
1225 OwnedRow::new(row)
1226 };
1227 let val = expr.eval_row_infallible(&row).await?;
1228 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1229 }
1230
1231 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1234 output_indices
1235 .iter()
1236 .position(|p| *p == self.col_idx)
1237 .map(|new_col_idx| self.with_idx(new_col_idx))
1238 }
1239
1240 pub fn to_protobuf(&self) -> PbWatermark {
1241 PbWatermark {
1242 column: Some(PbInputRef {
1243 index: self.col_idx as _,
1244 r#type: Some(self.data_type.to_protobuf()),
1245 }),
1246 val: Some(&self.val).to_protobuf().into(),
1247 }
1248 }
1249
1250 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1251 let col_ref = prost.get_column()?;
1252 let data_type = DataType::from(col_ref.get_type()?);
1253 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1254 .expect("watermark value cannot be null");
1255 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1256 }
1257
1258 pub fn with_idx(self, idx: usize) -> Self {
1259 Self::new(idx, self.data_type, self.val)
1260 }
1261}
1262
1263#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1264pub enum MessageInner<M> {
1265 Chunk(StreamChunk),
1266 Barrier(BarrierInner<M>),
1267 Watermark(Watermark),
1268}
1269
1270impl<M> MessageInner<M> {
1271 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1272 match self {
1273 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1274 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1275 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1276 }
1277 }
1278}
1279
1280pub type Message = MessageInner<BarrierMutationType>;
1281pub type DispatcherMessage = MessageInner<()>;
1282
1283#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1286pub enum MessageBatchInner<M> {
1287 Chunk(StreamChunk),
1288 BarrierBatch(Vec<BarrierInner<M>>),
1289 Watermark(Watermark),
1290}
1291pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1292pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1293pub type DispatcherMessageBatch = MessageBatchInner<()>;
1294
1295impl From<DispatcherMessage> for DispatcherMessageBatch {
1296 fn from(m: DispatcherMessage) -> Self {
1297 match m {
1298 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1299 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1300 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1301 }
1302 }
1303}
1304
1305impl From<StreamChunk> for Message {
1306 fn from(chunk: StreamChunk) -> Self {
1307 Message::Chunk(chunk)
1308 }
1309}
1310
1311impl<'a> TryFrom<&'a Message> for &'a Barrier {
1312 type Error = ();
1313
1314 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1315 match m {
1316 Message::Chunk(_) => Err(()),
1317 Message::Barrier(b) => Ok(b),
1318 Message::Watermark(_) => Err(()),
1319 }
1320 }
1321}
1322
1323impl Message {
1324 #[cfg(test)]
1329 pub fn is_stop(&self) -> bool {
1330 matches!(
1331 self,
1332 Message::Barrier(Barrier {
1333 mutation,
1334 ..
1335 }) if mutation.as_ref().unwrap().is_stop()
1336 )
1337 }
1338}
1339
1340impl DispatcherMessageBatch {
1341 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1342 let prost = match self {
1343 Self::Chunk(stream_chunk) => {
1344 let prost_stream_chunk = stream_chunk.to_protobuf();
1345 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1346 }
1347 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1348 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1349 }),
1350 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1351 };
1352 PbStreamMessageBatch {
1353 stream_message_batch: Some(prost),
1354 }
1355 }
1356
1357 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1358 let res = match prost.get_stream_message_batch()? {
1359 StreamMessageBatch::StreamChunk(chunk) => {
1360 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1361 }
1362 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1363 let barriers = barrier_batch
1364 .barriers
1365 .iter()
1366 .map(|barrier| {
1367 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1368 if mutation.is_some() {
1369 if cfg!(debug_assertions) {
1370 panic!("should not receive message of barrier with mutation");
1371 } else {
1372 warn!(?barrier, "receive message of barrier with mutation");
1373 }
1374 }
1375 Ok(())
1376 })
1377 })
1378 .try_collect()?;
1379 Self::BarrierBatch(barriers)
1380 }
1381 StreamMessageBatch::Watermark(watermark) => {
1382 Self::Watermark(Watermark::from_protobuf(watermark)?)
1383 }
1384 };
1385 Ok(res)
1386 }
1387
1388 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1389 ::prost::Message::encoded_len(msg)
1390 }
1391}
1392
1393pub type StreamKey = Vec<usize>;
1394pub type StreamKeyRef<'a> = &'a [usize];
1395pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1396
1397pub async fn expect_first_barrier<M: Debug>(
1399 stream: &mut (impl MessageStreamInner<M> + Unpin),
1400) -> StreamExecutorResult<BarrierInner<M>> {
1401 let message = stream
1402 .next()
1403 .instrument_await("expect_first_barrier")
1404 .await
1405 .context("failed to extract the first message: stream closed unexpectedly")??;
1406 let barrier = message
1407 .into_barrier()
1408 .expect("the first message must be a barrier");
1409 assert!(matches!(
1411 barrier.kind,
1412 BarrierKind::Checkpoint | BarrierKind::Initial
1413 ));
1414 Ok(barrier)
1415}
1416
1417pub async fn expect_first_barrier_from_aligned_stream(
1419 stream: &mut (impl AlignedMessageStream + Unpin),
1420) -> StreamExecutorResult<Barrier> {
1421 let message = stream
1422 .next()
1423 .instrument_await("expect_first_barrier")
1424 .await
1425 .context("failed to extract the first message: stream closed unexpectedly")??;
1426 let barrier = message
1427 .into_barrier()
1428 .expect("the first message must be a barrier");
1429 Ok(barrier)
1430}
1431
1432pub trait StreamConsumer: Send + 'static {
1434 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1435
1436 fn execute(self: Box<Self>) -> Self::BarrierStream;
1437}
1438
1439type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1440
1441pub struct DynamicReceivers<InputId, M> {
1445 barrier: Option<BarrierInner<M>>,
1447 start_ts: Option<Instant>,
1449 blocked: Vec<BoxedMessageInput<InputId, M>>,
1451 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1453 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1455 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1457 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1459}
1460
1461impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1462 for DynamicReceivers<InputId, M>
1463{
1464 type Item = MessageStreamItemInner<M>;
1465
1466 fn poll_next(
1467 mut self: Pin<&mut Self>,
1468 cx: &mut std::task::Context<'_>,
1469 ) -> Poll<Option<Self::Item>> {
1470 if self.is_empty() {
1471 return Poll::Ready(None);
1472 }
1473
1474 loop {
1475 match futures::ready!(self.active.poll_next_unpin(cx)) {
1476 Some((Some(Err(e)), _)) => {
1478 return Poll::Ready(Some(Err(e)));
1479 }
1480 Some((Some(Ok(message)), remaining)) => {
1482 let input_id = remaining.id();
1483 match message {
1484 MessageInner::Chunk(chunk) => {
1485 self.active.push(remaining.into_future());
1487 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1488 }
1489 MessageInner::Watermark(watermark) => {
1490 self.active.push(remaining.into_future());
1492 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1493 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1494 }
1495 }
1496 MessageInner::Barrier(barrier) => {
1497 if self.blocked.is_empty() {
1499 self.start_ts = Some(Instant::now());
1500 }
1501 self.blocked.push(remaining);
1502 if let Some(current_barrier) = self.barrier.as_ref() {
1503 if current_barrier.epoch != barrier.epoch {
1504 return Poll::Ready(Some(Err(
1505 StreamExecutorError::align_barrier(
1506 current_barrier.clone().map_mutation(|_| None),
1507 barrier.map_mutation(|_| None),
1508 ),
1509 )));
1510 }
1511 } else {
1512 self.barrier = Some(barrier);
1513 }
1514 }
1515 }
1516 }
1517 Some((None, remaining)) => {
1525 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1526 "upstream input {:?} unexpectedly closed",
1527 remaining.id()
1528 )))));
1529 }
1530 None => {
1532 assert!(!self.blocked.is_empty());
1533
1534 let start_ts = self
1535 .start_ts
1536 .take()
1537 .expect("should have received at least one barrier");
1538 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1539 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1540 }
1541 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1542 merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1544 }
1545
1546 break;
1547 }
1548 }
1549 }
1550
1551 assert!(self.active.is_terminated());
1552
1553 let barrier = self.barrier.take().unwrap();
1554
1555 let upstreams = std::mem::take(&mut self.blocked);
1556 self.extend_active(upstreams);
1557 assert!(!self.active.is_terminated());
1558
1559 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1560 }
1561}
1562
1563impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1564 pub fn new(
1565 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1566 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1567 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1568 ) -> Self {
1569 let mut this = Self {
1570 barrier: None,
1571 start_ts: None,
1572 blocked: Vec::with_capacity(upstreams.len()),
1573 active: Default::default(),
1574 buffered_watermarks: Default::default(),
1575 merge_barrier_align_duration,
1576 barrier_align_duration,
1577 };
1578 this.extend_active(upstreams);
1579 this
1580 }
1581
1582 pub fn extend_active(
1585 &mut self,
1586 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1587 ) {
1588 assert!(self.blocked.is_empty() && self.barrier.is_none());
1589
1590 self.active
1591 .extend(upstreams.into_iter().map(|s| s.into_future()));
1592 }
1593
1594 pub fn handle_watermark(
1596 &mut self,
1597 input_id: InputId,
1598 watermark: Watermark,
1599 ) -> Option<Watermark> {
1600 let col_idx = watermark.col_idx;
1601 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1603 let watermarks = self
1604 .buffered_watermarks
1605 .entry(col_idx)
1606 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1607 watermarks.handle_watermark(input_id, watermark)
1608 }
1609
1610 pub fn add_upstreams_from(
1613 &mut self,
1614 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1615 ) {
1616 assert!(self.blocked.is_empty() && self.barrier.is_none());
1617
1618 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1619 let input_ids = new_inputs.iter().map(|input| input.id());
1620 self.buffered_watermarks.values_mut().for_each(|buffers| {
1621 buffers.add_buffers(input_ids.clone());
1623 });
1624 self.active
1625 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1626 }
1627
1628 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1632 assert!(self.blocked.is_empty() && self.barrier.is_none());
1633
1634 let new_upstreams = std::mem::take(&mut self.active)
1635 .into_iter()
1636 .map(|s| s.into_inner().unwrap())
1637 .filter(|u| !upstream_input_ids.contains(&u.id()));
1638 self.extend_active(new_upstreams);
1639 self.buffered_watermarks.values_mut().for_each(|buffers| {
1640 buffers.remove_buffer(upstream_input_ids.clone());
1643 });
1644 }
1645
1646 pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1647 self.merge_barrier_align_duration.clone()
1648 }
1649
1650 pub fn flush_buffered_watermarks(&mut self) {
1651 self.buffered_watermarks
1652 .values_mut()
1653 .for_each(|buffers| buffers.clear());
1654 }
1655
1656 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1657 self.blocked
1658 .iter()
1659 .map(|s| s.id())
1660 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1661 }
1662
1663 pub fn is_empty(&self) -> bool {
1664 self.blocked.is_empty() && self.active.is_empty()
1665 }
1666}
1667
1668pub(crate) struct DispatchBarrierBuffer {
1689 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1690 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1691 recv_state: BarrierReceiverState,
1692 curr_upstream_fragment_id: FragmentId,
1693 actor_id: ActorId,
1694 build_input_ctx: Arc<BuildInputContext>,
1696}
1697
1698struct BuildInputContext {
1699 pub actor_id: ActorId,
1700 pub local_barrier_manager: LocalBarrierManager,
1701 pub metrics: Arc<StreamingMetrics>,
1702 pub fragment_id: FragmentId,
1703}
1704
1705type BoxedNewInputsFuture =
1706 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1707
1708enum BarrierReceiverState {
1709 ReceivingBarrier,
1710 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1711}
1712
1713impl DispatchBarrierBuffer {
1714 pub fn new(
1715 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1716 actor_id: ActorId,
1717 curr_upstream_fragment_id: FragmentId,
1718 local_barrier_manager: LocalBarrierManager,
1719 metrics: Arc<StreamingMetrics>,
1720 fragment_id: FragmentId,
1721 ) -> Self {
1722 Self {
1723 buffer: VecDeque::new(),
1724 barrier_rx,
1725 recv_state: BarrierReceiverState::ReceivingBarrier,
1726 curr_upstream_fragment_id,
1727 actor_id,
1728 build_input_ctx: Arc::new(BuildInputContext {
1729 actor_id,
1730 local_barrier_manager,
1731 metrics,
1732 fragment_id,
1733 }),
1734 }
1735 }
1736
1737 pub async fn await_next_message(
1738 &mut self,
1739 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1740 metrics: &ActorInputMetrics,
1741 ) -> StreamExecutorResult<DispatcherMessage> {
1742 let mut start_time = Instant::now();
1743 let interval_duration = Duration::from_secs(15);
1744 let mut interval =
1745 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1746
1747 loop {
1748 tokio::select! {
1749 biased;
1750 msg = stream.try_next() => {
1751 metrics
1752 .actor_input_buffer_blocking_duration_ns
1753 .inc_by(start_time.elapsed().as_nanos() as u64);
1754 return msg?.ok_or_else(
1755 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1756 );
1757 }
1758
1759 e = self.continuously_fetch_barrier_rx() => {
1760 return Err(e);
1761 }
1762
1763 _ = interval.tick() => {
1764 start_time = Instant::now();
1765 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1766 continue;
1767 }
1768 }
1769 }
1770 }
1771
1772 pub async fn pop_barrier_with_inputs(
1773 &mut self,
1774 barrier: DispatcherBarrier,
1775 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1776 while self.buffer.is_empty() {
1777 self.try_fetch_barrier_rx(false).await?;
1778 }
1779 let (mut recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1780 apply_dispatcher_barrier(&mut recv_barrier, barrier);
1781
1782 Ok((recv_barrier, inputs))
1783 }
1784
1785 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1786 loop {
1787 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1788 return e;
1789 }
1790 }
1791 }
1792
1793 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1794 match &mut self.recv_state {
1795 BarrierReceiverState::ReceivingBarrier => {
1796 let Some(barrier) = self.barrier_rx.recv().await else {
1797 if pending_on_end {
1798 return pending().await;
1799 } else {
1800 return Err(StreamExecutorError::channel_closed(
1801 "barrier channel closed unexpectedly",
1802 ));
1803 }
1804 };
1805 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1806 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1807 } else {
1808 self.buffer.push_back((barrier, None));
1809 }
1810 }
1811 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1812 let new_inputs = fut.await?;
1813 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1814 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1815 }
1816 }
1817 Ok(())
1818 }
1819
1820 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1821 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1822 && !update.added_upstream_actors.is_empty()
1823 {
1824 let upstream_fragment_id =
1826 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1827 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1828 new_upstream_fragment_id
1829 } else {
1830 self.curr_upstream_fragment_id
1831 };
1832 let ctx = self.build_input_ctx.clone();
1833 let added_upstream_actors = update.added_upstream_actors.clone();
1834 let barrier = barrier.clone();
1835 let fut = async move {
1836 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1837 let mut new_input = new_input(
1838 &ctx.local_barrier_manager,
1839 ctx.metrics.clone(),
1840 ctx.actor_id,
1841 ctx.fragment_id,
1842 upstream_actor,
1843 upstream_fragment_id,
1844 )
1845 .await?;
1846
1847 let first_barrier = expect_first_barrier(&mut new_input).await?;
1850 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1851
1852 StreamExecutorResult::Ok(new_input)
1853 }))
1854 .await
1855 }
1856 .boxed();
1857
1858 Some(fut)
1859 } else {
1860 None
1861 }
1862 }
1863}