1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19
20use anyhow::{Context, anyhow};
21use enum_as_inner::EnumAsInner;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{
25 CDC_SOURCE_COLUMN_NUM, TableId, generate_internal_table_name_with_type,
26};
27use risingwave_common::hash::VnodeCount;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::stream_graph_visitor::{
30 self, visit_stream_node_cont, visit_stream_node_cont_mut,
31};
32use risingwave_meta_model::WorkerId;
33use risingwave_pb::catalog::Table;
34use risingwave_pb::ddl_service::TableJobType;
35use risingwave_pb::stream_plan::stream_fragment_graph::{
36 Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
37};
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_plan::{
40 BackfillOrder, DispatchStrategy, DispatcherType, FragmentTypeFlag, PbStreamNode,
41 StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode, StreamScanType,
42};
43
44use crate::barrier::SnapshotBackfillInfo;
45use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
46use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
47use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
48use crate::stream::stream_graph::schedule::Distribution;
49use crate::{MetaError, MetaResult};
50
51#[derive(Debug, Clone)]
54pub(super) struct BuildingFragment {
55 inner: StreamFragment,
57
58 job_id: Option<u32>,
60
61 upstream_table_columns: HashMap<TableId, Vec<i32>>,
66}
67
68impl BuildingFragment {
69 fn new(
72 id: GlobalFragmentId,
73 fragment: StreamFragment,
74 job: &StreamingJob,
75 table_id_gen: GlobalTableIdGen,
76 ) -> Self {
77 let mut fragment = StreamFragment {
78 fragment_id: id.as_global_id(),
79 ..fragment
80 };
81
82 Self::fill_internal_tables(&mut fragment, job, table_id_gen);
84
85 let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
86 let upstream_table_columns =
87 Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
88
89 Self {
90 inner: fragment,
91 job_id,
92 upstream_table_columns,
93 }
94 }
95
96 fn extract_internal_tables(&self) -> Vec<Table> {
98 let mut fragment = self.inner.to_owned();
99 let mut tables = Vec::new();
100 stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
101 tables.push(table.clone());
102 });
103 tables
104 }
105
106 fn fill_internal_tables(
108 fragment: &mut StreamFragment,
109 job: &StreamingJob,
110 table_id_gen: GlobalTableIdGen,
111 ) {
112 let fragment_id = fragment.fragment_id;
113 stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
114 table.id = table_id_gen.to_global_id(table.id).as_global_id();
115 table.schema_id = job.schema_id();
116 table.database_id = job.database_id();
117 table.name = generate_internal_table_name_with_type(
118 &job.name(),
119 fragment_id,
120 table.id,
121 table_type_name,
122 );
123 table.fragment_id = fragment_id;
124 table.owner = job.owner();
125 });
126 }
127
128 fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
130 let job_id = job.id();
131 let fragment_id = fragment.fragment_id;
132 let mut has_job = false;
133
134 stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
135 NodeBody::Materialize(materialize_node) => {
136 materialize_node.table_id = job_id;
137
138 let table = materialize_node.table.as_mut().unwrap();
140 table.id = job_id;
141 table.database_id = job.database_id();
142 table.schema_id = job.schema_id();
143 table.fragment_id = fragment_id;
144 #[cfg(not(debug_assertions))]
145 {
146 table.definition = job.name();
147 }
148
149 has_job = true;
150 }
151 NodeBody::Sink(sink_node) => {
152 sink_node.sink_desc.as_mut().unwrap().id = job_id;
153
154 has_job = true;
155 }
156 NodeBody::Dml(dml_node) => {
157 dml_node.table_id = job_id;
158 dml_node.table_version_id = job.table_version_id().unwrap();
159 }
160 NodeBody::StreamFsFetch(fs_fetch_node) => {
161 if let StreamingJob::Table(table_source, _, _) = job {
162 if let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
163 && let Some(source) = table_source
164 {
165 node_inner.source_id = source.id;
166 }
167 }
168 }
169 NodeBody::Source(source_node) => {
170 match job {
171 StreamingJob::Table(source, _table, _table_job_type) => {
174 if let Some(source_inner) = source_node.source_inner.as_mut() {
175 if let Some(source) = source {
176 debug_assert_ne!(source.id, job_id);
177 source_inner.source_id = source.id;
178 }
179 }
180 }
181 StreamingJob::Source(source) => {
182 has_job = true;
183 if let Some(source_inner) = source_node.source_inner.as_mut() {
184 debug_assert_eq!(source.id, job_id);
185 source_inner.source_id = source.id;
186 }
187 }
188 _ => {}
190 }
191 }
192 NodeBody::StreamCdcScan(node) => {
193 if let Some(table_desc) = node.cdc_table_desc.as_mut() {
194 table_desc.table_id = job_id;
195 }
196 }
197 _ => {}
198 });
199
200 has_job
201 }
202
203 fn extract_upstream_table_columns_except_cross_db_backfill(
205 fragment: &StreamFragment,
206 ) -> HashMap<TableId, Vec<i32>> {
207 let mut table_columns = HashMap::new();
208
209 stream_graph_visitor::visit_fragment(fragment, |node_body| {
210 let (table_id, column_ids) = match node_body {
211 NodeBody::StreamScan(stream_scan) => {
212 if stream_scan.get_stream_scan_type().unwrap()
213 == StreamScanType::CrossDbSnapshotBackfill
214 {
215 return;
216 }
217 (
218 stream_scan.table_id.into(),
219 stream_scan.upstream_column_ids.clone(),
220 )
221 }
222 NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
223 NodeBody::SourceBackfill(backfill) => (
224 backfill.upstream_source_id.into(),
225 backfill.column_ids(),
227 ),
228 _ => return,
229 };
230 table_columns
231 .try_insert(table_id, column_ids)
232 .expect("currently there should be no two same upstream tables in a fragment");
233 });
234
235 table_columns
236 }
237
238 pub fn has_shuffled_backfill(&self) -> bool {
239 let stream_node = match self.inner.node.as_ref() {
240 Some(node) => node,
241 _ => return false,
242 };
243 let mut has_shuffled_backfill = false;
244 let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
245 visit_stream_node_cont(stream_node, |node| {
246 let is_shuffled_backfill = if let Some(node) = &node.node_body
247 && let Some(node) = node.as_stream_scan()
248 {
249 node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
250 || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
251 } else {
252 false
253 };
254 if is_shuffled_backfill {
255 *has_shuffled_backfill_mut_ref = true;
256 false
257 } else {
258 true
259 }
260 });
261 has_shuffled_backfill
262 }
263}
264
265impl Deref for BuildingFragment {
266 type Target = StreamFragment;
267
268 fn deref(&self) -> &Self::Target {
269 &self.inner
270 }
271}
272
273impl DerefMut for BuildingFragment {
274 fn deref_mut(&mut self) -> &mut Self::Target {
275 &mut self.inner
276 }
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
282pub(super) enum EdgeId {
283 Internal {
285 link_id: u64,
288 },
289
290 UpstreamExternal {
293 upstream_table_id: TableId,
295 downstream_fragment_id: GlobalFragmentId,
297 },
298
299 DownstreamExternal(DownstreamExternalEdgeId),
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
305pub(super) struct DownstreamExternalEdgeId {
306 pub(super) original_upstream_fragment_id: GlobalFragmentId,
308 pub(super) downstream_fragment_id: GlobalFragmentId,
310}
311
312#[derive(Debug, Clone)]
316pub(super) struct StreamFragmentEdge {
317 pub id: EdgeId,
319
320 pub dispatch_strategy: DispatchStrategy,
322}
323
324impl StreamFragmentEdge {
325 fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
326 Self {
327 id: EdgeId::Internal {
330 link_id: edge.link_id,
331 },
332 dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
333 }
334 }
335}
336
337pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
342
343#[derive(Default, Debug)]
350pub struct StreamFragmentGraph {
351 fragments: HashMap<GlobalFragmentId, BuildingFragment>,
353
354 downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
356
357 upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
359
360 dependent_table_ids: HashSet<TableId>,
362
363 specified_parallelism: Option<NonZeroUsize>,
366
367 max_parallelism: usize,
377
378 backfill_order: BackfillOrder,
380}
381
382impl StreamFragmentGraph {
383 pub fn new(
386 env: &MetaSrvEnv,
387 proto: StreamFragmentGraphProto,
388 job: &StreamingJob,
389 ) -> MetaResult<Self> {
390 let fragment_id_gen =
391 GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
392 let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
396
397 let fragments: HashMap<_, _> = proto
399 .fragments
400 .into_iter()
401 .map(|(id, fragment)| {
402 let id = fragment_id_gen.to_global_id(id);
403 let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
404 (id, fragment)
405 })
406 .collect();
407
408 assert_eq!(
409 fragments
410 .values()
411 .map(|f| f.extract_internal_tables().len() as u32)
412 .sum::<u32>(),
413 proto.table_ids_cnt
414 );
415
416 let mut downstreams = HashMap::new();
418 let mut upstreams = HashMap::new();
419
420 for edge in proto.edges {
421 let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
422 let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
423 let edge = StreamFragmentEdge::from_protobuf(&edge);
424
425 upstreams
426 .entry(downstream_id)
427 .or_insert_with(HashMap::new)
428 .try_insert(upstream_id, edge.clone())
429 .unwrap();
430 downstreams
431 .entry(upstream_id)
432 .or_insert_with(HashMap::new)
433 .try_insert(downstream_id, edge)
434 .unwrap();
435 }
436
437 let dependent_table_ids = proto
440 .dependent_table_ids
441 .iter()
442 .map(TableId::from)
443 .collect();
444
445 let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
446 Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
447 } else {
448 None
449 };
450
451 let max_parallelism = proto.max_parallelism as usize;
452 let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
453 order: Default::default(),
454 });
455
456 Ok(Self {
457 fragments,
458 downstreams,
459 upstreams,
460 dependent_table_ids,
461 specified_parallelism,
462 max_parallelism,
463 backfill_order,
464 })
465 }
466
467 pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
475 let mut tables = BTreeMap::new();
476 for fragment in self.fragments.values() {
477 for table in fragment.extract_internal_tables() {
478 let table_id = table.id;
479 tables
480 .try_insert(table_id, table)
481 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
482 }
483 }
484 tables
485 }
486
487 pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
490 for fragment in self.fragments.values_mut() {
491 stream_graph_visitor::visit_internal_tables(
492 &mut fragment.inner,
493 |table, _table_type_name| {
494 let target = table_id_map.get(&table.id).cloned().unwrap();
495 table.id = target;
496 },
497 );
498 }
499 }
500
501 pub fn fit_internal_table_ids(
503 &mut self,
504 mut old_internal_tables: Vec<Table>,
505 ) -> MetaResult<()> {
506 let mut new_internal_table_ids = Vec::new();
507 for fragment in self.fragments.values() {
508 for table in &fragment.extract_internal_tables() {
509 new_internal_table_ids.push(table.id);
510 }
511 }
512
513 if new_internal_table_ids.len() != old_internal_tables.len() {
514 bail!(
515 "Different number of internal tables. New: {}, Old: {}",
516 new_internal_table_ids.len(),
517 old_internal_tables.len()
518 );
519 }
520 old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
521 new_internal_table_ids.sort();
522
523 let internal_table_id_map = new_internal_table_ids
524 .into_iter()
525 .zip_eq_fast(old_internal_tables.into_iter())
526 .collect::<HashMap<_, _>>();
527
528 for fragment in self.fragments.values_mut() {
529 stream_graph_visitor::visit_internal_tables(
530 &mut fragment.inner,
531 |table, _table_type_name| {
532 let target = internal_table_id_map.get(&table.id).cloned().unwrap();
533 *table = target;
534 },
535 );
536 }
537
538 Ok(())
539 }
540
541 pub fn table_fragment_id(&self) -> FragmentId {
543 self.fragments
544 .values()
545 .filter(|b| b.job_id.is_some())
546 .map(|b| b.fragment_id)
547 .exactly_one()
548 .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
549 }
550
551 pub fn dml_fragment_id(&self) -> Option<FragmentId> {
553 self.fragments
554 .values()
555 .filter(|b| b.fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0)
556 .map(|b| b.fragment_id)
557 .at_most_one()
558 .expect("require at most 1 dml node when creating the streaming job")
559 }
560
561 pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
563 &self.dependent_table_ids
564 }
565
566 pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
568 self.specified_parallelism
569 }
570
571 pub fn max_parallelism(&self) -> usize {
573 self.max_parallelism
574 }
575
576 fn get_downstreams(
578 &self,
579 fragment_id: GlobalFragmentId,
580 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
581 self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
582 }
583
584 fn get_upstreams(
586 &self,
587 fragment_id: GlobalFragmentId,
588 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
589 self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
590 }
591
592 pub fn collect_snapshot_backfill_info(
593 &self,
594 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
595 Self::collect_snapshot_backfill_info_impl(
596 self.fragments
597 .values()
598 .map(|fragment| (fragment.node.as_ref().unwrap(), fragment.fragment_type_mask)),
599 )
600 }
601
602 pub fn collect_snapshot_backfill_info_impl(
604 fragments: impl IntoIterator<Item = (&PbStreamNode, u32)>,
605 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
606 let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
607 let mut cross_db_info = SnapshotBackfillInfo {
608 upstream_mv_table_id_to_backfill_epoch: Default::default(),
609 };
610 let mut result = Ok(());
611 for (node, fragment_type_mask) in fragments {
612 visit_stream_node_cont(node, |node| {
613 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
614 let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
615 .expect("invalid stream_scan_type");
616 let is_snapshot_backfill = match stream_scan_type {
617 StreamScanType::SnapshotBackfill => {
618 assert!(
619 (fragment_type_mask
620 & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32))
621 > 0
622 );
623 true
624 }
625 StreamScanType::CrossDbSnapshotBackfill => {
626 assert!(
627 (fragment_type_mask
628 & (FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
629 > 0
630 );
631 cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
632 TableId::new(stream_scan.table_id),
633 stream_scan.snapshot_backfill_epoch,
634 );
635
636 return true;
637 }
638 _ => false,
639 };
640
641 match &mut prev_stream_scan {
642 Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
643 match (prev_snapshot_backfill_info, is_snapshot_backfill) {
644 (Some(prev_snapshot_backfill_info), true) => {
645 prev_snapshot_backfill_info
646 .upstream_mv_table_id_to_backfill_epoch
647 .insert(
648 TableId::new(stream_scan.table_id),
649 stream_scan.snapshot_backfill_epoch,
650 );
651 true
652 }
653 (None, false) => true,
654 (_, _) => {
655 result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
656 false
657 }
658 }
659 }
660 None => {
661 prev_stream_scan = Some((
662 if is_snapshot_backfill {
663 Some(SnapshotBackfillInfo {
664 upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
665 [(
666 TableId::new(stream_scan.table_id),
667 stream_scan.snapshot_backfill_epoch,
668 )],
669 ),
670 })
671 } else {
672 None
673 },
674 *stream_scan.clone(),
675 ));
676 true
677 }
678 }
679 } else {
680 true
681 }
682 })
683 }
684 result.map(|_| {
685 (
686 prev_stream_scan
687 .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
688 .unwrap_or(None),
689 cross_db_info,
690 )
691 })
692 }
693
694 pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
696 let mut mapping = HashMap::new();
697 for (fragment_id, fragment) in &self.fragments {
698 let fragment_id = fragment_id.as_global_id();
699 let fragment_mask = fragment.fragment_type_mask;
700 let candidates = [FragmentTypeFlag::StreamScan];
702 let has_some_scan = candidates
703 .into_iter()
704 .any(|flag| (fragment_mask & flag as u32) > 0);
705 if has_some_scan {
706 visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
707 match node.node_body.as_ref() {
708 Some(NodeBody::StreamScan(stream_scan)) => {
709 let table_id = stream_scan.table_id;
710 let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
711 fragments.push(fragment_id);
712 false
714 }
715 _ => true,
717 }
718 })
719 }
720 }
721 mapping
722 }
723
724 pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
728 let mapping = self.collect_backfill_mapping();
729 let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
730 for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
731 let fragment_ids = mapping.get(rel_id).unwrap();
732 for fragment_id in fragment_ids {
733 let downstream_fragment_ids = downstream_rel_ids
734 .data
735 .iter()
736 .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
737 .copied()
738 .collect();
739 fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
740 }
741 }
742 fragment_ordering
743 }
744}
745
746pub fn fill_snapshot_backfill_epoch(
749 node: &mut StreamNode,
750 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
751 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
752) -> MetaResult<bool> {
753 let mut result = Ok(());
754 let mut applied = false;
755 visit_stream_node_cont_mut(node, |node| {
756 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
757 && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
758 || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
759 {
760 result = try {
761 let table_id = TableId::new(stream_scan.table_id);
762 let snapshot_epoch = cross_db_snapshot_backfill_info
763 .upstream_mv_table_id_to_backfill_epoch
764 .get(&table_id)
765 .or_else(|| {
766 snapshot_backfill_info.and_then(|snapshot_backfill_info| {
767 snapshot_backfill_info
768 .upstream_mv_table_id_to_backfill_epoch
769 .get(&table_id)
770 })
771 })
772 .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
773 .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
774 if let Some(prev_snapshot_epoch) =
775 stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
776 {
777 Err(anyhow!(
778 "snapshot backfill epoch set again: {} {} {}",
779 table_id,
780 prev_snapshot_epoch,
781 snapshot_epoch
782 ))?;
783 }
784 applied = true;
785 };
786 result.is_ok()
787 } else {
788 true
789 }
790 });
791 result.map(|_| applied)
792}
793
794static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
795 LazyLock::new(HashMap::new);
796
797#[derive(Debug, Clone, EnumAsInner)]
800pub(super) enum EitherFragment {
801 Building(BuildingFragment),
803
804 Existing(Fragment),
806}
807
808#[derive(Debug)]
817pub struct CompleteStreamFragmentGraph {
818 building_graph: StreamFragmentGraph,
820
821 existing_fragments: HashMap<GlobalFragmentId, Fragment>,
823
824 existing_actor_location: HashMap<ActorId, WorkerId>,
826
827 extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
829
830 extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
832}
833
834pub struct FragmentGraphUpstreamContext {
835 upstream_root_fragments: HashMap<TableId, Fragment>,
838 upstream_actor_location: HashMap<ActorId, WorkerId>,
839}
840
841pub struct FragmentGraphDownstreamContext {
842 original_root_fragment_id: FragmentId,
843 downstream_fragments: Vec<(DispatcherType, Fragment)>,
844 downstream_actor_location: HashMap<ActorId, WorkerId>,
845}
846
847impl CompleteStreamFragmentGraph {
848 #[cfg(test)]
851 pub fn for_test(graph: StreamFragmentGraph) -> Self {
852 Self {
853 building_graph: graph,
854 existing_fragments: Default::default(),
855 existing_actor_location: Default::default(),
856 extra_downstreams: Default::default(),
857 extra_upstreams: Default::default(),
858 }
859 }
860
861 pub fn with_upstreams(
865 graph: StreamFragmentGraph,
866 upstream_root_fragments: HashMap<TableId, Fragment>,
867 existing_actor_location: HashMap<ActorId, WorkerId>,
868 job_type: StreamingJobType,
869 ) -> MetaResult<Self> {
870 Self::build_helper(
871 graph,
872 Some(FragmentGraphUpstreamContext {
873 upstream_root_fragments,
874 upstream_actor_location: existing_actor_location,
875 }),
876 None,
877 job_type,
878 )
879 }
880
881 pub fn with_downstreams(
884 graph: StreamFragmentGraph,
885 original_root_fragment_id: FragmentId,
886 downstream_fragments: Vec<(DispatcherType, Fragment)>,
887 existing_actor_location: HashMap<ActorId, WorkerId>,
888 job_type: StreamingJobType,
889 ) -> MetaResult<Self> {
890 Self::build_helper(
891 graph,
892 None,
893 Some(FragmentGraphDownstreamContext {
894 original_root_fragment_id,
895 downstream_fragments,
896 downstream_actor_location: existing_actor_location,
897 }),
898 job_type,
899 )
900 }
901
902 pub fn with_upstreams_and_downstreams(
904 graph: StreamFragmentGraph,
905 upstream_root_fragments: HashMap<TableId, Fragment>,
906 upstream_actor_location: HashMap<ActorId, WorkerId>,
907 original_root_fragment_id: FragmentId,
908 downstream_fragments: Vec<(DispatcherType, Fragment)>,
909 downstream_actor_location: HashMap<ActorId, WorkerId>,
910 job_type: StreamingJobType,
911 ) -> MetaResult<Self> {
912 Self::build_helper(
913 graph,
914 Some(FragmentGraphUpstreamContext {
915 upstream_root_fragments,
916 upstream_actor_location,
917 }),
918 Some(FragmentGraphDownstreamContext {
919 original_root_fragment_id,
920 downstream_fragments,
921 downstream_actor_location,
922 }),
923 job_type,
924 )
925 }
926
927 fn build_helper(
929 mut graph: StreamFragmentGraph,
930 upstream_ctx: Option<FragmentGraphUpstreamContext>,
931 downstream_ctx: Option<FragmentGraphDownstreamContext>,
932 job_type: StreamingJobType,
933 ) -> MetaResult<Self> {
934 let mut extra_downstreams = HashMap::new();
935 let mut extra_upstreams = HashMap::new();
936 let mut existing_fragments = HashMap::new();
937
938 let mut existing_actor_location = HashMap::new();
939
940 if let Some(FragmentGraphUpstreamContext {
941 upstream_root_fragments,
942 upstream_actor_location,
943 }) = upstream_ctx
944 {
945 for (&id, fragment) in &mut graph.fragments {
946 let uses_shuffled_backfill = fragment.has_shuffled_backfill();
947
948 for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
949 let upstream_fragment = upstream_root_fragments
950 .get(&upstream_table_id)
951 .context("upstream fragment not found")?;
952 let upstream_root_fragment_id =
953 GlobalFragmentId::new(upstream_fragment.fragment_id);
954
955 let edge = match job_type {
956 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
957 assert_ne!(
960 (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
961 0
962 );
963
964 tracing::debug!(
965 ?upstream_root_fragment_id,
966 ?required_columns,
967 identity = ?fragment.inner.get_node().unwrap().get_identity(),
968 current_frag_id=?id,
969 "CdcFilter with upstream source fragment"
970 );
971
972 StreamFragmentEdge {
973 id: EdgeId::UpstreamExternal {
974 upstream_table_id,
975 downstream_fragment_id: id,
976 },
977 dispatch_strategy: DispatchStrategy {
980 r#type: DispatcherType::NoShuffle as _,
981 dist_key_indices: vec![], output_indices: (0..CDC_SOURCE_COLUMN_NUM as _).collect(),
983 },
984 }
985 }
986
987 StreamingJobType::MaterializedView
989 | StreamingJobType::Sink
990 | StreamingJobType::Index => {
991 if upstream_fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32
994 != 0
995 {
996 let (dist_key_indices, output_indices) = {
998 let nodes = &upstream_fragment.nodes;
999 let mview_node =
1000 nodes.get_node_body().unwrap().as_materialize().unwrap();
1001 let all_column_ids = mview_node.column_ids();
1002 let dist_key_indices = mview_node.dist_key_indices();
1003 let output_indices = gen_output_indices(
1004 required_columns,
1005 all_column_ids,
1006 )
1007 .context(
1008 "BUG: column not found in the upstream materialized view",
1009 )?;
1010 (dist_key_indices, output_indices)
1011 };
1012 let dispatch_strategy = mv_on_mv_dispatch_strategy(
1013 uses_shuffled_backfill,
1014 dist_key_indices,
1015 output_indices,
1016 );
1017
1018 StreamFragmentEdge {
1019 id: EdgeId::UpstreamExternal {
1020 upstream_table_id,
1021 downstream_fragment_id: id,
1022 },
1023 dispatch_strategy,
1024 }
1025 }
1026 else if upstream_fragment.fragment_type_mask
1029 & FragmentTypeFlag::Source as u32
1030 != 0
1031 {
1032 let output_indices = {
1033 let nodes = &upstream_fragment.nodes;
1034 let source_node =
1035 nodes.get_node_body().unwrap().as_source().unwrap();
1036
1037 let all_column_ids = source_node.column_ids().unwrap();
1038 gen_output_indices(required_columns, all_column_ids).context(
1039 "BUG: column not found in the upstream source node",
1040 )?
1041 };
1042
1043 StreamFragmentEdge {
1044 id: EdgeId::UpstreamExternal {
1045 upstream_table_id,
1046 downstream_fragment_id: id,
1047 },
1048 dispatch_strategy: DispatchStrategy {
1051 r#type: DispatcherType::NoShuffle as _,
1052 dist_key_indices: vec![], output_indices,
1054 },
1055 }
1056 } else {
1057 bail!(
1058 "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1059 upstream_fragment.fragment_type_mask
1060 )
1061 }
1062 }
1063 StreamingJobType::Source | StreamingJobType::Table(_) => {
1064 bail!(
1065 "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1066 job_type
1067 )
1068 }
1069 };
1070
1071 extra_downstreams
1073 .entry(upstream_root_fragment_id)
1074 .or_insert_with(HashMap::new)
1075 .try_insert(id, edge.clone())
1076 .unwrap();
1077 extra_upstreams
1078 .entry(id)
1079 .or_insert_with(HashMap::new)
1080 .try_insert(upstream_root_fragment_id, edge)
1081 .unwrap();
1082 }
1083 }
1084
1085 existing_fragments.extend(
1086 upstream_root_fragments
1087 .into_values()
1088 .map(|f| (GlobalFragmentId::new(f.fragment_id), f)),
1089 );
1090
1091 existing_actor_location.extend(upstream_actor_location);
1092 }
1093
1094 if let Some(FragmentGraphDownstreamContext {
1095 original_root_fragment_id,
1096 downstream_fragments,
1097 downstream_actor_location,
1098 }) = downstream_ctx
1099 {
1100 let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1101 let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1102
1103 for (dispatcher_type, fragment) in &downstream_fragments {
1106 let id = GlobalFragmentId::new(fragment.fragment_id);
1107
1108 let output_columns = {
1110 let mut res = None;
1111
1112 stream_graph_visitor::visit_stream_node(&fragment.nodes, |node_body| {
1113 let columns = match node_body {
1114 NodeBody::StreamScan(stream_scan) => {
1115 stream_scan.upstream_column_ids.clone()
1116 }
1117 NodeBody::SourceBackfill(source_backfill) => {
1118 source_backfill.column_ids()
1120 }
1121 _ => return,
1122 };
1123 res = Some(columns);
1124 });
1125
1126 res.context("failed to locate downstream scan")?
1127 };
1128
1129 let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1130 let nodes = table_fragment.node.as_ref().unwrap();
1131
1132 let (dist_key_indices, output_indices) = match job_type {
1133 StreamingJobType::Table(_) => {
1134 let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1135 let all_column_ids = mview_node.column_ids();
1136 let dist_key_indices = mview_node.dist_key_indices();
1137 let output_indices = gen_output_indices(&output_columns, all_column_ids)
1138 .ok_or_else(|| {
1139 MetaError::invalid_parameter(
1140 "unable to drop the column due to \
1141 being referenced by downstream materialized views or sinks",
1142 )
1143 })?;
1144 (dist_key_indices, output_indices)
1145 }
1146
1147 StreamingJobType::Source => {
1148 let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1149 let all_column_ids = source_node.column_ids().unwrap();
1150 let output_indices = gen_output_indices(&output_columns, all_column_ids)
1151 .ok_or_else(|| {
1152 MetaError::invalid_parameter(
1153 "unable to drop the column due to \
1154 being referenced by downstream materialized views or sinks",
1155 )
1156 })?;
1157 assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1158 (
1159 vec![], output_indices,
1161 )
1162 }
1163
1164 _ => bail!("unsupported job type for replacement: {job_type:?}"),
1165 };
1166
1167 let edge = StreamFragmentEdge {
1168 id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1169 original_upstream_fragment_id: original_table_fragment_id,
1170 downstream_fragment_id: id,
1171 }),
1172 dispatch_strategy: DispatchStrategy {
1173 r#type: *dispatcher_type as i32,
1174 output_indices,
1175 dist_key_indices,
1176 },
1177 };
1178
1179 extra_downstreams
1180 .entry(table_fragment_id)
1181 .or_insert_with(HashMap::new)
1182 .try_insert(id, edge.clone())
1183 .unwrap();
1184 extra_upstreams
1185 .entry(id)
1186 .or_insert_with(HashMap::new)
1187 .try_insert(table_fragment_id, edge)
1188 .unwrap();
1189 }
1190
1191 existing_fragments.extend(
1192 downstream_fragments
1193 .into_iter()
1194 .map(|(_, f)| (GlobalFragmentId::new(f.fragment_id), f)),
1195 );
1196
1197 existing_actor_location.extend(downstream_actor_location);
1198 }
1199
1200 Ok(Self {
1201 building_graph: graph,
1202 existing_fragments,
1203 existing_actor_location,
1204 extra_downstreams,
1205 extra_upstreams,
1206 })
1207 }
1208}
1209
1210fn gen_output_indices(required_columns: &Vec<i32>, upstream_columns: Vec<i32>) -> Option<Vec<u32>> {
1212 required_columns
1213 .iter()
1214 .map(|c| {
1215 upstream_columns
1216 .iter()
1217 .position(|&id| id == *c)
1218 .map(|i| i as u32)
1219 })
1220 .collect()
1221}
1222
1223fn mv_on_mv_dispatch_strategy(
1224 uses_shuffled_backfill: bool,
1225 dist_key_indices: Vec<u32>,
1226 output_indices: Vec<u32>,
1227) -> DispatchStrategy {
1228 if uses_shuffled_backfill {
1229 if !dist_key_indices.is_empty() {
1230 DispatchStrategy {
1231 r#type: DispatcherType::Hash as _,
1232 dist_key_indices,
1233 output_indices,
1234 }
1235 } else {
1236 DispatchStrategy {
1237 r#type: DispatcherType::Simple as _,
1238 dist_key_indices: vec![], output_indices,
1240 }
1241 }
1242 } else {
1243 DispatchStrategy {
1244 r#type: DispatcherType::NoShuffle as _,
1245 dist_key_indices: vec![], output_indices,
1247 }
1248 }
1249}
1250
1251impl CompleteStreamFragmentGraph {
1252 pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1255 self.building_graph
1256 .fragments
1257 .keys()
1258 .chain(self.existing_fragments.keys())
1259 .copied()
1260 }
1261
1262 pub(super) fn all_edges(
1264 &self,
1265 ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1266 self.building_graph
1267 .downstreams
1268 .iter()
1269 .chain(self.extra_downstreams.iter())
1270 .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1271 }
1272
1273 pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1275 self.existing_fragments
1276 .iter()
1277 .map(|(&id, f)| {
1278 (
1279 id,
1280 Distribution::from_fragment(f, &self.existing_actor_location),
1281 )
1282 })
1283 .collect()
1284 }
1285
1286 pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1293 let mut topo = Vec::new();
1294 let mut downstream_cnts = HashMap::new();
1295
1296 for fragment_id in self.all_fragment_ids() {
1298 let downstream_cnt = self.get_downstreams(fragment_id).count();
1300 if downstream_cnt == 0 {
1301 topo.push(fragment_id);
1302 } else {
1303 downstream_cnts.insert(fragment_id, downstream_cnt);
1304 }
1305 }
1306
1307 let mut i = 0;
1308 while let Some(&fragment_id) = topo.get(i) {
1309 i += 1;
1310 for (upstream_id, _) in self.get_upstreams(fragment_id) {
1312 let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1313 *downstream_cnt -= 1;
1314 if *downstream_cnt == 0 {
1315 downstream_cnts.remove(&upstream_id);
1316 topo.push(upstream_id);
1317 }
1318 }
1319 }
1320
1321 if !downstream_cnts.is_empty() {
1322 bail!("graph is not a DAG");
1324 }
1325
1326 Ok(topo)
1327 }
1328
1329 pub(super) fn seal_fragment(
1332 &self,
1333 id: GlobalFragmentId,
1334 actors: Vec<StreamActor>,
1335 distribution: Distribution,
1336 stream_node: StreamNode,
1337 ) -> Fragment {
1338 let building_fragment = self.get_fragment(id).into_building().unwrap();
1339 let internal_tables = building_fragment.extract_internal_tables();
1340 let BuildingFragment {
1341 inner,
1342 job_id,
1343 upstream_table_columns: _,
1344 } = building_fragment;
1345
1346 let distribution_type = distribution.to_distribution_type();
1347 let vnode_count = distribution.vnode_count();
1348
1349 let materialized_fragment_id =
1350 if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 {
1351 job_id
1352 } else {
1353 None
1354 };
1355
1356 let state_table_ids = internal_tables
1357 .iter()
1358 .map(|t| t.id)
1359 .chain(materialized_fragment_id)
1360 .collect();
1361
1362 Fragment {
1363 fragment_id: inner.fragment_id,
1364 fragment_type_mask: inner.fragment_type_mask,
1365 distribution_type,
1366 actors,
1367 state_table_ids,
1368 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1369 nodes: stream_node,
1370 }
1371 }
1372
1373 pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1376 if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1377 EitherFragment::Existing(fragment.clone())
1378 } else {
1379 EitherFragment::Building(
1380 self.building_graph
1381 .fragments
1382 .get(&fragment_id)
1383 .unwrap()
1384 .clone(),
1385 )
1386 }
1387 }
1388
1389 pub(super) fn get_downstreams(
1392 &self,
1393 fragment_id: GlobalFragmentId,
1394 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1395 self.building_graph
1396 .get_downstreams(fragment_id)
1397 .iter()
1398 .chain(
1399 self.extra_downstreams
1400 .get(&fragment_id)
1401 .into_iter()
1402 .flatten(),
1403 )
1404 .map(|(&id, edge)| (id, edge))
1405 }
1406
1407 pub(super) fn get_upstreams(
1410 &self,
1411 fragment_id: GlobalFragmentId,
1412 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1413 self.building_graph
1414 .get_upstreams(fragment_id)
1415 .iter()
1416 .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1417 .map(|(&id, edge)| (id, edge))
1418 }
1419
1420 pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1422 &self.building_graph.fragments
1423 }
1424
1425 pub(super) fn building_fragments_mut(
1427 &mut self,
1428 ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1429 &mut self.building_graph.fragments
1430 }
1431
1432 pub(super) fn max_parallelism(&self) -> usize {
1434 self.building_graph.max_parallelism()
1435 }
1436}