1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19use std::sync::atomic::AtomicU32;
20
21use anyhow::{Context, anyhow};
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use risingwave_common::bail;
25use risingwave_common::catalog::{
26 CDC_SOURCE_COLUMN_NUM, ColumnCatalog, Field, FragmentTypeFlag, FragmentTypeMask, TableId,
27 generate_internal_table_name_with_type,
28};
29use risingwave_common::hash::VnodeCount;
30use risingwave_common::util::iter_util::ZipEqFast;
31use risingwave_common::util::stream_graph_visitor::{
32 self, visit_stream_node_cont, visit_stream_node_cont_mut,
33};
34use risingwave_connector::sink::catalog::SinkType;
35use risingwave_meta_model::WorkerId;
36use risingwave_pb::catalog::{PbSink, PbTable, Table};
37use risingwave_pb::ddl_service::TableJobType;
38use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
39use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
40use risingwave_pb::stream_plan::stream_fragment_graph::{
41 Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
42};
43use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
44use risingwave_pb::stream_plan::{
45 BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
46 PbStreamScanType, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
47 StreamScanType,
48};
49
50use crate::barrier::{SharedFragmentInfo, SnapshotBackfillInfo};
51use crate::controller::id::IdGeneratorManager;
52use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
53use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
54use crate::stream::stream_graph::id::{
55 GlobalActorIdGen, GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen,
56};
57use crate::stream::stream_graph::schedule::Distribution;
58use crate::{MetaError, MetaResult};
59
60#[derive(Debug, Clone)]
63pub(super) struct BuildingFragment {
64 inner: StreamFragment,
66
67 job_id: Option<u32>,
69
70 upstream_table_columns: HashMap<TableId, Vec<PbColumnDesc>>,
75}
76
77impl BuildingFragment {
78 fn new(
81 id: GlobalFragmentId,
82 fragment: StreamFragment,
83 job: &StreamingJob,
84 table_id_gen: GlobalTableIdGen,
85 ) -> Self {
86 let mut fragment = StreamFragment {
87 fragment_id: id.as_global_id(),
88 ..fragment
89 };
90
91 Self::fill_internal_tables(&mut fragment, job, table_id_gen);
93
94 let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
95 let upstream_table_columns =
96 Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
97
98 Self {
99 inner: fragment,
100 job_id,
101 upstream_table_columns,
102 }
103 }
104
105 fn extract_internal_tables(&self) -> Vec<Table> {
107 let mut fragment = self.inner.clone();
108 let mut tables = Vec::new();
109 stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
110 tables.push(table.clone());
111 });
112 tables
113 }
114
115 fn fill_internal_tables(
117 fragment: &mut StreamFragment,
118 job: &StreamingJob,
119 table_id_gen: GlobalTableIdGen,
120 ) {
121 let fragment_id = fragment.fragment_id;
122 stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
123 table.id = table_id_gen.to_global_id(table.id).as_global_id();
124 table.schema_id = job.schema_id();
125 table.database_id = job.database_id();
126 table.name = generate_internal_table_name_with_type(
127 &job.name(),
128 fragment_id,
129 table.id,
130 table_type_name,
131 );
132 table.fragment_id = fragment_id;
133 table.owner = job.owner();
134 table.job_id = Some(job.id());
135 });
136 }
137
138 fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
140 let job_id = job.id();
141 let fragment_id = fragment.fragment_id;
142 let mut has_job = false;
143
144 stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
145 NodeBody::Materialize(materialize_node) => {
146 materialize_node.table_id = job_id;
147
148 let table = materialize_node.table.insert(job.table().unwrap().clone());
150 table.fragment_id = fragment_id; if cfg!(not(debug_assertions)) {
153 table.definition = job.name();
154 }
155
156 has_job = true;
157 }
158 NodeBody::Sink(sink_node) => {
159 sink_node.sink_desc.as_mut().unwrap().id = job_id;
160
161 has_job = true;
162 }
163 NodeBody::Dml(dml_node) => {
164 dml_node.table_id = job_id;
165 dml_node.table_version_id = job.table_version_id().unwrap();
166 }
167 NodeBody::StreamFsFetch(fs_fetch_node) => {
168 if let StreamingJob::Table(table_source, _, _) = job
169 && let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
170 && let Some(source) = table_source
171 {
172 node_inner.source_id = source.id;
173 }
174 }
175 NodeBody::Source(source_node) => {
176 match job {
177 StreamingJob::Table(source, _table, _table_job_type) => {
180 if let Some(source_inner) = source_node.source_inner.as_mut()
181 && let Some(source) = source
182 {
183 debug_assert_ne!(source.id, job_id);
184 source_inner.source_id = source.id;
185 }
186 }
187 StreamingJob::Source(source) => {
188 has_job = true;
189 if let Some(source_inner) = source_node.source_inner.as_mut() {
190 debug_assert_eq!(source.id, job_id);
191 source_inner.source_id = source.id;
192 }
193 }
194 _ => {}
196 }
197 }
198 NodeBody::StreamCdcScan(node) => {
199 if let Some(table_desc) = node.cdc_table_desc.as_mut() {
200 table_desc.table_id = job_id;
201 }
202 }
203 NodeBody::VectorIndexWrite(node) => {
204 let table = node.table.as_mut().unwrap();
205 table.id = job_id;
206 table.database_id = job.database_id();
207 table.schema_id = job.schema_id();
208 table.fragment_id = fragment_id;
209 #[cfg(not(debug_assertions))]
210 {
211 table.definition = job.name();
212 }
213
214 has_job = true;
215 }
216 _ => {}
217 });
218
219 has_job
220 }
221
222 fn extract_upstream_table_columns_except_cross_db_backfill(
224 fragment: &StreamFragment,
225 ) -> HashMap<TableId, Vec<PbColumnDesc>> {
226 let mut table_columns = HashMap::new();
227
228 stream_graph_visitor::visit_fragment(fragment, |node_body| {
229 let (table_id, column_ids) = match node_body {
230 NodeBody::StreamScan(stream_scan) => {
231 if stream_scan.get_stream_scan_type().unwrap()
232 == StreamScanType::CrossDbSnapshotBackfill
233 {
234 return;
235 }
236 (stream_scan.table_id.into(), stream_scan.upstream_columns())
237 }
238 NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
239 NodeBody::SourceBackfill(backfill) => (
240 backfill.upstream_source_id.into(),
241 backfill.column_descs(),
243 ),
244 _ => return,
245 };
246 table_columns
247 .try_insert(table_id, column_ids)
248 .expect("currently there should be no two same upstream tables in a fragment");
249 });
250
251 table_columns
252 }
253
254 pub fn has_shuffled_backfill(&self) -> bool {
255 let stream_node = match self.inner.node.as_ref() {
256 Some(node) => node,
257 _ => return false,
258 };
259 let mut has_shuffled_backfill = false;
260 let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
261 visit_stream_node_cont(stream_node, |node| {
262 let is_shuffled_backfill = if let Some(node) = &node.node_body
263 && let Some(node) = node.as_stream_scan()
264 {
265 node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
266 || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
267 } else {
268 false
269 };
270 if is_shuffled_backfill {
271 *has_shuffled_backfill_mut_ref = true;
272 false
273 } else {
274 true
275 }
276 });
277 has_shuffled_backfill
278 }
279}
280
281impl Deref for BuildingFragment {
282 type Target = StreamFragment;
283
284 fn deref(&self) -> &Self::Target {
285 &self.inner
286 }
287}
288
289impl DerefMut for BuildingFragment {
290 fn deref_mut(&mut self) -> &mut Self::Target {
291 &mut self.inner
292 }
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
298pub(super) enum EdgeId {
299 Internal {
301 link_id: u64,
304 },
305
306 UpstreamExternal {
309 upstream_table_id: TableId,
311 downstream_fragment_id: GlobalFragmentId,
313 },
314
315 DownstreamExternal(DownstreamExternalEdgeId),
318}
319
320#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
321pub(super) struct DownstreamExternalEdgeId {
322 pub(super) original_upstream_fragment_id: GlobalFragmentId,
324 pub(super) downstream_fragment_id: GlobalFragmentId,
326}
327
328#[derive(Debug, Clone)]
332pub(super) struct StreamFragmentEdge {
333 pub id: EdgeId,
335
336 pub dispatch_strategy: DispatchStrategy,
338}
339
340impl StreamFragmentEdge {
341 fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
342 Self {
343 id: EdgeId::Internal {
346 link_id: edge.link_id,
347 },
348 dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
349 }
350 }
351}
352
353fn clone_fragment(
354 fragment: &Fragment,
355 id_generator_manager: &IdGeneratorManager,
356 actor_id_counter: &AtomicU32,
357) -> Fragment {
358 let fragment_id = GlobalFragmentIdGen::new(id_generator_manager, 1)
359 .to_global_id(0)
360 .as_global_id();
361 let actor_id_gen = GlobalActorIdGen::new(actor_id_counter, fragment.actors.len() as _);
362 Fragment {
363 fragment_id,
364 fragment_type_mask: fragment.fragment_type_mask,
365 distribution_type: fragment.distribution_type,
366 actors: fragment
367 .actors
368 .iter()
369 .enumerate()
370 .map(|(i, actor)| StreamActor {
371 actor_id: actor_id_gen.to_global_id(i as _).as_global_id() as _,
372 fragment_id,
373 vnode_bitmap: actor.vnode_bitmap.clone(),
374 mview_definition: actor.mview_definition.clone(),
375 expr_context: actor.expr_context.clone(),
376 })
377 .collect(),
378 state_table_ids: fragment.state_table_ids.clone(),
379 maybe_vnode_count: fragment.maybe_vnode_count,
380 nodes: fragment.nodes.clone(),
381 }
382}
383
384pub fn check_sink_fragments_support_refresh_schema(
385 fragments: &BTreeMap<FragmentId, Fragment>,
386) -> MetaResult<()> {
387 if fragments.len() != 1 {
388 return Err(anyhow!(
389 "sink with auto schema change should have only 1 fragment, but got {:?}",
390 fragments.len()
391 )
392 .into());
393 }
394 let (_, fragment) = fragments.first_key_value().expect("non-empty");
395 let sink_node = &fragment.nodes;
396 let PbNodeBody::Sink(_) = sink_node.node_body.as_ref().unwrap() else {
397 return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
398 };
399 let [stream_scan_node] = sink_node.input.as_slice() else {
400 panic!("Sink has more than 1 input: {:?}", sink_node.input);
401 };
402 let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_ref().unwrap() else {
403 return Err(anyhow!(
404 "expect PbNodeBody::StreamScan but got: {:?}",
405 stream_scan_node.node_body
406 )
407 .into());
408 };
409 let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
410 if stream_scan_type != PbStreamScanType::ArrangementBackfill {
411 return Err(anyhow!(
412 "unsupported stream_scan_type for auto refresh schema: {:?}",
413 stream_scan_type
414 )
415 .into());
416 }
417 let [merge_node, _batch_plan_node] = stream_scan_node.input.as_slice() else {
418 panic!(
419 "the number of StreamScan inputs is not 2: {:?}",
420 stream_scan_node.input
421 );
422 };
423 let NodeBody::Merge(_) = merge_node.node_body.as_ref().unwrap() else {
424 return Err(anyhow!(
425 "expect PbNodeBody::Merge but got: {:?}",
426 merge_node.node_body
427 )
428 .into());
429 };
430 Ok(())
431}
432
433pub fn rewrite_refresh_schema_sink_fragment(
434 original_sink_fragment: &Fragment,
435 sink: &PbSink,
436 newly_added_columns: &[ColumnCatalog],
437 upstream_table: &PbTable,
438 upstream_table_fragment_id: FragmentId,
439 id_generator_manager: &IdGeneratorManager,
440 actor_id_counter: &AtomicU32,
441) -> MetaResult<(Fragment, Vec<PbColumnCatalog>, Option<PbTable>)> {
442 let mut new_sink_columns = sink.columns.clone();
443 fn extend_sink_columns(
444 sink_columns: &mut Vec<PbColumnCatalog>,
445 new_columns: &[ColumnCatalog],
446 get_column_name: impl Fn(&String) -> String,
447 ) {
448 let next_column_id = sink_columns
449 .iter()
450 .map(|col| col.column_desc.as_ref().unwrap().column_id + 1)
451 .max()
452 .unwrap_or(1);
453 sink_columns.extend(new_columns.iter().enumerate().map(|(i, col)| {
454 let mut col = col.to_protobuf();
455 let column_desc = col.column_desc.as_mut().unwrap();
456 column_desc.column_id = next_column_id + (i as i32);
457 column_desc.name = get_column_name(&column_desc.name);
458 col
459 }));
460 }
461 extend_sink_columns(&mut new_sink_columns, newly_added_columns, |name| {
462 name.clone()
463 });
464
465 let mut new_sink_fragment = clone_fragment(
466 original_sink_fragment,
467 id_generator_manager,
468 actor_id_counter,
469 );
470 let sink_node = &mut new_sink_fragment.nodes;
471 let PbNodeBody::Sink(sink_node_body) = sink_node.node_body.as_mut().unwrap() else {
472 return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
473 };
474 let [stream_scan_node] = sink_node.input.as_mut_slice() else {
475 panic!("Sink has more than 1 input: {:?}", sink_node.input);
476 };
477 let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_mut().unwrap() else {
478 return Err(anyhow!(
479 "expect PbNodeBody::StreamScan but got: {:?}",
480 stream_scan_node.node_body
481 )
482 .into());
483 };
484 let [merge_node, _batch_plan_node] = stream_scan_node.input.as_mut_slice() else {
485 panic!(
486 "the number of StreamScan inputs is not 2: {:?}",
487 stream_scan_node.input
488 );
489 };
490 let NodeBody::Merge(merge) = merge_node.node_body.as_mut().unwrap() else {
491 return Err(anyhow!(
492 "expect PbNodeBody::Merge but got: {:?}",
493 merge_node.node_body
494 )
495 .into());
496 };
497 sink_node.identity = {
500 let sink_type = SinkType::from_proto(sink.sink_type());
501 let sink_type_str = sink_type.type_str();
502 let column_names = new_sink_columns
503 .iter()
504 .map(|col| {
505 ColumnCatalog::from(col.clone())
506 .name_with_hidden()
507 .to_string()
508 })
509 .join(", ");
510 let downstream_pk = if !sink_type.is_append_only() {
511 let downstream_pk = sink
512 .downstream_pk
513 .iter()
514 .map(|i| &sink.columns[*i as usize].column_desc.as_ref().unwrap().name)
515 .collect_vec();
516 format!(", downstream_pk: {downstream_pk:?}")
517 } else {
518 "".to_owned()
519 };
520 format!("StreamSink {{ type: {sink_type_str}, columns: [{column_names}]{downstream_pk} }}")
521 };
522 sink_node
523 .fields
524 .extend(newly_added_columns.iter().map(|col| {
525 Field::new(
526 format!("{}.{}", upstream_table.name, col.column_desc.name),
527 col.data_type().clone(),
528 )
529 .to_prost()
530 }));
531
532 let new_log_store_table = if let Some(log_store_table) = &mut sink_node_body.table {
533 extend_sink_columns(&mut log_store_table.columns, newly_added_columns, |name| {
534 format!("{}_{}", upstream_table.name, name)
535 });
536 Some(log_store_table.clone())
537 } else {
538 None
539 };
540 sink_node_body.sink_desc.as_mut().unwrap().column_catalogs = new_sink_columns.clone();
541
542 stream_scan_node
544 .fields
545 .extend(newly_added_columns.iter().map(|col| {
546 Field::new(
547 format!("{}.{}", upstream_table.name, col.column_desc.name),
548 col.data_type().clone(),
549 )
550 .to_prost()
551 }));
552 stream_scan_node.identity = {
554 let columns = stream_scan_node
555 .fields
556 .iter()
557 .map(|col| &col.name)
558 .join(", ");
559 format!("StreamTableScan {{ table: t, columns: [{columns}] }}")
560 };
561
562 let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
563 if stream_scan_type != PbStreamScanType::ArrangementBackfill {
564 return Err(anyhow!(
565 "unsupported stream_scan_type for auto refresh schema: {:?}",
566 stream_scan_type
567 )
568 .into());
569 }
570 scan.arrangement_table = Some(upstream_table.clone());
571 scan.output_indices.extend(
572 (0..newly_added_columns.len()).map(|i| (i + scan.upstream_column_ids.len()) as u32),
573 );
574 scan.upstream_column_ids.extend(
575 newly_added_columns
576 .iter()
577 .map(|col| col.column_id().get_id()),
578 );
579 let table_desc = scan.table_desc.as_mut().unwrap();
580 table_desc
581 .value_indices
582 .extend((0..newly_added_columns.len()).map(|i| (i + table_desc.columns.len()) as u32));
583 table_desc.columns.extend(
584 newly_added_columns
585 .iter()
586 .map(|col| col.column_desc.to_protobuf()),
587 );
588
589 merge_node.fields = scan
591 .upstream_column_ids
592 .iter()
593 .map(|&column_id| {
594 let col = upstream_table
595 .columns
596 .iter()
597 .find(|c| c.column_desc.as_ref().unwrap().column_id == column_id)
598 .unwrap();
599 let col_desc = col.column_desc.as_ref().unwrap();
600 Field::new(
601 col_desc.name.clone(),
602 col_desc.column_type.as_ref().unwrap().into(),
603 )
604 .to_prost()
605 })
606 .collect();
607 merge.upstream_fragment_id = upstream_table_fragment_id;
608 Ok((new_sink_fragment, new_sink_columns, new_log_store_table))
609}
610
611pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
616
617#[derive(Default, Debug)]
624pub struct StreamFragmentGraph {
625 pub(super) fragments: HashMap<GlobalFragmentId, BuildingFragment>,
627
628 pub(super) downstreams:
630 HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
631
632 pub(super) upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
634
635 dependent_table_ids: HashSet<TableId>,
637
638 specified_parallelism: Option<NonZeroUsize>,
641
642 max_parallelism: usize,
652
653 backfill_order: BackfillOrder,
655}
656
657impl StreamFragmentGraph {
658 pub fn new(
661 env: &MetaSrvEnv,
662 proto: StreamFragmentGraphProto,
663 job: &StreamingJob,
664 ) -> MetaResult<Self> {
665 let fragment_id_gen =
666 GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
667 let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
671
672 let fragments: HashMap<_, _> = proto
674 .fragments
675 .into_iter()
676 .map(|(id, fragment)| {
677 let id = fragment_id_gen.to_global_id(id);
678 let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
679 (id, fragment)
680 })
681 .collect();
682
683 assert_eq!(
684 fragments
685 .values()
686 .map(|f| f.extract_internal_tables().len() as u32)
687 .sum::<u32>(),
688 proto.table_ids_cnt
689 );
690
691 let mut downstreams = HashMap::new();
693 let mut upstreams = HashMap::new();
694
695 for edge in proto.edges {
696 let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
697 let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
698 let edge = StreamFragmentEdge::from_protobuf(&edge);
699
700 upstreams
701 .entry(downstream_id)
702 .or_insert_with(HashMap::new)
703 .try_insert(upstream_id, edge.clone())
704 .unwrap();
705 downstreams
706 .entry(upstream_id)
707 .or_insert_with(HashMap::new)
708 .try_insert(downstream_id, edge)
709 .unwrap();
710 }
711
712 let dependent_table_ids = proto
715 .dependent_table_ids
716 .iter()
717 .map(TableId::from)
718 .collect();
719
720 let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
721 Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
722 } else {
723 None
724 };
725
726 let max_parallelism = proto.max_parallelism as usize;
727 let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
728 order: Default::default(),
729 });
730
731 Ok(Self {
732 fragments,
733 downstreams,
734 upstreams,
735 dependent_table_ids,
736 specified_parallelism,
737 max_parallelism,
738 backfill_order,
739 })
740 }
741
742 pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
748 let mut tables = BTreeMap::new();
749 for fragment in self.fragments.values() {
750 for table in fragment.extract_internal_tables() {
751 let table_id = table.id;
752 tables
753 .try_insert(table_id, table)
754 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
755 }
756 }
757 tables
758 }
759
760 pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
763 for fragment in self.fragments.values_mut() {
764 stream_graph_visitor::visit_internal_tables(
765 &mut fragment.inner,
766 |table, _table_type_name| {
767 let target = table_id_map.get(&table.id).cloned().unwrap();
768 table.id = target;
769 },
770 );
771 }
772 }
773
774 pub fn fit_internal_tables_trivial(
777 &mut self,
778 mut old_internal_tables: Vec<Table>,
779 ) -> MetaResult<()> {
780 let mut new_internal_table_ids = Vec::new();
781 for fragment in self.fragments.values() {
782 for table in &fragment.extract_internal_tables() {
783 new_internal_table_ids.push(table.id);
784 }
785 }
786
787 if new_internal_table_ids.len() != old_internal_tables.len() {
788 bail!(
789 "Different number of internal tables. New: {}, Old: {}",
790 new_internal_table_ids.len(),
791 old_internal_tables.len()
792 );
793 }
794 old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
795 new_internal_table_ids.sort();
796
797 let internal_table_id_map = new_internal_table_ids
798 .into_iter()
799 .zip_eq_fast(old_internal_tables.into_iter())
800 .collect::<HashMap<_, _>>();
801
802 for fragment in self.fragments.values_mut() {
805 stream_graph_visitor::visit_internal_tables(
806 &mut fragment.inner,
807 |table, _table_type_name| {
808 let target = internal_table_id_map.get(&table.id).cloned().unwrap();
810 *table = target;
811 },
812 );
813 }
814
815 Ok(())
816 }
817
818 pub fn fit_internal_table_ids_with_mapping(&mut self, mut matches: HashMap<u32, Table>) {
820 for fragment in self.fragments.values_mut() {
821 stream_graph_visitor::visit_internal_tables(
822 &mut fragment.inner,
823 |table, _table_type_name| {
824 let target = matches.remove(&table.id).unwrap_or_else(|| {
825 panic!("no matching table for table {}({})", table.id, table.name)
826 });
827 table.id = target.id;
828 table.maybe_vnode_count = target.maybe_vnode_count;
829 },
830 );
831 }
832 }
833
834 pub fn table_fragment_id(&self) -> FragmentId {
836 self.fragments
837 .values()
838 .filter(|b| b.job_id.is_some())
839 .map(|b| b.fragment_id)
840 .exactly_one()
841 .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
842 }
843
844 pub fn dml_fragment_id(&self) -> Option<FragmentId> {
846 self.fragments
847 .values()
848 .filter(|b| {
849 FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
850 })
851 .map(|b| b.fragment_id)
852 .at_most_one()
853 .expect("require at most 1 dml node when creating the streaming job")
854 }
855
856 pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
858 &self.dependent_table_ids
859 }
860
861 pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
863 self.specified_parallelism
864 }
865
866 pub fn max_parallelism(&self) -> usize {
868 self.max_parallelism
869 }
870
871 fn get_downstreams(
873 &self,
874 fragment_id: GlobalFragmentId,
875 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
876 self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
877 }
878
879 fn get_upstreams(
881 &self,
882 fragment_id: GlobalFragmentId,
883 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
884 self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
885 }
886
887 pub fn collect_snapshot_backfill_info(
888 &self,
889 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
890 Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
891 (
892 fragment.node.as_ref().unwrap(),
893 fragment.fragment_type_mask.into(),
894 )
895 }))
896 }
897
898 pub fn collect_snapshot_backfill_info_impl(
900 fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
901 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
902 let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
903 let mut cross_db_info = SnapshotBackfillInfo {
904 upstream_mv_table_id_to_backfill_epoch: Default::default(),
905 };
906 let mut result = Ok(());
907 for (node, fragment_type_mask) in fragments {
908 visit_stream_node_cont(node, |node| {
909 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
910 let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
911 .expect("invalid stream_scan_type");
912 let is_snapshot_backfill = match stream_scan_type {
913 StreamScanType::SnapshotBackfill => {
914 assert!(
915 fragment_type_mask
916 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
917 );
918 true
919 }
920 StreamScanType::CrossDbSnapshotBackfill => {
921 assert!(
922 fragment_type_mask
923 .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
924 );
925 cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
926 TableId::new(stream_scan.table_id),
927 stream_scan.snapshot_backfill_epoch,
928 );
929
930 return true;
931 }
932 _ => false,
933 };
934
935 match &mut prev_stream_scan {
936 Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
937 match (prev_snapshot_backfill_info, is_snapshot_backfill) {
938 (Some(prev_snapshot_backfill_info), true) => {
939 prev_snapshot_backfill_info
940 .upstream_mv_table_id_to_backfill_epoch
941 .insert(
942 TableId::new(stream_scan.table_id),
943 stream_scan.snapshot_backfill_epoch,
944 );
945 true
946 }
947 (None, false) => true,
948 (_, _) => {
949 result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
950 false
951 }
952 }
953 }
954 None => {
955 prev_stream_scan = Some((
956 if is_snapshot_backfill {
957 Some(SnapshotBackfillInfo {
958 upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
959 [(
960 TableId::new(stream_scan.table_id),
961 stream_scan.snapshot_backfill_epoch,
962 )],
963 ),
964 })
965 } else {
966 None
967 },
968 *stream_scan.clone(),
969 ));
970 true
971 }
972 }
973 } else {
974 true
975 }
976 })
977 }
978 result.map(|_| {
979 (
980 prev_stream_scan
981 .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
982 .unwrap_or(None),
983 cross_db_info,
984 )
985 })
986 }
987
988 pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
990 let mut mapping = HashMap::new();
991 for (fragment_id, fragment) in &self.fragments {
992 let fragment_id = fragment_id.as_global_id();
993 let fragment_mask = fragment.fragment_type_mask;
994 let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
995 let has_some_scan = candidates
996 .into_iter()
997 .any(|flag| (fragment_mask & flag as u32) > 0);
998 if has_some_scan {
999 visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
1000 match node.node_body.as_ref() {
1001 Some(NodeBody::StreamScan(stream_scan)) => {
1002 let table_id = stream_scan.table_id;
1003 let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
1004 fragments.push(fragment_id);
1005 false
1007 }
1008 Some(NodeBody::SourceBackfill(source_backfill)) => {
1009 let source_id = source_backfill.upstream_source_id;
1010 let fragments: &mut Vec<_> = mapping.entry(source_id).or_default();
1011 fragments.push(fragment_id);
1012 false
1014 }
1015 _ => true,
1016 }
1017 })
1018 }
1019 }
1020 mapping
1021 }
1022
1023 pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
1027 let mapping = self.collect_backfill_mapping();
1028 let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
1029
1030 for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
1032 let fragment_ids = mapping.get(rel_id).unwrap();
1033 for fragment_id in fragment_ids {
1034 let downstream_fragment_ids = downstream_rel_ids
1035 .data
1036 .iter()
1037 .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
1038 .copied()
1039 .collect();
1040 fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
1041 }
1042 }
1043
1044 if fragment_ordering.is_empty() {
1047 for value in mapping.values() {
1048 for &fragment_id in value {
1049 fragment_ordering.entry(fragment_id).or_default();
1050 }
1051 }
1052 }
1053
1054 let locality_provider_dependencies = self.find_locality_provider_dependencies();
1056
1057 let backfill_fragments: HashSet<u32> = mapping.values().flatten().copied().collect();
1058
1059 let all_locality_provider_fragments: HashSet<u32> =
1062 locality_provider_dependencies.keys().copied().collect();
1063 let downstream_locality_provider_fragments: HashSet<u32> = locality_provider_dependencies
1064 .values()
1065 .flatten()
1066 .copied()
1067 .collect();
1068 let locality_provider_root_fragments: Vec<u32> = all_locality_provider_fragments
1069 .difference(&downstream_locality_provider_fragments)
1070 .copied()
1071 .collect();
1072
1073 for &backfill_fragment_id in &backfill_fragments {
1076 fragment_ordering
1077 .entry(backfill_fragment_id)
1078 .or_default()
1079 .extend(locality_provider_root_fragments.iter().copied());
1080 }
1081
1082 for (fragment_id, downstream_fragments) in locality_provider_dependencies {
1084 fragment_ordering
1085 .entry(fragment_id)
1086 .or_default()
1087 .extend(downstream_fragments);
1088 }
1089
1090 fragment_ordering
1091 }
1092
1093 pub fn find_locality_provider_fragment_state_table_mapping(
1094 &self,
1095 ) -> HashMap<FragmentId, Vec<TableId>> {
1096 let mut mapping: HashMap<FragmentId, Vec<TableId>> = HashMap::new();
1097
1098 for (fragment_id, fragment) in &self.fragments {
1099 let fragment_id = fragment_id.as_global_id();
1100
1101 if let Some(node) = fragment.node.as_ref() {
1103 let mut state_table_ids = Vec::new();
1104
1105 visit_stream_node_cont(node, |stream_node| {
1106 if let Some(NodeBody::LocalityProvider(locality_provider)) =
1107 stream_node.node_body.as_ref()
1108 {
1109 let state_table_id = locality_provider
1111 .state_table
1112 .as_ref()
1113 .expect("must have state table")
1114 .id;
1115 state_table_ids.push(TableId::new(state_table_id));
1116 false } else {
1118 true }
1120 });
1121
1122 if !state_table_ids.is_empty() {
1123 mapping.insert(fragment_id, state_table_ids);
1124 }
1125 }
1126 }
1127
1128 mapping
1129 }
1130
1131 pub fn find_locality_provider_dependencies(&self) -> HashMap<FragmentId, Vec<FragmentId>> {
1139 let mut locality_provider_fragments = HashSet::new();
1140 let mut dependencies: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1141
1142 for (fragment_id, fragment) in &self.fragments {
1144 let fragment_id = fragment_id.as_global_id();
1145 let has_locality_provider = self.fragment_has_locality_provider(fragment);
1146
1147 if has_locality_provider {
1148 locality_provider_fragments.insert(fragment_id);
1149 dependencies.entry(fragment_id).or_default();
1150 }
1151 }
1152
1153 for &provider_fragment_id in &locality_provider_fragments {
1157 let provider_fragment_global_id = GlobalFragmentId::new(provider_fragment_id);
1158
1159 let mut visited = HashSet::new();
1161 let mut downstream_locality_providers = Vec::new();
1162
1163 self.collect_downstream_locality_providers(
1164 provider_fragment_global_id,
1165 &locality_provider_fragments,
1166 &mut visited,
1167 &mut downstream_locality_providers,
1168 );
1169
1170 dependencies
1172 .entry(provider_fragment_id)
1173 .or_default()
1174 .extend(downstream_locality_providers);
1175 }
1176
1177 dependencies
1178 }
1179
1180 fn fragment_has_locality_provider(&self, fragment: &BuildingFragment) -> bool {
1181 let mut has_locality_provider = false;
1182
1183 if let Some(node) = fragment.node.as_ref() {
1184 visit_stream_node_cont(node, |stream_node| {
1185 if let Some(NodeBody::LocalityProvider(_)) = stream_node.node_body.as_ref() {
1186 has_locality_provider = true;
1187 false } else {
1189 true }
1191 });
1192 }
1193
1194 has_locality_provider
1195 }
1196
1197 fn collect_downstream_locality_providers(
1199 &self,
1200 current_fragment_id: GlobalFragmentId,
1201 locality_provider_fragments: &HashSet<FragmentId>,
1202 visited: &mut HashSet<GlobalFragmentId>,
1203 downstream_providers: &mut Vec<FragmentId>,
1204 ) {
1205 if visited.contains(¤t_fragment_id) {
1206 return;
1207 }
1208 visited.insert(current_fragment_id);
1209
1210 for &downstream_id in self.get_downstreams(current_fragment_id).keys() {
1212 let downstream_fragment_id = downstream_id.as_global_id();
1213
1214 if locality_provider_fragments.contains(&downstream_fragment_id) {
1216 downstream_providers.push(downstream_fragment_id);
1217 }
1218
1219 self.collect_downstream_locality_providers(
1221 downstream_id,
1222 locality_provider_fragments,
1223 visited,
1224 downstream_providers,
1225 );
1226 }
1227 }
1228}
1229
1230pub fn fill_snapshot_backfill_epoch(
1233 node: &mut StreamNode,
1234 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1235 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1236) -> MetaResult<bool> {
1237 let mut result = Ok(());
1238 let mut applied = false;
1239 visit_stream_node_cont_mut(node, |node| {
1240 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
1241 && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
1242 || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
1243 {
1244 result = try {
1245 let table_id = TableId::new(stream_scan.table_id);
1246 let snapshot_epoch = cross_db_snapshot_backfill_info
1247 .upstream_mv_table_id_to_backfill_epoch
1248 .get(&table_id)
1249 .or_else(|| {
1250 snapshot_backfill_info.and_then(|snapshot_backfill_info| {
1251 snapshot_backfill_info
1252 .upstream_mv_table_id_to_backfill_epoch
1253 .get(&table_id)
1254 })
1255 })
1256 .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
1257 .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
1258 if let Some(prev_snapshot_epoch) =
1259 stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
1260 {
1261 Err(anyhow!(
1262 "snapshot backfill epoch set again: {} {} {}",
1263 table_id,
1264 prev_snapshot_epoch,
1265 snapshot_epoch
1266 ))?;
1267 }
1268 applied = true;
1269 };
1270 result.is_ok()
1271 } else {
1272 true
1273 }
1274 });
1275 result.map(|_| applied)
1276}
1277
1278static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
1279 LazyLock::new(HashMap::new);
1280
1281#[derive(Debug, Clone, EnumAsInner)]
1284pub(super) enum EitherFragment {
1285 Building(BuildingFragment),
1287
1288 Existing(SharedFragmentInfo),
1290}
1291
1292#[derive(Debug)]
1301pub struct CompleteStreamFragmentGraph {
1302 building_graph: StreamFragmentGraph,
1304
1305 existing_fragments: HashMap<GlobalFragmentId, SharedFragmentInfo>,
1307
1308 existing_actor_location: HashMap<ActorId, WorkerId>,
1310
1311 extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1313
1314 extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1316}
1317
1318pub struct FragmentGraphUpstreamContext {
1319 pub upstream_root_fragments: HashMap<TableId, (SharedFragmentInfo, PbStreamNode)>,
1322 pub upstream_actor_location: HashMap<ActorId, WorkerId>,
1323}
1324
1325pub struct FragmentGraphDownstreamContext {
1326 pub original_root_fragment_id: FragmentId,
1327 pub downstream_fragments: Vec<(DispatcherType, SharedFragmentInfo, PbStreamNode)>,
1328 pub downstream_actor_location: HashMap<ActorId, WorkerId>,
1329}
1330
1331impl CompleteStreamFragmentGraph {
1332 #[cfg(test)]
1335 pub fn for_test(graph: StreamFragmentGraph) -> Self {
1336 Self {
1337 building_graph: graph,
1338 existing_fragments: Default::default(),
1339 existing_actor_location: Default::default(),
1340 extra_downstreams: Default::default(),
1341 extra_upstreams: Default::default(),
1342 }
1343 }
1344
1345 pub fn with_upstreams(
1349 graph: StreamFragmentGraph,
1350 upstream_context: FragmentGraphUpstreamContext,
1351 job_type: StreamingJobType,
1352 ) -> MetaResult<Self> {
1353 Self::build_helper(graph, Some(upstream_context), None, job_type)
1354 }
1355
1356 pub fn with_downstreams(
1359 graph: StreamFragmentGraph,
1360 downstream_context: FragmentGraphDownstreamContext,
1361 job_type: StreamingJobType,
1362 ) -> MetaResult<Self> {
1363 Self::build_helper(graph, None, Some(downstream_context), job_type)
1364 }
1365
1366 pub fn with_upstreams_and_downstreams(
1368 graph: StreamFragmentGraph,
1369 upstream_context: FragmentGraphUpstreamContext,
1370 downstream_context: FragmentGraphDownstreamContext,
1371 job_type: StreamingJobType,
1372 ) -> MetaResult<Self> {
1373 Self::build_helper(
1374 graph,
1375 Some(upstream_context),
1376 Some(downstream_context),
1377 job_type,
1378 )
1379 }
1380
1381 fn build_helper(
1383 mut graph: StreamFragmentGraph,
1384 upstream_ctx: Option<FragmentGraphUpstreamContext>,
1385 downstream_ctx: Option<FragmentGraphDownstreamContext>,
1386 job_type: StreamingJobType,
1387 ) -> MetaResult<Self> {
1388 let mut extra_downstreams = HashMap::new();
1389 let mut extra_upstreams = HashMap::new();
1390 let mut existing_fragments = HashMap::new();
1391
1392 let mut existing_actor_location = HashMap::new();
1393
1394 if let Some(FragmentGraphUpstreamContext {
1395 upstream_root_fragments,
1396 upstream_actor_location,
1397 }) = upstream_ctx
1398 {
1399 for (&id, fragment) in &mut graph.fragments {
1400 let uses_shuffled_backfill = fragment.has_shuffled_backfill();
1401
1402 for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
1403 let (upstream_fragment, nodes) = upstream_root_fragments
1404 .get(&upstream_table_id)
1405 .context("upstream fragment not found")?;
1406 let upstream_root_fragment_id =
1407 GlobalFragmentId::new(upstream_fragment.fragment_id);
1408
1409 let edge = match job_type {
1410 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1411 assert_ne!(
1414 (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
1415 0
1416 );
1417
1418 tracing::debug!(
1419 ?upstream_root_fragment_id,
1420 ?required_columns,
1421 identity = ?fragment.inner.get_node().unwrap().get_identity(),
1422 current_frag_id=?id,
1423 "CdcFilter with upstream source fragment"
1424 );
1425
1426 StreamFragmentEdge {
1427 id: EdgeId::UpstreamExternal {
1428 upstream_table_id,
1429 downstream_fragment_id: id,
1430 },
1431 dispatch_strategy: DispatchStrategy {
1434 r#type: DispatcherType::NoShuffle as _,
1435 dist_key_indices: vec![], output_mapping: DispatchOutputMapping::identical(
1437 CDC_SOURCE_COLUMN_NUM as _,
1438 )
1439 .into(),
1440 },
1441 }
1442 }
1443
1444 StreamingJobType::MaterializedView
1446 | StreamingJobType::Sink
1447 | StreamingJobType::Index => {
1448 if upstream_fragment
1451 .fragment_type_mask
1452 .contains(FragmentTypeFlag::Mview)
1453 {
1454 let (dist_key_indices, output_mapping) = {
1456 let mview_node =
1457 nodes.get_node_body().unwrap().as_materialize().unwrap();
1458 let all_columns = mview_node.column_descs();
1459 let dist_key_indices = mview_node.dist_key_indices();
1460 let output_mapping = gen_output_mapping(
1461 required_columns,
1462 &all_columns,
1463 )
1464 .context(
1465 "BUG: column not found in the upstream materialized view",
1466 )?;
1467 (dist_key_indices, output_mapping)
1468 };
1469 let dispatch_strategy = mv_on_mv_dispatch_strategy(
1470 uses_shuffled_backfill,
1471 dist_key_indices,
1472 output_mapping,
1473 );
1474
1475 StreamFragmentEdge {
1476 id: EdgeId::UpstreamExternal {
1477 upstream_table_id,
1478 downstream_fragment_id: id,
1479 },
1480 dispatch_strategy,
1481 }
1482 }
1483 else if upstream_fragment
1486 .fragment_type_mask
1487 .contains(FragmentTypeFlag::Source)
1488 {
1489 let output_mapping = {
1490 let source_node =
1491 nodes.get_node_body().unwrap().as_source().unwrap();
1492
1493 let all_columns = source_node.column_descs().unwrap();
1494 gen_output_mapping(required_columns, &all_columns).context(
1495 "BUG: column not found in the upstream source node",
1496 )?
1497 };
1498
1499 StreamFragmentEdge {
1500 id: EdgeId::UpstreamExternal {
1501 upstream_table_id,
1502 downstream_fragment_id: id,
1503 },
1504 dispatch_strategy: DispatchStrategy {
1507 r#type: DispatcherType::NoShuffle as _,
1508 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1510 },
1511 }
1512 } else {
1513 bail!(
1514 "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1515 upstream_fragment.fragment_type_mask
1516 )
1517 }
1518 }
1519 StreamingJobType::Source | StreamingJobType::Table(_) => {
1520 bail!(
1521 "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1522 job_type
1523 )
1524 }
1525 };
1526
1527 extra_downstreams
1529 .entry(upstream_root_fragment_id)
1530 .or_insert_with(HashMap::new)
1531 .try_insert(id, edge.clone())
1532 .unwrap();
1533 extra_upstreams
1534 .entry(id)
1535 .or_insert_with(HashMap::new)
1536 .try_insert(upstream_root_fragment_id, edge)
1537 .unwrap();
1538 }
1539 }
1540
1541 existing_fragments.extend(
1542 upstream_root_fragments
1543 .into_values()
1544 .map(|(f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1545 );
1546
1547 existing_actor_location.extend(upstream_actor_location);
1548 }
1549
1550 if let Some(FragmentGraphDownstreamContext {
1551 original_root_fragment_id,
1552 downstream_fragments,
1553 downstream_actor_location,
1554 }) = downstream_ctx
1555 {
1556 let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1557 let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1558
1559 for (dispatcher_type, fragment, nodes) in &downstream_fragments {
1562 let id = GlobalFragmentId::new(fragment.fragment_id);
1563
1564 let output_columns = {
1566 let mut res = None;
1567
1568 stream_graph_visitor::visit_stream_node_body(nodes, |node_body| {
1569 let columns = match node_body {
1570 NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1571 NodeBody::SourceBackfill(source_backfill) => {
1572 source_backfill.column_descs()
1574 }
1575 _ => return,
1576 };
1577 res = Some(columns);
1578 });
1579
1580 res.context("failed to locate downstream scan")?
1581 };
1582
1583 let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1584 let nodes = table_fragment.node.as_ref().unwrap();
1585
1586 let (dist_key_indices, output_mapping) = match job_type {
1587 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1588 let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1589 let all_columns = mview_node.column_descs();
1590 let dist_key_indices = mview_node.dist_key_indices();
1591 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1592 .ok_or_else(|| {
1593 MetaError::invalid_parameter(
1594 "unable to drop the column due to \
1595 being referenced by downstream materialized views or sinks",
1596 )
1597 })?;
1598 (dist_key_indices, output_mapping)
1599 }
1600
1601 StreamingJobType::Source => {
1602 let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1603 let all_columns = source_node.column_descs().unwrap();
1604 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1605 .ok_or_else(|| {
1606 MetaError::invalid_parameter(
1607 "unable to drop the column due to \
1608 being referenced by downstream materialized views or sinks",
1609 )
1610 })?;
1611 assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1612 (
1613 vec![], output_mapping,
1615 )
1616 }
1617
1618 _ => bail!("unsupported job type for replacement: {job_type:?}"),
1619 };
1620
1621 let edge = StreamFragmentEdge {
1622 id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1623 original_upstream_fragment_id: original_table_fragment_id,
1624 downstream_fragment_id: id,
1625 }),
1626 dispatch_strategy: DispatchStrategy {
1627 r#type: *dispatcher_type as i32,
1628 output_mapping: Some(output_mapping),
1629 dist_key_indices,
1630 },
1631 };
1632
1633 extra_downstreams
1634 .entry(table_fragment_id)
1635 .or_insert_with(HashMap::new)
1636 .try_insert(id, edge.clone())
1637 .unwrap();
1638 extra_upstreams
1639 .entry(id)
1640 .or_insert_with(HashMap::new)
1641 .try_insert(table_fragment_id, edge)
1642 .unwrap();
1643 }
1644
1645 existing_fragments.extend(
1646 downstream_fragments
1647 .into_iter()
1648 .map(|(_, f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1649 );
1650
1651 existing_actor_location.extend(downstream_actor_location);
1652 }
1653
1654 Ok(Self {
1655 building_graph: graph,
1656 existing_fragments,
1657 existing_actor_location,
1658 extra_downstreams,
1659 extra_upstreams,
1660 })
1661 }
1662}
1663
1664fn gen_output_mapping(
1666 required_columns: &[PbColumnDesc],
1667 upstream_columns: &[PbColumnDesc],
1668) -> Option<DispatchOutputMapping> {
1669 let len = required_columns.len();
1670 let mut indices = vec![0; len];
1671 let mut types = None;
1672
1673 for (i, r) in required_columns.iter().enumerate() {
1674 let (ui, u) = upstream_columns
1675 .iter()
1676 .find_position(|&u| u.column_id == r.column_id)?;
1677 indices[i] = ui as u32;
1678
1679 if u.column_type != r.column_type {
1682 types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1683 upstream: u.column_type.clone(),
1684 downstream: r.column_type.clone(),
1685 };
1686 }
1687 }
1688
1689 let types = types.unwrap_or(Vec::new());
1691
1692 Some(DispatchOutputMapping { indices, types })
1693}
1694
1695fn mv_on_mv_dispatch_strategy(
1696 uses_shuffled_backfill: bool,
1697 dist_key_indices: Vec<u32>,
1698 output_mapping: DispatchOutputMapping,
1699) -> DispatchStrategy {
1700 if uses_shuffled_backfill {
1701 if !dist_key_indices.is_empty() {
1702 DispatchStrategy {
1703 r#type: DispatcherType::Hash as _,
1704 dist_key_indices,
1705 output_mapping: Some(output_mapping),
1706 }
1707 } else {
1708 DispatchStrategy {
1709 r#type: DispatcherType::Simple as _,
1710 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1712 }
1713 }
1714 } else {
1715 DispatchStrategy {
1716 r#type: DispatcherType::NoShuffle as _,
1717 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1719 }
1720 }
1721}
1722
1723impl CompleteStreamFragmentGraph {
1724 pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1727 self.building_graph
1728 .fragments
1729 .keys()
1730 .chain(self.existing_fragments.keys())
1731 .copied()
1732 }
1733
1734 pub(super) fn all_edges(
1736 &self,
1737 ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1738 self.building_graph
1739 .downstreams
1740 .iter()
1741 .chain(self.extra_downstreams.iter())
1742 .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1743 }
1744
1745 pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1747 self.existing_fragments
1748 .iter()
1749 .map(|(&id, f)| {
1750 (
1751 id,
1752 Distribution::from_fragment(f, &self.existing_actor_location),
1753 )
1754 })
1755 .collect()
1756 }
1757
1758 pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1765 let mut topo = Vec::new();
1766 let mut downstream_cnts = HashMap::new();
1767
1768 for fragment_id in self.all_fragment_ids() {
1770 let downstream_cnt = self.get_downstreams(fragment_id).count();
1772 if downstream_cnt == 0 {
1773 topo.push(fragment_id);
1774 } else {
1775 downstream_cnts.insert(fragment_id, downstream_cnt);
1776 }
1777 }
1778
1779 let mut i = 0;
1780 while let Some(&fragment_id) = topo.get(i) {
1781 i += 1;
1782 for (upstream_id, _) in self.get_upstreams(fragment_id) {
1784 let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1785 *downstream_cnt -= 1;
1786 if *downstream_cnt == 0 {
1787 downstream_cnts.remove(&upstream_id);
1788 topo.push(upstream_id);
1789 }
1790 }
1791 }
1792
1793 if !downstream_cnts.is_empty() {
1794 bail!("graph is not a DAG");
1796 }
1797
1798 Ok(topo)
1799 }
1800
1801 pub(super) fn seal_fragment(
1804 &self,
1805 id: GlobalFragmentId,
1806 actors: Vec<StreamActor>,
1807 distribution: Distribution,
1808 stream_node: StreamNode,
1809 ) -> Fragment {
1810 let building_fragment = self.get_fragment(id).into_building().unwrap();
1811 let internal_tables = building_fragment.extract_internal_tables();
1812 let BuildingFragment {
1813 inner,
1814 job_id,
1815 upstream_table_columns: _,
1816 } = building_fragment;
1817
1818 let distribution_type = distribution.to_distribution_type();
1819 let vnode_count = distribution.vnode_count();
1820
1821 let materialized_fragment_id =
1822 if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1823 job_id
1824 } else {
1825 None
1826 };
1827
1828 let vector_index_fragment_id =
1829 if inner.fragment_type_mask & FragmentTypeFlag::VectorIndexWrite as u32 != 0 {
1830 job_id
1831 } else {
1832 None
1833 };
1834
1835 let state_table_ids = internal_tables
1836 .iter()
1837 .map(|t| t.id)
1838 .chain(materialized_fragment_id)
1839 .chain(vector_index_fragment_id)
1840 .collect();
1841
1842 Fragment {
1843 fragment_id: inner.fragment_id,
1844 fragment_type_mask: inner.fragment_type_mask.into(),
1845 distribution_type,
1846 actors,
1847 state_table_ids,
1848 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1849 nodes: stream_node,
1850 }
1851 }
1852
1853 pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1856 if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1857 EitherFragment::Existing(fragment.clone())
1858 } else {
1859 EitherFragment::Building(
1860 self.building_graph
1861 .fragments
1862 .get(&fragment_id)
1863 .unwrap()
1864 .clone(),
1865 )
1866 }
1867 }
1868
1869 pub(super) fn get_downstreams(
1872 &self,
1873 fragment_id: GlobalFragmentId,
1874 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1875 self.building_graph
1876 .get_downstreams(fragment_id)
1877 .iter()
1878 .chain(
1879 self.extra_downstreams
1880 .get(&fragment_id)
1881 .into_iter()
1882 .flatten(),
1883 )
1884 .map(|(&id, edge)| (id, edge))
1885 }
1886
1887 pub(super) fn get_upstreams(
1890 &self,
1891 fragment_id: GlobalFragmentId,
1892 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1893 self.building_graph
1894 .get_upstreams(fragment_id)
1895 .iter()
1896 .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1897 .map(|(&id, edge)| (id, edge))
1898 }
1899
1900 pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1902 &self.building_graph.fragments
1903 }
1904
1905 pub(super) fn building_fragments_mut(
1907 &mut self,
1908 ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1909 &mut self.building_graph.fragments
1910 }
1911
1912 pub(super) fn max_parallelism(&self) -> usize {
1914 self.building_graph.max_parallelism()
1915 }
1916}