1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19use std::sync::atomic::AtomicU32;
20
21use anyhow::{Context, anyhow};
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use risingwave_common::bail;
25use risingwave_common::catalog::{
26 CDC_SOURCE_COLUMN_NUM, ColumnCatalog, Field, FragmentTypeFlag, FragmentTypeMask, TableId,
27 generate_internal_table_name_with_type,
28};
29use risingwave_common::hash::VnodeCount;
30use risingwave_common::id::JobId;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::stream_graph_visitor::{
33 self, visit_stream_node_cont, visit_stream_node_cont_mut,
34};
35use risingwave_connector::sink::catalog::SinkType;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::{PbSink, PbTable, Table};
38use risingwave_pb::ddl_service::TableJobType;
39use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
40use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
41use risingwave_pb::stream_plan::stream_fragment_graph::{
42 Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
43};
44use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
45use risingwave_pb::stream_plan::{
46 BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
47 PbStreamScanType, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
48 StreamScanType,
49};
50
51use crate::barrier::{SharedFragmentInfo, SnapshotBackfillInfo};
52use crate::controller::id::IdGeneratorManager;
53use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
54use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
55use crate::stream::stream_graph::id::{
56 GlobalActorIdGen, GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen,
57};
58use crate::stream::stream_graph::schedule::Distribution;
59use crate::{MetaError, MetaResult};
60
61#[derive(Debug, Clone)]
64pub(super) struct BuildingFragment {
65 inner: StreamFragment,
67
68 job_id: Option<JobId>,
70
71 upstream_job_columns: HashMap<JobId, Vec<PbColumnDesc>>,
76}
77
78impl BuildingFragment {
79 fn new(
82 id: GlobalFragmentId,
83 fragment: StreamFragment,
84 job: &StreamingJob,
85 table_id_gen: GlobalTableIdGen,
86 ) -> Self {
87 let mut fragment = StreamFragment {
88 fragment_id: id.as_global_id(),
89 ..fragment
90 };
91
92 Self::fill_internal_tables(&mut fragment, job, table_id_gen);
94
95 let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
96 let upstream_job_columns =
97 Self::extract_upstream_columns_except_cross_db_backfill(&fragment);
98
99 Self {
100 inner: fragment,
101 job_id,
102 upstream_job_columns,
103 }
104 }
105
106 fn extract_internal_tables(&self) -> Vec<Table> {
108 let mut fragment = self.inner.clone();
109 let mut tables = Vec::new();
110 stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
111 tables.push(table.clone());
112 });
113 tables
114 }
115
116 fn fill_internal_tables(
118 fragment: &mut StreamFragment,
119 job: &StreamingJob,
120 table_id_gen: GlobalTableIdGen,
121 ) {
122 let fragment_id = fragment.fragment_id;
123 stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
124 table.id = table_id_gen
125 .to_global_id(table.id.as_raw_id())
126 .as_global_id();
127 table.schema_id = job.schema_id();
128 table.database_id = job.database_id();
129 table.name = generate_internal_table_name_with_type(
130 &job.name(),
131 fragment_id,
132 table.id,
133 table_type_name,
134 );
135 table.fragment_id = fragment_id;
136 table.owner = job.owner();
137 table.job_id = Some(job.id());
138 });
139 }
140
141 fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
143 let job_id = job.id();
144 let fragment_id = fragment.fragment_id;
145 let mut has_job = false;
146
147 stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
148 NodeBody::Materialize(materialize_node) => {
149 materialize_node.table_id = job_id.as_mv_table_id();
150
151 let table = materialize_node.table.insert(job.table().unwrap().clone());
153 table.fragment_id = fragment_id; if cfg!(not(debug_assertions)) {
156 table.definition = job.name();
157 }
158
159 has_job = true;
160 }
161 NodeBody::Sink(sink_node) => {
162 sink_node.sink_desc.as_mut().unwrap().id = job_id.as_sink_id();
163
164 has_job = true;
165 }
166 NodeBody::Dml(dml_node) => {
167 dml_node.table_id = job_id.as_mv_table_id();
168 dml_node.table_version_id = job.table_version_id().unwrap();
169 }
170 NodeBody::StreamFsFetch(fs_fetch_node) => {
171 if let StreamingJob::Table(table_source, _, _) = job
172 && let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
173 && let Some(source) = table_source
174 {
175 node_inner.source_id = source.id;
176 if let Some(id) = source.optional_associated_table_id {
177 node_inner.associated_table_id = Some(id.into());
178 }
179 }
180 }
181 NodeBody::Source(source_node) => {
182 match job {
183 StreamingJob::Table(source, _table, _table_job_type) => {
186 if let Some(source_inner) = source_node.source_inner.as_mut()
187 && let Some(source) = source
188 {
189 debug_assert_ne!(source.id, job_id.as_raw_id());
190 source_inner.source_id = source.id;
191 if let Some(id) = source.optional_associated_table_id {
192 source_inner.associated_table_id = Some(id.into());
193 }
194 }
195 }
196 StreamingJob::Source(source) => {
197 has_job = true;
198 if let Some(source_inner) = source_node.source_inner.as_mut() {
199 debug_assert_eq!(source.id, job_id.as_raw_id());
200 source_inner.source_id = source.id;
201 if let Some(id) = source.optional_associated_table_id {
202 source_inner.associated_table_id = Some(id.into());
203 }
204 }
205 }
206 _ => {}
208 }
209 }
210 NodeBody::StreamCdcScan(node) => {
211 if let Some(table_desc) = node.cdc_table_desc.as_mut() {
212 table_desc.table_id = job_id.as_mv_table_id();
213 }
214 }
215 NodeBody::VectorIndexWrite(node) => {
216 let table = node.table.as_mut().unwrap();
217 table.id = job_id.as_mv_table_id();
218 table.database_id = job.database_id();
219 table.schema_id = job.schema_id();
220 table.fragment_id = fragment_id;
221 #[cfg(not(debug_assertions))]
222 {
223 table.definition = job.name();
224 }
225
226 has_job = true;
227 }
228 _ => {}
229 });
230
231 has_job
232 }
233
234 fn extract_upstream_columns_except_cross_db_backfill(
236 fragment: &StreamFragment,
237 ) -> HashMap<JobId, Vec<PbColumnDesc>> {
238 let mut table_columns = HashMap::new();
239
240 stream_graph_visitor::visit_fragment(fragment, |node_body| {
241 let (table_id, column_ids) = match node_body {
242 NodeBody::StreamScan(stream_scan) => {
243 if stream_scan.get_stream_scan_type().unwrap()
244 == StreamScanType::CrossDbSnapshotBackfill
245 {
246 return;
247 }
248 (
249 stream_scan.table_id.as_job_id(),
250 stream_scan.upstream_columns(),
251 )
252 }
253 NodeBody::CdcFilter(cdc_filter) => (
254 cdc_filter.upstream_source_id.as_share_source_job_id(),
255 vec![],
256 ),
257 NodeBody::SourceBackfill(backfill) => (
258 backfill.upstream_source_id.as_share_source_job_id(),
259 backfill.column_descs(),
261 ),
262 _ => return,
263 };
264 table_columns
265 .try_insert(table_id, column_ids)
266 .expect("currently there should be no two same upstream tables in a fragment");
267 });
268
269 table_columns
270 }
271
272 pub fn has_shuffled_backfill(&self) -> bool {
273 let stream_node = match self.inner.node.as_ref() {
274 Some(node) => node,
275 _ => return false,
276 };
277 let mut has_shuffled_backfill = false;
278 let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
279 visit_stream_node_cont(stream_node, |node| {
280 let is_shuffled_backfill = if let Some(node) = &node.node_body
281 && let Some(node) = node.as_stream_scan()
282 {
283 node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
284 || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
285 } else {
286 false
287 };
288 if is_shuffled_backfill {
289 *has_shuffled_backfill_mut_ref = true;
290 false
291 } else {
292 true
293 }
294 });
295 has_shuffled_backfill
296 }
297}
298
299impl Deref for BuildingFragment {
300 type Target = StreamFragment;
301
302 fn deref(&self) -> &Self::Target {
303 &self.inner
304 }
305}
306
307impl DerefMut for BuildingFragment {
308 fn deref_mut(&mut self) -> &mut Self::Target {
309 &mut self.inner
310 }
311}
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
316pub(super) enum EdgeId {
317 Internal {
319 link_id: u64,
322 },
323
324 UpstreamExternal {
327 upstream_job_id: JobId,
329 downstream_fragment_id: GlobalFragmentId,
331 },
332
333 DownstreamExternal(DownstreamExternalEdgeId),
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
339pub(super) struct DownstreamExternalEdgeId {
340 pub(super) original_upstream_fragment_id: GlobalFragmentId,
342 pub(super) downstream_fragment_id: GlobalFragmentId,
344}
345
346#[derive(Debug, Clone)]
350pub(super) struct StreamFragmentEdge {
351 pub id: EdgeId,
353
354 pub dispatch_strategy: DispatchStrategy,
356}
357
358impl StreamFragmentEdge {
359 fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
360 Self {
361 id: EdgeId::Internal {
364 link_id: edge.link_id,
365 },
366 dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
367 }
368 }
369}
370
371fn clone_fragment(
372 fragment: &Fragment,
373 id_generator_manager: &IdGeneratorManager,
374 actor_id_counter: &AtomicU32,
375) -> Fragment {
376 let fragment_id = GlobalFragmentIdGen::new(id_generator_manager, 1)
377 .to_global_id(0)
378 .as_global_id();
379 let actor_id_gen = GlobalActorIdGen::new(actor_id_counter, fragment.actors.len() as _);
380 Fragment {
381 fragment_id,
382 fragment_type_mask: fragment.fragment_type_mask,
383 distribution_type: fragment.distribution_type,
384 actors: fragment
385 .actors
386 .iter()
387 .enumerate()
388 .map(|(i, actor)| StreamActor {
389 actor_id: actor_id_gen.to_global_id(i as _).as_global_id(),
390 fragment_id,
391 vnode_bitmap: actor.vnode_bitmap.clone(),
392 mview_definition: actor.mview_definition.clone(),
393 expr_context: actor.expr_context.clone(),
394 config_override: actor.config_override.clone(),
395 })
396 .collect(),
397 state_table_ids: fragment.state_table_ids.clone(),
398 maybe_vnode_count: fragment.maybe_vnode_count,
399 nodes: fragment.nodes.clone(),
400 }
401}
402
403pub fn check_sink_fragments_support_refresh_schema(
404 fragments: &BTreeMap<FragmentId, Fragment>,
405) -> MetaResult<()> {
406 if fragments.len() != 1 {
407 return Err(anyhow!(
408 "sink with auto schema change should have only 1 fragment, but got {:?}",
409 fragments.len()
410 )
411 .into());
412 }
413 let (_, fragment) = fragments.first_key_value().expect("non-empty");
414 let sink_node = &fragment.nodes;
415 let PbNodeBody::Sink(_) = sink_node.node_body.as_ref().unwrap() else {
416 return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
417 };
418 let [stream_scan_node] = sink_node.input.as_slice() else {
419 panic!("Sink has more than 1 input: {:?}", sink_node.input);
420 };
421 let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_ref().unwrap() else {
422 return Err(anyhow!(
423 "expect PbNodeBody::StreamScan but got: {:?}",
424 stream_scan_node.node_body
425 )
426 .into());
427 };
428 let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
429 if stream_scan_type != PbStreamScanType::ArrangementBackfill {
430 return Err(anyhow!(
431 "unsupported stream_scan_type for auto refresh schema: {:?}",
432 stream_scan_type
433 )
434 .into());
435 }
436 let [merge_node, _batch_plan_node] = stream_scan_node.input.as_slice() else {
437 panic!(
438 "the number of StreamScan inputs is not 2: {:?}",
439 stream_scan_node.input
440 );
441 };
442 let NodeBody::Merge(_) = merge_node.node_body.as_ref().unwrap() else {
443 return Err(anyhow!(
444 "expect PbNodeBody::Merge but got: {:?}",
445 merge_node.node_body
446 )
447 .into());
448 };
449 Ok(())
450}
451
452pub fn rewrite_refresh_schema_sink_fragment(
453 original_sink_fragment: &Fragment,
454 sink: &PbSink,
455 newly_added_columns: &[ColumnCatalog],
456 upstream_table: &PbTable,
457 upstream_table_fragment_id: FragmentId,
458 id_generator_manager: &IdGeneratorManager,
459 actor_id_counter: &AtomicU32,
460) -> MetaResult<(Fragment, Vec<PbColumnCatalog>, Option<PbTable>)> {
461 let mut new_sink_columns = sink.columns.clone();
462 fn extend_sink_columns(
463 sink_columns: &mut Vec<PbColumnCatalog>,
464 new_columns: &[ColumnCatalog],
465 get_column_name: impl Fn(&String) -> String,
466 ) {
467 let next_column_id = sink_columns
468 .iter()
469 .map(|col| col.column_desc.as_ref().unwrap().column_id + 1)
470 .max()
471 .unwrap_or(1);
472 sink_columns.extend(new_columns.iter().enumerate().map(|(i, col)| {
473 let mut col = col.to_protobuf();
474 let column_desc = col.column_desc.as_mut().unwrap();
475 column_desc.column_id = next_column_id + (i as i32);
476 column_desc.name = get_column_name(&column_desc.name);
477 col
478 }));
479 }
480 extend_sink_columns(&mut new_sink_columns, newly_added_columns, |name| {
481 name.clone()
482 });
483
484 let mut new_sink_fragment = clone_fragment(
485 original_sink_fragment,
486 id_generator_manager,
487 actor_id_counter,
488 );
489 let sink_node = &mut new_sink_fragment.nodes;
490 let PbNodeBody::Sink(sink_node_body) = sink_node.node_body.as_mut().unwrap() else {
491 return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
492 };
493 let [stream_scan_node] = sink_node.input.as_mut_slice() else {
494 panic!("Sink has more than 1 input: {:?}", sink_node.input);
495 };
496 let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_mut().unwrap() else {
497 return Err(anyhow!(
498 "expect PbNodeBody::StreamScan but got: {:?}",
499 stream_scan_node.node_body
500 )
501 .into());
502 };
503 let [merge_node, _batch_plan_node] = stream_scan_node.input.as_mut_slice() else {
504 panic!(
505 "the number of StreamScan inputs is not 2: {:?}",
506 stream_scan_node.input
507 );
508 };
509 let NodeBody::Merge(merge) = merge_node.node_body.as_mut().unwrap() else {
510 return Err(anyhow!(
511 "expect PbNodeBody::Merge but got: {:?}",
512 merge_node.node_body
513 )
514 .into());
515 };
516 sink_node.identity = {
519 let sink_type = SinkType::from_proto(sink.sink_type());
520 let sink_type_str = sink_type.type_str();
521 let column_names = new_sink_columns
522 .iter()
523 .map(|col| {
524 ColumnCatalog::from(col.clone())
525 .name_with_hidden()
526 .to_string()
527 })
528 .join(", ");
529 let downstream_pk = if !sink_type.is_append_only() {
530 let downstream_pk = sink
531 .downstream_pk
532 .iter()
533 .map(|i| &sink.columns[*i as usize].column_desc.as_ref().unwrap().name)
534 .collect_vec();
535 format!(", downstream_pk: {downstream_pk:?}")
536 } else {
537 "".to_owned()
538 };
539 format!("StreamSink {{ type: {sink_type_str}, columns: [{column_names}]{downstream_pk} }}")
540 };
541 sink_node
542 .fields
543 .extend(newly_added_columns.iter().map(|col| {
544 Field::new(
545 format!("{}.{}", upstream_table.name, col.column_desc.name),
546 col.data_type().clone(),
547 )
548 .to_prost()
549 }));
550
551 let new_log_store_table = if let Some(log_store_table) = &mut sink_node_body.table {
552 extend_sink_columns(&mut log_store_table.columns, newly_added_columns, |name| {
553 format!("{}_{}", upstream_table.name, name)
554 });
555 Some(log_store_table.clone())
556 } else {
557 None
558 };
559 sink_node_body.sink_desc.as_mut().unwrap().column_catalogs = new_sink_columns.clone();
560
561 stream_scan_node
563 .fields
564 .extend(newly_added_columns.iter().map(|col| {
565 Field::new(
566 format!("{}.{}", upstream_table.name, col.column_desc.name),
567 col.data_type().clone(),
568 )
569 .to_prost()
570 }));
571 stream_scan_node.identity = {
573 let columns = stream_scan_node
574 .fields
575 .iter()
576 .map(|col| &col.name)
577 .join(", ");
578 format!("StreamTableScan {{ table: t, columns: [{columns}] }}")
579 };
580
581 let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
582 if stream_scan_type != PbStreamScanType::ArrangementBackfill {
583 return Err(anyhow!(
584 "unsupported stream_scan_type for auto refresh schema: {:?}",
585 stream_scan_type
586 )
587 .into());
588 }
589 scan.arrangement_table = Some(upstream_table.clone());
590 scan.output_indices.extend(
591 (0..newly_added_columns.len()).map(|i| (i + scan.upstream_column_ids.len()) as u32),
592 );
593 scan.upstream_column_ids.extend(
594 newly_added_columns
595 .iter()
596 .map(|col| col.column_id().get_id()),
597 );
598 let table_desc = scan.table_desc.as_mut().unwrap();
599 table_desc
600 .value_indices
601 .extend((0..newly_added_columns.len()).map(|i| (i + table_desc.columns.len()) as u32));
602 table_desc.columns.extend(
603 newly_added_columns
604 .iter()
605 .map(|col| col.column_desc.to_protobuf()),
606 );
607
608 merge_node.fields = scan
610 .upstream_column_ids
611 .iter()
612 .map(|&column_id| {
613 let col = upstream_table
614 .columns
615 .iter()
616 .find(|c| c.column_desc.as_ref().unwrap().column_id == column_id)
617 .unwrap();
618 let col_desc = col.column_desc.as_ref().unwrap();
619 Field::new(
620 col_desc.name.clone(),
621 col_desc.column_type.as_ref().unwrap().into(),
622 )
623 .to_prost()
624 })
625 .collect();
626 merge.upstream_fragment_id = upstream_table_fragment_id;
627 Ok((new_sink_fragment, new_sink_columns, new_log_store_table))
628}
629
630pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
635
636#[derive(Default, Debug)]
643pub struct StreamFragmentGraph {
644 pub(super) fragments: HashMap<GlobalFragmentId, BuildingFragment>,
646
647 pub(super) downstreams:
649 HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
650
651 pub(super) upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
653
654 dependent_table_ids: HashSet<TableId>,
656
657 specified_parallelism: Option<NonZeroUsize>,
660
661 max_parallelism: usize,
671
672 backfill_order: BackfillOrder,
674}
675
676impl StreamFragmentGraph {
677 pub fn new(
680 env: &MetaSrvEnv,
681 proto: StreamFragmentGraphProto,
682 job: &StreamingJob,
683 ) -> MetaResult<Self> {
684 let fragment_id_gen =
685 GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
686 let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
690
691 let fragments: HashMap<_, _> = proto
693 .fragments
694 .into_iter()
695 .map(|(id, fragment)| {
696 let id = fragment_id_gen.to_global_id(id.as_raw_id());
697 let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
698 (id, fragment)
699 })
700 .collect();
701
702 assert_eq!(
703 fragments
704 .values()
705 .map(|f| f.extract_internal_tables().len() as u32)
706 .sum::<u32>(),
707 proto.table_ids_cnt
708 );
709
710 let mut downstreams = HashMap::new();
712 let mut upstreams = HashMap::new();
713
714 for edge in proto.edges {
715 let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id.as_raw_id());
716 let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id.as_raw_id());
717 let edge = StreamFragmentEdge::from_protobuf(&edge);
718
719 upstreams
720 .entry(downstream_id)
721 .or_insert_with(HashMap::new)
722 .try_insert(upstream_id, edge.clone())
723 .unwrap();
724 downstreams
725 .entry(upstream_id)
726 .or_insert_with(HashMap::new)
727 .try_insert(downstream_id, edge)
728 .unwrap();
729 }
730
731 let dependent_table_ids = proto.dependent_table_ids.iter().copied().collect();
734
735 let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
736 Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
737 } else {
738 None
739 };
740
741 let max_parallelism = proto.max_parallelism as usize;
742 let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
743 order: Default::default(),
744 });
745
746 Ok(Self {
747 fragments,
748 downstreams,
749 upstreams,
750 dependent_table_ids,
751 specified_parallelism,
752 max_parallelism,
753 backfill_order,
754 })
755 }
756
757 pub fn incomplete_internal_tables(&self) -> BTreeMap<TableId, Table> {
763 let mut tables = BTreeMap::new();
764 for fragment in self.fragments.values() {
765 for table in fragment.extract_internal_tables() {
766 let table_id = table.id;
767 tables
768 .try_insert(table_id, table)
769 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
770 }
771 }
772 tables
773 }
774
775 pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<TableId, TableId>) {
778 for fragment in self.fragments.values_mut() {
779 stream_graph_visitor::visit_internal_tables(
780 &mut fragment.inner,
781 |table, _table_type_name| {
782 let target = table_id_map.get(&table.id).cloned().unwrap();
783 table.id = target;
784 },
785 );
786 }
787 }
788
789 pub fn fit_internal_tables_trivial(
792 &mut self,
793 mut old_internal_tables: Vec<Table>,
794 ) -> MetaResult<()> {
795 let mut new_internal_table_ids = Vec::new();
796 for fragment in self.fragments.values() {
797 for table in &fragment.extract_internal_tables() {
798 new_internal_table_ids.push(table.id);
799 }
800 }
801
802 if new_internal_table_ids.len() != old_internal_tables.len() {
803 bail!(
804 "Different number of internal tables. New: {}, Old: {}",
805 new_internal_table_ids.len(),
806 old_internal_tables.len()
807 );
808 }
809 old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
810 new_internal_table_ids.sort();
811
812 let internal_table_id_map = new_internal_table_ids
813 .into_iter()
814 .zip_eq_fast(old_internal_tables.into_iter())
815 .collect::<HashMap<_, _>>();
816
817 for fragment in self.fragments.values_mut() {
820 stream_graph_visitor::visit_internal_tables(
821 &mut fragment.inner,
822 |table, _table_type_name| {
823 let target = internal_table_id_map.get(&table.id).cloned().unwrap();
825 *table = target;
826 },
827 );
828 }
829
830 Ok(())
831 }
832
833 pub fn fit_internal_table_ids_with_mapping(&mut self, mut matches: HashMap<TableId, Table>) {
835 for fragment in self.fragments.values_mut() {
836 stream_graph_visitor::visit_internal_tables(
837 &mut fragment.inner,
838 |table, _table_type_name| {
839 let target = matches.remove(&table.id).unwrap_or_else(|| {
840 panic!("no matching table for table {}({})", table.id, table.name)
841 });
842 table.id = target.id;
843 table.maybe_vnode_count = target.maybe_vnode_count;
844 },
845 );
846 }
847 }
848
849 pub fn table_fragment_id(&self) -> FragmentId {
851 self.fragments
852 .values()
853 .filter(|b| b.job_id.is_some())
854 .map(|b| b.fragment_id)
855 .exactly_one()
856 .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
857 }
858
859 pub fn dml_fragment_id(&self) -> Option<FragmentId> {
861 self.fragments
862 .values()
863 .filter(|b| {
864 FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
865 })
866 .map(|b| b.fragment_id)
867 .at_most_one()
868 .expect("require at most 1 dml node when creating the streaming job")
869 }
870
871 pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
873 &self.dependent_table_ids
874 }
875
876 pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
878 self.specified_parallelism
879 }
880
881 pub fn max_parallelism(&self) -> usize {
883 self.max_parallelism
884 }
885
886 fn get_downstreams(
888 &self,
889 fragment_id: GlobalFragmentId,
890 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
891 self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
892 }
893
894 fn get_upstreams(
896 &self,
897 fragment_id: GlobalFragmentId,
898 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
899 self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
900 }
901
902 pub fn collect_snapshot_backfill_info(
903 &self,
904 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
905 Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
906 (
907 fragment.node.as_ref().unwrap(),
908 fragment.fragment_type_mask.into(),
909 )
910 }))
911 }
912
913 pub fn collect_snapshot_backfill_info_impl(
915 fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
916 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
917 let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
918 let mut cross_db_info = SnapshotBackfillInfo {
919 upstream_mv_table_id_to_backfill_epoch: Default::default(),
920 };
921 let mut result = Ok(());
922 for (node, fragment_type_mask) in fragments {
923 visit_stream_node_cont(node, |node| {
924 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
925 let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
926 .expect("invalid stream_scan_type");
927 let is_snapshot_backfill = match stream_scan_type {
928 StreamScanType::SnapshotBackfill => {
929 assert!(
930 fragment_type_mask
931 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
932 );
933 true
934 }
935 StreamScanType::CrossDbSnapshotBackfill => {
936 assert!(
937 fragment_type_mask
938 .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
939 );
940 cross_db_info
941 .upstream_mv_table_id_to_backfill_epoch
942 .insert(stream_scan.table_id, stream_scan.snapshot_backfill_epoch);
943
944 return true;
945 }
946 _ => false,
947 };
948
949 match &mut prev_stream_scan {
950 Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
951 match (prev_snapshot_backfill_info, is_snapshot_backfill) {
952 (Some(prev_snapshot_backfill_info), true) => {
953 prev_snapshot_backfill_info
954 .upstream_mv_table_id_to_backfill_epoch
955 .insert(
956 stream_scan.table_id,
957 stream_scan.snapshot_backfill_epoch,
958 );
959 true
960 }
961 (None, false) => true,
962 (_, _) => {
963 result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
964 false
965 }
966 }
967 }
968 None => {
969 prev_stream_scan = Some((
970 if is_snapshot_backfill {
971 Some(SnapshotBackfillInfo {
972 upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
973 [(
974 stream_scan.table_id,
975 stream_scan.snapshot_backfill_epoch,
976 )],
977 ),
978 })
979 } else {
980 None
981 },
982 *stream_scan.clone(),
983 ));
984 true
985 }
986 }
987 } else {
988 true
989 }
990 })
991 }
992 result.map(|_| {
993 (
994 prev_stream_scan
995 .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
996 .unwrap_or(None),
997 cross_db_info,
998 )
999 })
1000 }
1001
1002 pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
1004 let mut mapping = HashMap::new();
1005 for (fragment_id, fragment) in &self.fragments {
1006 let fragment_id = fragment_id.as_global_id();
1007 let fragment_mask = fragment.fragment_type_mask;
1008 let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
1009 let has_some_scan = candidates
1010 .into_iter()
1011 .any(|flag| (fragment_mask & flag as u32) > 0);
1012 if has_some_scan {
1013 visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
1014 match node.node_body.as_ref() {
1015 Some(NodeBody::StreamScan(stream_scan)) => {
1016 let table_id = stream_scan.table_id;
1017 let fragments: &mut Vec<_> =
1018 mapping.entry(table_id.as_raw_id()).or_default();
1019 fragments.push(fragment_id);
1020 false
1022 }
1023 Some(NodeBody::SourceBackfill(source_backfill)) => {
1024 let source_id = source_backfill.upstream_source_id;
1025 let fragments: &mut Vec<_> =
1026 mapping.entry(source_id.as_raw_id()).or_default();
1027 fragments.push(fragment_id);
1028 false
1030 }
1031 _ => true,
1032 }
1033 })
1034 }
1035 }
1036 mapping
1037 }
1038
1039 pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
1043 let mapping = self.collect_backfill_mapping();
1044 let mut fragment_ordering: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1045
1046 for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
1048 let fragment_ids = mapping.get(rel_id).unwrap();
1049 for fragment_id in fragment_ids {
1050 let downstream_fragment_ids = downstream_rel_ids
1051 .data
1052 .iter()
1053 .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
1054 .copied()
1055 .collect();
1056 fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
1057 }
1058 }
1059
1060 if fragment_ordering.is_empty() {
1063 for value in mapping.values() {
1064 for &fragment_id in value {
1065 fragment_ordering.entry(fragment_id).or_default();
1066 }
1067 }
1068 }
1069
1070 let locality_provider_dependencies = self.find_locality_provider_dependencies();
1072
1073 let backfill_fragments: HashSet<FragmentId> = mapping.values().flatten().copied().collect();
1074
1075 let all_locality_provider_fragments: HashSet<FragmentId> =
1078 locality_provider_dependencies.keys().copied().collect();
1079 let downstream_locality_provider_fragments: HashSet<FragmentId> =
1080 locality_provider_dependencies
1081 .values()
1082 .flatten()
1083 .copied()
1084 .collect();
1085 let locality_provider_root_fragments: Vec<FragmentId> = all_locality_provider_fragments
1086 .difference(&downstream_locality_provider_fragments)
1087 .copied()
1088 .collect();
1089
1090 for &backfill_fragment_id in &backfill_fragments {
1093 fragment_ordering
1094 .entry(backfill_fragment_id)
1095 .or_default()
1096 .extend(locality_provider_root_fragments.iter().copied());
1097 }
1098
1099 for (fragment_id, downstream_fragments) in locality_provider_dependencies {
1101 fragment_ordering
1102 .entry(fragment_id)
1103 .or_default()
1104 .extend(downstream_fragments);
1105 }
1106
1107 fragment_ordering
1108 }
1109
1110 pub fn find_locality_provider_fragment_state_table_mapping(
1111 &self,
1112 ) -> HashMap<FragmentId, Vec<TableId>> {
1113 let mut mapping: HashMap<FragmentId, Vec<TableId>> = HashMap::new();
1114
1115 for (fragment_id, fragment) in &self.fragments {
1116 let fragment_id = fragment_id.as_global_id();
1117
1118 if let Some(node) = fragment.node.as_ref() {
1120 let mut state_table_ids = Vec::new();
1121
1122 visit_stream_node_cont(node, |stream_node| {
1123 if let Some(NodeBody::LocalityProvider(locality_provider)) =
1124 stream_node.node_body.as_ref()
1125 {
1126 let state_table_id = locality_provider
1128 .state_table
1129 .as_ref()
1130 .expect("must have state table")
1131 .id;
1132 state_table_ids.push(state_table_id);
1133 false } else {
1135 true }
1137 });
1138
1139 if !state_table_ids.is_empty() {
1140 mapping.insert(fragment_id, state_table_ids);
1141 }
1142 }
1143 }
1144
1145 mapping
1146 }
1147
1148 pub fn find_locality_provider_dependencies(&self) -> HashMap<FragmentId, Vec<FragmentId>> {
1156 let mut locality_provider_fragments = HashSet::new();
1157 let mut dependencies: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1158
1159 for (fragment_id, fragment) in &self.fragments {
1161 let fragment_id = fragment_id.as_global_id();
1162 let has_locality_provider = self.fragment_has_locality_provider(fragment);
1163
1164 if has_locality_provider {
1165 locality_provider_fragments.insert(fragment_id);
1166 dependencies.entry(fragment_id).or_default();
1167 }
1168 }
1169
1170 for &provider_fragment_id in &locality_provider_fragments {
1174 let provider_fragment_global_id = GlobalFragmentId::new(provider_fragment_id);
1175
1176 let mut visited = HashSet::new();
1178 let mut downstream_locality_providers = Vec::new();
1179
1180 self.collect_downstream_locality_providers(
1181 provider_fragment_global_id,
1182 &locality_provider_fragments,
1183 &mut visited,
1184 &mut downstream_locality_providers,
1185 );
1186
1187 dependencies
1189 .entry(provider_fragment_id)
1190 .or_default()
1191 .extend(downstream_locality_providers);
1192 }
1193
1194 dependencies
1195 }
1196
1197 fn fragment_has_locality_provider(&self, fragment: &BuildingFragment) -> bool {
1198 let mut has_locality_provider = false;
1199
1200 if let Some(node) = fragment.node.as_ref() {
1201 visit_stream_node_cont(node, |stream_node| {
1202 if let Some(NodeBody::LocalityProvider(_)) = stream_node.node_body.as_ref() {
1203 has_locality_provider = true;
1204 false } else {
1206 true }
1208 });
1209 }
1210
1211 has_locality_provider
1212 }
1213
1214 fn collect_downstream_locality_providers(
1216 &self,
1217 current_fragment_id: GlobalFragmentId,
1218 locality_provider_fragments: &HashSet<FragmentId>,
1219 visited: &mut HashSet<GlobalFragmentId>,
1220 downstream_providers: &mut Vec<FragmentId>,
1221 ) {
1222 if visited.contains(¤t_fragment_id) {
1223 return;
1224 }
1225 visited.insert(current_fragment_id);
1226
1227 for &downstream_id in self.get_downstreams(current_fragment_id).keys() {
1229 let downstream_fragment_id = downstream_id.as_global_id();
1230
1231 if locality_provider_fragments.contains(&downstream_fragment_id) {
1233 downstream_providers.push(downstream_fragment_id);
1234 }
1235
1236 self.collect_downstream_locality_providers(
1238 downstream_id,
1239 locality_provider_fragments,
1240 visited,
1241 downstream_providers,
1242 );
1243 }
1244 }
1245}
1246
1247pub fn fill_snapshot_backfill_epoch(
1250 node: &mut StreamNode,
1251 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1252 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1253) -> MetaResult<bool> {
1254 let mut result = Ok(());
1255 let mut applied = false;
1256 visit_stream_node_cont_mut(node, |node| {
1257 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
1258 && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
1259 || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
1260 {
1261 result = try {
1262 let table_id = stream_scan.table_id;
1263 let snapshot_epoch = cross_db_snapshot_backfill_info
1264 .upstream_mv_table_id_to_backfill_epoch
1265 .get(&table_id)
1266 .or_else(|| {
1267 snapshot_backfill_info.and_then(|snapshot_backfill_info| {
1268 snapshot_backfill_info
1269 .upstream_mv_table_id_to_backfill_epoch
1270 .get(&table_id)
1271 })
1272 })
1273 .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
1274 .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
1275 if let Some(prev_snapshot_epoch) =
1276 stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
1277 {
1278 Err(anyhow!(
1279 "snapshot backfill epoch set again: {} {} {}",
1280 table_id,
1281 prev_snapshot_epoch,
1282 snapshot_epoch
1283 ))?;
1284 }
1285 applied = true;
1286 };
1287 result.is_ok()
1288 } else {
1289 true
1290 }
1291 });
1292 result.map(|_| applied)
1293}
1294
1295static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
1296 LazyLock::new(HashMap::new);
1297
1298#[derive(Debug, Clone, EnumAsInner)]
1301pub(super) enum EitherFragment {
1302 Building(BuildingFragment),
1304
1305 Existing(SharedFragmentInfo),
1307}
1308
1309#[derive(Debug)]
1318pub struct CompleteStreamFragmentGraph {
1319 building_graph: StreamFragmentGraph,
1321
1322 existing_fragments: HashMap<GlobalFragmentId, SharedFragmentInfo>,
1324
1325 existing_actor_location: HashMap<ActorId, WorkerId>,
1327
1328 extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1330
1331 extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1333}
1334
1335pub struct FragmentGraphUpstreamContext {
1336 pub upstream_root_fragments: HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1339 pub upstream_actor_location: HashMap<ActorId, WorkerId>,
1340}
1341
1342pub struct FragmentGraphDownstreamContext {
1343 pub original_root_fragment_id: FragmentId,
1344 pub downstream_fragments: Vec<(DispatcherType, SharedFragmentInfo, PbStreamNode)>,
1345 pub downstream_actor_location: HashMap<ActorId, WorkerId>,
1346}
1347
1348impl CompleteStreamFragmentGraph {
1349 #[cfg(test)]
1352 pub fn for_test(graph: StreamFragmentGraph) -> Self {
1353 Self {
1354 building_graph: graph,
1355 existing_fragments: Default::default(),
1356 existing_actor_location: Default::default(),
1357 extra_downstreams: Default::default(),
1358 extra_upstreams: Default::default(),
1359 }
1360 }
1361
1362 pub fn with_upstreams(
1366 graph: StreamFragmentGraph,
1367 upstream_context: FragmentGraphUpstreamContext,
1368 job_type: StreamingJobType,
1369 ) -> MetaResult<Self> {
1370 Self::build_helper(graph, Some(upstream_context), None, job_type)
1371 }
1372
1373 pub fn with_downstreams(
1376 graph: StreamFragmentGraph,
1377 downstream_context: FragmentGraphDownstreamContext,
1378 job_type: StreamingJobType,
1379 ) -> MetaResult<Self> {
1380 Self::build_helper(graph, None, Some(downstream_context), job_type)
1381 }
1382
1383 pub fn with_upstreams_and_downstreams(
1385 graph: StreamFragmentGraph,
1386 upstream_context: FragmentGraphUpstreamContext,
1387 downstream_context: FragmentGraphDownstreamContext,
1388 job_type: StreamingJobType,
1389 ) -> MetaResult<Self> {
1390 Self::build_helper(
1391 graph,
1392 Some(upstream_context),
1393 Some(downstream_context),
1394 job_type,
1395 )
1396 }
1397
1398 fn build_helper(
1400 mut graph: StreamFragmentGraph,
1401 upstream_ctx: Option<FragmentGraphUpstreamContext>,
1402 downstream_ctx: Option<FragmentGraphDownstreamContext>,
1403 job_type: StreamingJobType,
1404 ) -> MetaResult<Self> {
1405 let mut extra_downstreams = HashMap::new();
1406 let mut extra_upstreams = HashMap::new();
1407 let mut existing_fragments = HashMap::new();
1408
1409 let mut existing_actor_location = HashMap::new();
1410
1411 if let Some(FragmentGraphUpstreamContext {
1412 upstream_root_fragments,
1413 upstream_actor_location,
1414 }) = upstream_ctx
1415 {
1416 for (&id, fragment) in &mut graph.fragments {
1417 let uses_shuffled_backfill = fragment.has_shuffled_backfill();
1418
1419 for (&upstream_job_id, required_columns) in &fragment.upstream_job_columns {
1420 let (upstream_fragment, nodes) = upstream_root_fragments
1421 .get(&upstream_job_id)
1422 .context("upstream fragment not found")?;
1423 let upstream_root_fragment_id =
1424 GlobalFragmentId::new(upstream_fragment.fragment_id);
1425
1426 let edge = match job_type {
1427 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1428 assert_ne!(
1431 (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
1432 0
1433 );
1434
1435 tracing::debug!(
1436 ?upstream_root_fragment_id,
1437 ?required_columns,
1438 identity = ?fragment.inner.get_node().unwrap().get_identity(),
1439 current_frag_id=?id,
1440 "CdcFilter with upstream source fragment"
1441 );
1442
1443 StreamFragmentEdge {
1444 id: EdgeId::UpstreamExternal {
1445 upstream_job_id,
1446 downstream_fragment_id: id,
1447 },
1448 dispatch_strategy: DispatchStrategy {
1451 r#type: DispatcherType::NoShuffle as _,
1452 dist_key_indices: vec![], output_mapping: DispatchOutputMapping::identical(
1454 CDC_SOURCE_COLUMN_NUM as _,
1455 )
1456 .into(),
1457 },
1458 }
1459 }
1460
1461 StreamingJobType::MaterializedView
1463 | StreamingJobType::Sink
1464 | StreamingJobType::Index => {
1465 if upstream_fragment
1468 .fragment_type_mask
1469 .contains(FragmentTypeFlag::Mview)
1470 {
1471 let (dist_key_indices, output_mapping) = {
1473 let mview_node =
1474 nodes.get_node_body().unwrap().as_materialize().unwrap();
1475 let all_columns = mview_node.column_descs();
1476 let dist_key_indices = mview_node.dist_key_indices();
1477 let output_mapping = gen_output_mapping(
1478 required_columns,
1479 &all_columns,
1480 )
1481 .context(
1482 "BUG: column not found in the upstream materialized view",
1483 )?;
1484 (dist_key_indices, output_mapping)
1485 };
1486 let dispatch_strategy = mv_on_mv_dispatch_strategy(
1487 uses_shuffled_backfill,
1488 dist_key_indices,
1489 output_mapping,
1490 );
1491
1492 StreamFragmentEdge {
1493 id: EdgeId::UpstreamExternal {
1494 upstream_job_id,
1495 downstream_fragment_id: id,
1496 },
1497 dispatch_strategy,
1498 }
1499 }
1500 else if upstream_fragment
1503 .fragment_type_mask
1504 .contains(FragmentTypeFlag::Source)
1505 {
1506 let output_mapping = {
1507 let source_node =
1508 nodes.get_node_body().unwrap().as_source().unwrap();
1509
1510 let all_columns = source_node.column_descs().unwrap();
1511 gen_output_mapping(required_columns, &all_columns).context(
1512 "BUG: column not found in the upstream source node",
1513 )?
1514 };
1515
1516 StreamFragmentEdge {
1517 id: EdgeId::UpstreamExternal {
1518 upstream_job_id,
1519 downstream_fragment_id: id,
1520 },
1521 dispatch_strategy: DispatchStrategy {
1524 r#type: DispatcherType::NoShuffle as _,
1525 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1527 },
1528 }
1529 } else {
1530 bail!(
1531 "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1532 upstream_fragment.fragment_type_mask
1533 )
1534 }
1535 }
1536 StreamingJobType::Source | StreamingJobType::Table(_) => {
1537 bail!(
1538 "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1539 job_type
1540 )
1541 }
1542 };
1543
1544 extra_downstreams
1546 .entry(upstream_root_fragment_id)
1547 .or_insert_with(HashMap::new)
1548 .try_insert(id, edge.clone())
1549 .unwrap();
1550 extra_upstreams
1551 .entry(id)
1552 .or_insert_with(HashMap::new)
1553 .try_insert(upstream_root_fragment_id, edge)
1554 .unwrap();
1555 }
1556 }
1557
1558 existing_fragments.extend(
1559 upstream_root_fragments
1560 .into_values()
1561 .map(|(f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1562 );
1563
1564 existing_actor_location.extend(upstream_actor_location);
1565 }
1566
1567 if let Some(FragmentGraphDownstreamContext {
1568 original_root_fragment_id,
1569 downstream_fragments,
1570 downstream_actor_location,
1571 }) = downstream_ctx
1572 {
1573 let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1574 let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1575
1576 for (dispatcher_type, fragment, nodes) in &downstream_fragments {
1579 let id = GlobalFragmentId::new(fragment.fragment_id);
1580
1581 let output_columns = {
1583 let mut res = None;
1584
1585 stream_graph_visitor::visit_stream_node_body(nodes, |node_body| {
1586 let columns = match node_body {
1587 NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1588 NodeBody::SourceBackfill(source_backfill) => {
1589 source_backfill.column_descs()
1591 }
1592 _ => return,
1593 };
1594 res = Some(columns);
1595 });
1596
1597 res.context("failed to locate downstream scan")?
1598 };
1599
1600 let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1601 let nodes = table_fragment.node.as_ref().unwrap();
1602
1603 let (dist_key_indices, output_mapping) = match job_type {
1604 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1605 let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1606 let all_columns = mview_node.column_descs();
1607 let dist_key_indices = mview_node.dist_key_indices();
1608 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1609 .ok_or_else(|| {
1610 MetaError::invalid_parameter(
1611 "unable to drop the column due to \
1612 being referenced by downstream materialized views or sinks",
1613 )
1614 })?;
1615 (dist_key_indices, output_mapping)
1616 }
1617
1618 StreamingJobType::Source => {
1619 let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1620 let all_columns = source_node.column_descs().unwrap();
1621 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1622 .ok_or_else(|| {
1623 MetaError::invalid_parameter(
1624 "unable to drop the column due to \
1625 being referenced by downstream materialized views or sinks",
1626 )
1627 })?;
1628 assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1629 (
1630 vec![], output_mapping,
1632 )
1633 }
1634
1635 _ => bail!("unsupported job type for replacement: {job_type:?}"),
1636 };
1637
1638 let edge = StreamFragmentEdge {
1639 id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1640 original_upstream_fragment_id: original_table_fragment_id,
1641 downstream_fragment_id: id,
1642 }),
1643 dispatch_strategy: DispatchStrategy {
1644 r#type: *dispatcher_type as i32,
1645 output_mapping: Some(output_mapping),
1646 dist_key_indices,
1647 },
1648 };
1649
1650 extra_downstreams
1651 .entry(table_fragment_id)
1652 .or_insert_with(HashMap::new)
1653 .try_insert(id, edge.clone())
1654 .unwrap();
1655 extra_upstreams
1656 .entry(id)
1657 .or_insert_with(HashMap::new)
1658 .try_insert(table_fragment_id, edge)
1659 .unwrap();
1660 }
1661
1662 existing_fragments.extend(
1663 downstream_fragments
1664 .into_iter()
1665 .map(|(_, f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1666 );
1667
1668 existing_actor_location.extend(downstream_actor_location);
1669 }
1670
1671 Ok(Self {
1672 building_graph: graph,
1673 existing_fragments,
1674 existing_actor_location,
1675 extra_downstreams,
1676 extra_upstreams,
1677 })
1678 }
1679}
1680
1681fn gen_output_mapping(
1683 required_columns: &[PbColumnDesc],
1684 upstream_columns: &[PbColumnDesc],
1685) -> Option<DispatchOutputMapping> {
1686 let len = required_columns.len();
1687 let mut indices = vec![0; len];
1688 let mut types = None;
1689
1690 for (i, r) in required_columns.iter().enumerate() {
1691 let (ui, u) = upstream_columns
1692 .iter()
1693 .find_position(|&u| u.column_id == r.column_id)?;
1694 indices[i] = ui as u32;
1695
1696 if u.column_type != r.column_type {
1699 types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1700 upstream: u.column_type.clone(),
1701 downstream: r.column_type.clone(),
1702 };
1703 }
1704 }
1705
1706 let types = types.unwrap_or(Vec::new());
1708
1709 Some(DispatchOutputMapping { indices, types })
1710}
1711
1712fn mv_on_mv_dispatch_strategy(
1713 uses_shuffled_backfill: bool,
1714 dist_key_indices: Vec<u32>,
1715 output_mapping: DispatchOutputMapping,
1716) -> DispatchStrategy {
1717 if uses_shuffled_backfill {
1718 if !dist_key_indices.is_empty() {
1719 DispatchStrategy {
1720 r#type: DispatcherType::Hash as _,
1721 dist_key_indices,
1722 output_mapping: Some(output_mapping),
1723 }
1724 } else {
1725 DispatchStrategy {
1726 r#type: DispatcherType::Simple as _,
1727 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1729 }
1730 }
1731 } else {
1732 DispatchStrategy {
1733 r#type: DispatcherType::NoShuffle as _,
1734 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1736 }
1737 }
1738}
1739
1740impl CompleteStreamFragmentGraph {
1741 pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1744 self.building_graph
1745 .fragments
1746 .keys()
1747 .chain(self.existing_fragments.keys())
1748 .copied()
1749 }
1750
1751 pub(super) fn all_edges(
1753 &self,
1754 ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1755 self.building_graph
1756 .downstreams
1757 .iter()
1758 .chain(self.extra_downstreams.iter())
1759 .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1760 }
1761
1762 pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1764 self.existing_fragments
1765 .iter()
1766 .map(|(&id, f)| {
1767 (
1768 id,
1769 Distribution::from_fragment(f, &self.existing_actor_location),
1770 )
1771 })
1772 .collect()
1773 }
1774
1775 pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1782 let mut topo = Vec::new();
1783 let mut downstream_cnts = HashMap::new();
1784
1785 for fragment_id in self.all_fragment_ids() {
1787 let downstream_cnt = self.get_downstreams(fragment_id).count();
1789 if downstream_cnt == 0 {
1790 topo.push(fragment_id);
1791 } else {
1792 downstream_cnts.insert(fragment_id, downstream_cnt);
1793 }
1794 }
1795
1796 let mut i = 0;
1797 while let Some(&fragment_id) = topo.get(i) {
1798 i += 1;
1799 for (upstream_job_id, _) in self.get_upstreams(fragment_id) {
1801 let downstream_cnt = downstream_cnts.get_mut(&upstream_job_id).unwrap();
1802 *downstream_cnt -= 1;
1803 if *downstream_cnt == 0 {
1804 downstream_cnts.remove(&upstream_job_id);
1805 topo.push(upstream_job_id);
1806 }
1807 }
1808 }
1809
1810 if !downstream_cnts.is_empty() {
1811 bail!("graph is not a DAG");
1813 }
1814
1815 Ok(topo)
1816 }
1817
1818 pub(super) fn seal_fragment(
1821 &self,
1822 id: GlobalFragmentId,
1823 actors: Vec<StreamActor>,
1824 distribution: Distribution,
1825 stream_node: StreamNode,
1826 ) -> Fragment {
1827 let building_fragment = self.get_fragment(id).into_building().unwrap();
1828 let internal_tables = building_fragment.extract_internal_tables();
1829 let BuildingFragment {
1830 inner,
1831 job_id,
1832 upstream_job_columns: _,
1833 } = building_fragment;
1834
1835 let distribution_type = distribution.to_distribution_type();
1836 let vnode_count = distribution.vnode_count();
1837
1838 let materialized_fragment_id =
1839 if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1840 job_id.map(JobId::as_mv_table_id)
1841 } else {
1842 None
1843 };
1844
1845 let vector_index_fragment_id =
1846 if inner.fragment_type_mask & FragmentTypeFlag::VectorIndexWrite as u32 != 0 {
1847 job_id.map(JobId::as_mv_table_id)
1848 } else {
1849 None
1850 };
1851
1852 let state_table_ids = internal_tables
1853 .iter()
1854 .map(|t| t.id)
1855 .chain(materialized_fragment_id)
1856 .chain(vector_index_fragment_id)
1857 .collect();
1858
1859 Fragment {
1860 fragment_id: inner.fragment_id,
1861 fragment_type_mask: inner.fragment_type_mask.into(),
1862 distribution_type,
1863 actors,
1864 state_table_ids,
1865 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1866 nodes: stream_node,
1867 }
1868 }
1869
1870 pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1873 if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1874 EitherFragment::Existing(fragment.clone())
1875 } else {
1876 EitherFragment::Building(
1877 self.building_graph
1878 .fragments
1879 .get(&fragment_id)
1880 .unwrap()
1881 .clone(),
1882 )
1883 }
1884 }
1885
1886 pub(super) fn get_downstreams(
1889 &self,
1890 fragment_id: GlobalFragmentId,
1891 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1892 self.building_graph
1893 .get_downstreams(fragment_id)
1894 .iter()
1895 .chain(
1896 self.extra_downstreams
1897 .get(&fragment_id)
1898 .into_iter()
1899 .flatten(),
1900 )
1901 .map(|(&id, edge)| (id, edge))
1902 }
1903
1904 pub(super) fn get_upstreams(
1907 &self,
1908 fragment_id: GlobalFragmentId,
1909 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1910 self.building_graph
1911 .get_upstreams(fragment_id)
1912 .iter()
1913 .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1914 .map(|(&id, edge)| (id, edge))
1915 }
1916
1917 pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1919 &self.building_graph.fragments
1920 }
1921
1922 pub(super) fn building_fragments_mut(
1924 &mut self,
1925 ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1926 &mut self.building_graph.fragments
1927 }
1928
1929 pub(super) fn max_parallelism(&self) -> usize {
1931 self.building_graph.max_parallelism()
1932 }
1933}