1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19use std::sync::atomic::AtomicU32;
20
21use anyhow::{Context, anyhow};
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use risingwave_common::bail;
25use risingwave_common::catalog::{
26 CDC_SOURCE_COLUMN_NUM, ColumnCatalog, Field, FragmentTypeFlag, FragmentTypeMask, TableId,
27 generate_internal_table_name_with_type,
28};
29use risingwave_common::hash::VnodeCount;
30use risingwave_common::id::JobId;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::stream_graph_visitor::{
33 self, visit_stream_node_cont, visit_stream_node_cont_mut,
34};
35use risingwave_connector::sink::catalog::SinkType;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::{PbSink, PbTable, Table};
38use risingwave_pb::ddl_service::TableJobType;
39use risingwave_pb::id::StreamNodeLocalOperatorId;
40use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
41use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
42use risingwave_pb::stream_plan::stream_fragment_graph::{
43 Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
44};
45use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
46use risingwave_pb::stream_plan::{
47 BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
48 PbStreamScanType, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
49 StreamScanType,
50};
51
52use crate::barrier::{SharedFragmentInfo, SnapshotBackfillInfo};
53use crate::controller::id::IdGeneratorManager;
54use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
55use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
56use crate::stream::stream_graph::id::{
57 GlobalActorIdGen, GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen,
58};
59use crate::stream::stream_graph::schedule::Distribution;
60use crate::{MetaError, MetaResult};
61
62#[derive(Debug, Clone)]
65pub(super) struct BuildingFragment {
66 inner: StreamFragment,
68
69 job_id: Option<JobId>,
71
72 upstream_job_columns: HashMap<JobId, Vec<PbColumnDesc>>,
77}
78
79impl BuildingFragment {
80 fn new(
83 id: GlobalFragmentId,
84 fragment: StreamFragment,
85 job: &StreamingJob,
86 table_id_gen: GlobalTableIdGen,
87 ) -> Self {
88 let mut fragment = StreamFragment {
89 fragment_id: id.as_global_id(),
90 ..fragment
91 };
92
93 Self::fill_internal_tables(&mut fragment, job, table_id_gen);
95
96 let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
97 let upstream_job_columns =
98 Self::extract_upstream_columns_except_cross_db_backfill(&fragment);
99
100 Self {
101 inner: fragment,
102 job_id,
103 upstream_job_columns,
104 }
105 }
106
107 fn extract_internal_tables(&self) -> Vec<Table> {
109 let mut fragment = self.inner.clone();
110 let mut tables = Vec::new();
111 stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
112 tables.push(table.clone());
113 });
114 tables
115 }
116
117 fn fill_internal_tables(
119 fragment: &mut StreamFragment,
120 job: &StreamingJob,
121 table_id_gen: GlobalTableIdGen,
122 ) {
123 let fragment_id = fragment.fragment_id;
124 stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
125 table.id = table_id_gen
126 .to_global_id(table.id.as_raw_id())
127 .as_global_id();
128 table.schema_id = job.schema_id();
129 table.database_id = job.database_id();
130 table.name = generate_internal_table_name_with_type(
131 &job.name(),
132 fragment_id,
133 table.id,
134 table_type_name,
135 );
136 table.fragment_id = fragment_id;
137 table.owner = job.owner();
138 table.job_id = Some(job.id());
139 });
140 }
141
142 fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
144 let job_id = job.id();
145 let fragment_id = fragment.fragment_id;
146 let mut has_job = false;
147
148 stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
149 NodeBody::Materialize(materialize_node) => {
150 materialize_node.table_id = job_id.as_mv_table_id();
151
152 let table = materialize_node.table.insert(job.table().unwrap().clone());
154 table.fragment_id = fragment_id; if cfg!(not(debug_assertions)) {
157 table.definition = job.name();
158 }
159
160 has_job = true;
161 }
162 NodeBody::Sink(sink_node) => {
163 sink_node.sink_desc.as_mut().unwrap().id = job_id.as_sink_id();
164
165 has_job = true;
166 }
167 NodeBody::Dml(dml_node) => {
168 dml_node.table_id = job_id.as_mv_table_id();
169 dml_node.table_version_id = job.table_version_id().unwrap();
170 }
171 NodeBody::StreamFsFetch(fs_fetch_node) => {
172 if let StreamingJob::Table(table_source, _, _) = job
173 && let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
174 && let Some(source) = table_source
175 {
176 node_inner.source_id = source.id;
177 if let Some(id) = source.optional_associated_table_id {
178 node_inner.associated_table_id = Some(id.into());
179 }
180 }
181 }
182 NodeBody::Source(source_node) => {
183 match job {
184 StreamingJob::Table(source, _table, _table_job_type) => {
187 if let Some(source_inner) = source_node.source_inner.as_mut()
188 && let Some(source) = source
189 {
190 debug_assert_ne!(source.id, job_id.as_raw_id());
191 source_inner.source_id = source.id;
192 if let Some(id) = source.optional_associated_table_id {
193 source_inner.associated_table_id = Some(id.into());
194 }
195 }
196 }
197 StreamingJob::Source(source) => {
198 has_job = true;
199 if let Some(source_inner) = source_node.source_inner.as_mut() {
200 debug_assert_eq!(source.id, job_id.as_raw_id());
201 source_inner.source_id = source.id;
202 if let Some(id) = source.optional_associated_table_id {
203 source_inner.associated_table_id = Some(id.into());
204 }
205 }
206 }
207 _ => {}
209 }
210 }
211 NodeBody::StreamCdcScan(node) => {
212 if let Some(table_desc) = node.cdc_table_desc.as_mut() {
213 table_desc.table_id = job_id.as_mv_table_id();
214 }
215 }
216 NodeBody::VectorIndexWrite(node) => {
217 let table = node.table.as_mut().unwrap();
218 table.id = job_id.as_mv_table_id();
219 table.database_id = job.database_id();
220 table.schema_id = job.schema_id();
221 table.fragment_id = fragment_id;
222 #[cfg(not(debug_assertions))]
223 {
224 table.definition = job.name();
225 }
226
227 has_job = true;
228 }
229 _ => {}
230 });
231
232 has_job
233 }
234
235 fn extract_upstream_columns_except_cross_db_backfill(
237 fragment: &StreamFragment,
238 ) -> HashMap<JobId, Vec<PbColumnDesc>> {
239 let mut table_columns = HashMap::new();
240
241 stream_graph_visitor::visit_fragment(fragment, |node_body| {
242 let (table_id, column_ids) = match node_body {
243 NodeBody::StreamScan(stream_scan) => {
244 if stream_scan.get_stream_scan_type().unwrap()
245 == StreamScanType::CrossDbSnapshotBackfill
246 {
247 return;
248 }
249 (
250 stream_scan.table_id.as_job_id(),
251 stream_scan.upstream_columns(),
252 )
253 }
254 NodeBody::CdcFilter(cdc_filter) => (
255 cdc_filter.upstream_source_id.as_share_source_job_id(),
256 vec![],
257 ),
258 NodeBody::SourceBackfill(backfill) => (
259 backfill.upstream_source_id.as_share_source_job_id(),
260 backfill.column_descs(),
262 ),
263 _ => return,
264 };
265 table_columns
266 .try_insert(table_id, column_ids)
267 .expect("currently there should be no two same upstream tables in a fragment");
268 });
269
270 table_columns
271 }
272
273 pub fn has_shuffled_backfill(&self) -> bool {
274 let stream_node = match self.inner.node.as_ref() {
275 Some(node) => node,
276 _ => return false,
277 };
278 let mut has_shuffled_backfill = false;
279 let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
280 visit_stream_node_cont(stream_node, |node| {
281 let is_shuffled_backfill = if let Some(node) = &node.node_body
282 && let Some(node) = node.as_stream_scan()
283 {
284 node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
285 || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
286 } else {
287 false
288 };
289 if is_shuffled_backfill {
290 *has_shuffled_backfill_mut_ref = true;
291 false
292 } else {
293 true
294 }
295 });
296 has_shuffled_backfill
297 }
298}
299
300impl Deref for BuildingFragment {
301 type Target = StreamFragment;
302
303 fn deref(&self) -> &Self::Target {
304 &self.inner
305 }
306}
307
308impl DerefMut for BuildingFragment {
309 fn deref_mut(&mut self) -> &mut Self::Target {
310 &mut self.inner
311 }
312}
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
317pub(super) enum EdgeId {
318 Internal {
320 link_id: u64,
323 },
324
325 UpstreamExternal {
328 upstream_job_id: JobId,
330 downstream_fragment_id: GlobalFragmentId,
332 },
333
334 DownstreamExternal(DownstreamExternalEdgeId),
337}
338
339#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
340pub(super) struct DownstreamExternalEdgeId {
341 pub(super) original_upstream_fragment_id: GlobalFragmentId,
343 pub(super) downstream_fragment_id: GlobalFragmentId,
345}
346
347#[derive(Debug, Clone)]
351pub(super) struct StreamFragmentEdge {
352 pub id: EdgeId,
354
355 pub dispatch_strategy: DispatchStrategy,
357}
358
359impl StreamFragmentEdge {
360 fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
361 Self {
362 id: EdgeId::Internal {
365 link_id: edge.link_id,
366 },
367 dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
368 }
369 }
370}
371
372fn clone_fragment(
373 fragment: &Fragment,
374 id_generator_manager: &IdGeneratorManager,
375 actor_id_counter: &AtomicU32,
376) -> Fragment {
377 let fragment_id = GlobalFragmentIdGen::new(id_generator_manager, 1)
378 .to_global_id(0)
379 .as_global_id();
380 let actor_id_gen = GlobalActorIdGen::new(actor_id_counter, fragment.actors.len() as _);
381 Fragment {
382 fragment_id,
383 fragment_type_mask: fragment.fragment_type_mask,
384 distribution_type: fragment.distribution_type,
385 actors: fragment
386 .actors
387 .iter()
388 .enumerate()
389 .map(|(i, actor)| StreamActor {
390 actor_id: actor_id_gen.to_global_id(i as _).as_global_id(),
391 fragment_id,
392 vnode_bitmap: actor.vnode_bitmap.clone(),
393 mview_definition: actor.mview_definition.clone(),
394 expr_context: actor.expr_context.clone(),
395 config_override: actor.config_override.clone(),
396 })
397 .collect(),
398 state_table_ids: fragment.state_table_ids.clone(),
399 maybe_vnode_count: fragment.maybe_vnode_count,
400 nodes: fragment.nodes.clone(),
401 }
402}
403
404pub fn check_sink_fragments_support_refresh_schema(
405 fragments: &BTreeMap<FragmentId, Fragment>,
406) -> MetaResult<()> {
407 if fragments.len() != 1 {
408 return Err(anyhow!(
409 "sink with auto schema change should have only 1 fragment, but got {:?}",
410 fragments.len()
411 )
412 .into());
413 }
414 let (_, fragment) = fragments.first_key_value().expect("non-empty");
415 let sink_node = &fragment.nodes;
416 let PbNodeBody::Sink(_) = sink_node.node_body.as_ref().unwrap() else {
417 return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
418 };
419 let [stream_scan_node] = sink_node.input.as_slice() else {
420 panic!("Sink has more than 1 input: {:?}", sink_node.input);
421 };
422 let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_ref().unwrap() else {
423 return Err(anyhow!(
424 "expect PbNodeBody::StreamScan but got: {:?}",
425 stream_scan_node.node_body
426 )
427 .into());
428 };
429 let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
430 if stream_scan_type != PbStreamScanType::ArrangementBackfill {
431 return Err(anyhow!(
432 "unsupported stream_scan_type for auto refresh schema: {:?}",
433 stream_scan_type
434 )
435 .into());
436 }
437 let [merge_node, _batch_plan_node] = stream_scan_node.input.as_slice() else {
438 panic!(
439 "the number of StreamScan inputs is not 2: {:?}",
440 stream_scan_node.input
441 );
442 };
443 let NodeBody::Merge(_) = merge_node.node_body.as_ref().unwrap() else {
444 return Err(anyhow!(
445 "expect PbNodeBody::Merge but got: {:?}",
446 merge_node.node_body
447 )
448 .into());
449 };
450 Ok(())
451}
452
453pub fn rewrite_refresh_schema_sink_fragment(
454 original_sink_fragment: &Fragment,
455 sink: &PbSink,
456 newly_added_columns: &[ColumnCatalog],
457 upstream_table: &PbTable,
458 upstream_table_fragment_id: FragmentId,
459 id_generator_manager: &IdGeneratorManager,
460 actor_id_counter: &AtomicU32,
461) -> MetaResult<(Fragment, Vec<PbColumnCatalog>, Option<PbTable>)> {
462 let mut new_sink_columns = sink.columns.clone();
463 fn extend_sink_columns(
464 sink_columns: &mut Vec<PbColumnCatalog>,
465 new_columns: &[ColumnCatalog],
466 get_column_name: impl Fn(&String) -> String,
467 ) {
468 let next_column_id = sink_columns
469 .iter()
470 .map(|col| col.column_desc.as_ref().unwrap().column_id + 1)
471 .max()
472 .unwrap_or(1);
473 sink_columns.extend(new_columns.iter().enumerate().map(|(i, col)| {
474 let mut col = col.to_protobuf();
475 let column_desc = col.column_desc.as_mut().unwrap();
476 column_desc.column_id = next_column_id + (i as i32);
477 column_desc.name = get_column_name(&column_desc.name);
478 col
479 }));
480 }
481 extend_sink_columns(&mut new_sink_columns, newly_added_columns, |name| {
482 name.clone()
483 });
484
485 let mut new_sink_fragment = clone_fragment(
486 original_sink_fragment,
487 id_generator_manager,
488 actor_id_counter,
489 );
490 let sink_node = &mut new_sink_fragment.nodes;
491 let PbNodeBody::Sink(sink_node_body) = sink_node.node_body.as_mut().unwrap() else {
492 return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
493 };
494 let [stream_scan_node] = sink_node.input.as_mut_slice() else {
495 panic!("Sink has more than 1 input: {:?}", sink_node.input);
496 };
497 let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_mut().unwrap() else {
498 return Err(anyhow!(
499 "expect PbNodeBody::StreamScan but got: {:?}",
500 stream_scan_node.node_body
501 )
502 .into());
503 };
504 let [merge_node, _batch_plan_node] = stream_scan_node.input.as_mut_slice() else {
505 panic!(
506 "the number of StreamScan inputs is not 2: {:?}",
507 stream_scan_node.input
508 );
509 };
510 let NodeBody::Merge(merge) = merge_node.node_body.as_mut().unwrap() else {
511 return Err(anyhow!(
512 "expect PbNodeBody::Merge but got: {:?}",
513 merge_node.node_body
514 )
515 .into());
516 };
517 sink_node.identity = {
520 let sink_type = SinkType::from_proto(sink.sink_type());
521 let sink_type_str = sink_type.type_str();
522 let column_names = new_sink_columns
523 .iter()
524 .map(|col| {
525 ColumnCatalog::from(col.clone())
526 .name_with_hidden()
527 .to_string()
528 })
529 .join(", ");
530 let downstream_pk = if !sink_type.is_append_only() {
531 let downstream_pk = sink
532 .downstream_pk
533 .iter()
534 .map(|i| &sink.columns[*i as usize].column_desc.as_ref().unwrap().name)
535 .collect_vec();
536 format!(", downstream_pk: {downstream_pk:?}")
537 } else {
538 "".to_owned()
539 };
540 format!("StreamSink {{ type: {sink_type_str}, columns: [{column_names}]{downstream_pk} }}")
541 };
542 sink_node
543 .fields
544 .extend(newly_added_columns.iter().map(|col| {
545 Field::new(
546 format!("{}.{}", upstream_table.name, col.column_desc.name),
547 col.data_type().clone(),
548 )
549 .to_prost()
550 }));
551
552 let new_log_store_table = if let Some(log_store_table) = &mut sink_node_body.table {
553 extend_sink_columns(&mut log_store_table.columns, newly_added_columns, |name| {
554 format!("{}_{}", upstream_table.name, name)
555 });
556 Some(log_store_table.clone())
557 } else {
558 None
559 };
560 sink_node_body.sink_desc.as_mut().unwrap().column_catalogs = new_sink_columns.clone();
561
562 stream_scan_node
564 .fields
565 .extend(newly_added_columns.iter().map(|col| {
566 Field::new(
567 format!("{}.{}", upstream_table.name, col.column_desc.name),
568 col.data_type().clone(),
569 )
570 .to_prost()
571 }));
572 stream_scan_node.identity = {
574 let columns = stream_scan_node
575 .fields
576 .iter()
577 .map(|col| &col.name)
578 .join(", ");
579 format!("StreamTableScan {{ table: t, columns: [{columns}] }}")
580 };
581
582 let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
583 if stream_scan_type != PbStreamScanType::ArrangementBackfill {
584 return Err(anyhow!(
585 "unsupported stream_scan_type for auto refresh schema: {:?}",
586 stream_scan_type
587 )
588 .into());
589 }
590 scan.arrangement_table = Some(upstream_table.clone());
591 scan.output_indices.extend(
592 (0..newly_added_columns.len()).map(|i| (i + scan.upstream_column_ids.len()) as u32),
593 );
594 scan.upstream_column_ids.extend(
595 newly_added_columns
596 .iter()
597 .map(|col| col.column_id().get_id()),
598 );
599 let table_desc = scan.table_desc.as_mut().unwrap();
600 table_desc
601 .value_indices
602 .extend((0..newly_added_columns.len()).map(|i| (i + table_desc.columns.len()) as u32));
603 table_desc.columns.extend(
604 newly_added_columns
605 .iter()
606 .map(|col| col.column_desc.to_protobuf()),
607 );
608
609 merge_node.fields = scan
611 .upstream_column_ids
612 .iter()
613 .map(|&column_id| {
614 let col = upstream_table
615 .columns
616 .iter()
617 .find(|c| c.column_desc.as_ref().unwrap().column_id == column_id)
618 .unwrap();
619 let col_desc = col.column_desc.as_ref().unwrap();
620 Field::new(
621 col_desc.name.clone(),
622 col_desc.column_type.as_ref().unwrap().into(),
623 )
624 .to_prost()
625 })
626 .collect();
627 merge.upstream_fragment_id = upstream_table_fragment_id;
628 Ok((new_sink_fragment, new_sink_columns, new_log_store_table))
629}
630
631pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
636
637#[derive(Default, Debug)]
644pub struct StreamFragmentGraph {
645 pub(super) fragments: HashMap<GlobalFragmentId, BuildingFragment>,
647
648 pub(super) downstreams:
650 HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
651
652 pub(super) upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
654
655 dependent_table_ids: HashSet<TableId>,
657
658 specified_parallelism: Option<NonZeroUsize>,
661 specified_backfill_parallelism: Option<NonZeroUsize>,
664
665 max_parallelism: usize,
675
676 backfill_order: BackfillOrder,
678}
679
680impl StreamFragmentGraph {
681 pub fn new(
684 env: &MetaSrvEnv,
685 proto: StreamFragmentGraphProto,
686 job: &StreamingJob,
687 ) -> MetaResult<Self> {
688 let fragment_id_gen =
689 GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
690 let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
694
695 let fragments: HashMap<_, _> = proto
697 .fragments
698 .into_iter()
699 .map(|(id, fragment)| {
700 let id = fragment_id_gen.to_global_id(id.as_raw_id());
701 let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
702 (id, fragment)
703 })
704 .collect();
705
706 assert_eq!(
707 fragments
708 .values()
709 .map(|f| f.extract_internal_tables().len() as u32)
710 .sum::<u32>(),
711 proto.table_ids_cnt
712 );
713
714 let mut downstreams = HashMap::new();
716 let mut upstreams = HashMap::new();
717
718 for edge in proto.edges {
719 let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id.as_raw_id());
720 let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id.as_raw_id());
721 let edge = StreamFragmentEdge::from_protobuf(&edge);
722
723 upstreams
724 .entry(downstream_id)
725 .or_insert_with(HashMap::new)
726 .try_insert(upstream_id, edge.clone())
727 .unwrap();
728 downstreams
729 .entry(upstream_id)
730 .or_insert_with(HashMap::new)
731 .try_insert(downstream_id, edge)
732 .unwrap();
733 }
734
735 let dependent_table_ids = proto.dependent_table_ids.iter().copied().collect();
738
739 let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
740 Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
741 } else {
742 None
743 };
744 let specified_backfill_parallelism =
745 if let Some(Parallelism { parallelism }) = proto.backfill_parallelism {
746 Some(
747 NonZeroUsize::new(parallelism as usize)
748 .context("backfill parallelism should not be 0")?,
749 )
750 } else {
751 None
752 };
753
754 let max_parallelism = proto.max_parallelism as usize;
755 let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
756 order: Default::default(),
757 });
758
759 Ok(Self {
760 fragments,
761 downstreams,
762 upstreams,
763 dependent_table_ids,
764 specified_parallelism,
765 specified_backfill_parallelism,
766 max_parallelism,
767 backfill_order,
768 })
769 }
770
771 pub fn incomplete_internal_tables(&self) -> BTreeMap<TableId, Table> {
777 let mut tables = BTreeMap::new();
778 for fragment in self.fragments.values() {
779 for table in fragment.extract_internal_tables() {
780 let table_id = table.id;
781 tables
782 .try_insert(table_id, table)
783 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
784 }
785 }
786 tables
787 }
788
789 pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<TableId, TableId>) {
792 for fragment in self.fragments.values_mut() {
793 stream_graph_visitor::visit_internal_tables(
794 &mut fragment.inner,
795 |table, _table_type_name| {
796 let target = table_id_map.get(&table.id).cloned().unwrap();
797 table.id = target;
798 },
799 );
800 }
801 }
802
803 pub fn fit_internal_tables_trivial(
806 &mut self,
807 mut old_internal_tables: Vec<Table>,
808 ) -> MetaResult<()> {
809 let mut new_internal_table_ids = Vec::new();
810 for fragment in self.fragments.values() {
811 for table in &fragment.extract_internal_tables() {
812 new_internal_table_ids.push(table.id);
813 }
814 }
815
816 if new_internal_table_ids.len() != old_internal_tables.len() {
817 bail!(
818 "Different number of internal tables. New: {}, Old: {}",
819 new_internal_table_ids.len(),
820 old_internal_tables.len()
821 );
822 }
823 old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
824 new_internal_table_ids.sort();
825
826 let internal_table_id_map = new_internal_table_ids
827 .into_iter()
828 .zip_eq_fast(old_internal_tables.into_iter())
829 .collect::<HashMap<_, _>>();
830
831 for fragment in self.fragments.values_mut() {
834 stream_graph_visitor::visit_internal_tables(
835 &mut fragment.inner,
836 |table, _table_type_name| {
837 let target = internal_table_id_map.get(&table.id).cloned().unwrap();
839 *table = target;
840 },
841 );
842 }
843
844 Ok(())
845 }
846
847 pub fn fit_internal_table_ids_with_mapping(&mut self, mut matches: HashMap<TableId, Table>) {
849 for fragment in self.fragments.values_mut() {
850 stream_graph_visitor::visit_internal_tables(
851 &mut fragment.inner,
852 |table, _table_type_name| {
853 let target = matches.remove(&table.id).unwrap_or_else(|| {
854 panic!("no matching table for table {}({})", table.id, table.name)
855 });
856 table.id = target.id;
857 table.maybe_vnode_count = target.maybe_vnode_count;
858 },
859 );
860 }
861 }
862
863 pub fn fit_snapshot_backfill_epochs(
864 &mut self,
865 mut snapshot_backfill_epochs: HashMap<StreamNodeLocalOperatorId, u64>,
866 ) {
867 for fragment in self.fragments.values_mut() {
868 visit_stream_node_cont_mut(fragment.node.as_mut().unwrap(), |node| {
869 if let PbNodeBody::StreamScan(scan) = node.node_body.as_mut().unwrap()
870 && let StreamScanType::SnapshotBackfill
871 | StreamScanType::CrossDbSnapshotBackfill = scan.stream_scan_type()
872 {
873 let Some(epoch) = snapshot_backfill_epochs.remove(&node.operator_id) else {
874 panic!("no snapshot epoch found for node {:?}", node)
875 };
876 scan.snapshot_backfill_epoch = Some(epoch);
877 }
878 true
879 })
880 }
881 }
882
883 pub fn table_fragment_id(&self) -> FragmentId {
885 self.fragments
886 .values()
887 .filter(|b| b.job_id.is_some())
888 .map(|b| b.fragment_id)
889 .exactly_one()
890 .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
891 }
892
893 pub fn dml_fragment_id(&self) -> Option<FragmentId> {
895 self.fragments
896 .values()
897 .filter(|b| {
898 FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
899 })
900 .map(|b| b.fragment_id)
901 .at_most_one()
902 .expect("require at most 1 dml node when creating the streaming job")
903 }
904
905 pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
907 &self.dependent_table_ids
908 }
909
910 pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
912 self.specified_parallelism
913 }
914
915 pub fn specified_backfill_parallelism(&self) -> Option<NonZeroUsize> {
917 self.specified_backfill_parallelism
918 }
919
920 pub fn max_parallelism(&self) -> usize {
922 self.max_parallelism
923 }
924
925 fn get_downstreams(
927 &self,
928 fragment_id: GlobalFragmentId,
929 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
930 self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
931 }
932
933 fn get_upstreams(
935 &self,
936 fragment_id: GlobalFragmentId,
937 ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
938 self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
939 }
940
941 pub fn collect_snapshot_backfill_info(
942 &self,
943 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
944 Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
945 (
946 fragment.node.as_ref().unwrap(),
947 fragment.fragment_type_mask.into(),
948 )
949 }))
950 }
951
952 pub fn collect_snapshot_backfill_info_impl(
954 fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
955 ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
956 let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
957 let mut cross_db_info = SnapshotBackfillInfo {
958 upstream_mv_table_id_to_backfill_epoch: Default::default(),
959 };
960 let mut result = Ok(());
961 for (node, fragment_type_mask) in fragments {
962 visit_stream_node_cont(node, |node| {
963 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
964 let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
965 .expect("invalid stream_scan_type");
966 let is_snapshot_backfill = match stream_scan_type {
967 StreamScanType::SnapshotBackfill => {
968 assert!(
969 fragment_type_mask
970 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
971 );
972 true
973 }
974 StreamScanType::CrossDbSnapshotBackfill => {
975 assert!(
976 fragment_type_mask
977 .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
978 );
979 cross_db_info
980 .upstream_mv_table_id_to_backfill_epoch
981 .insert(stream_scan.table_id, stream_scan.snapshot_backfill_epoch);
982
983 return true;
984 }
985 _ => false,
986 };
987
988 match &mut prev_stream_scan {
989 Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
990 match (prev_snapshot_backfill_info, is_snapshot_backfill) {
991 (Some(prev_snapshot_backfill_info), true) => {
992 prev_snapshot_backfill_info
993 .upstream_mv_table_id_to_backfill_epoch
994 .insert(
995 stream_scan.table_id,
996 stream_scan.snapshot_backfill_epoch,
997 );
998 true
999 }
1000 (None, false) => true,
1001 (_, _) => {
1002 result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
1003 false
1004 }
1005 }
1006 }
1007 None => {
1008 prev_stream_scan = Some((
1009 if is_snapshot_backfill {
1010 Some(SnapshotBackfillInfo {
1011 upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
1012 [(
1013 stream_scan.table_id,
1014 stream_scan.snapshot_backfill_epoch,
1015 )],
1016 ),
1017 })
1018 } else {
1019 None
1020 },
1021 *stream_scan.clone(),
1022 ));
1023 true
1024 }
1025 }
1026 } else {
1027 true
1028 }
1029 })
1030 }
1031 result.map(|_| {
1032 (
1033 prev_stream_scan
1034 .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
1035 .unwrap_or(None),
1036 cross_db_info,
1037 )
1038 })
1039 }
1040
1041 pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
1043 let mut mapping = HashMap::new();
1044 for (fragment_id, fragment) in &self.fragments {
1045 let fragment_id = fragment_id.as_global_id();
1046 let fragment_mask = fragment.fragment_type_mask;
1047 let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
1048 let has_some_scan = candidates
1049 .into_iter()
1050 .any(|flag| (fragment_mask & flag as u32) > 0);
1051 if has_some_scan {
1052 visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
1053 match node.node_body.as_ref() {
1054 Some(NodeBody::StreamScan(stream_scan)) => {
1055 let table_id = stream_scan.table_id;
1056 let fragments: &mut Vec<_> =
1057 mapping.entry(table_id.as_raw_id()).or_default();
1058 fragments.push(fragment_id);
1059 false
1061 }
1062 Some(NodeBody::SourceBackfill(source_backfill)) => {
1063 let source_id = source_backfill.upstream_source_id;
1064 let fragments: &mut Vec<_> =
1065 mapping.entry(source_id.as_raw_id()).or_default();
1066 fragments.push(fragment_id);
1067 false
1069 }
1070 _ => true,
1071 }
1072 })
1073 }
1074 }
1075 mapping
1076 }
1077
1078 pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
1082 let mapping = self.collect_backfill_mapping();
1083 let mut fragment_ordering: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1084
1085 for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
1087 let fragment_ids = mapping.get(rel_id).unwrap();
1088 for fragment_id in fragment_ids {
1089 let downstream_fragment_ids = downstream_rel_ids
1090 .data
1091 .iter()
1092 .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
1093 .copied()
1094 .collect();
1095 fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
1096 }
1097 }
1098
1099 if fragment_ordering.is_empty() {
1102 for value in mapping.values() {
1103 for &fragment_id in value {
1104 fragment_ordering.entry(fragment_id).or_default();
1105 }
1106 }
1107 }
1108
1109 let locality_provider_dependencies = self.find_locality_provider_dependencies();
1111
1112 let backfill_fragments: HashSet<FragmentId> = mapping.values().flatten().copied().collect();
1113
1114 let all_locality_provider_fragments: HashSet<FragmentId> =
1117 locality_provider_dependencies.keys().copied().collect();
1118 let downstream_locality_provider_fragments: HashSet<FragmentId> =
1119 locality_provider_dependencies
1120 .values()
1121 .flatten()
1122 .copied()
1123 .collect();
1124 let locality_provider_root_fragments: Vec<FragmentId> = all_locality_provider_fragments
1125 .difference(&downstream_locality_provider_fragments)
1126 .copied()
1127 .collect();
1128
1129 for &backfill_fragment_id in &backfill_fragments {
1132 fragment_ordering
1133 .entry(backfill_fragment_id)
1134 .or_default()
1135 .extend(locality_provider_root_fragments.iter().copied());
1136 }
1137
1138 for (fragment_id, downstream_fragments) in locality_provider_dependencies {
1140 fragment_ordering
1141 .entry(fragment_id)
1142 .or_default()
1143 .extend(downstream_fragments);
1144 }
1145
1146 for downstream in fragment_ordering.values_mut() {
1150 let mut seen = HashSet::new();
1151 downstream.retain(|id| seen.insert(*id));
1152 }
1153
1154 fragment_ordering
1155 }
1156
1157 pub fn find_locality_provider_fragment_state_table_mapping(
1158 &self,
1159 ) -> HashMap<FragmentId, Vec<TableId>> {
1160 let mut mapping: HashMap<FragmentId, Vec<TableId>> = HashMap::new();
1161
1162 for (fragment_id, fragment) in &self.fragments {
1163 let fragment_id = fragment_id.as_global_id();
1164
1165 if let Some(node) = fragment.node.as_ref() {
1167 let mut state_table_ids = Vec::new();
1168
1169 visit_stream_node_cont(node, |stream_node| {
1170 if let Some(NodeBody::LocalityProvider(locality_provider)) =
1171 stream_node.node_body.as_ref()
1172 {
1173 let state_table_id = locality_provider
1175 .state_table
1176 .as_ref()
1177 .expect("must have state table")
1178 .id;
1179 state_table_ids.push(state_table_id);
1180 false } else {
1182 true }
1184 });
1185
1186 if !state_table_ids.is_empty() {
1187 mapping.insert(fragment_id, state_table_ids);
1188 }
1189 }
1190 }
1191
1192 mapping
1193 }
1194
1195 pub fn find_locality_provider_dependencies(&self) -> HashMap<FragmentId, Vec<FragmentId>> {
1203 let mut locality_provider_fragments = HashSet::new();
1204 let mut dependencies: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1205
1206 for (fragment_id, fragment) in &self.fragments {
1208 let fragment_id = fragment_id.as_global_id();
1209 let has_locality_provider = self.fragment_has_locality_provider(fragment);
1210
1211 if has_locality_provider {
1212 locality_provider_fragments.insert(fragment_id);
1213 dependencies.entry(fragment_id).or_default();
1214 }
1215 }
1216
1217 for &provider_fragment_id in &locality_provider_fragments {
1221 let provider_fragment_global_id = GlobalFragmentId::new(provider_fragment_id);
1222
1223 let mut visited = HashSet::new();
1225 let mut downstream_locality_providers = Vec::new();
1226
1227 self.collect_downstream_locality_providers(
1228 provider_fragment_global_id,
1229 &locality_provider_fragments,
1230 &mut visited,
1231 &mut downstream_locality_providers,
1232 );
1233
1234 dependencies
1236 .entry(provider_fragment_id)
1237 .or_default()
1238 .extend(downstream_locality_providers);
1239 }
1240
1241 dependencies
1242 }
1243
1244 fn fragment_has_locality_provider(&self, fragment: &BuildingFragment) -> bool {
1245 let mut has_locality_provider = false;
1246
1247 if let Some(node) = fragment.node.as_ref() {
1248 visit_stream_node_cont(node, |stream_node| {
1249 if let Some(NodeBody::LocalityProvider(_)) = stream_node.node_body.as_ref() {
1250 has_locality_provider = true;
1251 false } else {
1253 true }
1255 });
1256 }
1257
1258 has_locality_provider
1259 }
1260
1261 fn collect_downstream_locality_providers(
1263 &self,
1264 current_fragment_id: GlobalFragmentId,
1265 locality_provider_fragments: &HashSet<FragmentId>,
1266 visited: &mut HashSet<GlobalFragmentId>,
1267 downstream_providers: &mut Vec<FragmentId>,
1268 ) {
1269 if visited.contains(¤t_fragment_id) {
1270 return;
1271 }
1272 visited.insert(current_fragment_id);
1273
1274 for &downstream_id in self.get_downstreams(current_fragment_id).keys() {
1276 let downstream_fragment_id = downstream_id.as_global_id();
1277
1278 if locality_provider_fragments.contains(&downstream_fragment_id) {
1280 downstream_providers.push(downstream_fragment_id);
1281 }
1282
1283 self.collect_downstream_locality_providers(
1285 downstream_id,
1286 locality_provider_fragments,
1287 visited,
1288 downstream_providers,
1289 );
1290 }
1291 }
1292}
1293
1294pub fn fill_snapshot_backfill_epoch(
1297 node: &mut StreamNode,
1298 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1299 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1300) -> MetaResult<bool> {
1301 let mut result = Ok(());
1302 let mut applied = false;
1303 visit_stream_node_cont_mut(node, |node| {
1304 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
1305 && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
1306 || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
1307 {
1308 result = try {
1309 let table_id = stream_scan.table_id;
1310 let snapshot_epoch = cross_db_snapshot_backfill_info
1311 .upstream_mv_table_id_to_backfill_epoch
1312 .get(&table_id)
1313 .or_else(|| {
1314 snapshot_backfill_info.and_then(|snapshot_backfill_info| {
1315 snapshot_backfill_info
1316 .upstream_mv_table_id_to_backfill_epoch
1317 .get(&table_id)
1318 })
1319 })
1320 .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
1321 .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
1322 if let Some(prev_snapshot_epoch) =
1323 stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
1324 {
1325 Err(anyhow!(
1326 "snapshot backfill epoch set again: {} {} {}",
1327 table_id,
1328 prev_snapshot_epoch,
1329 snapshot_epoch
1330 ))?;
1331 }
1332 applied = true;
1333 };
1334 result.is_ok()
1335 } else {
1336 true
1337 }
1338 });
1339 result.map(|_| applied)
1340}
1341
1342static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
1343 LazyLock::new(HashMap::new);
1344
1345#[derive(Debug, Clone, EnumAsInner)]
1348pub(super) enum EitherFragment {
1349 Building(BuildingFragment),
1351
1352 Existing(SharedFragmentInfo),
1354}
1355
1356#[derive(Debug)]
1365pub struct CompleteStreamFragmentGraph {
1366 building_graph: StreamFragmentGraph,
1368
1369 existing_fragments: HashMap<GlobalFragmentId, SharedFragmentInfo>,
1371
1372 existing_actor_location: HashMap<ActorId, WorkerId>,
1374
1375 extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1377
1378 extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1380}
1381
1382pub struct FragmentGraphUpstreamContext {
1383 pub upstream_root_fragments: HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1386 pub upstream_actor_location: HashMap<ActorId, WorkerId>,
1387}
1388
1389pub struct FragmentGraphDownstreamContext {
1390 pub original_root_fragment_id: FragmentId,
1391 pub downstream_fragments: Vec<(DispatcherType, SharedFragmentInfo, PbStreamNode)>,
1392 pub downstream_actor_location: HashMap<ActorId, WorkerId>,
1393}
1394
1395impl CompleteStreamFragmentGraph {
1396 #[cfg(test)]
1399 pub fn for_test(graph: StreamFragmentGraph) -> Self {
1400 Self {
1401 building_graph: graph,
1402 existing_fragments: Default::default(),
1403 existing_actor_location: Default::default(),
1404 extra_downstreams: Default::default(),
1405 extra_upstreams: Default::default(),
1406 }
1407 }
1408
1409 pub fn with_upstreams(
1413 graph: StreamFragmentGraph,
1414 upstream_context: FragmentGraphUpstreamContext,
1415 job_type: StreamingJobType,
1416 ) -> MetaResult<Self> {
1417 Self::build_helper(graph, Some(upstream_context), None, job_type)
1418 }
1419
1420 pub fn with_downstreams(
1423 graph: StreamFragmentGraph,
1424 downstream_context: FragmentGraphDownstreamContext,
1425 job_type: StreamingJobType,
1426 ) -> MetaResult<Self> {
1427 Self::build_helper(graph, None, Some(downstream_context), job_type)
1428 }
1429
1430 pub fn with_upstreams_and_downstreams(
1432 graph: StreamFragmentGraph,
1433 upstream_context: FragmentGraphUpstreamContext,
1434 downstream_context: FragmentGraphDownstreamContext,
1435 job_type: StreamingJobType,
1436 ) -> MetaResult<Self> {
1437 Self::build_helper(
1438 graph,
1439 Some(upstream_context),
1440 Some(downstream_context),
1441 job_type,
1442 )
1443 }
1444
1445 fn build_helper(
1447 mut graph: StreamFragmentGraph,
1448 upstream_ctx: Option<FragmentGraphUpstreamContext>,
1449 downstream_ctx: Option<FragmentGraphDownstreamContext>,
1450 job_type: StreamingJobType,
1451 ) -> MetaResult<Self> {
1452 let mut extra_downstreams = HashMap::new();
1453 let mut extra_upstreams = HashMap::new();
1454 let mut existing_fragments = HashMap::new();
1455
1456 let mut existing_actor_location = HashMap::new();
1457
1458 if let Some(FragmentGraphUpstreamContext {
1459 upstream_root_fragments,
1460 upstream_actor_location,
1461 }) = upstream_ctx
1462 {
1463 for (&id, fragment) in &mut graph.fragments {
1464 let uses_shuffled_backfill = fragment.has_shuffled_backfill();
1465
1466 for (&upstream_job_id, required_columns) in &fragment.upstream_job_columns {
1467 let (upstream_fragment, nodes) = upstream_root_fragments
1468 .get(&upstream_job_id)
1469 .context("upstream fragment not found")?;
1470 let upstream_root_fragment_id =
1471 GlobalFragmentId::new(upstream_fragment.fragment_id);
1472
1473 let edge = match job_type {
1474 StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1475 assert_ne!(
1478 (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
1479 0
1480 );
1481
1482 tracing::debug!(
1483 ?upstream_root_fragment_id,
1484 ?required_columns,
1485 identity = ?fragment.inner.get_node().unwrap().get_identity(),
1486 current_frag_id=?id,
1487 "CdcFilter with upstream source fragment"
1488 );
1489
1490 StreamFragmentEdge {
1491 id: EdgeId::UpstreamExternal {
1492 upstream_job_id,
1493 downstream_fragment_id: id,
1494 },
1495 dispatch_strategy: DispatchStrategy {
1498 r#type: DispatcherType::NoShuffle as _,
1499 dist_key_indices: vec![], output_mapping: DispatchOutputMapping::identical(
1501 CDC_SOURCE_COLUMN_NUM as _,
1502 )
1503 .into(),
1504 },
1505 }
1506 }
1507
1508 StreamingJobType::MaterializedView
1510 | StreamingJobType::Sink
1511 | StreamingJobType::Index => {
1512 if upstream_fragment
1515 .fragment_type_mask
1516 .contains(FragmentTypeFlag::Mview)
1517 {
1518 let (dist_key_indices, output_mapping) = {
1520 let mview_node =
1521 nodes.get_node_body().unwrap().as_materialize().unwrap();
1522 let all_columns = mview_node.column_descs();
1523 let dist_key_indices = mview_node.dist_key_indices();
1524 let output_mapping = gen_output_mapping(
1525 required_columns,
1526 &all_columns,
1527 )
1528 .context(
1529 "BUG: column not found in the upstream materialized view",
1530 )?;
1531 (dist_key_indices, output_mapping)
1532 };
1533 let dispatch_strategy = mv_on_mv_dispatch_strategy(
1534 uses_shuffled_backfill,
1535 dist_key_indices,
1536 output_mapping,
1537 );
1538
1539 StreamFragmentEdge {
1540 id: EdgeId::UpstreamExternal {
1541 upstream_job_id,
1542 downstream_fragment_id: id,
1543 },
1544 dispatch_strategy,
1545 }
1546 }
1547 else if upstream_fragment
1550 .fragment_type_mask
1551 .contains(FragmentTypeFlag::Source)
1552 {
1553 let output_mapping = {
1554 let source_node =
1555 nodes.get_node_body().unwrap().as_source().unwrap();
1556
1557 let all_columns = source_node.column_descs().unwrap();
1558 gen_output_mapping(required_columns, &all_columns).context(
1559 "BUG: column not found in the upstream source node",
1560 )?
1561 };
1562
1563 StreamFragmentEdge {
1564 id: EdgeId::UpstreamExternal {
1565 upstream_job_id,
1566 downstream_fragment_id: id,
1567 },
1568 dispatch_strategy: DispatchStrategy {
1571 r#type: DispatcherType::NoShuffle as _,
1572 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1574 },
1575 }
1576 } else {
1577 bail!(
1578 "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1579 upstream_fragment.fragment_type_mask
1580 )
1581 }
1582 }
1583 StreamingJobType::Source | StreamingJobType::Table(_) => {
1584 bail!(
1585 "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1586 job_type
1587 )
1588 }
1589 };
1590
1591 extra_downstreams
1593 .entry(upstream_root_fragment_id)
1594 .or_insert_with(HashMap::new)
1595 .try_insert(id, edge.clone())
1596 .unwrap();
1597 extra_upstreams
1598 .entry(id)
1599 .or_insert_with(HashMap::new)
1600 .try_insert(upstream_root_fragment_id, edge)
1601 .unwrap();
1602 }
1603 }
1604
1605 existing_fragments.extend(
1606 upstream_root_fragments
1607 .into_values()
1608 .map(|(f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1609 );
1610
1611 existing_actor_location.extend(upstream_actor_location);
1612 }
1613
1614 if let Some(FragmentGraphDownstreamContext {
1615 original_root_fragment_id,
1616 downstream_fragments,
1617 downstream_actor_location,
1618 }) = downstream_ctx
1619 {
1620 let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1621 let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1622
1623 for (dispatcher_type, fragment, nodes) in &downstream_fragments {
1626 let id = GlobalFragmentId::new(fragment.fragment_id);
1627
1628 let output_columns = {
1630 let mut res = None;
1631
1632 stream_graph_visitor::visit_stream_node_body(nodes, |node_body| {
1633 let columns = match node_body {
1634 NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1635 NodeBody::SourceBackfill(source_backfill) => {
1636 source_backfill.column_descs()
1638 }
1639 _ => return,
1640 };
1641 res = Some(columns);
1642 });
1643
1644 res.context("failed to locate downstream scan")?
1645 };
1646
1647 let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1648 let nodes = table_fragment.node.as_ref().unwrap();
1649
1650 let (dist_key_indices, output_mapping) = match job_type {
1651 StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1652 let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1653 let all_columns = mview_node.column_descs();
1654 let dist_key_indices = mview_node.dist_key_indices();
1655 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1656 .ok_or_else(|| {
1657 MetaError::invalid_parameter(
1658 "unable to drop the column due to \
1659 being referenced by downstream materialized views or sinks",
1660 )
1661 })?;
1662 (dist_key_indices, output_mapping)
1663 }
1664
1665 StreamingJobType::Source => {
1666 let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1667 let all_columns = source_node.column_descs().unwrap();
1668 let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1669 .ok_or_else(|| {
1670 MetaError::invalid_parameter(
1671 "unable to drop the column due to \
1672 being referenced by downstream materialized views or sinks",
1673 )
1674 })?;
1675 assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1676 (
1677 vec![], output_mapping,
1679 )
1680 }
1681
1682 _ => bail!("unsupported job type for replacement: {job_type:?}"),
1683 };
1684
1685 let edge = StreamFragmentEdge {
1686 id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1687 original_upstream_fragment_id: original_table_fragment_id,
1688 downstream_fragment_id: id,
1689 }),
1690 dispatch_strategy: DispatchStrategy {
1691 r#type: *dispatcher_type as i32,
1692 output_mapping: Some(output_mapping),
1693 dist_key_indices,
1694 },
1695 };
1696
1697 extra_downstreams
1698 .entry(table_fragment_id)
1699 .or_insert_with(HashMap::new)
1700 .try_insert(id, edge.clone())
1701 .unwrap();
1702 extra_upstreams
1703 .entry(id)
1704 .or_insert_with(HashMap::new)
1705 .try_insert(table_fragment_id, edge)
1706 .unwrap();
1707 }
1708
1709 existing_fragments.extend(
1710 downstream_fragments
1711 .into_iter()
1712 .map(|(_, f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1713 );
1714
1715 existing_actor_location.extend(downstream_actor_location);
1716 }
1717
1718 Ok(Self {
1719 building_graph: graph,
1720 existing_fragments,
1721 existing_actor_location,
1722 extra_downstreams,
1723 extra_upstreams,
1724 })
1725 }
1726}
1727
1728fn gen_output_mapping(
1730 required_columns: &[PbColumnDesc],
1731 upstream_columns: &[PbColumnDesc],
1732) -> Option<DispatchOutputMapping> {
1733 let len = required_columns.len();
1734 let mut indices = vec![0; len];
1735 let mut types = None;
1736
1737 for (i, r) in required_columns.iter().enumerate() {
1738 let (ui, u) = upstream_columns
1739 .iter()
1740 .find_position(|&u| u.column_id == r.column_id)?;
1741 indices[i] = ui as u32;
1742
1743 if u.column_type != r.column_type {
1746 types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1747 upstream: u.column_type.clone(),
1748 downstream: r.column_type.clone(),
1749 };
1750 }
1751 }
1752
1753 let types = types.unwrap_or(Vec::new());
1755
1756 Some(DispatchOutputMapping { indices, types })
1757}
1758
1759fn mv_on_mv_dispatch_strategy(
1760 uses_shuffled_backfill: bool,
1761 dist_key_indices: Vec<u32>,
1762 output_mapping: DispatchOutputMapping,
1763) -> DispatchStrategy {
1764 if uses_shuffled_backfill {
1765 if !dist_key_indices.is_empty() {
1766 DispatchStrategy {
1767 r#type: DispatcherType::Hash as _,
1768 dist_key_indices,
1769 output_mapping: Some(output_mapping),
1770 }
1771 } else {
1772 DispatchStrategy {
1773 r#type: DispatcherType::Simple as _,
1774 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1776 }
1777 }
1778 } else {
1779 DispatchStrategy {
1780 r#type: DispatcherType::NoShuffle as _,
1781 dist_key_indices: vec![], output_mapping: Some(output_mapping),
1783 }
1784 }
1785}
1786
1787impl CompleteStreamFragmentGraph {
1788 pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1791 self.building_graph
1792 .fragments
1793 .keys()
1794 .chain(self.existing_fragments.keys())
1795 .copied()
1796 }
1797
1798 pub(super) fn all_edges(
1800 &self,
1801 ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1802 self.building_graph
1803 .downstreams
1804 .iter()
1805 .chain(self.extra_downstreams.iter())
1806 .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1807 }
1808
1809 pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1811 self.existing_fragments
1812 .iter()
1813 .map(|(&id, f)| {
1814 (
1815 id,
1816 Distribution::from_fragment(f, &self.existing_actor_location),
1817 )
1818 })
1819 .collect()
1820 }
1821
1822 pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1829 let mut topo = Vec::new();
1830 let mut downstream_cnts = HashMap::new();
1831
1832 for fragment_id in self.all_fragment_ids() {
1834 let downstream_cnt = self.get_downstreams(fragment_id).count();
1836 if downstream_cnt == 0 {
1837 topo.push(fragment_id);
1838 } else {
1839 downstream_cnts.insert(fragment_id, downstream_cnt);
1840 }
1841 }
1842
1843 let mut i = 0;
1844 while let Some(&fragment_id) = topo.get(i) {
1845 i += 1;
1846 for (upstream_job_id, _) in self.get_upstreams(fragment_id) {
1848 let downstream_cnt = downstream_cnts.get_mut(&upstream_job_id).unwrap();
1849 *downstream_cnt -= 1;
1850 if *downstream_cnt == 0 {
1851 downstream_cnts.remove(&upstream_job_id);
1852 topo.push(upstream_job_id);
1853 }
1854 }
1855 }
1856
1857 if !downstream_cnts.is_empty() {
1858 bail!("graph is not a DAG");
1860 }
1861
1862 Ok(topo)
1863 }
1864
1865 pub(super) fn seal_fragment(
1868 &self,
1869 id: GlobalFragmentId,
1870 actors: Vec<StreamActor>,
1871 distribution: Distribution,
1872 stream_node: StreamNode,
1873 ) -> Fragment {
1874 let building_fragment = self.get_fragment(id).into_building().unwrap();
1875 let internal_tables = building_fragment.extract_internal_tables();
1876 let BuildingFragment {
1877 inner,
1878 job_id,
1879 upstream_job_columns: _,
1880 } = building_fragment;
1881
1882 let distribution_type = distribution.to_distribution_type();
1883 let vnode_count = distribution.vnode_count();
1884
1885 let materialized_fragment_id =
1886 if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1887 job_id.map(JobId::as_mv_table_id)
1888 } else {
1889 None
1890 };
1891
1892 let vector_index_fragment_id =
1893 if inner.fragment_type_mask & FragmentTypeFlag::VectorIndexWrite as u32 != 0 {
1894 job_id.map(JobId::as_mv_table_id)
1895 } else {
1896 None
1897 };
1898
1899 let state_table_ids = internal_tables
1900 .iter()
1901 .map(|t| t.id)
1902 .chain(materialized_fragment_id)
1903 .chain(vector_index_fragment_id)
1904 .collect();
1905
1906 Fragment {
1907 fragment_id: inner.fragment_id,
1908 fragment_type_mask: inner.fragment_type_mask.into(),
1909 distribution_type,
1910 actors,
1911 state_table_ids,
1912 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1913 nodes: stream_node,
1914 }
1915 }
1916
1917 pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1920 if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1921 EitherFragment::Existing(fragment.clone())
1922 } else {
1923 EitherFragment::Building(
1924 self.building_graph
1925 .fragments
1926 .get(&fragment_id)
1927 .unwrap()
1928 .clone(),
1929 )
1930 }
1931 }
1932
1933 pub(super) fn get_downstreams(
1936 &self,
1937 fragment_id: GlobalFragmentId,
1938 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1939 self.building_graph
1940 .get_downstreams(fragment_id)
1941 .iter()
1942 .chain(
1943 self.extra_downstreams
1944 .get(&fragment_id)
1945 .into_iter()
1946 .flatten(),
1947 )
1948 .map(|(&id, edge)| (id, edge))
1949 }
1950
1951 pub(super) fn get_upstreams(
1954 &self,
1955 fragment_id: GlobalFragmentId,
1956 ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1957 self.building_graph
1958 .get_upstreams(fragment_id)
1959 .iter()
1960 .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1961 .map(|(&id, edge)| (id, edge))
1962 }
1963
1964 pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1966 &self.building_graph.fragments
1967 }
1968
1969 pub(super) fn building_fragments_mut(
1971 &mut self,
1972 ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1973 &mut self.building_graph.fragments
1974 }
1975
1976 pub(super) fn max_parallelism(&self) -> usize {
1978 self.building_graph.max_parallelism()
1979 }
1980}