1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
21use risingwave_common::hash::{
22 ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
25use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
26use risingwave_pb::catalog::Table;
27use risingwave_pb::common::{ActorInfo, PbActorLocation};
28use risingwave_pb::meta::table_fragments::actor_status::ActorState;
29use risingwave_pb::meta::table_fragments::fragment::{
30 FragmentDistributionType, PbFragmentDistributionType,
31};
32use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
33use risingwave_pb::meta::table_parallelism::{
34 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
35 PbParallelism,
36};
37use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
38use risingwave_pb::plan_common::PbExprContext;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::{
41 DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
42 PbStreamContext, StreamNode,
43};
44
45use super::{ActorId, FragmentId};
46
47#[derive(Debug, Copy, Clone, Eq, PartialEq)]
49pub enum TableParallelism {
50 Adaptive,
52 Fixed(usize),
55 Custom,
62}
63
64impl From<PbTableParallelism> for TableParallelism {
65 fn from(value: PbTableParallelism) -> Self {
66 use Parallelism::*;
67 match &value.parallelism {
68 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
69 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
70 Some(Custom(_)) => Self::Custom,
71 _ => unreachable!(),
72 }
73 }
74}
75
76impl From<TableParallelism> for PbTableParallelism {
77 fn from(value: TableParallelism) -> Self {
78 use TableParallelism::*;
79
80 let parallelism = match value {
81 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
82 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
83 parallelism: n as u32,
84 }),
85 Custom => PbParallelism::Custom(PbCustomParallelism {}),
86 };
87
88 Self {
89 parallelism: Some(parallelism),
90 }
91 }
92}
93
94impl From<StreamingParallelism> for TableParallelism {
95 fn from(value: StreamingParallelism) -> Self {
96 match value {
97 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
98 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
99 StreamingParallelism::Custom => TableParallelism::Custom,
100 }
101 }
102}
103
104impl From<TableParallelism> for StreamingParallelism {
105 fn from(value: TableParallelism) -> Self {
106 match value {
107 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
108 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
109 TableParallelism::Custom => StreamingParallelism::Custom,
110 }
111 }
112}
113
114pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
115pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
116pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
117pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
118
119pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
120pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
122pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
125
126#[derive(Debug, Clone)]
127pub struct DownstreamFragmentRelation {
128 pub downstream_fragment_id: FragmentId,
129 pub dispatcher_type: DispatcherType,
130 pub dist_key_indices: Vec<u32>,
131 pub output_mapping: PbDispatchOutputMapping,
132}
133
134impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
135 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
136 Self {
137 downstream_fragment_id: fragment_id,
138 dispatcher_type: dispatch.get_type().unwrap().into(),
139 dist_key_indices: dispatch.dist_key_indices,
140 output_mapping: dispatch.output_mapping.unwrap(),
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
146pub struct StreamJobFragmentsToCreate {
147 pub inner: StreamJobFragments,
148 pub downstreams: FragmentDownstreamRelation,
149}
150
151impl Deref for StreamJobFragmentsToCreate {
152 type Target = StreamJobFragments;
153
154 fn deref(&self) -> &Self::Target {
155 &self.inner
156 }
157}
158
159#[derive(Clone, Debug)]
160pub struct StreamActor {
161 pub actor_id: u32,
162 pub fragment_id: u32,
163 pub vnode_bitmap: Option<Bitmap>,
164 pub mview_definition: String,
165 pub expr_context: Option<PbExprContext>,
166}
167
168impl StreamActor {
169 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
170 PbStreamActor {
171 actor_id: self.actor_id,
172 fragment_id: self.fragment_id,
173 dispatcher: dispatchers.collect(),
174 vnode_bitmap: self
175 .vnode_bitmap
176 .as_ref()
177 .map(|bitmap| bitmap.to_protobuf()),
178 mview_definition: self.mview_definition.clone(),
179 expr_context: self.expr_context.clone(),
180 }
181 }
182}
183
184#[derive(Clone, Debug, Default)]
185pub struct Fragment {
186 pub fragment_id: FragmentId,
187 pub fragment_type_mask: FragmentTypeMask,
188 pub distribution_type: PbFragmentDistributionType,
189 pub actors: Vec<StreamActor>,
190 pub state_table_ids: Vec<u32>,
191 pub maybe_vnode_count: Option<u32>,
192 pub nodes: StreamNode,
193}
194
195impl Fragment {
196 pub fn to_protobuf(
197 &self,
198 upstream_fragments: impl Iterator<Item = FragmentId>,
199 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
200 ) -> PbFragment {
201 PbFragment {
202 fragment_id: self.fragment_id,
203 fragment_type_mask: self.fragment_type_mask.into(),
204 distribution_type: self.distribution_type as _,
205 actors: self
206 .actors
207 .iter()
208 .map(|actor| {
209 actor.to_protobuf(
210 dispatchers
211 .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
212 .into_iter()
213 .flatten()
214 .cloned(),
215 )
216 })
217 .collect(),
218 state_table_ids: self.state_table_ids.clone(),
219 upstream_fragment_ids: upstream_fragments.collect(),
220 maybe_vnode_count: self.maybe_vnode_count,
221 nodes: Some(self.nodes.clone()),
222 }
223 }
224}
225
226impl VnodeCountCompat for Fragment {
227 fn vnode_count_inner(&self) -> VnodeCount {
228 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
229 }
230}
231
232impl IsSingleton for Fragment {
233 fn is_singleton(&self) -> bool {
234 matches!(self.distribution_type, FragmentDistributionType::Single)
235 }
236}
237
238#[derive(Debug, Clone)]
244pub struct StreamJobFragments {
245 pub stream_job_id: TableId,
247
248 pub state: State,
250
251 pub fragments: BTreeMap<FragmentId, Fragment>,
253
254 pub actor_status: BTreeMap<ActorId, ActorStatus>,
256
257 pub ctx: StreamContext,
259
260 pub assigned_parallelism: TableParallelism,
262
263 pub max_parallelism: usize,
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct StreamContext {
278 pub timezone: Option<String>,
280}
281
282impl StreamContext {
283 pub fn to_protobuf(&self) -> PbStreamContext {
284 PbStreamContext {
285 timezone: self.timezone.clone().unwrap_or("".into()),
286 }
287 }
288
289 pub fn to_expr_context(&self) -> PbExprContext {
290 PbExprContext {
291 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
293 strict_mode: false,
294 }
295 }
296
297 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
298 Self {
299 timezone: if prost.get_timezone().is_empty() {
300 None
301 } else {
302 Some(prost.get_timezone().clone())
303 },
304 }
305 }
306}
307
308impl StreamJobFragments {
309 pub fn to_protobuf(
310 &self,
311 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
312 fragment_dispatchers: &FragmentActorDispatchers,
313 ) -> PbTableFragments {
314 PbTableFragments {
315 table_id: self.stream_job_id.table_id(),
316 state: self.state as _,
317 fragments: self
318 .fragments
319 .iter()
320 .map(|(id, fragment)| {
321 (
322 *id,
323 fragment.to_protobuf(
324 fragment_upstreams.get(id).into_iter().flatten().cloned(),
325 fragment_dispatchers.get(&(*id as _)),
326 ),
327 )
328 })
329 .collect(),
330 actor_status: self.actor_status.clone().into_iter().collect(),
331 ctx: Some(self.ctx.to_protobuf()),
332 parallelism: Some(self.assigned_parallelism.into()),
333 node_label: "".to_owned(),
334 backfill_done: true,
335 max_parallelism: Some(self.max_parallelism as _),
336 }
337 }
338}
339
340pub type StreamJobActorsToCreate =
341 HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
342
343impl StreamJobFragments {
344 pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
346 Self::new(
347 table_id,
348 fragments,
349 &BTreeMap::new(),
350 StreamContext::default(),
351 TableParallelism::Adaptive,
352 VirtualNode::COUNT_FOR_TEST,
353 )
354 }
355
356 pub fn new(
359 stream_job_id: TableId,
360 fragments: BTreeMap<FragmentId, Fragment>,
361 actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
362 ctx: StreamContext,
363 table_parallelism: TableParallelism,
364 max_parallelism: usize,
365 ) -> Self {
366 let actor_status = actor_locations
367 .iter()
368 .map(|(&actor_id, alignment_id)| {
369 (
370 actor_id,
371 ActorStatus {
372 location: PbActorLocation::from_worker(alignment_id.worker_id()),
373 state: ActorState::Inactive as i32,
374 },
375 )
376 })
377 .collect();
378
379 Self {
380 stream_job_id,
381 state: State::Initial,
382 fragments,
383 actor_status,
384 ctx,
385 assigned_parallelism: table_parallelism,
386 max_parallelism,
387 }
388 }
389
390 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
391 self.fragments.keys().cloned()
392 }
393
394 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
395 self.fragments.values()
396 }
397
398 pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
399 self.fragments
400 .get(&fragment_id)
401 .map(|f| f.actors.as_slice())
402 .unwrap_or_default()
403 }
404
405 pub fn stream_job_id(&self) -> TableId {
407 self.stream_job_id
408 }
409
410 pub fn state(&self) -> State {
412 self.state
413 }
414
415 pub fn timezone(&self) -> Option<String> {
417 self.ctx.timezone.clone()
418 }
419
420 pub fn is_created(&self) -> bool {
422 self.state == State::Created
423 }
424
425 pub fn is_initial(&self) -> bool {
427 self.state == State::Initial
428 }
429
430 pub fn set_state(&mut self, state: State) {
432 self.state = state;
433 }
434
435 pub fn update_actors_state(&mut self, state: ActorState) {
437 for actor_status in self.actor_status.values_mut() {
438 actor_status.set_state(state);
439 }
440 }
441
442 pub fn actor_ids(&self) -> Vec<ActorId> {
444 self.fragments
445 .values()
446 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
447 .collect()
448 }
449
450 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
451 self.fragments
452 .values()
453 .flat_map(|fragment| {
454 fragment
455 .actors
456 .iter()
457 .map(|actor| (actor.actor_id, fragment.fragment_id))
458 })
459 .collect()
460 }
461
462 #[cfg(test)]
464 pub fn actors(&self) -> Vec<StreamActor> {
465 self.fragments
466 .values()
467 .flat_map(|fragment| fragment.actors.clone())
468 .collect()
469 }
470
471 pub fn filter_actor_ids(
473 &self,
474 check_type: impl Fn(FragmentTypeMask) -> bool + 'static,
475 ) -> impl Iterator<Item = ActorId> + '_ {
476 self.fragments
477 .values()
478 .filter(move |fragment| check_type(fragment.fragment_type_mask))
479 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
480 }
481
482 pub fn mview_actor_ids(&self) -> Vec<ActorId> {
484 Self::filter_actor_ids(self, |fragment_type_mask| {
485 fragment_type_mask.contains(FragmentTypeFlag::Mview)
486 })
487 .collect()
488 }
489
490 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
492 let mut actor_ids = vec![];
493 for fragment in self.fragments.values() {
494 if fragment
495 .fragment_type_mask
496 .contains(FragmentTypeFlag::CdcFilter)
497 {
498 return vec![];
501 }
502 if fragment.fragment_type_mask.contains_any([
503 FragmentTypeFlag::Values,
504 FragmentTypeFlag::StreamScan,
505 FragmentTypeFlag::SourceScan,
506 ]) {
507 actor_ids.extend(fragment.actors.iter().map(|actor| {
508 (
509 actor.actor_id,
510 BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
511 )
512 }));
513 }
514 }
515 actor_ids
516 }
517
518 pub fn root_fragment(&self) -> Option<Fragment> {
519 self.mview_fragment()
520 .or_else(|| self.sink_fragment())
521 .or_else(|| self.source_fragment())
522 }
523
524 pub fn mview_fragment(&self) -> Option<Fragment> {
526 self.fragments
527 .values()
528 .find(|fragment| {
529 fragment
530 .fragment_type_mask
531 .contains(FragmentTypeFlag::Mview)
532 })
533 .cloned()
534 }
535
536 pub fn source_fragment(&self) -> Option<Fragment> {
537 self.fragments
538 .values()
539 .find(|fragment| {
540 fragment
541 .fragment_type_mask
542 .contains(FragmentTypeFlag::Source)
543 })
544 .cloned()
545 }
546
547 pub fn sink_fragment(&self) -> Option<Fragment> {
548 self.fragments
549 .values()
550 .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
551 .cloned()
552 }
553
554 pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
555 Self::filter_actor_ids(self, |mask| {
556 mask.contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
557 })
558 .collect()
559 }
560
561 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
564 let mut source_fragments = HashMap::new();
565
566 for fragment in self.fragments() {
567 {
568 if let Some(source_id) = fragment.nodes.find_stream_source() {
569 source_fragments
570 .entry(source_id as SourceId)
571 .or_insert(BTreeSet::new())
572 .insert(fragment.fragment_id as FragmentId);
573 }
574 }
575 }
576 source_fragments
577 }
578
579 pub fn source_backfill_fragments(
584 &self,
585 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
586 let mut source_backfill_fragments = HashMap::new();
587
588 for fragment in self.fragments() {
589 {
590 if let Some((source_id, upstream_source_fragment_id)) =
591 fragment.nodes.find_source_backfill()
592 {
593 source_backfill_fragments
594 .entry(source_id as SourceId)
595 .or_insert(BTreeSet::new())
596 .insert((fragment.fragment_id, upstream_source_fragment_id));
597 }
598 }
599 }
600 source_backfill_fragments
601 }
602
603 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
606 let mut union_fragment_id = None;
607 for (fragment_id, fragment) in &self.fragments {
608 {
609 {
610 visit_stream_node_body(&fragment.nodes, |body| {
611 if let NodeBody::Union(_) = body {
612 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
613 assert_eq!(*union_fragment_id, *fragment_id);
615 } else {
616 union_fragment_id = Some(*fragment_id);
617 }
618 }
619 })
620 }
621 }
622 }
623
624 let union_fragment_id =
625 union_fragment_id.expect("fragment of placeholder merger not found");
626
627 (self
628 .fragments
629 .get_mut(&union_fragment_id)
630 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
631 }
632
633 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
635 let table_id = match stream_node.node_body.as_ref() {
636 Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
637 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
638 _ => None,
639 };
640 if let Some(table_id) = table_id {
641 table_ids.entry(table_id).or_default().add_assign(1);
642 }
643
644 for child in &stream_node.input {
645 Self::resolve_dependent_table(child, table_ids);
646 }
647 }
648
649 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
651 let mut table_ids = HashMap::new();
652 self.fragments.values().for_each(|fragment| {
653 Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
654 });
655
656 table_ids
657 }
658
659 pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
661 let mut map = BTreeMap::default();
662 for (&actor_id, actor_status) in &self.actor_status {
663 let node_id = actor_status.worker_id() as WorkerId;
664 map.entry(node_id)
665 .or_insert_with(Vec::new)
666 .push((actor_id, actor_status.state()));
667 }
668 map
669 }
670
671 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
673 let mut map = BTreeMap::default();
674 for (&actor_id, actor_status) in &self.actor_status {
675 let node_id = actor_status.worker_id() as WorkerId;
676 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
677 }
678 map
679 }
680
681 pub fn active_actors(&self) -> Vec<StreamActor> {
683 let mut actors = vec![];
684 for fragment in self.fragments.values() {
685 for actor in &fragment.actors {
686 if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
687 continue;
688 }
689 actors.push(actor.clone());
690 }
691 }
692 actors
693 }
694
695 pub fn actors_to_create(
696 &self,
697 ) -> impl Iterator<
698 Item = (
699 FragmentId,
700 &StreamNode,
701 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
702 ),
703 > + '_ {
704 self.fragments.values().map(move |fragment| {
705 (
706 fragment.fragment_id,
707 &fragment.nodes,
708 fragment.actors.iter().map(move |actor| {
709 let worker_id = self
710 .actor_status
711 .get(&actor.actor_id)
712 .expect("should exist")
713 .worker_id() as WorkerId;
714 (actor, worker_id)
715 }),
716 )
717 })
718 }
719
720 pub fn mv_table_id(&self) -> Option<u32> {
721 if self
722 .fragments
723 .values()
724 .flat_map(|f| f.state_table_ids.iter())
725 .any(|table_id| *table_id == self.stream_job_id.table_id)
726 {
727 Some(self.stream_job_id.table_id)
728 } else {
729 None
730 }
731 }
732
733 pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<u32, Table> {
734 let mut tables = BTreeMap::new();
735 for fragment in fragments {
736 stream_graph_visitor::visit_stream_node_tables_inner(
737 &mut fragment.nodes.clone(),
738 false,
739 true,
740 |table, _| {
741 let table_id = table.id;
742 tables
743 .try_insert(table_id, table.clone())
744 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
745 },
746 );
747 }
748 tables
749 }
750
751 pub fn internal_table_ids(&self) -> Vec<u32> {
753 self.fragments
754 .values()
755 .flat_map(|f| f.state_table_ids.clone())
756 .filter(|&t| t != self.stream_job_id.table_id)
757 .collect_vec()
758 }
759
760 pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
762 self.fragments
763 .values()
764 .flat_map(|f| f.state_table_ids.clone())
765 }
766
767 pub fn fill_expr_context(mut self) -> Self {
769 self.fragments.values_mut().for_each(|fragment| {
770 fragment.actors.iter_mut().for_each(|actor| {
771 if actor.expr_context.is_none() {
772 actor.expr_context = Some(self.ctx.to_expr_context());
773 }
774 });
775 });
776 self
777 }
778}
779
780#[derive(Debug, Clone, Copy, PartialEq, Eq)]
781pub enum BackfillUpstreamType {
782 MView,
783 Values,
784 Source,
785}
786
787impl BackfillUpstreamType {
788 pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
789 let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
790 let is_values = mask.contains(FragmentTypeFlag::Values);
791 let is_source = mask.contains(FragmentTypeFlag::SourceScan);
792
793 debug_assert!(
796 is_mview as u8 + is_values as u8 + is_source as u8 == 1,
797 "a backfill fragment should either be mview, value or source, found {:?}",
798 mask
799 );
800
801 if is_mview {
802 BackfillUpstreamType::MView
803 } else if is_values {
804 BackfillUpstreamType::Values
805 } else if is_source {
806 BackfillUpstreamType::Source
807 } else {
808 unreachable!("invalid fragment type mask: {:?}", mask);
809 }
810 }
811}