1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19
20use anyhow::{Context, anyhow};
21use enum_as_inner::EnumAsInner;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{
25 CDC_SOURCE_COLUMN_NUM, ColumnCatalog, Field, FragmentTypeFlag, FragmentTypeMask, TableId,
26 generate_internal_table_name_with_type,
27};
28use risingwave_common::hash::VnodeCount;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::util::stream_graph_visitor::{
31 self, visit_stream_node_cont, visit_stream_node_cont_mut,
32};
33use risingwave_connector::sink::catalog::SinkType;
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::catalog::{PbSink, PbTable, Table};
36use risingwave_pb::ddl_service::TableJobType;
37use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
38use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
39use risingwave_pb::stream_plan::stream_fragment_graph::{
40 Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
41};
42use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
43use risingwave_pb::stream_plan::{
44 BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
45 PbStreamScanType, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
46 StreamScanType,
47};
48
49use crate::barrier::SnapshotBackfillInfo;
50use crate::controller::id::IdGeneratorManager;
51use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
52use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
53use crate::stream::stream_graph::id::{
54 GlobalActorIdGen, GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen,
55};
56use crate::stream::stream_graph::schedule::Distribution;
57use crate::{MetaError, MetaResult};
58
59#[derive(Debug, Clone)]
62pub(super) struct BuildingFragment {
63 inner: StreamFragment,
65
66 job_id: Option<u32>,
68
69 upstream_table_columns: HashMap<TableId, Vec<PbColumnDesc>>,
74}
75
76impl BuildingFragment {
77 fn new(
80 id: GlobalFragmentId,
81 fragment: StreamFragment,
82 job: &StreamingJob,
83 table_id_gen: GlobalTableIdGen,
84 ) -> Self {
85 let mut fragment = StreamFragment {
86 fragment_id: id.as_global_id(),
87 ..fragment
88 };
89
90 Self::fill_internal_tables(&mut fragment, job, table_id_gen);
92
93 let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
94 let upstream_table_columns =
95 Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
96
97 Self {
98 inner: fragment,
99 job_id,
100 upstream_table_columns,
101 }
102 }
103
104 fn extract_internal_tables(&self) -> Vec<Table> {
106 let mut fragment = self.inner.to_owned();
107 let mut tables = Vec::new();
108 stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
109 tables.push(table.clone());
110 });
111 tables
112 }
113
114 fn fill_internal_tables(
116 fragment: &mut StreamFragment,
117 job: &StreamingJob,
118 table_id_gen: GlobalTableIdGen,
119 ) {
120 let fragment_id = fragment.fragment_id;
121 stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
122 table.id = table_id_gen.to_global_id(table.id).as_global_id();
123 table.schema_id = job.schema_id();
124 table.database_id = job.database_id();
125 table.name = generate_internal_table_name_with_type(
126 &job.name(),
127 fragment_id,
128 table.id,
129 table_type_name,
130 );
131 table.fragment_id = fragment_id;
132 table.owner = job.owner();
133 table.job_id = Some(job.id());
134 });
135 }
136
137 fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
139 let job_id = job.id();
140 let fragment_id = fragment.fragment_id;
141 let mut has_job = false;
142
143 stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
144 NodeBody::Materialize(materialize_node) => {
145 materialize_node.table_id = job_id;
146
147 let table = materialize_node.table.insert(job.table().unwrap().clone());
149 table.fragment_id = fragment_id; if cfg!(not(debug_assertions)) {
152 table.definition = job.name();
153 }
154
155 has_job = true;
156 }
157 NodeBody::Sink(sink_node) => {
158 sink_node.sink_desc.as_mut().unwrap().id = job_id;
159
160 has_job = true;
161 }
162 NodeBody::Dml(dml_node) => {
163 dml_node.table_id = job_id;
164 dml_node.table_version_id = job.table_version_id().unwrap();
165 }
166 NodeBody::StreamFsFetch(fs_fetch_node) => {
167 if let StreamingJob::Table(table_source, _, _) = job
168 && let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
169 && let Some(source) = table_source
170 {
171 node_inner.source_id = source.id;
172 }
173 }
174 NodeBody::Source(source_node) => {
175 match job {
176 StreamingJob::Table(source, _table, _table_job_type) => {
179 if let Some(source_inner) = source_node.source_inner.as_mut()
180 && let Some(source) = source
181 {
182 debug_assert_ne!(source.id, job_id);
183 source_inner.source_id = source.id;
184 }
185 }
186 StreamingJob::Source(source) => {
187 has_job = true;
188 if let Some(source_inner) = source_node.source_inner.as_mut() {
189 debug_assert_eq!(source.id, job_id);
190 source_inner.source_id = source.id;
191 }
192 }
193 _ => {}
195 }
196 }
197 NodeBody::StreamCdcScan(node) => {
198 if let Some(table_desc) = node.cdc_table_desc.as_mut() {
199 table_desc.table_id = job_id;
200 }
201 }
202 NodeBody::VectorIndexWrite(node) => {
203 let table = node.table.as_mut().unwrap();
204 table.id = job_id;
205 table.database_id = job.database_id();
206 table.schema_id = job.schema_id();
207 table.fragment_id = fragment_id;
208 #[cfg(not(debug_assertions))]
209 {
210 table.definition = job.name();
211 }
212
213 has_job = true;
214 }
215 _ => {}
216 });
217
218 has_job
219 }
220
221 fn extract_upstream_table_columns_except_cross_db_backfill(
223 fragment: &StreamFragment,
224 ) -> HashMap<TableId, Vec<PbColumnDesc>> {
225 let mut table_columns = HashMap::new();
226
227 stream_graph_visitor::visit_fragment(fragment, |node_body| {
228 let (table_id, column_ids) = match node_body {
229 NodeBody::StreamScan(stream_scan) => {
230 if stream_scan.get_stream_scan_type().unwrap()
231 == StreamScanType::CrossDbSnapshotBackfill
232 {
233 return;
234 }
235 (stream_scan.table_id.into(), stream_scan.upstream_columns())
236 }
237 NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
238 NodeBody::SourceBackfill(backfill) => (
239 backfill.upstream_source_id.into(),
240 backfill.column_descs(),
242 ),
243 _ => return,
244 };
245 table_columns
246 .try_insert(table_id, column_ids)
247 .expect("currently there should be no two same upstream tables in a fragment");
248 });
249
250 table_columns
251 }
252
253 pub fn has_shuffled_backfill(&self) -> bool {
254 let stream_node = match self.inner.node.as_ref() {
255 Some(node) => node,
256 _ => return false,
257 };
258 let mut has_shuffled_backfill = false;
259 let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
260 visit_stream_node_cont(stream_node, |node| {
261 let is_shuffled_backfill = if let Some(node) = &node.node_body
262 && let Some(node) = node.as_stream_scan()
263 {
264 node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
265 || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
266 } else {
267 false
268 };
269 if is_shuffled_backfill {
270 *has_shuffled_backfill_mut_ref = true;
271 false
272 } else {
273 true
274 }
275 });
276 has_shuffled_backfill
277 }
278}
279
280impl Deref for BuildingFragment {
281 type Target = StreamFragment;
282
283 fn deref(&self) -> &Self::Target {
284 &self.inner
285 }
286}
287
288impl DerefMut for BuildingFragment {
289 fn deref_mut(&mut self) -> &mut Self::Target {
290 &mut self.inner
291 }
292}
293
294#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
297pub(super) enum EdgeId {
298 Internal {
300 link_id: u64,
303 },
304
305 UpstreamExternal {
308 upstream_table_id: TableId,
310 downstream_fragment_id: GlobalFragmentId,
312 },
313
314 DownstreamExternal(DownstreamExternalEdgeId),
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
320pub(super) struct DownstreamExternalEdgeId {
321 pub(super) original_upstream_fragment_id: GlobalFragmentId,
323 pub(super) downstream_fragment_id: GlobalFragmentId,
325}
326
327#[derive(Debug, Clone)]
331pub(super) struct StreamFragmentEdge {
332 pub id: EdgeId,
334
335 pub dispatch_strategy: DispatchStrategy,
337}
338
339impl StreamFragmentEdge {
340 fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
341 Self {
342 id: EdgeId::Internal {
345 link_id: edge.link_id,
346 },
347 dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
348 }
349 }
350}
351
352fn clone_fragment(fragment: &Fragment, id_generator_manager: &IdGeneratorManager) -> Fragment {
353 let fragment_id = GlobalFragmentIdGen::new(id_generator_manager, 1)
354 .to_global_id(0)
355 .as_global_id();
356 let actor_id_gen = GlobalActorIdGen::new(id_generator_manager, fragment.actors.len() as _);
357 Fragment {
358 fragment_id,
359 fragment_type_mask: fragment.fragment_type_mask,
360 distribution_type: fragment.distribution_type,
361 actors: fragment
362 .actors
363 .iter()
364 .enumerate()
365 .map(|(i, actor)| StreamActor {
366 actor_id: actor_id_gen.to_global_id(i as _).as_global_id() as _,
367 fragment_id,
368 vnode_bitmap: actor.vnode_bitmap.clone(),
369 mview_definition: actor.mview_definition.clone(),
370 expr_context: actor.expr_context.clone(),
371 })
372 .collect(),
373 state_table_ids: fragment.state_table_ids.clone(),
374 maybe_vnode_count: fragment.maybe_vnode_count,
375 nodes: fragment.nodes.clone(),
376 }
377}
378
379pub fn check_sink_fragments_support_refresh_schema(
380 fragments: &BTreeMap<FragmentId, Fragment>,
381) -> MetaResult<()> {
382 if fragments.len() != 1 {
383 return Err(anyhow!(
384 "sink with auto schema change should have only 1 fragment, but got {:?}",
385 fragments.len()
386 )
387 .into());
388 }
389 let (_, fragment) = fragments.first_key_value().expect("non-empty");
390 let sink_node = &fragment.nodes;
391 let PbNodeBody::Sink(_) = sink_node.node_body.as_ref().unwrap() else {
392 return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
393 };
394 let [stream_scan_node] = sink_node.input.as_slice() else {
395 panic!("Sink has more than 1 input: {:?}", sink_node.input);
396 };
397 let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_ref().unwrap() else {
398 return Err(anyhow!(
399 "expect PbNodeBody::StreamScan but got: {:?}",
400 stream_scan_node.node_body
401 )
402 .into());
403 };
404 let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
405 if stream_scan_type != PbStreamScanType::ArrangementBackfill {
406 return Err(anyhow!(
407 "unsupported stream_scan_type for auto refresh schema: {:?}",
408 stream_scan_type
409 )
410 .into());
411 }
412 let [merge_node, _batch_plan_node] = stream_scan_node.input.as_slice() else {
413 panic!(
414 "the number of StreamScan inputs is not 2: {:?}",
415 stream_scan_node.input
416 );
417 };
418 let NodeBody::Merge(_) = merge_node.node_body.as_ref().unwrap() else {
419 return Err(anyhow!(
420 "expect PbNodeBody::Merge but got: {:?}",
421 merge_node.node_body
422 )
423 .into());
424 };
425 Ok(())
426}
427
428pub fn rewrite_refresh_schema_sink_fragment(
429 original_sink_fragment: &Fragment,
430 sink: &PbSink,
431 newly_added_columns: &[ColumnCatalog],
432 upstream_table: &PbTable,
433 upstream_table_fragment_id: FragmentId,
434 id_generator_manager: &IdGeneratorManager,
435) -> MetaResult<(Fragment, Vec<PbColumnCatalog>, Option<PbTable>)> {
436 let mut new_sink_columns = sink.columns.clone();
437 fn extend_sink_columns(
438 sink_columns: &mut Vec<PbColumnCatalog>,
439 new_columns: &[ColumnCatalog],
440 get_column_name: impl Fn(&String) -> String,
441 ) {
442 let next_column_id = sink_columns
443 .iter()
444 .map(|col| col.column_desc.as_ref().unwrap().column_id + 1)
445 .max()
446 .unwrap_or(1);
447 sink_columns.extend(new_columns.iter().enumerate().map(|(i, col)| {
448 let mut col = col.to_protobuf();
449 let column_desc = col.column_desc.as_mut().unwrap();
450 column_desc.column_id = next_column_id + (i as i32);
451 column_desc.name = get_column_name(&column_desc.name);
452 col
453 }));
454 }
455 extend_sink_columns(&mut new_sink_columns, newly_added_columns, |name| {
456 name.clone()
457 });
458
459 let mut new_sink_fragment = clone_fragment(original_sink_fragment, id_generator_manager);
460 let sink_node = &mut new_sink_fragment.nodes;
461 let PbNodeBody::Sink(sink_node_body) = sink_node.node_body.as_mut().unwrap() else {
462 return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
463 };
464 let [stream_scan_node] = sink_node.input.as_mut_slice() else {
465 panic!("Sink has more than 1 input: {:?}", sink_node.input);
466 };
467 let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_mut().unwrap() else {
468 return Err(anyhow!(
469 "expect PbNodeBody::StreamScan but got: {:?}",
470 stream_scan_node.node_body
471 )
472 .into());
473 };
474 let [merge_node, _batch_plan_node] = stream_scan_node.input.as_mut_slice() else {
475 panic!(
476 "the number of StreamScan inputs is not 2: {:?}",
477 stream_scan_node.input
478 );
479 };
480 let NodeBody::Merge(merge) = merge_node.node_body.as_mut().unwrap() else {
481 return Err(anyhow!(
482 "expect PbNodeBody::Merge but got: {:?}",
483 merge_node.node_body
484 )
485 .into());
486 };
487 sink_node.identity = {
490 let sink_type = SinkType::from_proto(sink.sink_type());
491 let sink_type_str = if sink_type.is_append_only() {
492 "append-only"
493 } else {
494 "upsert"
495 };
496 let column_names = new_sink_columns
497 .iter()
498 .map(|col| {
499 ColumnCatalog::from(col.clone())
500 .name_with_hidden()
501 .to_string()
502 })
503 .join(", ");
504 let downstream_pk = if sink_type.is_upsert() {
505 let downstream_pk = sink
506 .downstream_pk
507 .iter()
508 .map(|i| &sink.columns[*i as usize].column_desc.as_ref().unwrap().name)
509 .collect_vec();
510 format!(", downstream_pk: {downstream_pk:?}")
511 } else {
512 "".to_owned()
513 };
514 format!("StreamSink {{ type: {sink_type_str}, columns: [{column_names}]{downstream_pk} }}")
515 };
516 sink_node
517 .fields
518 .extend(newly_added_columns.iter().map(|col| {
519 Field::new(
520 format!("{}.{}", upstream_table.name, col.column_desc.name),
521 col.data_type().clone(),
522 )
523 .to_prost()
524 }));
525
526 let new_log_store_table = if let Some(log_store_table) = &mut sink_node_body.table {
527 extend_sink_columns(&mut log_store_table.columns, newly_added_columns, |name| {
528 format!("{}_{}", upstream_table.name, name)
529 });
530 Some(log_store_table.clone())
531 } else {
532 None
533 };
534 sink_node_body.sink_desc.as_mut().unwrap().column_catalogs = new_sink_columns.clone();
535
536 stream_scan_node
538 .fields
539 .extend(newly_added_columns.iter().map(|col| {
540 Field::new(
541 format!("{}.{}", upstream_table.name, col.column_desc.name),
542 col.data_type().clone(),
543 )
544 .to_prost()
545 }));
546 stream_scan_node.identity = {
548 let columns = stream_scan_node
549 .fields
550 .iter()
551 .map(|col| &col.name)
552 .join(", ");
553 format!("StreamTableScan {{ table: t, columns: [{columns}] }}")
554 };
555
556 let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
557 if stream_scan_type != PbStreamScanType::ArrangementBackfill {
558 return Err(anyhow!(
559 "unsupported stream_scan_type for auto refresh schema: {:?}",
560 stream_scan_type
561 )
562 .into());
563 }
564 scan.arrangement_table = Some(upstream_table.clone());
565 scan.output_indices.extend(
566 (0..newly_added_columns.len()).map(|i| (i + scan.upstream_column_ids.len()) as u32),
567 );
568 scan.upstream_column_ids.extend(
569 newly_added_columns
570 .iter()
571 .map(|col| col.column_id().get_id()),
572 );
573 let table_desc = scan.table_desc.as_mut().unwrap();
574 table_desc
575 .value_indices
576 .extend((0..newly_added_columns.len()).map(|i| (i + table_desc.columns.len()) as u32));
577 table_desc.columns.extend(
578 newly_added_columns
579 .iter()
580 .map(|col| col.column_desc.to_protobuf()),
581 );
582
583 merge_node.fields = scan
585 .upstream_column_ids
586 .iter()
587 .map(|&column_id| {
588 let col = upstream_table
589 .columns
590 .iter()
591 .find(|c| c.column_desc.as_ref().unwrap().column_id == column_id)
592 .unwrap();
593 let col_desc = col.column_desc.as_ref().unwrap();
594 Field::new(
595 col_desc.name.clone(),
596 col_desc.column_type.as_ref().unwrap().into(),
597 )
598 .to_prost()
599 })
600 .collect();
601 merge.upstream_fragment_id = upstream_table_fragment_id;
602 Ok((new_sink_fragment, new_sink_columns, new_log_store_table))
603}
604
605pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
610
611#[derive(Default, Debug)]
618pub struct StreamFragmentGraph {
619 pub(super) fragments: HashMap<GlobalFragmentId, BuildingFragment>,
621
622 pub(super) downstreams:
624 HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
625
626 pub(super) upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
628
629 dependent_table_ids: HashSet<TableId>,
631
632 specified_parallelism: Option<NonZeroUsize>,
635
636 max_parallelism: usize,
646
647 backfill_order: BackfillOrder,
649}
650
651impl StreamFragmentGraph {
652 pub fn new(
655 env: &MetaSrvEnv,
656 proto: StreamFragmentGraphProto,
657 job: &StreamingJob,
658 ) -> MetaResult<Self> {
659 let fragment_id_gen =
660 GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
661 let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
665
666 let fragments: HashMap<_, _> = proto
668 .fragments
669 .into_iter()
670 .map(|(id, fragment)| {
671 let id = fragment_id_gen.to_global_id(id);
672 let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
673 (id, fragment)
674 })
675 .collect();
676
677 assert_eq!(
678 fragments
679 .values()
680 .map(|f| f.extract_internal_tables().len() as u32)
681 .sum::<u32>(),
682 proto.table_ids_cnt
683 );
684
685 let mut downstreams = HashMap::new();
687 let mut upstreams = HashMap::new();
688
689 for edge in proto.edges {
690 let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
691 let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
692 let edge = StreamFragmentEdge::from_protobuf(&edge);
693
694 upstreams
695 .entry(downstream_id)
696 .or_insert_with(HashMap::new)
697 .try_insert(upstream_id, edge.clone())
698 .unwrap();
699 downstreams
700 .entry(upstream_id)
701 .or_insert_with(HashMap::new)
702 .try_insert(downstream_id, edge)
703 .unwrap();
704 }
705
706 let dependent_table_ids = proto
709 .dependent_table_ids
710 .iter()
711 .map(TableId::from)
712 .collect();
713
714 let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
715 Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
716 } else {
717 None
718 };
719
720 let max_parallelism = proto.max_parallelism as usize;
721 let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
722 order: Default::default(),
723 });
724
725 Ok(Self {
726 fragments,
727 downstreams,
728 upstreams,
729 dependent_table_ids,
730 specified_parallelism,
731 max_parallelism,
732 backfill_order,
733 })
734 }
735
736 pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
742 let mut tables = BTreeMap::new();
743 for fragment in self.fragments.values() {
744 for table in fragment.extract_internal_tables() {
745 let table_id = table.id;
746 tables
747 .try_insert(table_id, table)
748 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
749 }
750 }
751 tables
752 }
753
754 pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
757 for fragment in self.fragments.values_mut() {
758 stream_graph_visitor::visit_internal_tables(
759 &mut fragment.inner,
760 |table, _table_type_name| {
761 let target = table_id_map.get(&table.id).cloned().unwrap();
762 table.id = target;
763 },
764 );
765 }
766 }
767
768 pub fn fit_internal_tables_trivial(
771 &mut self,
772 mut old_internal_tables: Vec<Table>,
773 ) -> MetaResult<()> {
774 let mut new_internal_table_ids = Vec::new();
775 for fragment in self.fragments.values() {
776 for table in &fragment.extract_internal_tables() {
777 new_internal_table_ids.push(table.id);
778 }
779 }
780
781 if new_internal_table_ids.len() != old_internal_tables.len() {
782 bail!(
783 "Different number of internal tables. New: {}, Old: {}",
784 new_internal_table_ids.len(),
785 old_internal_tables.len()
786 );
787 }
788 old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
789 new_internal_table_ids.sort();
790
791 let internal_table_id_map = new_internal_table_ids
792 .into_iter()
793 .zip_eq_fast(old_internal_tables.into_iter())
794 .collect::<HashMap<_, _>>();
795
796 for fragment in self.fragments.values_mut() {
799 stream_graph_visitor::visit_internal_tables(
800 &mut fragment.inner,
801 |table, _table_type_name| {
802 let target = internal_table_id_map.get(&table.id).cloned().unwrap();
804 *table = target;
805 },
806 );
807 }
808
809 Ok(())
810 }
811
812 pub fn fit_internal_table_ids_with_mapping(&mut self, mut matches: HashMap<u32, Table>) {
814 for fragment in self.fragments.values_mut() {
815 stream_graph_visitor::visit_internal_tables(
816 &mut fragment.inner,
817 |table, _table_type_name| {
818 let target = matches.remove(&table.id).unwrap_or_else(|| {
819 panic!("no matching table for table {}({})", table.id, table.name)
820 });
821 table.id = target.id;
822 table.maybe_vnode_count = target.maybe_vnode_count;
823 },
824 );
825 }
826 }
827
828 pub fn table_fragment_id(&self) -> FragmentId {
830 self.fragments
831 .values()
832 .filter(|b| b.job_id.is_some())
833 .map(|b| b.fragment_id)
834 .exactly_one()
835 .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
836 }
837
838 pub fn dml_fragment_id(&self) -> Option<FragmentId> {
840 self.fragments
841 .values()
842 .filter(|b| {
843 FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
844 })
845 .map(|b| b.fragment_id)
846 .at_most_one()
847 .expect("require at most 1 dml node when creating the streaming job")
848 }
849
850 pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
852 &self.dependent_table_ids
853 }
854
855 pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
857 self.specified_parallelism
858 }
859
860 pub fn max_parallelism(&self) -> usize {
862 self.max_parallelism
863 }
864
865 fn get_downstreams(
867 &self,
868 fragment_id: GlobalFragmentId,
869 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
870 self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
871 }
872
873 fn get_upstreams(
875 &self,
876 fragment_id: GlobalFragmentId,
877 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
878 self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
879 }
880
881 pub fn collect_snapshot_backfill_info(
882 &self,
883 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
884 Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
885 (
886 fragment.node.as_ref().unwrap(),
887 fragment.fragment_type_mask.into(),
888 )
889 }))
890 }
891
892 pub fn collect_snapshot_backfill_info_impl(
894 fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
895 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
896 let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
897 let mut cross_db_info = SnapshotBackfillInfo {
898 upstream_mv_table_id_to_backfill_epoch: Default::default(),
899 };
900 let mut result = Ok(());
901 for (node, fragment_type_mask) in fragments {
902 visit_stream_node_cont(node, |node| {
903 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
904 let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
905 .expect("invalid stream_scan_type");
906 let is_snapshot_backfill = match stream_scan_type {
907 StreamScanType::SnapshotBackfill => {
908 assert!(
909 fragment_type_mask
910 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
911 );
912 true
913 }
914 StreamScanType::CrossDbSnapshotBackfill => {
915 assert!(
916 fragment_type_mask
917 .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
918 );
919 cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
920 TableId::new(stream_scan.table_id),
921 stream_scan.snapshot_backfill_epoch,
922 );
923
924 return true;
925 }
926 _ => false,
927 };
928
929 match &mut prev_stream_scan {
930 Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
931 match (prev_snapshot_backfill_info, is_snapshot_backfill) {
932 (Some(prev_snapshot_backfill_info), true) => {
933 prev_snapshot_backfill_info
934 .upstream_mv_table_id_to_backfill_epoch
935 .insert(
936 TableId::new(stream_scan.table_id),
937 stream_scan.snapshot_backfill_epoch,
938 );
939 true
940 }
941 (None, false) => true,
942 (_, _) => {
943 result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
944 false
945 }
946 }
947 }
948 None => {
949 prev_stream_scan = Some((
950 if is_snapshot_backfill {
951 Some(SnapshotBackfillInfo {
952 upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
953 [(
954 TableId::new(stream_scan.table_id),
955 stream_scan.snapshot_backfill_epoch,
956 )],
957 ),
958 })
959 } else {
960 None
961 },
962 *stream_scan.clone(),
963 ));
964 true
965 }
966 }
967 } else {
968 true
969 }
970 })
971 }
972 result.map(|_| {
973 (
974 prev_stream_scan
975 .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
976 .unwrap_or(None),
977 cross_db_info,
978 )
979 })
980 }
981
982 pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
984 let mut mapping = HashMap::new();
985 for (fragment_id, fragment) in &self.fragments {
986 let fragment_id = fragment_id.as_global_id();
987 let fragment_mask = fragment.fragment_type_mask;
988 let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
989 let has_some_scan = candidates
990 .into_iter()
991 .any(|flag| (fragment_mask & flag as u32) > 0);
992 if has_some_scan {
993 visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
994 match node.node_body.as_ref() {
995 Some(NodeBody::StreamScan(stream_scan)) => {
996 let table_id = stream_scan.table_id;
997 let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
998 fragments.push(fragment_id);
999 false
1001 }
1002 Some(NodeBody::SourceBackfill(source_backfill)) => {
1003 let source_id = source_backfill.upstream_source_id;
1004 let fragments: &mut Vec<_> = mapping.entry(source_id).or_default();
1005 fragments.push(fragment_id);
1006 false
1008 }
1009 _ => true,
1010 }
1011 })
1012 }
1013 }
1014 mapping
1015 }
1016
1017 pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
1021 let mapping = self.collect_backfill_mapping();
1022 let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
1023 for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
1024 let fragment_ids = mapping.get(rel_id).unwrap();
1025 for fragment_id in fragment_ids {
1026 let downstream_fragment_ids = downstream_rel_ids
1027 .data
1028 .iter()
1029 .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
1030 .copied()
1031 .collect();
1032 fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
1033 }
1034 }
1035 fragment_ordering
1036 }
1037}
1038
1039pub fn fill_snapshot_backfill_epoch(
1042 node: &mut StreamNode,
1043 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1044 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1045) -> MetaResult<bool> {
1046 let mut result = Ok(());
1047 let mut applied = false;
1048 visit_stream_node_cont_mut(node, |node| {
1049 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
1050 && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
1051 || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
1052 {
1053 result = try {
1054 let table_id = TableId::new(stream_scan.table_id);
1055 let snapshot_epoch = cross_db_snapshot_backfill_info
1056 .upstream_mv_table_id_to_backfill_epoch
1057 .get(&table_id)
1058 .or_else(|| {
1059 snapshot_backfill_info.and_then(|snapshot_backfill_info| {
1060 snapshot_backfill_info
1061 .upstream_mv_table_id_to_backfill_epoch
1062 .get(&table_id)
1063 })
1064 })
1065 .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
1066 .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
1067 if let Some(prev_snapshot_epoch) =
1068 stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
1069 {
1070 Err(anyhow!(
1071 "snapshot backfill epoch set again: {} {} {}",
1072 table_id,
1073 prev_snapshot_epoch,
1074 snapshot_epoch
1075 ))?;
1076 }
1077 applied = true;
1078 };
1079 result.is_ok()
1080 } else {
1081 true
1082 }
1083 });
1084 result.map(|_| applied)
1085}
1086
1087static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
1088 LazyLock::new(HashMap::new);
1089
1090#[derive(Debug, Clone, EnumAsInner)]
1093pub(super) enum EitherFragment {
1094 Building(BuildingFragment),
1096
1097 Existing(Fragment),
1099}
1100
1101#[derive(Debug)]
1110pub struct CompleteStreamFragmentGraph {
1111 building_graph: StreamFragmentGraph,
1113
1114 existing_fragments: HashMap<GlobalFragmentId, Fragment>,
1116
1117 existing_actor_location: HashMap<ActorId, WorkerId>,
1119
1120 extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1122
1123 extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1125}
1126
1127pub struct FragmentGraphUpstreamContext {
1128 upstream_root_fragments: HashMap<TableId, Fragment>,
1131 upstream_actor_location: HashMap<ActorId, WorkerId>,
1132}
1133
1134pub struct FragmentGraphDownstreamContext {
1135 original_root_fragment_id: FragmentId,
1136 downstream_fragments: Vec<(DispatcherType, Fragment)>,
1137 downstream_actor_location: HashMap<ActorId, WorkerId>,
1138}
1139
1140impl CompleteStreamFragmentGraph {
1141 #[cfg(test)]
1144 pub fn for_test(graph: StreamFragmentGraph) -> Self {
1145 Self {
1146 building_graph: graph,
1147 existing_fragments: Default::default(),
1148 existing_actor_location: Default::default(),
1149 extra_downstreams: Default::default(),
1150 extra_upstreams: Default::default(),
1151 }
1152 }
1153
1154 pub fn with_upstreams(
1158 graph: StreamFragmentGraph,
1159 upstream_root_fragments: HashMap<TableId, Fragment>,
1160 existing_actor_location: HashMap<ActorId, WorkerId>,
1161 job_type: StreamingJobType,
1162 ) -> MetaResult<Self> {
1163 Self::build_helper(
1164 graph,
1165 Some(FragmentGraphUpstreamContext {
1166 upstream_root_fragments,
1167 upstream_actor_location: existing_actor_location,
1168 }),
1169 None,
1170 job_type,
1171 )
1172 }
1173
1174 pub fn with_downstreams(
1177 graph: StreamFragmentGraph,
1178 original_root_fragment_id: FragmentId,
1179 downstream_fragments: Vec<(DispatcherType, Fragment)>,
1180 existing_actor_location: HashMap<ActorId, WorkerId>,
1181 job_type: StreamingJobType,
1182 ) -> MetaResult<Self> {
1183 Self::build_helper(
1184 graph,
1185 None,
1186 Some(FragmentGraphDownstreamContext {
1187 original_root_fragment_id,
1188 downstream_fragments,
1189 downstream_actor_location: existing_actor_location,
1190 }),
1191 job_type,
1192 )
1193 }
1194
1195 pub fn with_upstreams_and_downstreams(
1197 graph: StreamFragmentGraph,
1198 upstream_root_fragments: HashMap<TableId, Fragment>,
1199 upstream_actor_location: HashMap<ActorId, WorkerId>,
1200 original_root_fragment_id: FragmentId,
1201 downstream_fragments: Vec<(DispatcherType, Fragment)>,
1202 downstream_actor_location: HashMap<ActorId, WorkerId>,
1203 job_type: StreamingJobType,
1204 ) -> MetaResult<Self> {
1205 Self::build_helper(
1206 graph,
1207 Some(FragmentGraphUpstreamContext {
1208 upstream_root_fragments,
1209 upstream_actor_location,
1210 }),
1211 Some(FragmentGraphDownstreamContext {
1212 original_root_fragment_id,
1213 downstream_fragments,
1214 downstream_actor_location,
1215 }),
1216 job_type,
1217 )
1218 }
1219
1220 fn build_helper(
1222 mut graph: StreamFragmentGraph,
1223 upstream_ctx: Option<FragmentGraphUpstreamContext>,
1224 downstream_ctx: Option<FragmentGraphDownstreamContext>,
1225 job_type: StreamingJobType,
1226 ) -> MetaResult<Self> {
1227 let mut extra_downstreams = HashMap::new();
1228 let mut extra_upstreams = HashMap::new();
1229 let mut existing_fragments = HashMap::new();
1230
1231 let mut existing_actor_location = HashMap::new();
1232
1233 if let Some(FragmentGraphUpstreamContext {
1234 upstream_root_fragments,
1235 upstream_actor_location,
1236 }) = upstream_ctx
1237 {
1238 for (&id, fragment) in &mut graph.fragments {
1239 let uses_shuffled_backfill = fragment.has_shuffled_backfill();
1240
1241 for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
1242 let upstream_fragment = upstream_root_fragments
1243 .get(&upstream_table_id)
1244 .context("upstream fragment not found")?;
1245 let upstream_root_fragment_id =
1246 GlobalFragmentId::new(upstream_fragment.fragment_id);
1247
1248 let edge = match job_type {
1249 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1250 assert_ne!(
1253 (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
1254 0
1255 );
1256
1257 tracing::debug!(
1258 ?upstream_root_fragment_id,
1259 ?required_columns,
1260 identity = ?fragment.inner.get_node().unwrap().get_identity(),
1261 current_frag_id=?id,
1262 "CdcFilter with upstream source fragment"
1263 );
1264
1265 StreamFragmentEdge {
1266 id: EdgeId::UpstreamExternal {
1267 upstream_table_id,
1268 downstream_fragment_id: id,
1269 },
1270 dispatch_strategy: DispatchStrategy {
1273 r#type: DispatcherType::NoShuffle as _,
1274 dist_key_indices: vec![], output_mapping: DispatchOutputMapping::identical(
1276 CDC_SOURCE_COLUMN_NUM as _,
1277 )
1278 .into(),
1279 },
1280 }
1281 }
1282
1283 StreamingJobType::MaterializedView
1285 | StreamingJobType::Sink
1286 | StreamingJobType::Index => {
1287 if upstream_fragment
1290 .fragment_type_mask
1291 .contains(FragmentTypeFlag::Mview)
1292 {
1293 let (dist_key_indices, output_mapping) = {
1295 let nodes = &upstream_fragment.nodes;
1296 let mview_node =
1297 nodes.get_node_body().unwrap().as_materialize().unwrap();
1298 let all_columns = mview_node.column_descs();
1299 let dist_key_indices = mview_node.dist_key_indices();
1300 let output_mapping = gen_output_mapping(
1301 required_columns,
1302 &all_columns,
1303 )
1304 .context(
1305 "BUG: column not found in the upstream materialized view",
1306 )?;
1307 (dist_key_indices, output_mapping)
1308 };
1309 let dispatch_strategy = mv_on_mv_dispatch_strategy(
1310 uses_shuffled_backfill,
1311 dist_key_indices,
1312 output_mapping,
1313 );
1314
1315 StreamFragmentEdge {
1316 id: EdgeId::UpstreamExternal {
1317 upstream_table_id,
1318 downstream_fragment_id: id,
1319 },
1320 dispatch_strategy,
1321 }
1322 }
1323 else if upstream_fragment
1326 .fragment_type_mask
1327 .contains(FragmentTypeFlag::Source)
1328 {
1329 let output_mapping = {
1330 let nodes = &upstream_fragment.nodes;
1331 let source_node =
1332 nodes.get_node_body().unwrap().as_source().unwrap();
1333
1334 let all_columns = source_node.column_descs().unwrap();
1335 gen_output_mapping(required_columns, &all_columns).context(
1336 "BUG: column not found in the upstream source node",
1337 )?
1338 };
1339
1340 StreamFragmentEdge {
1341 id: EdgeId::UpstreamExternal {
1342 upstream_table_id,
1343 downstream_fragment_id: id,
1344 },
1345 dispatch_strategy: DispatchStrategy {
1348 r#type: DispatcherType::NoShuffle as _,
1349 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1351 },
1352 }
1353 } else {
1354 bail!(
1355 "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1356 upstream_fragment.fragment_type_mask
1357 )
1358 }
1359 }
1360 StreamingJobType::Source | StreamingJobType::Table(_) => {
1361 bail!(
1362 "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1363 job_type
1364 )
1365 }
1366 };
1367
1368 extra_downstreams
1370 .entry(upstream_root_fragment_id)
1371 .or_insert_with(HashMap::new)
1372 .try_insert(id, edge.clone())
1373 .unwrap();
1374 extra_upstreams
1375 .entry(id)
1376 .or_insert_with(HashMap::new)
1377 .try_insert(upstream_root_fragment_id, edge)
1378 .unwrap();
1379 }
1380 }
1381
1382 existing_fragments.extend(
1383 upstream_root_fragments
1384 .into_values()
1385 .map(|f| (GlobalFragmentId::new(f.fragment_id), f)),
1386 );
1387
1388 existing_actor_location.extend(upstream_actor_location);
1389 }
1390
1391 if let Some(FragmentGraphDownstreamContext {
1392 original_root_fragment_id,
1393 downstream_fragments,
1394 downstream_actor_location,
1395 }) = downstream_ctx
1396 {
1397 let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1398 let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1399
1400 for (dispatcher_type, fragment) in &downstream_fragments {
1403 let id = GlobalFragmentId::new(fragment.fragment_id);
1404
1405 let output_columns = {
1407 let mut res = None;
1408
1409 stream_graph_visitor::visit_stream_node_body(&fragment.nodes, |node_body| {
1410 let columns = match node_body {
1411 NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1412 NodeBody::SourceBackfill(source_backfill) => {
1413 source_backfill.column_descs()
1415 }
1416 _ => return,
1417 };
1418 res = Some(columns);
1419 });
1420
1421 res.context("failed to locate downstream scan")?
1422 };
1423
1424 let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1425 let nodes = table_fragment.node.as_ref().unwrap();
1426
1427 let (dist_key_indices, output_mapping) = match job_type {
1428 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1429 let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1430 let all_columns = mview_node.column_descs();
1431 let dist_key_indices = mview_node.dist_key_indices();
1432 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1433 .ok_or_else(|| {
1434 MetaError::invalid_parameter(
1435 "unable to drop the column due to \
1436 being referenced by downstream materialized views or sinks",
1437 )
1438 })?;
1439 (dist_key_indices, output_mapping)
1440 }
1441
1442 StreamingJobType::Source => {
1443 let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1444 let all_columns = source_node.column_descs().unwrap();
1445 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1446 .ok_or_else(|| {
1447 MetaError::invalid_parameter(
1448 "unable to drop the column due to \
1449 being referenced by downstream materialized views or sinks",
1450 )
1451 })?;
1452 assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1453 (
1454 vec![], output_mapping,
1456 )
1457 }
1458
1459 _ => bail!("unsupported job type for replacement: {job_type:?}"),
1460 };
1461
1462 let edge = StreamFragmentEdge {
1463 id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1464 original_upstream_fragment_id: original_table_fragment_id,
1465 downstream_fragment_id: id,
1466 }),
1467 dispatch_strategy: DispatchStrategy {
1468 r#type: *dispatcher_type as i32,
1469 output_mapping: Some(output_mapping),
1470 dist_key_indices,
1471 },
1472 };
1473
1474 extra_downstreams
1475 .entry(table_fragment_id)
1476 .or_insert_with(HashMap::new)
1477 .try_insert(id, edge.clone())
1478 .unwrap();
1479 extra_upstreams
1480 .entry(id)
1481 .or_insert_with(HashMap::new)
1482 .try_insert(table_fragment_id, edge)
1483 .unwrap();
1484 }
1485
1486 existing_fragments.extend(
1487 downstream_fragments
1488 .into_iter()
1489 .map(|(_, f)| (GlobalFragmentId::new(f.fragment_id), f)),
1490 );
1491
1492 existing_actor_location.extend(downstream_actor_location);
1493 }
1494
1495 Ok(Self {
1496 building_graph: graph,
1497 existing_fragments,
1498 existing_actor_location,
1499 extra_downstreams,
1500 extra_upstreams,
1501 })
1502 }
1503}
1504
1505fn gen_output_mapping(
1507 required_columns: &[PbColumnDesc],
1508 upstream_columns: &[PbColumnDesc],
1509) -> Option<DispatchOutputMapping> {
1510 let len = required_columns.len();
1511 let mut indices = vec![0; len];
1512 let mut types = None;
1513
1514 for (i, r) in required_columns.iter().enumerate() {
1515 let (ui, u) = upstream_columns
1516 .iter()
1517 .find_position(|&u| u.column_id == r.column_id)?;
1518 indices[i] = ui as u32;
1519
1520 if u.column_type != r.column_type {
1523 types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1524 upstream: u.column_type.clone(),
1525 downstream: r.column_type.clone(),
1526 };
1527 }
1528 }
1529
1530 let types = types.unwrap_or(Vec::new());
1532
1533 Some(DispatchOutputMapping { indices, types })
1534}
1535
1536fn mv_on_mv_dispatch_strategy(
1537 uses_shuffled_backfill: bool,
1538 dist_key_indices: Vec<u32>,
1539 output_mapping: DispatchOutputMapping,
1540) -> DispatchStrategy {
1541 if uses_shuffled_backfill {
1542 if !dist_key_indices.is_empty() {
1543 DispatchStrategy {
1544 r#type: DispatcherType::Hash as _,
1545 dist_key_indices,
1546 output_mapping: Some(output_mapping),
1547 }
1548 } else {
1549 DispatchStrategy {
1550 r#type: DispatcherType::Simple as _,
1551 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1553 }
1554 }
1555 } else {
1556 DispatchStrategy {
1557 r#type: DispatcherType::NoShuffle as _,
1558 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1560 }
1561 }
1562}
1563
1564impl CompleteStreamFragmentGraph {
1565 pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1568 self.building_graph
1569 .fragments
1570 .keys()
1571 .chain(self.existing_fragments.keys())
1572 .copied()
1573 }
1574
1575 pub(super) fn all_edges(
1577 &self,
1578 ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1579 self.building_graph
1580 .downstreams
1581 .iter()
1582 .chain(self.extra_downstreams.iter())
1583 .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1584 }
1585
1586 pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1588 self.existing_fragments
1589 .iter()
1590 .map(|(&id, f)| {
1591 (
1592 id,
1593 Distribution::from_fragment(f, &self.existing_actor_location),
1594 )
1595 })
1596 .collect()
1597 }
1598
1599 pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1606 let mut topo = Vec::new();
1607 let mut downstream_cnts = HashMap::new();
1608
1609 for fragment_id in self.all_fragment_ids() {
1611 let downstream_cnt = self.get_downstreams(fragment_id).count();
1613 if downstream_cnt == 0 {
1614 topo.push(fragment_id);
1615 } else {
1616 downstream_cnts.insert(fragment_id, downstream_cnt);
1617 }
1618 }
1619
1620 let mut i = 0;
1621 while let Some(&fragment_id) = topo.get(i) {
1622 i += 1;
1623 for (upstream_id, _) in self.get_upstreams(fragment_id) {
1625 let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1626 *downstream_cnt -= 1;
1627 if *downstream_cnt == 0 {
1628 downstream_cnts.remove(&upstream_id);
1629 topo.push(upstream_id);
1630 }
1631 }
1632 }
1633
1634 if !downstream_cnts.is_empty() {
1635 bail!("graph is not a DAG");
1637 }
1638
1639 Ok(topo)
1640 }
1641
1642 pub(super) fn seal_fragment(
1645 &self,
1646 id: GlobalFragmentId,
1647 actors: Vec<StreamActor>,
1648 distribution: Distribution,
1649 stream_node: StreamNode,
1650 ) -> Fragment {
1651 let building_fragment = self.get_fragment(id).into_building().unwrap();
1652 let internal_tables = building_fragment.extract_internal_tables();
1653 let BuildingFragment {
1654 inner,
1655 job_id,
1656 upstream_table_columns: _,
1657 } = building_fragment;
1658
1659 let distribution_type = distribution.to_distribution_type();
1660 let vnode_count = distribution.vnode_count();
1661
1662 let materialized_fragment_id =
1663 if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1664 job_id
1665 } else {
1666 None
1667 };
1668
1669 let vector_index_fragment_id =
1670 if inner.fragment_type_mask & FragmentTypeFlag::VectorIndexWrite as u32 != 0 {
1671 job_id
1672 } else {
1673 None
1674 };
1675
1676 let state_table_ids = internal_tables
1677 .iter()
1678 .map(|t| t.id)
1679 .chain(materialized_fragment_id)
1680 .chain(vector_index_fragment_id)
1681 .collect();
1682
1683 Fragment {
1684 fragment_id: inner.fragment_id,
1685 fragment_type_mask: inner.fragment_type_mask.into(),
1686 distribution_type,
1687 actors,
1688 state_table_ids,
1689 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1690 nodes: stream_node,
1691 }
1692 }
1693
1694 pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1697 if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1698 EitherFragment::Existing(fragment.clone())
1699 } else {
1700 EitherFragment::Building(
1701 self.building_graph
1702 .fragments
1703 .get(&fragment_id)
1704 .unwrap()
1705 .clone(),
1706 )
1707 }
1708 }
1709
1710 pub(super) fn get_downstreams(
1713 &self,
1714 fragment_id: GlobalFragmentId,
1715 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1716 self.building_graph
1717 .get_downstreams(fragment_id)
1718 .iter()
1719 .chain(
1720 self.extra_downstreams
1721 .get(&fragment_id)
1722 .into_iter()
1723 .flatten(),
1724 )
1725 .map(|(&id, edge)| (id, edge))
1726 }
1727
1728 pub(super) fn get_upstreams(
1731 &self,
1732 fragment_id: GlobalFragmentId,
1733 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1734 self.building_graph
1735 .get_upstreams(fragment_id)
1736 .iter()
1737 .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1738 .map(|(&id, edge)| (id, edge))
1739 }
1740
1741 pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1743 &self.building_graph.fragments
1744 }
1745
1746 pub(super) fn building_fragments_mut(
1748 &mut self,
1749 ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1750 &mut self.building_graph.fragments
1751 }
1752
1753 pub(super) fn max_parallelism(&self) -> usize {
1755 self.building_graph.max_parallelism()
1756 }
1757}