1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19
20use anyhow::{Context, anyhow};
21use enum_as_inner::EnumAsInner;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{
25 CDC_SOURCE_COLUMN_NUM, FragmentTypeFlag, FragmentTypeMask, TableId,
26 generate_internal_table_name_with_type,
27};
28use risingwave_common::hash::VnodeCount;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::util::stream_graph_visitor::{
31 self, visit_stream_node_cont, visit_stream_node_cont_mut,
32};
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::ddl_service::TableJobType;
36use risingwave_pb::plan_common::PbColumnDesc;
37use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
38use risingwave_pb::stream_plan::stream_fragment_graph::{
39 Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
40};
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{
43 BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
44 StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode, StreamScanType,
45};
46
47use crate::barrier::SnapshotBackfillInfo;
48use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
49use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
50use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
51use crate::stream::stream_graph::schedule::Distribution;
52use crate::{MetaError, MetaResult};
53
54#[derive(Debug, Clone)]
57pub(super) struct BuildingFragment {
58 inner: StreamFragment,
60
61 job_id: Option<u32>,
63
64 upstream_table_columns: HashMap<TableId, Vec<PbColumnDesc>>,
69}
70
71impl BuildingFragment {
72 fn new(
75 id: GlobalFragmentId,
76 fragment: StreamFragment,
77 job: &StreamingJob,
78 table_id_gen: GlobalTableIdGen,
79 ) -> Self {
80 let mut fragment = StreamFragment {
81 fragment_id: id.as_global_id(),
82 ..fragment
83 };
84
85 Self::fill_internal_tables(&mut fragment, job, table_id_gen);
87
88 let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
89 let upstream_table_columns =
90 Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
91
92 Self {
93 inner: fragment,
94 job_id,
95 upstream_table_columns,
96 }
97 }
98
99 fn extract_internal_tables(&self) -> Vec<Table> {
101 let mut fragment = self.inner.to_owned();
102 let mut tables = Vec::new();
103 stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
104 tables.push(table.clone());
105 });
106 tables
107 }
108
109 fn fill_internal_tables(
111 fragment: &mut StreamFragment,
112 job: &StreamingJob,
113 table_id_gen: GlobalTableIdGen,
114 ) {
115 let fragment_id = fragment.fragment_id;
116 stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
117 table.id = table_id_gen.to_global_id(table.id).as_global_id();
118 table.schema_id = job.schema_id();
119 table.database_id = job.database_id();
120 table.name = generate_internal_table_name_with_type(
121 &job.name(),
122 fragment_id,
123 table.id,
124 table_type_name,
125 );
126 table.fragment_id = fragment_id;
127 table.owner = job.owner();
128 });
129 }
130
131 fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
133 let job_id = job.id();
134 let fragment_id = fragment.fragment_id;
135 let mut has_job = false;
136
137 stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
138 NodeBody::Materialize(materialize_node) => {
139 materialize_node.table_id = job_id;
140
141 let table = materialize_node.table.insert(job.table().unwrap().clone());
143 table.fragment_id = fragment_id; if cfg!(not(debug_assertions)) {
146 table.definition = job.name();
147 }
148
149 has_job = true;
150 }
151 NodeBody::Sink(sink_node) => {
152 sink_node.sink_desc.as_mut().unwrap().id = job_id;
153
154 has_job = true;
155 }
156 NodeBody::Dml(dml_node) => {
157 dml_node.table_id = job_id;
158 dml_node.table_version_id = job.table_version_id().unwrap();
159 }
160 NodeBody::StreamFsFetch(fs_fetch_node) => {
161 if let StreamingJob::Table(table_source, _, _) = job {
162 if let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
163 && let Some(source) = table_source
164 {
165 node_inner.source_id = source.id;
166 }
167 }
168 }
169 NodeBody::Source(source_node) => {
170 match job {
171 StreamingJob::Table(source, _table, _table_job_type) => {
174 if let Some(source_inner) = source_node.source_inner.as_mut() {
175 if let Some(source) = source {
176 debug_assert_ne!(source.id, job_id);
177 source_inner.source_id = source.id;
178 }
179 }
180 }
181 StreamingJob::Source(source) => {
182 has_job = true;
183 if let Some(source_inner) = source_node.source_inner.as_mut() {
184 debug_assert_eq!(source.id, job_id);
185 source_inner.source_id = source.id;
186 }
187 }
188 _ => {}
190 }
191 }
192 NodeBody::StreamCdcScan(node) => {
193 if let Some(table_desc) = node.cdc_table_desc.as_mut() {
194 table_desc.table_id = job_id;
195 }
196 }
197 _ => {}
198 });
199
200 has_job
201 }
202
203 fn extract_upstream_table_columns_except_cross_db_backfill(
205 fragment: &StreamFragment,
206 ) -> HashMap<TableId, Vec<PbColumnDesc>> {
207 let mut table_columns = HashMap::new();
208
209 stream_graph_visitor::visit_fragment(fragment, |node_body| {
210 let (table_id, column_ids) = match node_body {
211 NodeBody::StreamScan(stream_scan) => {
212 if stream_scan.get_stream_scan_type().unwrap()
213 == StreamScanType::CrossDbSnapshotBackfill
214 {
215 return;
216 }
217 (stream_scan.table_id.into(), stream_scan.upstream_columns())
218 }
219 NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
220 NodeBody::SourceBackfill(backfill) => (
221 backfill.upstream_source_id.into(),
222 backfill.column_descs(),
224 ),
225 _ => return,
226 };
227 table_columns
228 .try_insert(table_id, column_ids)
229 .expect("currently there should be no two same upstream tables in a fragment");
230 });
231
232 table_columns
233 }
234
235 pub fn has_shuffled_backfill(&self) -> bool {
236 let stream_node = match self.inner.node.as_ref() {
237 Some(node) => node,
238 _ => return false,
239 };
240 let mut has_shuffled_backfill = false;
241 let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
242 visit_stream_node_cont(stream_node, |node| {
243 let is_shuffled_backfill = if let Some(node) = &node.node_body
244 && let Some(node) = node.as_stream_scan()
245 {
246 node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
247 || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
248 } else {
249 false
250 };
251 if is_shuffled_backfill {
252 *has_shuffled_backfill_mut_ref = true;
253 false
254 } else {
255 true
256 }
257 });
258 has_shuffled_backfill
259 }
260}
261
262impl Deref for BuildingFragment {
263 type Target = StreamFragment;
264
265 fn deref(&self) -> &Self::Target {
266 &self.inner
267 }
268}
269
270impl DerefMut for BuildingFragment {
271 fn deref_mut(&mut self) -> &mut Self::Target {
272 &mut self.inner
273 }
274}
275
276#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
279pub(super) enum EdgeId {
280 Internal {
282 link_id: u64,
285 },
286
287 UpstreamExternal {
290 upstream_table_id: TableId,
292 downstream_fragment_id: GlobalFragmentId,
294 },
295
296 DownstreamExternal(DownstreamExternalEdgeId),
299}
300
301#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
302pub(super) struct DownstreamExternalEdgeId {
303 pub(super) original_upstream_fragment_id: GlobalFragmentId,
305 pub(super) downstream_fragment_id: GlobalFragmentId,
307}
308
309#[derive(Debug, Clone)]
313pub(super) struct StreamFragmentEdge {
314 pub id: EdgeId,
316
317 pub dispatch_strategy: DispatchStrategy,
319}
320
321impl StreamFragmentEdge {
322 fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
323 Self {
324 id: EdgeId::Internal {
327 link_id: edge.link_id,
328 },
329 dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
330 }
331 }
332}
333
334pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
339
340#[derive(Default, Debug)]
347pub struct StreamFragmentGraph {
348 fragments: HashMap<GlobalFragmentId, BuildingFragment>,
350
351 downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
353
354 upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
356
357 dependent_table_ids: HashSet<TableId>,
359
360 specified_parallelism: Option<NonZeroUsize>,
363
364 max_parallelism: usize,
374
375 backfill_order: BackfillOrder,
377}
378
379impl StreamFragmentGraph {
380 pub fn new(
383 env: &MetaSrvEnv,
384 proto: StreamFragmentGraphProto,
385 job: &StreamingJob,
386 ) -> MetaResult<Self> {
387 let fragment_id_gen =
388 GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
389 let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
393
394 let fragments: HashMap<_, _> = proto
396 .fragments
397 .into_iter()
398 .map(|(id, fragment)| {
399 let id = fragment_id_gen.to_global_id(id);
400 let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
401 (id, fragment)
402 })
403 .collect();
404
405 assert_eq!(
406 fragments
407 .values()
408 .map(|f| f.extract_internal_tables().len() as u32)
409 .sum::<u32>(),
410 proto.table_ids_cnt
411 );
412
413 let mut downstreams = HashMap::new();
415 let mut upstreams = HashMap::new();
416
417 for edge in proto.edges {
418 let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
419 let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
420 let edge = StreamFragmentEdge::from_protobuf(&edge);
421
422 upstreams
423 .entry(downstream_id)
424 .or_insert_with(HashMap::new)
425 .try_insert(upstream_id, edge.clone())
426 .unwrap();
427 downstreams
428 .entry(upstream_id)
429 .or_insert_with(HashMap::new)
430 .try_insert(downstream_id, edge)
431 .unwrap();
432 }
433
434 let dependent_table_ids = proto
437 .dependent_table_ids
438 .iter()
439 .map(TableId::from)
440 .collect();
441
442 let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
443 Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
444 } else {
445 None
446 };
447
448 let max_parallelism = proto.max_parallelism as usize;
449 let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
450 order: Default::default(),
451 });
452
453 Ok(Self {
454 fragments,
455 downstreams,
456 upstreams,
457 dependent_table_ids,
458 specified_parallelism,
459 max_parallelism,
460 backfill_order,
461 })
462 }
463
464 pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
472 let mut tables = BTreeMap::new();
473 for fragment in self.fragments.values() {
474 for table in fragment.extract_internal_tables() {
475 let table_id = table.id;
476 tables
477 .try_insert(table_id, table)
478 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
479 }
480 }
481 tables
482 }
483
484 pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
487 for fragment in self.fragments.values_mut() {
488 stream_graph_visitor::visit_internal_tables(
489 &mut fragment.inner,
490 |table, _table_type_name| {
491 let target = table_id_map.get(&table.id).cloned().unwrap();
492 table.id = target;
493 },
494 );
495 }
496 }
497
498 pub fn fit_internal_table_ids(
500 &mut self,
501 mut old_internal_tables: Vec<Table>,
502 ) -> MetaResult<()> {
503 let mut new_internal_table_ids = Vec::new();
504 for fragment in self.fragments.values() {
505 for table in &fragment.extract_internal_tables() {
506 new_internal_table_ids.push(table.id);
507 }
508 }
509
510 if new_internal_table_ids.len() != old_internal_tables.len() {
511 bail!(
512 "Different number of internal tables. New: {}, Old: {}",
513 new_internal_table_ids.len(),
514 old_internal_tables.len()
515 );
516 }
517 old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
518 new_internal_table_ids.sort();
519
520 let internal_table_id_map = new_internal_table_ids
521 .into_iter()
522 .zip_eq_fast(old_internal_tables.into_iter())
523 .collect::<HashMap<_, _>>();
524
525 for fragment in self.fragments.values_mut() {
526 stream_graph_visitor::visit_internal_tables(
527 &mut fragment.inner,
528 |table, _table_type_name| {
529 let target = internal_table_id_map.get(&table.id).cloned().unwrap();
530 *table = target;
531 },
532 );
533 }
534
535 Ok(())
536 }
537
538 pub fn table_fragment_id(&self) -> FragmentId {
540 self.fragments
541 .values()
542 .filter(|b| b.job_id.is_some())
543 .map(|b| b.fragment_id)
544 .exactly_one()
545 .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
546 }
547
548 pub fn dml_fragment_id(&self) -> Option<FragmentId> {
550 self.fragments
551 .values()
552 .filter(|b| {
553 FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
554 })
555 .map(|b| b.fragment_id)
556 .at_most_one()
557 .expect("require at most 1 dml node when creating the streaming job")
558 }
559
560 pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
562 &self.dependent_table_ids
563 }
564
565 pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
567 self.specified_parallelism
568 }
569
570 pub fn max_parallelism(&self) -> usize {
572 self.max_parallelism
573 }
574
575 fn get_downstreams(
577 &self,
578 fragment_id: GlobalFragmentId,
579 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
580 self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
581 }
582
583 fn get_upstreams(
585 &self,
586 fragment_id: GlobalFragmentId,
587 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
588 self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
589 }
590
591 pub fn collect_snapshot_backfill_info(
592 &self,
593 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
594 Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
595 (
596 fragment.node.as_ref().unwrap(),
597 fragment.fragment_type_mask.into(),
598 )
599 }))
600 }
601
602 pub fn collect_snapshot_backfill_info_impl(
604 fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
605 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
606 let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
607 let mut cross_db_info = SnapshotBackfillInfo {
608 upstream_mv_table_id_to_backfill_epoch: Default::default(),
609 };
610 let mut result = Ok(());
611 for (node, fragment_type_mask) in fragments {
612 visit_stream_node_cont(node, |node| {
613 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
614 let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
615 .expect("invalid stream_scan_type");
616 let is_snapshot_backfill = match stream_scan_type {
617 StreamScanType::SnapshotBackfill => {
618 assert!(
619 fragment_type_mask
620 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
621 );
622 true
623 }
624 StreamScanType::CrossDbSnapshotBackfill => {
625 assert!(
626 fragment_type_mask
627 .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
628 );
629 cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
630 TableId::new(stream_scan.table_id),
631 stream_scan.snapshot_backfill_epoch,
632 );
633
634 return true;
635 }
636 _ => false,
637 };
638
639 match &mut prev_stream_scan {
640 Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
641 match (prev_snapshot_backfill_info, is_snapshot_backfill) {
642 (Some(prev_snapshot_backfill_info), true) => {
643 prev_snapshot_backfill_info
644 .upstream_mv_table_id_to_backfill_epoch
645 .insert(
646 TableId::new(stream_scan.table_id),
647 stream_scan.snapshot_backfill_epoch,
648 );
649 true
650 }
651 (None, false) => true,
652 (_, _) => {
653 result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
654 false
655 }
656 }
657 }
658 None => {
659 prev_stream_scan = Some((
660 if is_snapshot_backfill {
661 Some(SnapshotBackfillInfo {
662 upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
663 [(
664 TableId::new(stream_scan.table_id),
665 stream_scan.snapshot_backfill_epoch,
666 )],
667 ),
668 })
669 } else {
670 None
671 },
672 *stream_scan.clone(),
673 ));
674 true
675 }
676 }
677 } else {
678 true
679 }
680 })
681 }
682 result.map(|_| {
683 (
684 prev_stream_scan
685 .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
686 .unwrap_or(None),
687 cross_db_info,
688 )
689 })
690 }
691
692 pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
694 let mut mapping = HashMap::new();
695 for (fragment_id, fragment) in &self.fragments {
696 let fragment_id = fragment_id.as_global_id();
697 let fragment_mask = fragment.fragment_type_mask;
698 let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
699 let has_some_scan = candidates
700 .into_iter()
701 .any(|flag| (fragment_mask & flag as u32) > 0);
702 if has_some_scan {
703 visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
704 match node.node_body.as_ref() {
705 Some(NodeBody::StreamScan(stream_scan)) => {
706 let table_id = stream_scan.table_id;
707 let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
708 fragments.push(fragment_id);
709 false
711 }
712 Some(NodeBody::SourceBackfill(source_backfill)) => {
713 let source_id = source_backfill.upstream_source_id;
714 let fragments: &mut Vec<_> = mapping.entry(source_id).or_default();
715 fragments.push(fragment_id);
716 false
718 }
719 _ => true,
720 }
721 })
722 }
723 }
724 mapping
725 }
726
727 pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
731 let mapping = self.collect_backfill_mapping();
732 let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
733 for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
734 let fragment_ids = mapping.get(rel_id).unwrap();
735 for fragment_id in fragment_ids {
736 let downstream_fragment_ids = downstream_rel_ids
737 .data
738 .iter()
739 .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
740 .copied()
741 .collect();
742 fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
743 }
744 }
745 fragment_ordering
746 }
747}
748
749pub fn fill_snapshot_backfill_epoch(
752 node: &mut StreamNode,
753 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
754 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
755) -> MetaResult<bool> {
756 let mut result = Ok(());
757 let mut applied = false;
758 visit_stream_node_cont_mut(node, |node| {
759 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
760 && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
761 || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
762 {
763 result = try {
764 let table_id = TableId::new(stream_scan.table_id);
765 let snapshot_epoch = cross_db_snapshot_backfill_info
766 .upstream_mv_table_id_to_backfill_epoch
767 .get(&table_id)
768 .or_else(|| {
769 snapshot_backfill_info.and_then(|snapshot_backfill_info| {
770 snapshot_backfill_info
771 .upstream_mv_table_id_to_backfill_epoch
772 .get(&table_id)
773 })
774 })
775 .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
776 .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
777 if let Some(prev_snapshot_epoch) =
778 stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
779 {
780 Err(anyhow!(
781 "snapshot backfill epoch set again: {} {} {}",
782 table_id,
783 prev_snapshot_epoch,
784 snapshot_epoch
785 ))?;
786 }
787 applied = true;
788 };
789 result.is_ok()
790 } else {
791 true
792 }
793 });
794 result.map(|_| applied)
795}
796
797static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
798 LazyLock::new(HashMap::new);
799
800#[derive(Debug, Clone, EnumAsInner)]
803pub(super) enum EitherFragment {
804 Building(BuildingFragment),
806
807 Existing(Fragment),
809}
810
811#[derive(Debug)]
820pub struct CompleteStreamFragmentGraph {
821 building_graph: StreamFragmentGraph,
823
824 existing_fragments: HashMap<GlobalFragmentId, Fragment>,
826
827 existing_actor_location: HashMap<ActorId, WorkerId>,
829
830 extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
832
833 extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
835}
836
837pub struct FragmentGraphUpstreamContext {
838 upstream_root_fragments: HashMap<TableId, Fragment>,
841 upstream_actor_location: HashMap<ActorId, WorkerId>,
842}
843
844pub struct FragmentGraphDownstreamContext {
845 original_root_fragment_id: FragmentId,
846 downstream_fragments: Vec<(DispatcherType, Fragment)>,
847 downstream_actor_location: HashMap<ActorId, WorkerId>,
848}
849
850impl CompleteStreamFragmentGraph {
851 #[cfg(test)]
854 pub fn for_test(graph: StreamFragmentGraph) -> Self {
855 Self {
856 building_graph: graph,
857 existing_fragments: Default::default(),
858 existing_actor_location: Default::default(),
859 extra_downstreams: Default::default(),
860 extra_upstreams: Default::default(),
861 }
862 }
863
864 pub fn with_upstreams(
868 graph: StreamFragmentGraph,
869 upstream_root_fragments: HashMap<TableId, Fragment>,
870 existing_actor_location: HashMap<ActorId, WorkerId>,
871 job_type: StreamingJobType,
872 ) -> MetaResult<Self> {
873 Self::build_helper(
874 graph,
875 Some(FragmentGraphUpstreamContext {
876 upstream_root_fragments,
877 upstream_actor_location: existing_actor_location,
878 }),
879 None,
880 job_type,
881 )
882 }
883
884 pub fn with_downstreams(
887 graph: StreamFragmentGraph,
888 original_root_fragment_id: FragmentId,
889 downstream_fragments: Vec<(DispatcherType, Fragment)>,
890 existing_actor_location: HashMap<ActorId, WorkerId>,
891 job_type: StreamingJobType,
892 ) -> MetaResult<Self> {
893 Self::build_helper(
894 graph,
895 None,
896 Some(FragmentGraphDownstreamContext {
897 original_root_fragment_id,
898 downstream_fragments,
899 downstream_actor_location: existing_actor_location,
900 }),
901 job_type,
902 )
903 }
904
905 pub fn with_upstreams_and_downstreams(
907 graph: StreamFragmentGraph,
908 upstream_root_fragments: HashMap<TableId, Fragment>,
909 upstream_actor_location: HashMap<ActorId, WorkerId>,
910 original_root_fragment_id: FragmentId,
911 downstream_fragments: Vec<(DispatcherType, Fragment)>,
912 downstream_actor_location: HashMap<ActorId, WorkerId>,
913 job_type: StreamingJobType,
914 ) -> MetaResult<Self> {
915 Self::build_helper(
916 graph,
917 Some(FragmentGraphUpstreamContext {
918 upstream_root_fragments,
919 upstream_actor_location,
920 }),
921 Some(FragmentGraphDownstreamContext {
922 original_root_fragment_id,
923 downstream_fragments,
924 downstream_actor_location,
925 }),
926 job_type,
927 )
928 }
929
930 fn build_helper(
932 mut graph: StreamFragmentGraph,
933 upstream_ctx: Option<FragmentGraphUpstreamContext>,
934 downstream_ctx: Option<FragmentGraphDownstreamContext>,
935 job_type: StreamingJobType,
936 ) -> MetaResult<Self> {
937 let mut extra_downstreams = HashMap::new();
938 let mut extra_upstreams = HashMap::new();
939 let mut existing_fragments = HashMap::new();
940
941 let mut existing_actor_location = HashMap::new();
942
943 if let Some(FragmentGraphUpstreamContext {
944 upstream_root_fragments,
945 upstream_actor_location,
946 }) = upstream_ctx
947 {
948 for (&id, fragment) in &mut graph.fragments {
949 let uses_shuffled_backfill = fragment.has_shuffled_backfill();
950
951 for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
952 let upstream_fragment = upstream_root_fragments
953 .get(&upstream_table_id)
954 .context("upstream fragment not found")?;
955 let upstream_root_fragment_id =
956 GlobalFragmentId::new(upstream_fragment.fragment_id);
957
958 let edge = match job_type {
959 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
960 assert_ne!(
963 (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
964 0
965 );
966
967 tracing::debug!(
968 ?upstream_root_fragment_id,
969 ?required_columns,
970 identity = ?fragment.inner.get_node().unwrap().get_identity(),
971 current_frag_id=?id,
972 "CdcFilter with upstream source fragment"
973 );
974
975 StreamFragmentEdge {
976 id: EdgeId::UpstreamExternal {
977 upstream_table_id,
978 downstream_fragment_id: id,
979 },
980 dispatch_strategy: DispatchStrategy {
983 r#type: DispatcherType::NoShuffle as _,
984 dist_key_indices: vec![], output_mapping: DispatchOutputMapping::identical(
986 CDC_SOURCE_COLUMN_NUM as _,
987 )
988 .into(),
989 },
990 }
991 }
992
993 StreamingJobType::MaterializedView
995 | StreamingJobType::Sink
996 | StreamingJobType::Index => {
997 if upstream_fragment
1000 .fragment_type_mask
1001 .contains(FragmentTypeFlag::Mview)
1002 {
1003 let (dist_key_indices, output_mapping) = {
1005 let nodes = &upstream_fragment.nodes;
1006 let mview_node =
1007 nodes.get_node_body().unwrap().as_materialize().unwrap();
1008 let all_columns = mview_node.column_descs();
1009 let dist_key_indices = mview_node.dist_key_indices();
1010 let output_mapping = gen_output_mapping(
1011 required_columns,
1012 &all_columns,
1013 )
1014 .context(
1015 "BUG: column not found in the upstream materialized view",
1016 )?;
1017 (dist_key_indices, output_mapping)
1018 };
1019 let dispatch_strategy = mv_on_mv_dispatch_strategy(
1020 uses_shuffled_backfill,
1021 dist_key_indices,
1022 output_mapping,
1023 );
1024
1025 StreamFragmentEdge {
1026 id: EdgeId::UpstreamExternal {
1027 upstream_table_id,
1028 downstream_fragment_id: id,
1029 },
1030 dispatch_strategy,
1031 }
1032 }
1033 else if upstream_fragment
1036 .fragment_type_mask
1037 .contains(FragmentTypeFlag::Source)
1038 {
1039 let output_mapping = {
1040 let nodes = &upstream_fragment.nodes;
1041 let source_node =
1042 nodes.get_node_body().unwrap().as_source().unwrap();
1043
1044 let all_columns = source_node.column_descs().unwrap();
1045 gen_output_mapping(required_columns, &all_columns).context(
1046 "BUG: column not found in the upstream source node",
1047 )?
1048 };
1049
1050 StreamFragmentEdge {
1051 id: EdgeId::UpstreamExternal {
1052 upstream_table_id,
1053 downstream_fragment_id: id,
1054 },
1055 dispatch_strategy: DispatchStrategy {
1058 r#type: DispatcherType::NoShuffle as _,
1059 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1061 },
1062 }
1063 } else {
1064 bail!(
1065 "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1066 upstream_fragment.fragment_type_mask
1067 )
1068 }
1069 }
1070 StreamingJobType::Source | StreamingJobType::Table(_) => {
1071 bail!(
1072 "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1073 job_type
1074 )
1075 }
1076 };
1077
1078 extra_downstreams
1080 .entry(upstream_root_fragment_id)
1081 .or_insert_with(HashMap::new)
1082 .try_insert(id, edge.clone())
1083 .unwrap();
1084 extra_upstreams
1085 .entry(id)
1086 .or_insert_with(HashMap::new)
1087 .try_insert(upstream_root_fragment_id, edge)
1088 .unwrap();
1089 }
1090 }
1091
1092 existing_fragments.extend(
1093 upstream_root_fragments
1094 .into_values()
1095 .map(|f| (GlobalFragmentId::new(f.fragment_id), f)),
1096 );
1097
1098 existing_actor_location.extend(upstream_actor_location);
1099 }
1100
1101 if let Some(FragmentGraphDownstreamContext {
1102 original_root_fragment_id,
1103 downstream_fragments,
1104 downstream_actor_location,
1105 }) = downstream_ctx
1106 {
1107 let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1108 let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1109
1110 for (dispatcher_type, fragment) in &downstream_fragments {
1113 let id = GlobalFragmentId::new(fragment.fragment_id);
1114
1115 let output_columns = {
1117 let mut res = None;
1118
1119 stream_graph_visitor::visit_stream_node_body(&fragment.nodes, |node_body| {
1120 let columns = match node_body {
1121 NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1122 NodeBody::SourceBackfill(source_backfill) => {
1123 source_backfill.column_descs()
1125 }
1126 _ => return,
1127 };
1128 res = Some(columns);
1129 });
1130
1131 res.context("failed to locate downstream scan")?
1132 };
1133
1134 let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1135 let nodes = table_fragment.node.as_ref().unwrap();
1136
1137 let (dist_key_indices, output_mapping) = match job_type {
1138 StreamingJobType::Table(_) => {
1139 let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1140 let all_columns = mview_node.column_descs();
1141 let dist_key_indices = mview_node.dist_key_indices();
1142 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1143 .ok_or_else(|| {
1144 MetaError::invalid_parameter(
1145 "unable to drop the column due to \
1146 being referenced by downstream materialized views or sinks",
1147 )
1148 })?;
1149 (dist_key_indices, output_mapping)
1150 }
1151
1152 StreamingJobType::Source => {
1153 let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1154 let all_columns = source_node.column_descs().unwrap();
1155 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1156 .ok_or_else(|| {
1157 MetaError::invalid_parameter(
1158 "unable to drop the column due to \
1159 being referenced by downstream materialized views or sinks",
1160 )
1161 })?;
1162 assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1163 (
1164 vec![], output_mapping,
1166 )
1167 }
1168
1169 _ => bail!("unsupported job type for replacement: {job_type:?}"),
1170 };
1171
1172 let edge = StreamFragmentEdge {
1173 id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1174 original_upstream_fragment_id: original_table_fragment_id,
1175 downstream_fragment_id: id,
1176 }),
1177 dispatch_strategy: DispatchStrategy {
1178 r#type: *dispatcher_type as i32,
1179 output_mapping: Some(output_mapping),
1180 dist_key_indices,
1181 },
1182 };
1183
1184 extra_downstreams
1185 .entry(table_fragment_id)
1186 .or_insert_with(HashMap::new)
1187 .try_insert(id, edge.clone())
1188 .unwrap();
1189 extra_upstreams
1190 .entry(id)
1191 .or_insert_with(HashMap::new)
1192 .try_insert(table_fragment_id, edge)
1193 .unwrap();
1194 }
1195
1196 existing_fragments.extend(
1197 downstream_fragments
1198 .into_iter()
1199 .map(|(_, f)| (GlobalFragmentId::new(f.fragment_id), f)),
1200 );
1201
1202 existing_actor_location.extend(downstream_actor_location);
1203 }
1204
1205 Ok(Self {
1206 building_graph: graph,
1207 existing_fragments,
1208 existing_actor_location,
1209 extra_downstreams,
1210 extra_upstreams,
1211 })
1212 }
1213}
1214
1215fn gen_output_mapping(
1217 required_columns: &[PbColumnDesc],
1218 upstream_columns: &[PbColumnDesc],
1219) -> Option<DispatchOutputMapping> {
1220 let len = required_columns.len();
1221 let mut indices = vec![0; len];
1222 let mut types = None;
1223
1224 for (i, r) in required_columns.iter().enumerate() {
1225 let (ui, u) = upstream_columns
1226 .iter()
1227 .find_position(|&u| u.column_id == r.column_id)?;
1228 indices[i] = ui as u32;
1229
1230 if u.column_type != r.column_type {
1233 types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1234 upstream: u.column_type.clone(),
1235 downstream: r.column_type.clone(),
1236 };
1237 }
1238 }
1239
1240 let types = types.unwrap_or(Vec::new());
1242
1243 Some(DispatchOutputMapping { indices, types })
1244}
1245
1246fn mv_on_mv_dispatch_strategy(
1247 uses_shuffled_backfill: bool,
1248 dist_key_indices: Vec<u32>,
1249 output_mapping: DispatchOutputMapping,
1250) -> DispatchStrategy {
1251 if uses_shuffled_backfill {
1252 if !dist_key_indices.is_empty() {
1253 DispatchStrategy {
1254 r#type: DispatcherType::Hash as _,
1255 dist_key_indices,
1256 output_mapping: Some(output_mapping),
1257 }
1258 } else {
1259 DispatchStrategy {
1260 r#type: DispatcherType::Simple as _,
1261 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1263 }
1264 }
1265 } else {
1266 DispatchStrategy {
1267 r#type: DispatcherType::NoShuffle as _,
1268 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1270 }
1271 }
1272}
1273
1274impl CompleteStreamFragmentGraph {
1275 pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1278 self.building_graph
1279 .fragments
1280 .keys()
1281 .chain(self.existing_fragments.keys())
1282 .copied()
1283 }
1284
1285 pub(super) fn all_edges(
1287 &self,
1288 ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1289 self.building_graph
1290 .downstreams
1291 .iter()
1292 .chain(self.extra_downstreams.iter())
1293 .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1294 }
1295
1296 pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1298 self.existing_fragments
1299 .iter()
1300 .map(|(&id, f)| {
1301 (
1302 id,
1303 Distribution::from_fragment(f, &self.existing_actor_location),
1304 )
1305 })
1306 .collect()
1307 }
1308
1309 pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1316 let mut topo = Vec::new();
1317 let mut downstream_cnts = HashMap::new();
1318
1319 for fragment_id in self.all_fragment_ids() {
1321 let downstream_cnt = self.get_downstreams(fragment_id).count();
1323 if downstream_cnt == 0 {
1324 topo.push(fragment_id);
1325 } else {
1326 downstream_cnts.insert(fragment_id, downstream_cnt);
1327 }
1328 }
1329
1330 let mut i = 0;
1331 while let Some(&fragment_id) = topo.get(i) {
1332 i += 1;
1333 for (upstream_id, _) in self.get_upstreams(fragment_id) {
1335 let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1336 *downstream_cnt -= 1;
1337 if *downstream_cnt == 0 {
1338 downstream_cnts.remove(&upstream_id);
1339 topo.push(upstream_id);
1340 }
1341 }
1342 }
1343
1344 if !downstream_cnts.is_empty() {
1345 bail!("graph is not a DAG");
1347 }
1348
1349 Ok(topo)
1350 }
1351
1352 pub(super) fn seal_fragment(
1355 &self,
1356 id: GlobalFragmentId,
1357 actors: Vec<StreamActor>,
1358 distribution: Distribution,
1359 stream_node: StreamNode,
1360 ) -> Fragment {
1361 let building_fragment = self.get_fragment(id).into_building().unwrap();
1362 let internal_tables = building_fragment.extract_internal_tables();
1363 let BuildingFragment {
1364 inner,
1365 job_id,
1366 upstream_table_columns: _,
1367 } = building_fragment;
1368
1369 let distribution_type = distribution.to_distribution_type();
1370 let vnode_count = distribution.vnode_count();
1371
1372 let materialized_fragment_id =
1373 if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1374 job_id
1375 } else {
1376 None
1377 };
1378
1379 let state_table_ids = internal_tables
1380 .iter()
1381 .map(|t| t.id)
1382 .chain(materialized_fragment_id)
1383 .collect();
1384
1385 Fragment {
1386 fragment_id: inner.fragment_id,
1387 fragment_type_mask: inner.fragment_type_mask.into(),
1388 distribution_type,
1389 actors,
1390 state_table_ids,
1391 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1392 nodes: stream_node,
1393 }
1394 }
1395
1396 pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1399 if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1400 EitherFragment::Existing(fragment.clone())
1401 } else {
1402 EitherFragment::Building(
1403 self.building_graph
1404 .fragments
1405 .get(&fragment_id)
1406 .unwrap()
1407 .clone(),
1408 )
1409 }
1410 }
1411
1412 pub(super) fn get_downstreams(
1415 &self,
1416 fragment_id: GlobalFragmentId,
1417 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1418 self.building_graph
1419 .get_downstreams(fragment_id)
1420 .iter()
1421 .chain(
1422 self.extra_downstreams
1423 .get(&fragment_id)
1424 .into_iter()
1425 .flatten(),
1426 )
1427 .map(|(&id, edge)| (id, edge))
1428 }
1429
1430 pub(super) fn get_upstreams(
1433 &self,
1434 fragment_id: GlobalFragmentId,
1435 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1436 self.building_graph
1437 .get_upstreams(fragment_id)
1438 .iter()
1439 .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1440 .map(|(&id, edge)| (id, edge))
1441 }
1442
1443 pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1445 &self.building_graph.fragments
1446 }
1447
1448 pub(super) fn building_fragments_mut(
1450 &mut self,
1451 ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1452 &mut self.building_graph.fragments
1453 }
1454
1455 pub(super) fn max_parallelism(&self) -> usize {
1457 self.building_graph.max_parallelism()
1458 }
1459}