1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
22use risingwave_common::hash::{
23 ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
24};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
27use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
28use risingwave_pb::catalog::Table;
29use risingwave_pb::common::{ActorInfo, PbActorLocation};
30use risingwave_pb::meta::table_fragments::fragment::{
31 FragmentDistributionType, PbFragmentDistributionType,
32};
33use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
34use risingwave_pb::meta::table_parallelism::{
35 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
36 PbParallelism,
37};
38use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
39use risingwave_pb::plan_common::PbExprContext;
40use risingwave_pb::stream_plan::stream_node::NodeBody;
41use risingwave_pb::stream_plan::{
42 DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
43 PbStreamContext, StreamNode,
44};
45
46use super::{ActorId, FragmentId};
47
48#[derive(Debug, Copy, Clone, Eq, PartialEq)]
50pub enum TableParallelism {
51 Adaptive,
53 Fixed(usize),
56 Custom,
63}
64
65impl From<PbTableParallelism> for TableParallelism {
66 fn from(value: PbTableParallelism) -> Self {
67 use Parallelism::*;
68 match &value.parallelism {
69 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
70 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
71 Some(Custom(_)) => Self::Custom,
72 _ => unreachable!(),
73 }
74 }
75}
76
77impl From<TableParallelism> for PbTableParallelism {
78 fn from(value: TableParallelism) -> Self {
79 use TableParallelism::*;
80
81 let parallelism = match value {
82 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
83 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
84 parallelism: n as u32,
85 }),
86 Custom => PbParallelism::Custom(PbCustomParallelism {}),
87 };
88
89 Self {
90 parallelism: Some(parallelism),
91 }
92 }
93}
94
95impl From<StreamingParallelism> for TableParallelism {
96 fn from(value: StreamingParallelism) -> Self {
97 match value {
98 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
99 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
100 StreamingParallelism::Custom => TableParallelism::Custom,
101 }
102 }
103}
104
105impl From<TableParallelism> for StreamingParallelism {
106 fn from(value: TableParallelism) -> Self {
107 match value {
108 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
109 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
110 TableParallelism::Custom => StreamingParallelism::Custom,
111 }
112 }
113}
114
115pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
116pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
117pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
118pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
119
120pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
121pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
123pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
126
127#[derive(Debug, Clone)]
128pub struct DownstreamFragmentRelation {
129 pub downstream_fragment_id: FragmentId,
130 pub dispatcher_type: DispatcherType,
131 pub dist_key_indices: Vec<u32>,
132 pub output_mapping: PbDispatchOutputMapping,
133}
134
135impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
136 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
137 Self {
138 downstream_fragment_id: fragment_id,
139 dispatcher_type: dispatch.get_type().unwrap().into(),
140 dist_key_indices: dispatch.dist_key_indices,
141 output_mapping: dispatch.output_mapping.unwrap(),
142 }
143 }
144}
145
146#[derive(Debug, Clone)]
147pub struct StreamJobFragmentsToCreate {
148 pub inner: StreamJobFragments,
149 pub downstreams: FragmentDownstreamRelation,
150}
151
152impl Deref for StreamJobFragmentsToCreate {
153 type Target = StreamJobFragments;
154
155 fn deref(&self) -> &Self::Target {
156 &self.inner
157 }
158}
159
160#[derive(Clone, Debug)]
161pub struct StreamActor {
162 pub actor_id: ActorId,
163 pub fragment_id: FragmentId,
164 pub vnode_bitmap: Option<Bitmap>,
165 pub mview_definition: String,
166 pub expr_context: Option<PbExprContext>,
167 pub config_override: Arc<str>,
169}
170
171impl StreamActor {
172 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
173 PbStreamActor {
174 actor_id: self.actor_id,
175 fragment_id: self.fragment_id,
176 dispatcher: dispatchers.collect(),
177 vnode_bitmap: self
178 .vnode_bitmap
179 .as_ref()
180 .map(|bitmap| bitmap.to_protobuf()),
181 mview_definition: self.mview_definition.clone(),
182 expr_context: self.expr_context.clone(),
183 config_override: self.config_override.to_string(),
184 }
185 }
186}
187
188#[derive(Clone, Debug, Default)]
189pub struct Fragment {
190 pub fragment_id: FragmentId,
191 pub fragment_type_mask: FragmentTypeMask,
192 pub distribution_type: PbFragmentDistributionType,
193 pub actors: Vec<StreamActor>,
194 pub state_table_ids: Vec<TableId>,
195 pub maybe_vnode_count: Option<u32>,
196 pub nodes: StreamNode,
197}
198
199impl Fragment {
200 pub fn to_protobuf(
201 &self,
202 upstream_fragments: impl Iterator<Item = FragmentId>,
203 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
204 ) -> PbFragment {
205 PbFragment {
206 fragment_id: self.fragment_id,
207 fragment_type_mask: self.fragment_type_mask.into(),
208 distribution_type: self.distribution_type as _,
209 actors: self
210 .actors
211 .iter()
212 .map(|actor| {
213 actor.to_protobuf(
214 dispatchers
215 .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
216 .into_iter()
217 .flatten()
218 .cloned(),
219 )
220 })
221 .collect(),
222 state_table_ids: self.state_table_ids.clone(),
223 upstream_fragment_ids: upstream_fragments.collect(),
224 maybe_vnode_count: self.maybe_vnode_count,
225 nodes: Some(self.nodes.clone()),
226 }
227 }
228}
229
230impl VnodeCountCompat for Fragment {
231 fn vnode_count_inner(&self) -> VnodeCount {
232 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
233 }
234}
235
236impl IsSingleton for Fragment {
237 fn is_singleton(&self) -> bool {
238 matches!(self.distribution_type, FragmentDistributionType::Single)
239 }
240}
241
242#[derive(Debug, Clone)]
248pub struct StreamJobFragments {
249 pub stream_job_id: JobId,
251
252 pub state: State,
254
255 pub fragments: BTreeMap<FragmentId, Fragment>,
257
258 pub actor_status: BTreeMap<ActorId, ActorStatus>,
260
261 pub ctx: StreamContext,
263
264 pub assigned_parallelism: TableParallelism,
266
267 pub max_parallelism: usize,
278}
279
280#[derive(Debug, Clone, Default)]
281pub struct StreamContext {
282 pub timezone: Option<String>,
284
285 pub config_override: Arc<str>,
287}
288
289impl StreamContext {
290 pub fn to_protobuf(&self) -> PbStreamContext {
291 PbStreamContext {
292 timezone: self.timezone.clone().unwrap_or("".into()),
293 config_override: self.config_override.to_string(),
294 }
295 }
296
297 pub fn to_expr_context(&self) -> PbExprContext {
298 PbExprContext {
299 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
301 strict_mode: false,
302 }
303 }
304
305 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
306 Self {
307 timezone: if prost.get_timezone().is_empty() {
308 None
309 } else {
310 Some(prost.get_timezone().clone())
311 },
312 config_override: prost.get_config_override().as_str().into(),
313 }
314 }
315}
316
317#[easy_ext::ext(StreamingJobModelContextExt)]
318impl risingwave_meta_model::streaming_job::Model {
319 pub fn stream_context(&self) -> StreamContext {
320 StreamContext {
321 timezone: self.timezone.clone(),
322 config_override: self.config_override.clone().unwrap_or_default().into(),
323 }
324 }
325}
326
327impl StreamJobFragments {
328 pub fn to_protobuf(
329 &self,
330 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
331 fragment_dispatchers: &FragmentActorDispatchers,
332 ) -> PbTableFragments {
333 PbTableFragments {
334 table_id: self.stream_job_id,
335 state: self.state as _,
336 fragments: self
337 .fragments
338 .iter()
339 .map(|(id, fragment)| {
340 (
341 *id,
342 fragment.to_protobuf(
343 fragment_upstreams.get(id).into_iter().flatten().cloned(),
344 fragment_dispatchers.get(&(*id as _)),
345 ),
346 )
347 })
348 .collect(),
349 actor_status: self
350 .actor_status
351 .iter()
352 .map(|(actor_id, status)| (*actor_id, *status))
353 .collect(),
354 ctx: Some(self.ctx.to_protobuf()),
355 parallelism: Some(self.assigned_parallelism.into()),
356 node_label: "".to_owned(),
357 backfill_done: true,
358 max_parallelism: Some(self.max_parallelism as _),
359 }
360 }
361}
362
363pub type StreamJobActorsToCreate = HashMap<
364 WorkerId,
365 HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>, HashSet<u32>)>,
366>;
367
368impl StreamJobFragments {
369 pub fn for_test(job_id: JobId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
371 Self::new(
372 job_id,
373 fragments,
374 &BTreeMap::new(),
375 StreamContext::default(),
376 TableParallelism::Adaptive,
377 VirtualNode::COUNT_FOR_TEST,
378 )
379 }
380
381 pub fn new(
384 stream_job_id: JobId,
385 fragments: BTreeMap<FragmentId, Fragment>,
386 actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
387 ctx: StreamContext,
388 table_parallelism: TableParallelism,
389 max_parallelism: usize,
390 ) -> Self {
391 let actor_status = actor_locations
392 .iter()
393 .map(|(&actor_id, alignment_id)| {
394 (
395 actor_id,
396 ActorStatus {
397 location: PbActorLocation::from_worker(alignment_id.worker_id()),
398 },
399 )
400 })
401 .collect();
402
403 Self {
404 stream_job_id,
405 state: State::Initial,
406 fragments,
407 actor_status,
408 ctx,
409 assigned_parallelism: table_parallelism,
410 max_parallelism,
411 }
412 }
413
414 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
415 self.fragments.keys().cloned()
416 }
417
418 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
419 self.fragments.values()
420 }
421
422 pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
423 self.fragments
424 .get(&fragment_id)
425 .map(|f| f.actors.as_slice())
426 .unwrap_or_default()
427 }
428
429 pub fn stream_job_id(&self) -> JobId {
431 self.stream_job_id
432 }
433
434 pub fn timezone(&self) -> Option<String> {
436 self.ctx.timezone.clone()
437 }
438
439 pub fn is_created(&self) -> bool {
441 self.state == State::Created
442 }
443
444 pub fn actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
446 self.fragments
447 .values()
448 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
449 }
450
451 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
452 self.fragments
453 .values()
454 .flat_map(|fragment| {
455 fragment
456 .actors
457 .iter()
458 .map(|actor| (actor.actor_id, fragment.fragment_id))
459 })
460 .collect()
461 }
462
463 #[cfg(test)]
465 pub fn actors(&self) -> Vec<StreamActor> {
466 self.fragments
467 .values()
468 .flat_map(|fragment| fragment.actors.clone())
469 .collect()
470 }
471
472 #[cfg(test)]
474 pub fn mview_fragment_ids(&self) -> Vec<FragmentId> {
475 self.fragments
476 .values()
477 .filter(move |fragment| {
478 fragment
479 .fragment_type_mask
480 .contains(FragmentTypeFlag::Mview)
481 })
482 .map(|fragment| fragment.fragment_id)
483 .collect()
484 }
485
486 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
487 Self::tracking_progress_actor_ids_impl(self.fragments.values().map(|fragment| {
488 (
489 fragment.fragment_type_mask,
490 fragment.actors.iter().map(|actor| actor.actor_id),
491 )
492 }))
493 }
494
495 pub fn tracking_progress_actor_ids_impl(
497 fragments: impl IntoIterator<Item = (FragmentTypeMask, impl Iterator<Item = ActorId>)>,
498 ) -> Vec<(ActorId, BackfillUpstreamType)> {
499 let mut actor_ids = vec![];
500 for (fragment_type_mask, actors) in fragments {
501 if fragment_type_mask.contains(FragmentTypeFlag::CdcFilter) {
502 return vec![];
505 }
506 if fragment_type_mask.contains_any([
507 FragmentTypeFlag::Values,
508 FragmentTypeFlag::StreamScan,
509 FragmentTypeFlag::SourceScan,
510 FragmentTypeFlag::LocalityProvider,
511 ]) {
512 actor_ids.extend(actors.map(|actor_id| {
513 (
514 actor_id,
515 BackfillUpstreamType::from_fragment_type_mask(fragment_type_mask),
516 )
517 }));
518 }
519 }
520 actor_ids
521 }
522
523 pub fn root_fragment(&self) -> Option<Fragment> {
524 self.mview_fragment()
525 .or_else(|| self.sink_fragment())
526 .or_else(|| self.source_fragment())
527 }
528
529 pub fn mview_fragment(&self) -> Option<Fragment> {
531 self.fragments
532 .values()
533 .find(|fragment| {
534 fragment
535 .fragment_type_mask
536 .contains(FragmentTypeFlag::Mview)
537 })
538 .cloned()
539 }
540
541 pub fn source_fragment(&self) -> Option<Fragment> {
542 self.fragments
543 .values()
544 .find(|fragment| {
545 fragment
546 .fragment_type_mask
547 .contains(FragmentTypeFlag::Source)
548 })
549 .cloned()
550 }
551
552 pub fn sink_fragment(&self) -> Option<Fragment> {
553 self.fragments
554 .values()
555 .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
556 .cloned()
557 }
558
559 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
562 let mut source_fragments = HashMap::new();
563
564 for fragment in self.fragments() {
565 {
566 if let Some(source_id) = fragment.nodes.find_stream_source() {
567 source_fragments
568 .entry(source_id)
569 .or_insert(BTreeSet::new())
570 .insert(fragment.fragment_id as FragmentId);
571 }
572 }
573 }
574 source_fragments
575 }
576
577 pub fn source_backfill_fragments(
578 &self,
579 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
580 Self::source_backfill_fragments_impl(
581 self.fragments
582 .iter()
583 .map(|(fragment_id, fragment)| (*fragment_id, &fragment.nodes)),
584 )
585 }
586
587 pub fn source_backfill_fragments_impl(
592 fragments: impl Iterator<Item = (FragmentId, &StreamNode)>,
593 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
594 let mut source_backfill_fragments = HashMap::new();
595
596 for (fragment_id, fragment_node) in fragments {
597 {
598 if let Some((source_id, upstream_source_fragment_id)) =
599 fragment_node.find_source_backfill()
600 {
601 source_backfill_fragments
602 .entry(source_id)
603 .or_insert(BTreeSet::new())
604 .insert((fragment_id, upstream_source_fragment_id));
605 }
606 }
607 }
608 source_backfill_fragments
609 }
610
611 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
614 let mut union_fragment_id = None;
615 for (fragment_id, fragment) in &self.fragments {
616 {
617 {
618 visit_stream_node_body(&fragment.nodes, |body| {
619 if let NodeBody::Union(_) = body {
620 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
621 assert_eq!(*union_fragment_id, *fragment_id);
623 } else {
624 union_fragment_id = Some(*fragment_id);
625 }
626 }
627 })
628 }
629 }
630 }
631
632 let union_fragment_id =
633 union_fragment_id.expect("fragment of placeholder merger not found");
634
635 (self
636 .fragments
637 .get_mut(&union_fragment_id)
638 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
639 }
640
641 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
643 let table_id = match stream_node.node_body.as_ref() {
644 Some(NodeBody::StreamScan(stream_scan)) => Some(stream_scan.table_id),
645 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(stream_scan.table_id),
646 _ => None,
647 };
648 if let Some(table_id) = table_id {
649 table_ids.entry(table_id).or_default().add_assign(1);
650 }
651
652 for child in &stream_node.input {
653 Self::resolve_dependent_table(child, table_ids);
654 }
655 }
656
657 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
658 Self::upstream_table_counts_impl(self.fragments.values().map(|fragment| &fragment.nodes))
659 }
660
661 pub fn upstream_table_counts_impl(
663 fragment_nodes: impl Iterator<Item = &StreamNode>,
664 ) -> HashMap<TableId, usize> {
665 let mut table_ids = HashMap::new();
666 fragment_nodes.for_each(|node| {
667 Self::resolve_dependent_table(node, &mut table_ids);
668 });
669
670 table_ids
671 }
672
673 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
675 let mut map = BTreeMap::default();
676 for (&actor_id, actor_status) in &self.actor_status {
677 let node_id = actor_status.worker_id();
678 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
679 }
680 map
681 }
682
683 pub fn actors_to_create(
684 &self,
685 ) -> impl Iterator<
686 Item = (
687 FragmentId,
688 &StreamNode,
689 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
690 ),
691 > + '_ {
692 self.fragments.values().map(move |fragment| {
693 (
694 fragment.fragment_id,
695 &fragment.nodes,
696 fragment.actors.iter().map(move |actor| {
697 let worker_id: WorkerId = self
698 .actor_status
699 .get(&actor.actor_id)
700 .expect("should exist")
701 .worker_id();
702 (actor, worker_id)
703 }),
704 )
705 })
706 }
707
708 pub fn mv_table_id(&self) -> Option<TableId> {
709 self.fragments
710 .values()
711 .flat_map(|f| f.state_table_ids.iter().copied())
712 .find(|table_id| self.stream_job_id.is_mv_table_id(*table_id))
713 }
714
715 pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<TableId, Table> {
716 let mut tables = BTreeMap::new();
717 for fragment in fragments {
718 stream_graph_visitor::visit_stream_node_tables_inner(
719 &mut fragment.nodes.clone(),
720 false,
721 true,
722 |table, _| {
723 let table_id = table.id;
724 tables
725 .try_insert(table_id, table.clone())
726 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
727 },
728 );
729 }
730 tables
731 }
732
733 pub fn internal_table_ids(&self) -> Vec<TableId> {
735 self.fragments
736 .values()
737 .flat_map(|f| f.state_table_ids.iter().copied())
738 .filter(|&t| !self.stream_job_id.is_mv_table_id(t))
739 .collect_vec()
740 }
741
742 pub fn all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
744 self.fragments
745 .values()
746 .flat_map(|f| f.state_table_ids.clone())
747 }
748
749 pub fn fill_expr_context(mut self) -> Self {
751 self.fragments.values_mut().for_each(|fragment| {
752 fragment.actors.iter_mut().for_each(|actor| {
753 if actor.expr_context.is_none() {
754 actor.expr_context = Some(self.ctx.to_expr_context());
755 }
756 });
757 });
758 self
759 }
760}
761
762#[derive(Debug, Clone, Copy, PartialEq, Eq)]
763pub enum BackfillUpstreamType {
764 MView,
765 Values,
766 Source,
767 LocalityProvider,
768}
769
770impl BackfillUpstreamType {
771 pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
772 let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
773 let is_values = mask.contains(FragmentTypeFlag::Values);
774 let is_source = mask.contains(FragmentTypeFlag::SourceScan);
775 let is_locality_provider = mask.contains(FragmentTypeFlag::LocalityProvider);
776
777 debug_assert!(
780 is_mview as u8 + is_values as u8 + is_source as u8 + is_locality_provider as u8 == 1,
781 "a backfill fragment should either be mview, value, source, or locality provider, found {:?}",
782 mask
783 );
784
785 if is_mview {
786 BackfillUpstreamType::MView
787 } else if is_values {
788 BackfillUpstreamType::Values
789 } else if is_source {
790 BackfillUpstreamType::Source
791 } else if is_locality_provider {
792 BackfillUpstreamType::LocalityProvider
793 } else {
794 unreachable!("invalid fragment type mask: {:?}", mask);
795 }
796 }
797}