1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
22use risingwave_common::hash::{
23 ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
24};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
27use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
28use risingwave_pb::catalog::Table;
29use risingwave_pb::common::{ActorInfo, PbActorLocation};
30use risingwave_pb::id::SubscriberId;
31use risingwave_pb::meta::table_fragments::fragment::{
32 FragmentDistributionType, PbFragmentDistributionType,
33};
34use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
35use risingwave_pb::meta::table_parallelism::{
36 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
37 PbParallelism,
38};
39use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
40use risingwave_pb::plan_common::PbExprContext;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{
43 DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
44 PbStreamContext, StreamNode,
45};
46
47use super::{ActorId, FragmentId};
48
49#[derive(Debug, Copy, Clone, Eq, PartialEq)]
51pub enum TableParallelism {
52 Adaptive,
54 Fixed(usize),
57 Custom,
64}
65
66impl From<PbTableParallelism> for TableParallelism {
67 fn from(value: PbTableParallelism) -> Self {
68 use Parallelism::*;
69 match &value.parallelism {
70 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
71 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
72 Some(Custom(_)) => Self::Custom,
73 _ => unreachable!(),
74 }
75 }
76}
77
78impl From<TableParallelism> for PbTableParallelism {
79 fn from(value: TableParallelism) -> Self {
80 use TableParallelism::*;
81
82 let parallelism = match value {
83 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
84 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
85 parallelism: n as u32,
86 }),
87 Custom => PbParallelism::Custom(PbCustomParallelism {}),
88 };
89
90 Self {
91 parallelism: Some(parallelism),
92 }
93 }
94}
95
96impl From<StreamingParallelism> for TableParallelism {
97 fn from(value: StreamingParallelism) -> Self {
98 match value {
99 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
100 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
101 StreamingParallelism::Custom => TableParallelism::Custom,
102 }
103 }
104}
105
106impl From<TableParallelism> for StreamingParallelism {
107 fn from(value: TableParallelism) -> Self {
108 match value {
109 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
110 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
111 TableParallelism::Custom => StreamingParallelism::Custom,
112 }
113 }
114}
115
116pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
117pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
118pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
119pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
120
121pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
122pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
124pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
127
128#[derive(Debug, Clone)]
129pub struct DownstreamFragmentRelation {
130 pub downstream_fragment_id: FragmentId,
131 pub dispatcher_type: DispatcherType,
132 pub dist_key_indices: Vec<u32>,
133 pub output_mapping: PbDispatchOutputMapping,
134}
135
136impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
137 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
138 Self {
139 downstream_fragment_id: fragment_id,
140 dispatcher_type: dispatch.get_type().unwrap().into(),
141 dist_key_indices: dispatch.dist_key_indices,
142 output_mapping: dispatch.output_mapping.unwrap(),
143 }
144 }
145}
146
147#[derive(Debug, Clone)]
148pub struct StreamJobFragmentsToCreate {
149 pub inner: StreamJobFragments,
150 pub downstreams: FragmentDownstreamRelation,
151}
152
153impl Deref for StreamJobFragmentsToCreate {
154 type Target = StreamJobFragments;
155
156 fn deref(&self) -> &Self::Target {
157 &self.inner
158 }
159}
160
161#[derive(Clone, Debug)]
162pub struct StreamActor {
163 pub actor_id: ActorId,
164 pub fragment_id: FragmentId,
165 pub vnode_bitmap: Option<Bitmap>,
166 pub mview_definition: String,
167 pub expr_context: Option<PbExprContext>,
168 pub config_override: Arc<str>,
170}
171
172impl StreamActor {
173 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
174 PbStreamActor {
175 actor_id: self.actor_id,
176 fragment_id: self.fragment_id,
177 dispatcher: dispatchers.collect(),
178 vnode_bitmap: self
179 .vnode_bitmap
180 .as_ref()
181 .map(|bitmap| bitmap.to_protobuf()),
182 mview_definition: self.mview_definition.clone(),
183 expr_context: self.expr_context.clone(),
184 config_override: self.config_override.to_string(),
185 }
186 }
187}
188
189#[derive(Clone, Debug, Default)]
190pub struct Fragment {
191 pub fragment_id: FragmentId,
192 pub fragment_type_mask: FragmentTypeMask,
193 pub distribution_type: PbFragmentDistributionType,
194 pub actors: Vec<StreamActor>,
195 pub state_table_ids: Vec<TableId>,
196 pub maybe_vnode_count: Option<u32>,
197 pub nodes: StreamNode,
198}
199
200impl Fragment {
201 pub fn to_protobuf(
202 &self,
203 upstream_fragments: impl Iterator<Item = FragmentId>,
204 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
205 ) -> PbFragment {
206 PbFragment {
207 fragment_id: self.fragment_id,
208 fragment_type_mask: self.fragment_type_mask.into(),
209 distribution_type: self.distribution_type as _,
210 actors: self
211 .actors
212 .iter()
213 .map(|actor| {
214 actor.to_protobuf(
215 dispatchers
216 .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
217 .into_iter()
218 .flatten()
219 .cloned(),
220 )
221 })
222 .collect(),
223 state_table_ids: self.state_table_ids.clone(),
224 upstream_fragment_ids: upstream_fragments.collect(),
225 maybe_vnode_count: self.maybe_vnode_count,
226 nodes: Some(self.nodes.clone()),
227 }
228 }
229}
230
231impl VnodeCountCompat for Fragment {
232 fn vnode_count_inner(&self) -> VnodeCount {
233 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
234 }
235}
236
237impl IsSingleton for Fragment {
238 fn is_singleton(&self) -> bool {
239 matches!(self.distribution_type, FragmentDistributionType::Single)
240 }
241}
242
243#[derive(Debug, Clone)]
249pub struct StreamJobFragments {
250 pub stream_job_id: JobId,
252
253 pub state: State,
255
256 pub fragments: BTreeMap<FragmentId, Fragment>,
258
259 pub actor_status: BTreeMap<ActorId, ActorStatus>,
261
262 pub ctx: StreamContext,
264
265 pub assigned_parallelism: TableParallelism,
267
268 pub max_parallelism: usize,
279}
280
281#[derive(Debug, Clone, Default)]
282pub struct StreamContext {
283 pub timezone: Option<String>,
285
286 pub config_override: Arc<str>,
288}
289
290impl StreamContext {
291 pub fn to_protobuf(&self) -> PbStreamContext {
292 PbStreamContext {
293 timezone: self.timezone.clone().unwrap_or("".into()),
294 config_override: self.config_override.to_string(),
295 }
296 }
297
298 pub fn to_expr_context(&self) -> PbExprContext {
299 PbExprContext {
300 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
302 strict_mode: false,
303 }
304 }
305
306 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
307 Self {
308 timezone: if prost.get_timezone().is_empty() {
309 None
310 } else {
311 Some(prost.get_timezone().clone())
312 },
313 config_override: prost.get_config_override().as_str().into(),
314 }
315 }
316}
317
318#[easy_ext::ext(StreamingJobModelContextExt)]
319impl risingwave_meta_model::streaming_job::Model {
320 pub fn stream_context(&self) -> StreamContext {
321 StreamContext {
322 timezone: self.timezone.clone(),
323 config_override: self.config_override.clone().unwrap_or_default().into(),
324 }
325 }
326}
327
328impl StreamJobFragments {
329 pub fn to_protobuf(
330 &self,
331 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
332 fragment_dispatchers: &FragmentActorDispatchers,
333 ) -> PbTableFragments {
334 PbTableFragments {
335 table_id: self.stream_job_id,
336 state: self.state as _,
337 fragments: self
338 .fragments
339 .iter()
340 .map(|(id, fragment)| {
341 (
342 *id,
343 fragment.to_protobuf(
344 fragment_upstreams.get(id).into_iter().flatten().cloned(),
345 fragment_dispatchers.get(&(*id as _)),
346 ),
347 )
348 })
349 .collect(),
350 actor_status: self
351 .actor_status
352 .iter()
353 .map(|(actor_id, status)| (*actor_id, *status))
354 .collect(),
355 ctx: Some(self.ctx.to_protobuf()),
356 parallelism: Some(self.assigned_parallelism.into()),
357 node_label: "".to_owned(),
358 backfill_done: true,
359 max_parallelism: Some(self.max_parallelism as _),
360 }
361 }
362}
363
364pub type StreamJobActorsToCreate = HashMap<
365 WorkerId,
366 HashMap<
367 FragmentId,
368 (
369 StreamNode,
370 Vec<StreamActorWithUpDownstreams>,
371 HashSet<SubscriberId>,
372 ),
373 >,
374>;
375
376impl StreamJobFragments {
377 pub fn for_test(job_id: JobId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
379 Self::new(
380 job_id,
381 fragments,
382 &BTreeMap::new(),
383 StreamContext::default(),
384 TableParallelism::Adaptive,
385 VirtualNode::COUNT_FOR_TEST,
386 )
387 }
388
389 pub fn new(
392 stream_job_id: JobId,
393 fragments: BTreeMap<FragmentId, Fragment>,
394 actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
395 ctx: StreamContext,
396 table_parallelism: TableParallelism,
397 max_parallelism: usize,
398 ) -> Self {
399 let actor_status = actor_locations
400 .iter()
401 .map(|(&actor_id, alignment_id)| {
402 (
403 actor_id,
404 ActorStatus {
405 location: PbActorLocation::from_worker(alignment_id.worker_id()),
406 },
407 )
408 })
409 .collect();
410
411 Self {
412 stream_job_id,
413 state: State::Initial,
414 fragments,
415 actor_status,
416 ctx,
417 assigned_parallelism: table_parallelism,
418 max_parallelism,
419 }
420 }
421
422 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
423 self.fragments.keys().cloned()
424 }
425
426 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
427 self.fragments.values()
428 }
429
430 pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
431 self.fragments
432 .get(&fragment_id)
433 .map(|f| f.actors.as_slice())
434 .unwrap_or_default()
435 }
436
437 pub fn stream_job_id(&self) -> JobId {
439 self.stream_job_id
440 }
441
442 pub fn timezone(&self) -> Option<String> {
444 self.ctx.timezone.clone()
445 }
446
447 pub fn is_created(&self) -> bool {
449 self.state == State::Created
450 }
451
452 pub fn actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
454 self.fragments
455 .values()
456 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
457 }
458
459 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
460 self.fragments
461 .values()
462 .flat_map(|fragment| {
463 fragment
464 .actors
465 .iter()
466 .map(|actor| (actor.actor_id, fragment.fragment_id))
467 })
468 .collect()
469 }
470
471 #[cfg(test)]
473 pub fn actors(&self) -> Vec<StreamActor> {
474 self.fragments
475 .values()
476 .flat_map(|fragment| fragment.actors.clone())
477 .collect()
478 }
479
480 #[cfg(test)]
482 pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
483 self.fragments
484 .values()
485 .filter(move |fragment| {
486 fragment
487 .fragment_type_mask
488 .contains(FragmentTypeFlag::Mview)
489 })
490 .map(|fragment| fragment.fragment_id)
491 .collect()
492 }
493
494 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
495 Self::tracking_progress_actor_ids_impl(self.fragments.values().map(|fragment| {
496 (
497 fragment.fragment_type_mask,
498 fragment.actors.iter().map(|actor| actor.actor_id),
499 )
500 }))
501 }
502
503 pub fn tracking_progress_actor_ids_impl(
505 fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
506 ) -> Vec<(ActorId, BackfillUpstreamType)> {
507 let mut actor_ids = vec![];
508 for (fragment_type_mask, actors) in fragments {
509 if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
510 return vec![];
513 }
514 if fragment_type_mask.contains_any([
515 FragmentTypeFlag::Values,
516 FragmentTypeFlag::StreamScan,
517 FragmentTypeFlag::SourceScan,
518 FragmentTypeFlag::LocalityProvider,
519 ]) {
520 actor_ids.extend(actors.map(|actor_id| {
521 (
522 actor_id,
523 BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
524 )
525 }));
526 }
527 }
528 actor_ids
529 }
530
531 pub fn root_fragment(&self) -> Option<Fragment> {
532 self.mview_fragment()
533 .or_else(|| self.sink_fragment())
534 .or_else(|| self.source_fragment())
535 }
536
537 pub fn mview_fragment(&self) -> Option<Fragment> {
539 self.fragments
540 .values()
541 .find(|fragment| {
542 fragment
543 .fragment_type_mask
544 .contains(FragmentTypeFlag::Mview)
545 })
546 .cloned()
547 }
548
549 pub fn source_fragment(&self) -> Option<Fragment> {
550 self.fragments
551 .values()
552 .find(|fragment| {
553 fragment
554 .fragment_type_mask
555 .contains(FragmentTypeFlag::Source)
556 })
557 .cloned()
558 }
559
560 pub fn sink_fragment(&self) -> Option<Fragment> {
561 self.fragments
562 .values()
563 .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
564 .cloned()
565 }
566
567 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
570 let mut source_fragments = HashMap::new();
571
572 for fragment in self.fragments() {
573 {
574 if let Some(source_id) = fragment.nodes.find_stream_source() {
575 source_fragments
576 .entry(source_id)
577 .or_insert(BTreeSet::new())
578 .insert(fragment.fragment_id as FragmentId);
579 }
580 }
581 }
582 source_fragments
583 }
584
585 pub fn source_backfill_fragments(
586 &self,
587 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
588 Self::source_backfill_fragments_impl(
589 self.fragments
590 .iter()
591 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
592 )
593 }
594
595 pub fn source_backfill_fragments_impl(
600 fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
601 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
602 let mut source_backfill_fragments = HashMap::new();
603
604 for (fragment_id, fragment_node) in fragments {
605 {
606 if let Some((source_id, upstream_source_fragment_id)) =
607 fragment_node.find_source_backfill()
608 {
609 source_backfill_fragments
610 .entry(source_id)
611 .or_insert(BTreeSet::new())
612 .insert((fragment_id, upstream_source_fragment_id));
613 }
614 }
615 }
616 source_backfill_fragments
617 }
618
619 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
622 let mut union_fragment_id = None;
623 for (fragment_id, fragment) in &self.fragments {
624 {
625 {
626 visit_stream_node_body(&fragment.nodes, |body| {
627 if let NodeBody::Union(_) = body {
628 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
629 assert_eq!(*union_fragment_id, *fragment_id);
631 } else {
632 union_fragment_id = Some(*fragment_id);
633 }
634 }
635 })
636 }
637 }
638 }
639
640 let union_fragment_id =
641 union_fragment_id.expect("fragment of placeholder merger not found");
642
643 (self
644 .fragments
645 .get_mut(&union_fragment_id)
646 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
647 }
648
649 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
651 let table_id = match stream_node.node_body.as_ref() {
652 Some(NodeBody::StreamScan(stream_scan)) => Some(stream_scan.table_id),
653 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(stream_scan.table_id),
654 _ => None,
655 };
656 if let Some(table_id) = table_id {
657 table_ids.entry(table_id).or_default().add_assign(1);
658 }
659
660 for child in &stream_node.input {
661 Self::resolve_dependent_table(child, table_ids);
662 }
663 }
664
665 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
666 Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
667 }
668
669 pub fn upstream_table_counts_impl(
671 fragment_nodes: impl Iterator<Item = &StreamNode>,
672 ) -> HashMap<TableId, usize> {
673 let mut table_ids = HashMap::new();
674 fragment_nodes.for_each(|node| {
675 Self::resolve_dependent_table(node, &mut table_ids);
676 });
677
678 table_ids
679 }
680
681 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
683 let mut map = BTreeMap::default();
684 for (&actor_id, actor_status) in &self.actor_status {
685 let node_id = actor_status.worker_id();
686 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
687 }
688 map
689 }
690
691 pub fn actors_to_create(
692 &self,
693 ) -> impl Iterator<
694 Item = (
695 FragmentId,
696 &StreamNode,
697 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
698 ),
699 > + '_ {
700 self.fragments.values().map(move |fragment| {
701 (
702 fragment.fragment_id,
703 &fragment.nodes,
704 fragment.actors.iter().map(move |actor| {
705 let worker_id: WorkerId = self
706 .actor_status
707 .get(&actor.actor_id)
708 .expect("should exist")
709 .worker_id();
710 (actor, worker_id)
711 }),
712 )
713 })
714 }
715
716 pub fn mv_table_id(&self) -> Option<TableId> {
717 self.fragments
718 .values()
719 .flat_map(|f| f.state_table_ids.iter().copied())
720 .find(|table_id| self.stream_job_id.is_mv_table_id(*table_id))
721 }
722
723 pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<TableId, Table> {
724 let mut tables = BTreeMap::new();
725 for fragment in fragments {
726 stream_graph_visitor::visit_stream_node_tables_inner(
727 &mut fragment.nodes.clone(),
728 false,
729 true,
730 |table, _| {
731 let table_id = table.id;
732 tables
733 .try_insert(table_id, table.clone())
734 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
735 },
736 );
737 }
738 tables
739 }
740
741 pub fn internal_table_ids(&self) -> Vec<TableId> {
743 self.fragments
744 .values()
745 .flat_map(|f| f.state_table_ids.iter().copied())
746 .filter(|&t| !self.stream_job_id.is_mv_table_id(t))
747 .collect_vec()
748 }
749
750 pub fn all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
752 self.fragments
753 .values()
754 .flat_map(|f| f.state_table_ids.clone())
755 }
756
757 pub fn fill_expr_context(mut self) -> Self {
759 self.fragments.values_mut().for_each(|fragment| {
760 fragment.actors.iter_mut().for_each(|actor| {
761 if actor.expr_context.is_none() {
762 actor.expr_context = Some(self.ctx.to_expr_context());
763 }
764 });
765 });
766 self
767 }
768}
769
770#[derive(Debug, Clone, Copy, PartialEq, Eq)]
771pub enum BackfillUpstreamType {
772 MView,
773 Values,
774 Source,
775 LocalityProvider,
776}
777
778impl BackfillUpstreamType {
779 pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
780 let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
781 let is_values = mask.contains(FragmentTypeFlag::Values);
782 let is_source = mask.contains(FragmentTypeFlag::SourceScan);
783 let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
784
785 debug_assert!(
788 is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
789 "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
790 mask
791 );
792
793 if is_mview {
794 BackfillUpstreamType::MView
795 } else if is_values {
796 BackfillUpstreamType::Values
797 } else if is_source {
798 BackfillUpstreamType::Source
799 } else if is_locality_provider {
800 BackfillUpstreamType::LocalityProvider
801 } else {
802 unreachable!("invalid fragment type mask: {:?}", mask);
803 }
804 }
805}