1mod prelude;
16
17use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
18use std::fmt::Debug;
19use std::future::pending;
20use std::hash::Hash;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::Poll;
24use std::vec;
25
26use await_tree::InstrumentAwait;
27use enum_as_inner::EnumAsInner;
28use futures::future::try_join_all;
29use futures::stream::{BoxStream, FusedStream, FuturesUnordered, StreamFuture};
30use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
31use itertools::Itertools;
32use prometheus::Histogram;
33use prometheus::core::{AtomicU64, GenericCounter};
34use risingwave_common::array::StreamChunk;
35use risingwave_common::bitmap::Bitmap;
36use risingwave_common::catalog::{Field, Schema, TableId};
37use risingwave_common::metrics::LabelGuardedMetric;
38use risingwave_common::row::OwnedRow;
39use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl};
40use risingwave_common::util::epoch::{Epoch, EpochPair};
41use risingwave_common::util::tracing::TracingContext;
42use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
43use risingwave_connector::source::SplitImpl;
44use risingwave_expr::expr::{Expression, NonStrictExpression};
45use risingwave_pb::data::PbEpoch;
46use risingwave_pb::expr::PbInputRef;
47use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
48use risingwave_pb::stream_plan::barrier::BarrierKind;
49use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
50use risingwave_pb::stream_plan::stream_node::PbStreamKind;
51use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
52use risingwave_pb::stream_plan::{
53 PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessageBatch, PbWatermark,
54 SubscriptionUpstreamInfo,
55};
56use smallvec::SmallVec;
57use tokio::sync::mpsc;
58use tokio::time::{Duration, Instant};
59
60use crate::error::StreamResult;
61use crate::executor::exchange::input::{
62 BoxedActorInput, BoxedInput, apply_dispatcher_barrier, assert_equal_dispatcher_barrier,
63 new_input,
64};
65use crate::executor::monitor::ActorInputMetrics;
66use crate::executor::prelude::StreamingMetrics;
67use crate::executor::watermark::BufferedWatermarks;
68use crate::task::{ActorId, FragmentId, LocalBarrierManager};
69
70mod actor;
71mod barrier_align;
72pub mod exchange;
73pub mod monitor;
74
75pub mod aggregate;
76pub mod asof_join;
77mod backfill;
78mod barrier_recv;
79mod batch_query;
80mod chain;
81mod changelog;
82mod dedup;
83mod dispatch;
84pub mod dml;
85mod dynamic_filter;
86pub mod eowc;
87pub mod error;
88mod expand;
89mod filter;
90mod gap_fill;
91pub mod hash_join;
92mod hop_window;
93mod join;
94pub mod locality_provider;
95mod lookup;
96mod lookup_union;
97mod merge;
98mod mview;
99mod nested_loop_temporal_join;
100mod no_op;
101mod now;
102mod over_window;
103pub mod project;
104mod rearranged_chain;
105mod receiver;
106pub mod row_id_gen;
107mod sink;
108pub mod source;
109mod stream_reader;
110pub mod subtask;
111mod temporal_join;
112mod top_n;
113mod troublemaker;
114mod union;
115mod upstream_sink_union;
116mod values;
117mod watermark;
118mod watermark_filter;
119mod wrapper;
120
121mod approx_percentile;
122
123mod row_merge;
124
125#[cfg(test)]
126mod integration_tests;
127mod sync_kv_log_store;
128#[cfg(any(test, feature = "test"))]
129pub mod test_utils;
130mod utils;
131mod vector_index;
132
133pub use actor::{Actor, ActorContext, ActorContextRef};
134use anyhow::Context;
135pub use approx_percentile::global::GlobalApproxPercentileExecutor;
136pub use approx_percentile::local::LocalApproxPercentileExecutor;
137pub use backfill::arrangement_backfill::*;
138pub use backfill::cdc::{
139 CdcBackfillExecutor, ExternalStorageTable, ParallelizedCdcBackfillExecutor,
140};
141pub use backfill::no_shuffle_backfill::*;
142pub use backfill::snapshot_backfill::*;
143pub use barrier_recv::BarrierRecvExecutor;
144pub use batch_query::BatchQueryExecutor;
145pub use chain::ChainExecutor;
146pub use changelog::ChangeLogExecutor;
147pub use dedup::AppendOnlyDedupExecutor;
148pub use dispatch::{DispatchExecutor, DispatcherImpl};
149pub use dynamic_filter::DynamicFilterExecutor;
150pub use error::{StreamExecutorError, StreamExecutorResult};
151pub use expand::ExpandExecutor;
152pub use filter::{FilterExecutor, UpsertFilterExecutor};
153pub use gap_fill::{GapFillExecutor, GapFillExecutorArgs};
154pub use hash_join::*;
155pub use hop_window::HopWindowExecutor;
156pub use join::row::{CachedJoinRow, CpuEncoding, JoinEncoding, MemoryEncoding};
157pub use join::{AsOfDesc, AsOfJoinType, JoinType};
158pub use lookup::*;
159pub use lookup_union::LookupUnionExecutor;
160pub use merge::MergeExecutor;
161pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream};
162pub use mview::*;
163pub use nested_loop_temporal_join::NestedLoopTemporalJoinExecutor;
164pub use no_op::NoOpExecutor;
165pub use now::*;
166pub use over_window::*;
167pub use rearranged_chain::RearrangedChainExecutor;
168pub use receiver::ReceiverExecutor;
169use risingwave_common::id::SourceId;
170pub use row_merge::RowMergeExecutor;
171pub use sink::SinkExecutor;
172pub use sync_kv_log_store::SyncedKvLogStoreExecutor;
173pub use sync_kv_log_store::metrics::SyncedKvLogStoreMetrics;
174pub use temporal_join::TemporalJoinExecutor;
175pub use top_n::{
176 AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor,
177};
178pub use troublemaker::TroublemakerExecutor;
179pub use union::UnionExecutor;
180pub use upstream_sink_union::{UpstreamFragmentInfo, UpstreamSinkUnionExecutor};
181pub use utils::DummyExecutor;
182pub use values::ValuesExecutor;
183pub use vector_index::VectorIndexWriteExecutor;
184pub use watermark_filter::WatermarkFilterExecutor;
185pub use wrapper::WrapperExecutor;
186
187use self::barrier_align::AlignedMessageStream;
188
189pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
190pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
191pub type DispatcherMessageStreamItem = StreamExecutorResult<DispatcherMessage>;
192pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;
193
194pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
195use risingwave_connector::sink::catalog::SinkId;
196use risingwave_connector::source::cdc::{
197 CdcTableSnapshotSplitAssignmentWithGeneration,
198 build_actor_cdc_table_snapshot_splits_with_generation,
199};
200use risingwave_pb::stream_plan::stream_message_batch::{BarrierBatch, StreamMessageBatch};
201
202pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
203pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
204pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;
205
206#[derive(Debug, Default, Clone)]
208pub struct ExecutorInfo {
209 pub schema: Schema,
211
212 pub stream_key: StreamKey,
214
215 pub stream_kind: PbStreamKind,
217
218 pub identity: String,
220
221 pub id: u64,
223}
224
225impl ExecutorInfo {
226 pub fn for_test(schema: Schema, stream_key: StreamKey, identity: String, id: u64) -> Self {
227 Self {
228 schema,
229 stream_key,
230 stream_kind: PbStreamKind::Retract, identity,
232 id,
233 }
234 }
235}
236
237pub trait Execute: Send + 'static {
239 fn execute(self: Box<Self>) -> BoxedMessageStream;
240
241 fn execute_with_epoch(self: Box<Self>, _epoch: u64) -> BoxedMessageStream {
242 self.execute()
243 }
244
245 fn boxed(self) -> Box<dyn Execute>
246 where
247 Self: Sized + Send + 'static,
248 {
249 Box::new(self)
250 }
251}
252
253pub struct Executor {
256 info: ExecutorInfo,
257 execute: Box<dyn Execute>,
258}
259
260impl Executor {
261 pub fn new(info: ExecutorInfo, execute: Box<dyn Execute>) -> Self {
262 Self { info, execute }
263 }
264
265 pub fn info(&self) -> &ExecutorInfo {
266 &self.info
267 }
268
269 pub fn schema(&self) -> &Schema {
270 &self.info.schema
271 }
272
273 pub fn stream_key(&self) -> StreamKeyRef<'_> {
274 &self.info.stream_key
275 }
276
277 pub fn stream_kind(&self) -> PbStreamKind {
278 self.info.stream_kind
279 }
280
281 pub fn identity(&self) -> &str {
282 &self.info.identity
283 }
284
285 pub fn execute(self) -> BoxedMessageStream {
286 self.execute.execute()
287 }
288
289 pub fn execute_with_epoch(self, epoch: u64) -> BoxedMessageStream {
290 self.execute.execute_with_epoch(epoch)
291 }
292}
293
294impl std::fmt::Debug for Executor {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.write_str(self.identity())
297 }
298}
299
300impl From<(ExecutorInfo, Box<dyn Execute>)> for Executor {
301 fn from((info, execute): (ExecutorInfo, Box<dyn Execute>)) -> Self {
302 Self::new(info, execute)
303 }
304}
305
306impl<E> From<(ExecutorInfo, E)> for Executor
307where
308 E: Execute,
309{
310 fn from((info, execute): (ExecutorInfo, E)) -> Self {
311 Self::new(info, execute.boxed())
312 }
313}
314
315pub const INVALID_EPOCH: u64 = 0;
316
317type UpstreamFragmentId = FragmentId;
318type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;
319
320#[derive(Debug, Clone, PartialEq)]
321#[cfg_attr(any(test, feature = "test"), derive(Default))]
322pub struct UpdateMutation {
323 pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
324 pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
325 pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
326 pub dropped_actors: HashSet<ActorId>,
327 pub actor_splits: SplitAssignments,
328 pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
329 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
330 pub sink_add_columns: HashMap<SinkId, Vec<Field>>,
331}
332
333#[derive(Debug, Clone, PartialEq)]
334#[cfg_attr(any(test, feature = "test"), derive(Default))]
335pub struct AddMutation {
336 pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
337 pub added_actors: HashSet<ActorId>,
338 pub splits: SplitAssignments,
340 pub pause: bool,
341 pub subscriptions_to_add: Vec<(TableId, u32)>,
343 pub backfill_nodes_to_pause: HashSet<FragmentId>,
345 pub actor_cdc_table_snapshot_splits: CdcTableSnapshotSplitAssignmentWithGeneration,
346 pub new_upstream_sinks: HashMap<FragmentId, PbNewUpstreamSink>,
347}
348
349#[derive(Debug, Clone, PartialEq)]
350#[cfg_attr(any(test, feature = "test"), derive(Default))]
351pub struct StopMutation {
352 pub dropped_actors: HashSet<ActorId>,
353 pub dropped_sink_fragments: HashSet<FragmentId>,
354}
355
356#[derive(Debug, Clone, PartialEq)]
358pub enum Mutation {
359 Stop(StopMutation),
360 Update(UpdateMutation),
361 Add(AddMutation),
362 SourceChangeSplit(SplitAssignments),
363 Pause,
364 Resume,
365 Throttle(HashMap<ActorId, Option<u32>>),
366 ConnectorPropsChange(HashMap<u32, HashMap<String, String>>),
367 DropSubscriptions {
368 subscriptions_to_drop: Vec<(u32, TableId)>,
370 },
371 StartFragmentBackfill {
372 fragment_ids: HashSet<FragmentId>,
373 },
374 RefreshStart {
375 table_id: TableId,
376 associated_source_id: SourceId,
377 },
378 ListFinish {
379 associated_source_id: SourceId,
380 },
381 LoadFinish {
382 associated_source_id: SourceId,
383 },
384}
385
386#[derive(Debug, Clone)]
391pub struct BarrierInner<M> {
392 pub epoch: EpochPair,
393 pub mutation: M,
394 pub kind: BarrierKind,
395
396 pub tracing_context: TracingContext,
398
399 pub passed_actors: Vec<ActorId>,
401}
402
403pub type BarrierMutationType = Option<Arc<Mutation>>;
404pub type Barrier = BarrierInner<BarrierMutationType>;
405pub type DispatcherBarrier = BarrierInner<()>;
406
407impl<M: Default> BarrierInner<M> {
408 pub fn new_test_barrier(epoch: u64) -> Self {
410 Self {
411 epoch: EpochPair::new_test_epoch(epoch),
412 kind: BarrierKind::Checkpoint,
413 tracing_context: TracingContext::none(),
414 mutation: Default::default(),
415 passed_actors: Default::default(),
416 }
417 }
418
419 pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
420 Self {
421 epoch: EpochPair::new(epoch, prev_epoch),
422 kind: BarrierKind::Checkpoint,
423 tracing_context: TracingContext::none(),
424 mutation: Default::default(),
425 passed_actors: Default::default(),
426 }
427 }
428}
429
430impl Barrier {
431 pub fn into_dispatcher(self) -> DispatcherBarrier {
432 DispatcherBarrier {
433 epoch: self.epoch,
434 mutation: (),
435 kind: self.kind,
436 tracing_context: self.tracing_context,
437 passed_actors: self.passed_actors,
438 }
439 }
440
441 #[must_use]
442 pub fn with_mutation(self, mutation: Mutation) -> Self {
443 Self {
444 mutation: Some(Arc::new(mutation)),
445 ..self
446 }
447 }
448
449 #[must_use]
450 pub fn with_stop(self) -> Self {
451 self.with_mutation(Mutation::Stop(StopMutation {
452 dropped_actors: Default::default(),
453 dropped_sink_fragments: Default::default(),
454 }))
455 }
456
457 pub fn is_with_stop_mutation(&self) -> bool {
459 matches!(self.mutation.as_deref(), Some(Mutation::Stop(_)))
460 }
461
462 pub fn is_stop(&self, actor_id: ActorId) -> bool {
464 self.all_stop_actors()
465 .is_some_and(|actors| actors.contains(&actor_id))
466 }
467
468 pub fn is_checkpoint(&self) -> bool {
469 self.kind == BarrierKind::Checkpoint
470 }
471
472 pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
482 match self.mutation.as_deref()? {
483 Mutation::Update(UpdateMutation { actor_splits, .. })
484 | Mutation::Add(AddMutation {
485 splits: actor_splits,
486 ..
487 }) => actor_splits.get(&actor_id),
488
489 _ => {
490 if cfg!(debug_assertions) {
491 panic!(
492 "the initial mutation of the barrier should not be {:?}",
493 self.mutation
494 );
495 }
496 None
497 }
498 }
499 .map(|s| s.as_slice())
500 }
501
502 pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
504 match self.mutation.as_deref() {
505 Some(Mutation::Stop(StopMutation { dropped_actors, .. })) => Some(dropped_actors),
506 Some(Mutation::Update(UpdateMutation { dropped_actors, .. })) => Some(dropped_actors),
507 _ => None,
508 }
509 }
510
511 pub fn is_newly_added(&self, actor_id: ActorId) -> bool {
517 match self.mutation.as_deref() {
518 Some(Mutation::Add(AddMutation { added_actors, .. })) => {
519 added_actors.contains(&actor_id)
520 }
521 _ => false,
522 }
523 }
524
525 pub fn should_start_fragment_backfill(&self, fragment_id: FragmentId) -> bool {
526 if let Some(Mutation::StartFragmentBackfill { fragment_ids }) = self.mutation.as_deref() {
527 fragment_ids.contains(&fragment_id)
528 } else {
529 false
530 }
531 }
532
533 pub fn has_more_downstream_fragments(&self, upstream_actor_id: ActorId) -> bool {
551 let Some(mutation) = self.mutation.as_deref() else {
552 return false;
553 };
554 match mutation {
555 Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(),
557 Mutation::Update(_)
558 | Mutation::Stop(_)
559 | Mutation::Pause
560 | Mutation::Resume
561 | Mutation::SourceChangeSplit(_)
562 | Mutation::Throttle(_)
563 | Mutation::DropSubscriptions { .. }
564 | Mutation::ConnectorPropsChange(_)
565 | Mutation::StartFragmentBackfill { .. }
566 | Mutation::RefreshStart { .. }
567 | Mutation::ListFinish { .. }
568 | Mutation::LoadFinish { .. } => false,
569 }
570 }
571
572 pub fn is_pause_on_startup(&self) -> bool {
574 match self.mutation.as_deref() {
575 Some(Mutation::Add(AddMutation { pause, .. })) => *pause,
576 _ => false,
577 }
578 }
579
580 pub fn is_backfill_pause_on_startup(&self, backfill_fragment_id: FragmentId) -> bool {
581 match self.mutation.as_deref() {
582 Some(Mutation::Add(AddMutation {
583 backfill_nodes_to_pause,
584 ..
585 })) => backfill_nodes_to_pause.contains(&backfill_fragment_id),
586 Some(Mutation::Update(_)) => false,
587 _ => {
588 tracing::warn!(
589 "expected an AddMutation or UpdateMutation on Startup, instead got {:?}",
590 self
591 );
592 false
593 }
594 }
595 }
596
597 pub fn is_resume(&self) -> bool {
599 matches!(self.mutation.as_deref(), Some(Mutation::Resume))
600 }
601
602 pub fn as_update_merge(
605 &self,
606 actor_id: ActorId,
607 upstream_fragment_id: UpstreamFragmentId,
608 ) -> Option<&MergeUpdate> {
609 self.mutation
610 .as_deref()
611 .and_then(|mutation| match mutation {
612 Mutation::Update(UpdateMutation { merges, .. }) => {
613 merges.get(&(actor_id, upstream_fragment_id))
614 }
615 _ => None,
616 })
617 }
618
619 pub fn as_new_upstream_sink(&self, fragment_id: FragmentId) -> Option<&PbNewUpstreamSink> {
622 self.mutation
623 .as_deref()
624 .and_then(|mutation| match mutation {
625 Mutation::Add(AddMutation {
626 new_upstream_sinks, ..
627 }) => new_upstream_sinks.get(&fragment_id),
628 _ => None,
629 })
630 }
631
632 pub fn as_dropped_upstream_sinks(&self) -> Option<&HashSet<FragmentId>> {
634 self.mutation
635 .as_deref()
636 .and_then(|mutation| match mutation {
637 Mutation::Stop(StopMutation {
638 dropped_sink_fragments,
639 ..
640 }) => Some(dropped_sink_fragments),
641 _ => None,
642 })
643 }
644
645 pub fn as_update_vnode_bitmap(&self, actor_id: ActorId) -> Option<Arc<Bitmap>> {
651 self.mutation
652 .as_deref()
653 .and_then(|mutation| match mutation {
654 Mutation::Update(UpdateMutation { vnode_bitmaps, .. }) => {
655 vnode_bitmaps.get(&actor_id).cloned()
656 }
657 _ => None,
658 })
659 }
660
661 pub fn as_sink_add_columns(&self, sink_id: SinkId) -> Option<Vec<Field>> {
662 self.mutation
663 .as_deref()
664 .and_then(|mutation| match mutation {
665 Mutation::Update(UpdateMutation {
666 sink_add_columns, ..
667 }) => sink_add_columns.get(&sink_id).cloned(),
668 _ => None,
669 })
670 }
671
672 pub fn get_curr_epoch(&self) -> Epoch {
673 Epoch(self.epoch.curr)
674 }
675
676 pub fn tracing_context(&self) -> &TracingContext {
678 &self.tracing_context
679 }
680
681 pub fn added_subscriber_on_mv_table(
682 &self,
683 mv_table_id: TableId,
684 ) -> impl Iterator<Item = u32> + '_ {
685 if let Some(Mutation::Add(add)) = self.mutation.as_deref() {
686 Some(add)
687 } else {
688 None
689 }
690 .into_iter()
691 .flat_map(move |add| {
692 add.subscriptions_to_add.iter().filter_map(
693 move |(upstream_mv_table_id, subscriber_id)| {
694 if *upstream_mv_table_id == mv_table_id {
695 Some(*subscriber_id)
696 } else {
697 None
698 }
699 },
700 )
701 })
702 }
703}
704
705impl<M: PartialEq> PartialEq for BarrierInner<M> {
706 fn eq(&self, other: &Self) -> bool {
707 self.epoch == other.epoch && self.mutation == other.mutation
708 }
709}
710
711impl Mutation {
712 #[cfg(test)]
716 pub fn is_stop(&self) -> bool {
717 matches!(self, Mutation::Stop(_))
718 }
719
720 #[cfg(test)]
721 fn to_protobuf(&self) -> PbMutation {
722 use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
723 use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
724 use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
725 use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
726 use risingwave_pb::stream_plan::{
727 PbAddMutation, PbConnectorPropsChangeMutation, PbDispatchers,
728 PbDropSubscriptionsMutation, PbPauseMutation, PbResumeMutation, PbSinkAddColumns,
729 PbSourceChangeSplitMutation, PbStartFragmentBackfillMutation, PbStopMutation,
730 PbThrottleMutation, PbUpdateMutation,
731 };
732 let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
733 actor_splits
734 .iter()
735 .map(|(&actor_id, splits)| {
736 (
737 actor_id,
738 ConnectorSplits {
739 splits: splits.clone().iter().map(ConnectorSplit::from).collect(),
740 },
741 )
742 })
743 .collect::<HashMap<_, _>>()
744 };
745
746 match self {
747 Mutation::Stop(StopMutation {
748 dropped_actors,
749 dropped_sink_fragments,
750 }) => PbMutation::Stop(PbStopMutation {
751 actors: dropped_actors.iter().copied().collect(),
752 dropped_sink_fragments: dropped_sink_fragments.iter().copied().collect(),
753 }),
754 Mutation::Update(UpdateMutation {
755 dispatchers,
756 merges,
757 vnode_bitmaps,
758 dropped_actors,
759 actor_splits,
760 actor_new_dispatchers,
761 actor_cdc_table_snapshot_splits,
762 sink_add_columns,
763 }) => PbMutation::Update(PbUpdateMutation {
764 dispatcher_update: dispatchers.values().flatten().cloned().collect(),
765 merge_update: merges.values().cloned().collect(),
766 actor_vnode_bitmap_update: vnode_bitmaps
767 .iter()
768 .map(|(&actor_id, bitmap)| (actor_id, bitmap.to_protobuf()))
769 .collect(),
770 dropped_actors: dropped_actors.iter().copied().collect(),
771 actor_splits: actor_splits_to_protobuf(actor_splits),
772 actor_new_dispatchers: actor_new_dispatchers
773 .iter()
774 .map(|(&actor_id, dispatchers)| {
775 (
776 actor_id,
777 PbDispatchers {
778 dispatchers: dispatchers.clone(),
779 },
780 )
781 })
782 .collect(),
783 actor_cdc_table_snapshot_splits:
784 build_pb_actor_cdc_table_snapshot_splits_with_generation(
785 actor_cdc_table_snapshot_splits.clone(),
786 )
787 .into(),
788 sink_add_columns: sink_add_columns
789 .iter()
790 .map(|(sink_id, add_columns)| {
791 (
792 *sink_id,
793 PbSinkAddColumns {
794 fields: add_columns.iter().map(|field| field.to_prost()).collect(),
795 },
796 )
797 })
798 .collect(),
799 }),
800 Mutation::Add(AddMutation {
801 adds,
802 added_actors,
803 splits,
804 pause,
805 subscriptions_to_add,
806 backfill_nodes_to_pause,
807 actor_cdc_table_snapshot_splits,
808 new_upstream_sinks,
809 }) => PbMutation::Add(PbAddMutation {
810 actor_dispatchers: adds
811 .iter()
812 .map(|(&actor_id, dispatchers)| {
813 (
814 actor_id,
815 PbDispatchers {
816 dispatchers: dispatchers.clone(),
817 },
818 )
819 })
820 .collect(),
821 added_actors: added_actors.iter().copied().collect(),
822 actor_splits: actor_splits_to_protobuf(splits),
823 pause: *pause,
824 subscriptions_to_add: subscriptions_to_add
825 .iter()
826 .map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
827 subscriber_id: *subscriber_id,
828 upstream_mv_table_id: *table_id,
829 })
830 .collect(),
831 backfill_nodes_to_pause: backfill_nodes_to_pause.iter().copied().collect(),
832 actor_cdc_table_snapshot_splits:
833 build_pb_actor_cdc_table_snapshot_splits_with_generation(
834 actor_cdc_table_snapshot_splits.clone(),
835 )
836 .into(),
837 new_upstream_sinks: new_upstream_sinks
838 .iter()
839 .map(|(k, v)| (*k, v.clone()))
840 .collect(),
841 }),
842 Mutation::SourceChangeSplit(changes) => {
843 PbMutation::Splits(PbSourceChangeSplitMutation {
844 actor_splits: changes
845 .iter()
846 .map(|(&actor_id, splits)| {
847 (
848 actor_id,
849 ConnectorSplits {
850 splits: splits
851 .clone()
852 .iter()
853 .map(ConnectorSplit::from)
854 .collect(),
855 },
856 )
857 })
858 .collect(),
859 })
860 }
861 Mutation::Pause => PbMutation::Pause(PbPauseMutation {}),
862 Mutation::Resume => PbMutation::Resume(PbResumeMutation {}),
863 Mutation::Throttle(changes) => PbMutation::Throttle(PbThrottleMutation {
864 actor_throttle: changes
865 .iter()
866 .map(|(actor_id, limit)| (*actor_id, RateLimit { rate_limit: *limit }))
867 .collect(),
868 }),
869 Mutation::DropSubscriptions {
870 subscriptions_to_drop,
871 } => PbMutation::DropSubscriptions(PbDropSubscriptionsMutation {
872 info: subscriptions_to_drop
873 .iter()
874 .map(
875 |(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
876 subscriber_id: *subscriber_id,
877 upstream_mv_table_id: *upstream_mv_table_id,
878 },
879 )
880 .collect(),
881 }),
882 Mutation::ConnectorPropsChange(map) => {
883 PbMutation::ConnectorPropsChange(PbConnectorPropsChangeMutation {
884 connector_props_infos: map
885 .iter()
886 .map(|(actor_id, options)| {
887 (
888 *actor_id,
889 ConnectorPropsInfo {
890 connector_props_info: options
891 .iter()
892 .map(|(k, v)| (k.clone(), v.clone()))
893 .collect(),
894 },
895 )
896 })
897 .collect(),
898 })
899 }
900 Mutation::StartFragmentBackfill { fragment_ids } => {
901 PbMutation::StartFragmentBackfill(PbStartFragmentBackfillMutation {
902 fragment_ids: fragment_ids.iter().copied().collect(),
903 })
904 }
905 Mutation::RefreshStart {
906 table_id,
907 associated_source_id,
908 } => PbMutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
909 table_id: *table_id,
910 associated_source_id: *associated_source_id,
911 }),
912 Mutation::ListFinish {
913 associated_source_id,
914 } => PbMutation::ListFinish(risingwave_pb::stream_plan::ListFinishMutation {
915 associated_source_id: *associated_source_id,
916 }),
917 Mutation::LoadFinish {
918 associated_source_id,
919 } => PbMutation::LoadFinish(risingwave_pb::stream_plan::LoadFinishMutation {
920 associated_source_id: *associated_source_id,
921 }),
922 }
923 }
924
925 fn from_protobuf(prost: &PbMutation) -> StreamExecutorResult<Self> {
926 let mutation = match prost {
927 PbMutation::Stop(stop) => Mutation::Stop(StopMutation {
928 dropped_actors: stop.actors.iter().copied().collect(),
929 dropped_sink_fragments: stop.dropped_sink_fragments.iter().copied().collect(),
930 }),
931
932 PbMutation::Update(update) => Mutation::Update(UpdateMutation {
933 dispatchers: update
934 .dispatcher_update
935 .iter()
936 .map(|u| (u.actor_id, u.clone()))
937 .into_group_map(),
938 merges: update
939 .merge_update
940 .iter()
941 .map(|u| ((u.actor_id, u.upstream_fragment_id), u.clone()))
942 .collect(),
943 vnode_bitmaps: update
944 .actor_vnode_bitmap_update
945 .iter()
946 .map(|(&actor_id, bitmap)| (actor_id, Arc::new(bitmap.into())))
947 .collect(),
948 dropped_actors: update.dropped_actors.iter().copied().collect(),
949 actor_splits: update
950 .actor_splits
951 .iter()
952 .map(|(&actor_id, splits)| {
953 (
954 actor_id,
955 splits
956 .splits
957 .iter()
958 .map(|split| split.try_into().unwrap())
959 .collect(),
960 )
961 })
962 .collect(),
963 actor_new_dispatchers: update
964 .actor_new_dispatchers
965 .iter()
966 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
967 .collect(),
968 actor_cdc_table_snapshot_splits:
969 build_actor_cdc_table_snapshot_splits_with_generation(
970 update
971 .actor_cdc_table_snapshot_splits
972 .clone()
973 .unwrap_or_default(),
974 ),
975 sink_add_columns: update
976 .sink_add_columns
977 .iter()
978 .map(|(sink_id, add_columns)| {
979 (
980 *sink_id,
981 add_columns.fields.iter().map(Field::from_prost).collect(),
982 )
983 })
984 .collect(),
985 }),
986
987 PbMutation::Add(add) => Mutation::Add(AddMutation {
988 adds: add
989 .actor_dispatchers
990 .iter()
991 .map(|(&actor_id, dispatchers)| (actor_id, dispatchers.dispatchers.clone()))
992 .collect(),
993 added_actors: add.added_actors.iter().copied().collect(),
994 splits: add
997 .actor_splits
998 .iter()
999 .map(|(&actor_id, splits)| {
1000 (
1001 actor_id,
1002 splits
1003 .splits
1004 .iter()
1005 .map(|split| split.try_into().unwrap())
1006 .collect(),
1007 )
1008 })
1009 .collect(),
1010 pause: add.pause,
1011 subscriptions_to_add: add
1012 .subscriptions_to_add
1013 .iter()
1014 .map(
1015 |SubscriptionUpstreamInfo {
1016 subscriber_id,
1017 upstream_mv_table_id,
1018 }| { (*upstream_mv_table_id, *subscriber_id) },
1019 )
1020 .collect(),
1021 backfill_nodes_to_pause: add.backfill_nodes_to_pause.iter().copied().collect(),
1022 actor_cdc_table_snapshot_splits:
1023 build_actor_cdc_table_snapshot_splits_with_generation(
1024 add.actor_cdc_table_snapshot_splits
1025 .clone()
1026 .unwrap_or_default(),
1027 ),
1028 new_upstream_sinks: add
1029 .new_upstream_sinks
1030 .iter()
1031 .map(|(k, v)| (*k, v.clone()))
1032 .collect(),
1033 }),
1034
1035 PbMutation::Splits(s) => {
1036 let mut change_splits: Vec<(ActorId, Vec<SplitImpl>)> =
1037 Vec::with_capacity(s.actor_splits.len());
1038 for (&actor_id, splits) in &s.actor_splits {
1039 if !splits.splits.is_empty() {
1040 change_splits.push((
1041 actor_id,
1042 splits
1043 .splits
1044 .iter()
1045 .map(SplitImpl::try_from)
1046 .try_collect()?,
1047 ));
1048 }
1049 }
1050 Mutation::SourceChangeSplit(change_splits.into_iter().collect())
1051 }
1052 PbMutation::Pause(_) => Mutation::Pause,
1053 PbMutation::Resume(_) => Mutation::Resume,
1054 PbMutation::Throttle(changes) => Mutation::Throttle(
1055 changes
1056 .actor_throttle
1057 .iter()
1058 .map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
1059 .collect(),
1060 ),
1061 PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
1062 subscriptions_to_drop: drop
1063 .info
1064 .iter()
1065 .map(|info| (info.subscriber_id, info.upstream_mv_table_id))
1066 .collect(),
1067 },
1068 PbMutation::ConnectorPropsChange(alter_connector_props) => {
1069 Mutation::ConnectorPropsChange(
1070 alter_connector_props
1071 .connector_props_infos
1072 .iter()
1073 .map(|(connector_id, options)| {
1074 (
1075 *connector_id,
1076 options
1077 .connector_props_info
1078 .iter()
1079 .map(|(k, v)| (k.clone(), v.clone()))
1080 .collect(),
1081 )
1082 })
1083 .collect(),
1084 )
1085 }
1086 PbMutation::StartFragmentBackfill(start_fragment_backfill) => {
1087 Mutation::StartFragmentBackfill {
1088 fragment_ids: start_fragment_backfill
1089 .fragment_ids
1090 .iter()
1091 .copied()
1092 .collect(),
1093 }
1094 }
1095 PbMutation::RefreshStart(refresh_start) => Mutation::RefreshStart {
1096 table_id: refresh_start.table_id,
1097 associated_source_id: refresh_start.associated_source_id,
1098 },
1099 PbMutation::ListFinish(list_finish) => Mutation::ListFinish {
1100 associated_source_id: list_finish.associated_source_id,
1101 },
1102 PbMutation::LoadFinish(load_finish) => Mutation::LoadFinish {
1103 associated_source_id: load_finish.associated_source_id,
1104 },
1105 };
1106 Ok(mutation)
1107 }
1108}
1109
1110impl<M> BarrierInner<M> {
1111 fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option<PbMutation>) -> PbBarrier {
1112 let Self {
1113 epoch,
1114 mutation,
1115 kind,
1116 passed_actors,
1117 tracing_context,
1118 ..
1119 } = self;
1120
1121 PbBarrier {
1122 epoch: Some(PbEpoch {
1123 curr: epoch.curr,
1124 prev: epoch.prev,
1125 }),
1126 mutation: Some(PbBarrierMutation {
1127 mutation: barrier_fn(mutation),
1128 }),
1129 tracing_context: tracing_context.to_protobuf(),
1130 kind: *kind as _,
1131 passed_actors: passed_actors.clone(),
1132 }
1133 }
1134
1135 fn from_protobuf_inner(
1136 prost: &PbBarrier,
1137 mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult<M>,
1138 ) -> StreamExecutorResult<Self> {
1139 let epoch = prost.get_epoch()?;
1140
1141 Ok(Self {
1142 kind: prost.kind(),
1143 epoch: EpochPair::new(epoch.curr, epoch.prev),
1144 mutation: mutation_from_pb(
1145 prost
1146 .mutation
1147 .as_ref()
1148 .and_then(|mutation| mutation.mutation.as_ref()),
1149 )?,
1150 passed_actors: prost.get_passed_actors().clone(),
1151 tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
1152 })
1153 }
1154
1155 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
1156 BarrierInner {
1157 epoch: self.epoch,
1158 mutation: f(self.mutation),
1159 kind: self.kind,
1160 tracing_context: self.tracing_context,
1161 passed_actors: self.passed_actors,
1162 }
1163 }
1164}
1165
1166impl DispatcherBarrier {
1167 pub fn to_protobuf(&self) -> PbBarrier {
1168 self.to_protobuf_inner(|_| None)
1169 }
1170}
1171
1172impl Barrier {
1173 #[cfg(test)]
1174 pub fn to_protobuf(&self) -> PbBarrier {
1175 self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf()))
1176 }
1177
1178 pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult<Self> {
1179 Self::from_protobuf_inner(prost, |mutation| {
1180 mutation
1181 .map(|m| Mutation::from_protobuf(m).map(Arc::new))
1182 .transpose()
1183 })
1184 }
1185}
1186
1187#[derive(Debug, PartialEq, Eq, Clone)]
1188pub struct Watermark {
1189 pub col_idx: usize,
1190 pub data_type: DataType,
1191 pub val: ScalarImpl,
1192}
1193
1194impl PartialOrd for Watermark {
1195 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1196 Some(self.cmp(other))
1197 }
1198}
1199
1200impl Ord for Watermark {
1201 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1202 self.val.default_cmp(&other.val)
1203 }
1204}
1205
1206impl Watermark {
1207 pub fn new(col_idx: usize, data_type: DataType, val: ScalarImpl) -> Self {
1208 Self {
1209 col_idx,
1210 data_type,
1211 val,
1212 }
1213 }
1214
1215 pub async fn transform_with_expr(
1216 self,
1217 expr: &NonStrictExpression<impl Expression>,
1218 new_col_idx: usize,
1219 ) -> Option<Self> {
1220 let Self { col_idx, val, .. } = self;
1221 let row = {
1222 let mut row = vec![None; col_idx + 1];
1223 row[col_idx] = Some(val);
1224 OwnedRow::new(row)
1225 };
1226 let val = expr.eval_row_infallible(&row).await?;
1227 Some(Self::new(new_col_idx, expr.inner().return_type(), val))
1228 }
1229
1230 pub fn transform_with_indices(self, output_indices: &[usize]) -> Option<Self> {
1233 output_indices
1234 .iter()
1235 .position(|p| *p == self.col_idx)
1236 .map(|new_col_idx| self.with_idx(new_col_idx))
1237 }
1238
1239 pub fn to_protobuf(&self) -> PbWatermark {
1240 PbWatermark {
1241 column: Some(PbInputRef {
1242 index: self.col_idx as _,
1243 r#type: Some(self.data_type.to_protobuf()),
1244 }),
1245 val: Some(&self.val).to_protobuf().into(),
1246 }
1247 }
1248
1249 pub fn from_protobuf(prost: &PbWatermark) -> StreamExecutorResult<Self> {
1250 let col_ref = prost.get_column()?;
1251 let data_type = DataType::from(col_ref.get_type()?);
1252 let val = Datum::from_protobuf(prost.get_val()?, &data_type)?
1253 .expect("watermark value cannot be null");
1254 Ok(Self::new(col_ref.get_index() as _, data_type, val))
1255 }
1256
1257 pub fn with_idx(self, idx: usize) -> Self {
1258 Self::new(idx, self.data_type, self.val)
1259 }
1260}
1261
1262#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1263pub enum MessageInner<M> {
1264 Chunk(StreamChunk),
1265 Barrier(BarrierInner<M>),
1266 Watermark(Watermark),
1267}
1268
1269impl<M> MessageInner<M> {
1270 pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
1271 match self {
1272 MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
1273 MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
1274 MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
1275 }
1276 }
1277}
1278
1279pub type Message = MessageInner<BarrierMutationType>;
1280pub type DispatcherMessage = MessageInner<()>;
1281
1282#[derive(Debug, EnumAsInner, PartialEq, Clone)]
1285pub enum MessageBatchInner<M> {
1286 Chunk(StreamChunk),
1287 BarrierBatch(Vec<BarrierInner<M>>),
1288 Watermark(Watermark),
1289}
1290pub type MessageBatch = MessageBatchInner<BarrierMutationType>;
1291pub type DispatcherBarriers = Vec<DispatcherBarrier>;
1292pub type DispatcherMessageBatch = MessageBatchInner<()>;
1293
1294impl From<DispatcherMessage> for DispatcherMessageBatch {
1295 fn from(m: DispatcherMessage) -> Self {
1296 match m {
1297 DispatcherMessage::Chunk(c) => Self::Chunk(c),
1298 DispatcherMessage::Barrier(b) => Self::BarrierBatch(vec![b]),
1299 DispatcherMessage::Watermark(w) => Self::Watermark(w),
1300 }
1301 }
1302}
1303
1304impl From<StreamChunk> for Message {
1305 fn from(chunk: StreamChunk) -> Self {
1306 Message::Chunk(chunk)
1307 }
1308}
1309
1310impl<'a> TryFrom<&'a Message> for &'a Barrier {
1311 type Error = ();
1312
1313 fn try_from(m: &'a Message) -> std::result::Result<Self, Self::Error> {
1314 match m {
1315 Message::Chunk(_) => Err(()),
1316 Message::Barrier(b) => Ok(b),
1317 Message::Watermark(_) => Err(()),
1318 }
1319 }
1320}
1321
1322impl Message {
1323 #[cfg(test)]
1328 pub fn is_stop(&self) -> bool {
1329 matches!(
1330 self,
1331 Message::Barrier(Barrier {
1332 mutation,
1333 ..
1334 }) if mutation.as_ref().unwrap().is_stop()
1335 )
1336 }
1337}
1338
1339impl DispatcherMessageBatch {
1340 pub fn to_protobuf(&self) -> PbStreamMessageBatch {
1341 let prost = match self {
1342 Self::Chunk(stream_chunk) => {
1343 let prost_stream_chunk = stream_chunk.to_protobuf();
1344 StreamMessageBatch::StreamChunk(prost_stream_chunk)
1345 }
1346 Self::BarrierBatch(barrier_batch) => StreamMessageBatch::BarrierBatch(BarrierBatch {
1347 barriers: barrier_batch.iter().map(|b| b.to_protobuf()).collect(),
1348 }),
1349 Self::Watermark(watermark) => StreamMessageBatch::Watermark(watermark.to_protobuf()),
1350 };
1351 PbStreamMessageBatch {
1352 stream_message_batch: Some(prost),
1353 }
1354 }
1355
1356 pub fn from_protobuf(prost: &PbStreamMessageBatch) -> StreamExecutorResult<Self> {
1357 let res = match prost.get_stream_message_batch()? {
1358 StreamMessageBatch::StreamChunk(chunk) => {
1359 Self::Chunk(StreamChunk::from_protobuf(chunk)?)
1360 }
1361 StreamMessageBatch::BarrierBatch(barrier_batch) => {
1362 let barriers = barrier_batch
1363 .barriers
1364 .iter()
1365 .map(|barrier| {
1366 DispatcherBarrier::from_protobuf_inner(barrier, |mutation| {
1367 if mutation.is_some() {
1368 if cfg!(debug_assertions) {
1369 panic!("should not receive message of barrier with mutation");
1370 } else {
1371 warn!(?barrier, "receive message of barrier with mutation");
1372 }
1373 }
1374 Ok(())
1375 })
1376 })
1377 .try_collect()?;
1378 Self::BarrierBatch(barriers)
1379 }
1380 StreamMessageBatch::Watermark(watermark) => {
1381 Self::Watermark(Watermark::from_protobuf(watermark)?)
1382 }
1383 };
1384 Ok(res)
1385 }
1386
1387 pub fn get_encoded_len(msg: &impl ::prost::Message) -> usize {
1388 ::prost::Message::encoded_len(msg)
1389 }
1390}
1391
1392pub type StreamKey = Vec<usize>;
1393pub type StreamKeyRef<'a> = &'a [usize];
1394pub type StreamKeyDataTypes = SmallVec<[DataType; 1]>;
1395
1396pub async fn expect_first_barrier<M: Debug>(
1398 stream: &mut (impl MessageStreamInner<M> + Unpin),
1399) -> StreamExecutorResult<BarrierInner<M>> {
1400 let message = stream
1401 .next()
1402 .instrument_await("expect_first_barrier")
1403 .await
1404 .context("failed to extract the first message: stream closed unexpectedly")??;
1405 let barrier = message
1406 .into_barrier()
1407 .expect("the first message must be a barrier");
1408 assert!(matches!(
1410 barrier.kind,
1411 BarrierKind::Checkpoint | BarrierKind::Initial
1412 ));
1413 Ok(barrier)
1414}
1415
1416pub async fn expect_first_barrier_from_aligned_stream(
1418 stream: &mut (impl AlignedMessageStream + Unpin),
1419) -> StreamExecutorResult<Barrier> {
1420 let message = stream
1421 .next()
1422 .instrument_await("expect_first_barrier")
1423 .await
1424 .context("failed to extract the first message: stream closed unexpectedly")??;
1425 let barrier = message
1426 .into_barrier()
1427 .expect("the first message must be a barrier");
1428 Ok(barrier)
1429}
1430
1431pub trait StreamConsumer: Send + 'static {
1433 type BarrierStream: Stream<Item = StreamResult<Barrier>> + Send;
1434
1435 fn execute(self: Box<Self>) -> Self::BarrierStream;
1436}
1437
1438type BoxedMessageInput<InputId, M> = BoxedInput<InputId, MessageStreamItemInner<M>>;
1439
1440pub struct DynamicReceivers<InputId, M> {
1444 barrier: Option<BarrierInner<M>>,
1446 start_ts: Option<Instant>,
1448 blocked: Vec<BoxedMessageInput<InputId, M>>,
1450 active: FuturesUnordered<StreamFuture<BoxedMessageInput<InputId, M>>>,
1452 buffered_watermarks: BTreeMap<usize, BufferedWatermarks<InputId>>,
1454 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1456 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1458}
1459
1460impl<InputId: Clone + Ord + Hash + std::fmt::Debug + Unpin, M: Clone + Unpin> Stream
1461 for DynamicReceivers<InputId, M>
1462{
1463 type Item = MessageStreamItemInner<M>;
1464
1465 fn poll_next(
1466 mut self: Pin<&mut Self>,
1467 cx: &mut std::task::Context<'_>,
1468 ) -> Poll<Option<Self::Item>> {
1469 if self.is_empty() {
1470 return Poll::Ready(None);
1471 }
1472
1473 loop {
1474 match futures::ready!(self.active.poll_next_unpin(cx)) {
1475 Some((Some(Err(e)), _)) => {
1477 return Poll::Ready(Some(Err(e)));
1478 }
1479 Some((Some(Ok(message)), remaining)) => {
1481 let input_id = remaining.id();
1482 match message {
1483 MessageInner::Chunk(chunk) => {
1484 self.active.push(remaining.into_future());
1486 return Poll::Ready(Some(Ok(MessageInner::Chunk(chunk))));
1487 }
1488 MessageInner::Watermark(watermark) => {
1489 self.active.push(remaining.into_future());
1491 if let Some(watermark) = self.handle_watermark(input_id, watermark) {
1492 return Poll::Ready(Some(Ok(MessageInner::Watermark(watermark))));
1493 }
1494 }
1495 MessageInner::Barrier(barrier) => {
1496 if self.blocked.is_empty() {
1498 self.start_ts = Some(Instant::now());
1499 }
1500 self.blocked.push(remaining);
1501 if let Some(current_barrier) = self.barrier.as_ref() {
1502 if current_barrier.epoch != barrier.epoch {
1503 return Poll::Ready(Some(Err(
1504 StreamExecutorError::align_barrier(
1505 current_barrier.clone().map_mutation(|_| None),
1506 barrier.map_mutation(|_| None),
1507 ),
1508 )));
1509 }
1510 } else {
1511 self.barrier = Some(barrier);
1512 }
1513 }
1514 }
1515 }
1516 Some((None, remaining)) => {
1524 return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!(
1525 "upstream input {:?} unexpectedly closed",
1526 remaining.id()
1527 )))));
1528 }
1529 None => {
1531 assert!(!self.blocked.is_empty());
1532
1533 let start_ts = self
1534 .start_ts
1535 .take()
1536 .expect("should have received at least one barrier");
1537 if let Some(barrier_align_duration) = &self.barrier_align_duration {
1538 barrier_align_duration.inc_by(start_ts.elapsed().as_nanos() as u64);
1539 }
1540 if let Some(merge_barrier_align_duration) = &self.merge_barrier_align_duration {
1541 merge_barrier_align_duration.observe(start_ts.elapsed().as_secs_f64())
1543 }
1544
1545 break;
1546 }
1547 }
1548 }
1549
1550 assert!(self.active.is_terminated());
1551
1552 let barrier = self.barrier.take().unwrap();
1553
1554 let upstreams = std::mem::take(&mut self.blocked);
1555 self.extend_active(upstreams);
1556 assert!(!self.active.is_terminated());
1557
1558 Poll::Ready(Some(Ok(MessageInner::Barrier(barrier))))
1559 }
1560}
1561
1562impl<InputId: Clone + Ord + Hash + std::fmt::Debug, M> DynamicReceivers<InputId, M> {
1563 pub fn new(
1564 upstreams: Vec<BoxedMessageInput<InputId, M>>,
1565 barrier_align_duration: Option<LabelGuardedMetric<GenericCounter<AtomicU64>>>,
1566 merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram>>,
1567 ) -> Self {
1568 let mut this = Self {
1569 barrier: None,
1570 start_ts: None,
1571 blocked: Vec::with_capacity(upstreams.len()),
1572 active: Default::default(),
1573 buffered_watermarks: Default::default(),
1574 merge_barrier_align_duration,
1575 barrier_align_duration,
1576 };
1577 this.extend_active(upstreams);
1578 this
1579 }
1580
1581 pub fn extend_active(
1584 &mut self,
1585 upstreams: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1586 ) {
1587 assert!(self.blocked.is_empty() && self.barrier.is_none());
1588
1589 self.active
1590 .extend(upstreams.into_iter().map(|s| s.into_future()));
1591 }
1592
1593 pub fn handle_watermark(
1595 &mut self,
1596 input_id: InputId,
1597 watermark: Watermark,
1598 ) -> Option<Watermark> {
1599 let col_idx = watermark.col_idx;
1600 let upstream_ids: Vec<_> = self.upstream_input_ids().collect();
1602 let watermarks = self
1603 .buffered_watermarks
1604 .entry(col_idx)
1605 .or_insert_with(|| BufferedWatermarks::with_ids(upstream_ids));
1606 watermarks.handle_watermark(input_id, watermark)
1607 }
1608
1609 pub fn add_upstreams_from(
1612 &mut self,
1613 new_inputs: impl IntoIterator<Item = BoxedMessageInput<InputId, M>>,
1614 ) {
1615 assert!(self.blocked.is_empty() && self.barrier.is_none());
1616
1617 let new_inputs: Vec<_> = new_inputs.into_iter().collect();
1618 let input_ids = new_inputs.iter().map(|input| input.id());
1619 self.buffered_watermarks.values_mut().for_each(|buffers| {
1620 buffers.add_buffers(input_ids.clone());
1622 });
1623 self.active
1624 .extend(new_inputs.into_iter().map(|s| s.into_future()));
1625 }
1626
1627 pub fn remove_upstreams(&mut self, upstream_input_ids: &HashSet<InputId>) {
1631 assert!(self.blocked.is_empty() && self.barrier.is_none());
1632
1633 let new_upstreams = std::mem::take(&mut self.active)
1634 .into_iter()
1635 .map(|s| s.into_inner().unwrap())
1636 .filter(|u| !upstream_input_ids.contains(&u.id()));
1637 self.extend_active(new_upstreams);
1638 self.buffered_watermarks.values_mut().for_each(|buffers| {
1639 buffers.remove_buffer(upstream_input_ids.clone());
1642 });
1643 }
1644
1645 pub fn merge_barrier_align_duration(&self) -> Option<LabelGuardedMetric<Histogram>> {
1646 self.merge_barrier_align_duration.clone()
1647 }
1648
1649 pub fn flush_buffered_watermarks(&mut self) {
1650 self.buffered_watermarks
1651 .values_mut()
1652 .for_each(|buffers| buffers.clear());
1653 }
1654
1655 pub fn upstream_input_ids(&self) -> impl Iterator<Item = InputId> + '_ {
1656 self.blocked
1657 .iter()
1658 .map(|s| s.id())
1659 .chain(self.active.iter().map(|s| s.get_ref().unwrap().id()))
1660 }
1661
1662 pub fn is_empty(&self) -> bool {
1663 self.blocked.is_empty() && self.active.is_empty()
1664 }
1665}
1666
1667pub(crate) struct DispatchBarrierBuffer {
1688 buffer: VecDeque<(Barrier, Option<Vec<BoxedActorInput>>)>,
1689 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1690 recv_state: BarrierReceiverState,
1691 curr_upstream_fragment_id: FragmentId,
1692 actor_id: ActorId,
1693 build_input_ctx: Arc<BuildInputContext>,
1695}
1696
1697struct BuildInputContext {
1698 pub actor_id: ActorId,
1699 pub local_barrier_manager: LocalBarrierManager,
1700 pub metrics: Arc<StreamingMetrics>,
1701 pub fragment_id: FragmentId,
1702}
1703
1704type BoxedNewInputsFuture =
1705 Pin<Box<dyn Future<Output = StreamExecutorResult<Vec<BoxedActorInput>>> + Send>>;
1706
1707enum BarrierReceiverState {
1708 ReceivingBarrier,
1709 CreatingNewInput(Barrier, BoxedNewInputsFuture),
1710}
1711
1712impl DispatchBarrierBuffer {
1713 pub fn new(
1714 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
1715 actor_id: ActorId,
1716 curr_upstream_fragment_id: FragmentId,
1717 local_barrier_manager: LocalBarrierManager,
1718 metrics: Arc<StreamingMetrics>,
1719 fragment_id: FragmentId,
1720 ) -> Self {
1721 Self {
1722 buffer: VecDeque::new(),
1723 barrier_rx,
1724 recv_state: BarrierReceiverState::ReceivingBarrier,
1725 curr_upstream_fragment_id,
1726 actor_id,
1727 build_input_ctx: Arc::new(BuildInputContext {
1728 actor_id,
1729 local_barrier_manager,
1730 metrics,
1731 fragment_id,
1732 }),
1733 }
1734 }
1735
1736 pub async fn await_next_message(
1737 &mut self,
1738 stream: &mut (impl Stream<Item = StreamExecutorResult<DispatcherMessage>> + Unpin),
1739 metrics: &ActorInputMetrics,
1740 ) -> StreamExecutorResult<DispatcherMessage> {
1741 let mut start_time = Instant::now();
1742 let interval_duration = Duration::from_secs(15);
1743 let mut interval =
1744 tokio::time::interval_at(start_time + interval_duration, interval_duration);
1745
1746 loop {
1747 tokio::select! {
1748 biased;
1749 msg = stream.try_next() => {
1750 metrics
1751 .actor_input_buffer_blocking_duration_ns
1752 .inc_by(start_time.elapsed().as_nanos() as u64);
1753 return msg?.ok_or_else(
1754 || StreamExecutorError::channel_closed("upstream executor closed unexpectedly")
1755 );
1756 }
1757
1758 e = self.continuously_fetch_barrier_rx() => {
1759 return Err(e);
1760 }
1761
1762 _ = interval.tick() => {
1763 start_time = Instant::now();
1764 metrics.actor_input_buffer_blocking_duration_ns.inc_by(interval_duration.as_nanos() as u64);
1765 continue;
1766 }
1767 }
1768 }
1769 }
1770
1771 pub async fn pop_barrier_with_inputs(
1772 &mut self,
1773 barrier: DispatcherBarrier,
1774 ) -> StreamExecutorResult<(Barrier, Option<Vec<BoxedActorInput>>)> {
1775 while self.buffer.is_empty() {
1776 self.try_fetch_barrier_rx(false).await?;
1777 }
1778 let (mut recv_barrier, inputs) = self.buffer.pop_front().unwrap();
1779 apply_dispatcher_barrier(&mut recv_barrier, barrier);
1780
1781 Ok((recv_barrier, inputs))
1782 }
1783
1784 async fn continuously_fetch_barrier_rx(&mut self) -> StreamExecutorError {
1785 loop {
1786 if let Err(e) = self.try_fetch_barrier_rx(true).await {
1787 return e;
1788 }
1789 }
1790 }
1791
1792 async fn try_fetch_barrier_rx(&mut self, pending_on_end: bool) -> StreamExecutorResult<()> {
1793 match &mut self.recv_state {
1794 BarrierReceiverState::ReceivingBarrier => {
1795 let Some(barrier) = self.barrier_rx.recv().await else {
1796 if pending_on_end {
1797 return pending().await;
1798 } else {
1799 return Err(StreamExecutorError::channel_closed(
1800 "barrier channel closed unexpectedly",
1801 ));
1802 }
1803 };
1804 if let Some(fut) = self.pre_apply_barrier(&barrier) {
1805 self.recv_state = BarrierReceiverState::CreatingNewInput(barrier, fut);
1806 } else {
1807 self.buffer.push_back((barrier, None));
1808 }
1809 }
1810 BarrierReceiverState::CreatingNewInput(barrier, fut) => {
1811 let new_inputs = fut.await?;
1812 self.buffer.push_back((barrier.clone(), Some(new_inputs)));
1813 self.recv_state = BarrierReceiverState::ReceivingBarrier;
1814 }
1815 }
1816 Ok(())
1817 }
1818
1819 fn pre_apply_barrier(&mut self, barrier: &Barrier) -> Option<BoxedNewInputsFuture> {
1820 if let Some(update) = barrier.as_update_merge(self.actor_id, self.curr_upstream_fragment_id)
1821 && !update.added_upstream_actors.is_empty()
1822 {
1823 let upstream_fragment_id =
1825 if let Some(new_upstream_fragment_id) = update.new_upstream_fragment_id {
1826 self.curr_upstream_fragment_id = new_upstream_fragment_id;
1827 new_upstream_fragment_id
1828 } else {
1829 self.curr_upstream_fragment_id
1830 };
1831 let ctx = self.build_input_ctx.clone();
1832 let added_upstream_actors = update.added_upstream_actors.clone();
1833 let barrier = barrier.clone();
1834 let fut = async move {
1835 try_join_all(added_upstream_actors.iter().map(|upstream_actor| async {
1836 let mut new_input = new_input(
1837 &ctx.local_barrier_manager,
1838 ctx.metrics.clone(),
1839 ctx.actor_id,
1840 ctx.fragment_id,
1841 upstream_actor,
1842 upstream_fragment_id,
1843 )
1844 .await?;
1845
1846 let first_barrier = expect_first_barrier(&mut new_input).await?;
1849 assert_equal_dispatcher_barrier(&barrier, &first_barrier);
1850
1851 StreamExecutorResult::Ok(new_input)
1852 }))
1853 .await
1854 }
1855 .boxed();
1856
1857 Some(fut)
1858 } else {
1859 None
1860 }
1861 }
1862}