risingwave_meta/stream/stream_graph/
state_match.rs1use std::collections::{HashMap, HashSet, VecDeque};
19use std::hash::{DefaultHasher, Hash as _, Hasher as _};
20
21use itertools::Itertools;
22use risingwave_common::catalog::{TableDesc, TableId};
23use risingwave_common::id::FragmentId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_tables_inner;
25use risingwave_pb::catalog::PbTable;
26use risingwave_pb::stream_plan::StreamNode;
27use strum::IntoDiscriminant;
28
29use crate::model::StreamJobFragments;
30use crate::stream::StreamFragmentGraph;
31
32pub(crate) struct StreamNodeDesc(Box<str>);
34
35impl From<&StreamNode> for StreamNodeDesc {
36 fn from(node: &StreamNode) -> Self {
37 let id = node.operator_id;
38 let identity = &node.identity;
39 let body = node.node_body.as_ref().unwrap();
40
41 Self(format!("{}({}, {})", body, id, identity).into_boxed_str())
42 }
43}
44
45impl std::fmt::Display for StreamNodeDesc {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 write!(f, "{}", self.0)
48 }
49}
50
51#[derive(thiserror::Error, thiserror_ext::Macro, thiserror_ext::ReportDebug)]
53pub(crate) enum Error {
54 #[error("failed to match graph: {message}")]
55 Graph { message: String },
56
57 #[error("failed to match fragment {id}: {message}")]
58 Fragment {
59 source: Option<Box<Error>>,
60 id: Id,
61 message: String,
62 },
63
64 #[error("failed to match operator {from} to {to}: {message}")]
65 Operator {
66 from: StreamNodeDesc,
67 to: StreamNodeDesc,
68 message: String,
69 },
70}
71
72type Result<T, E = Error> = std::result::Result<T, E>;
73
74type Id = FragmentId;
76
77struct Fragment {
79 id: Id,
81 root: StreamNode,
83}
84
85pub struct Graph {
87 nodes: HashMap<Id, Fragment>,
89 downstreams: HashMap<Id, Vec<Id>>,
91 upstreams: HashMap<Id, Vec<Id>>,
93}
94
95impl Graph {
96 fn len(&self) -> usize {
98 self.nodes.len()
99 }
100
101 fn downstreams(&self, id: Id) -> &[Id] {
103 self.downstreams.get(&id).map_or(&[], |v| v.as_slice())
104 }
105
106 fn upstreams(&self, id: Id) -> &[Id] {
108 self.upstreams.get(&id).map_or(&[], |v| v.as_slice())
109 }
110
111 fn topo_order(&self) -> Result<Vec<Id>> {
113 let mut topo = Vec::new();
114 let mut downstream_cnts = HashMap::new();
115
116 for node_id in self.nodes.keys() {
118 let downstream_cnt = self.downstreams(*node_id).len();
119 if downstream_cnt == 0 {
120 topo.push(*node_id);
121 } else {
122 downstream_cnts.insert(*node_id, downstream_cnt);
123 }
124 }
125
126 let mut i = 0;
127 while let Some(&node_id) = topo.get(i) {
128 i += 1;
129 for &upstream_id in self.upstreams(node_id) {
131 let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
132 *downstream_cnt -= 1;
133 if *downstream_cnt == 0 {
134 downstream_cnts.remove(&upstream_id);
135 topo.push(upstream_id);
136 }
137 }
138 }
139
140 if !downstream_cnts.is_empty() {
141 bail_graph!("fragment graph is not a DAG");
143 }
144 assert_eq!(topo.len(), self.len());
145
146 Ok(topo)
147 }
148
149 fn fingerprints(&self) -> Result<HashMap<Id, u64>> {
153 let mut fps = HashMap::new();
154
155 let order = self.topo_order()?;
156 for u in order.into_iter().rev() {
157 let upstream_fps = self
158 .upstreams(u)
159 .iter()
160 .map(|id| *fps.get(id).unwrap())
161 .sorted() .collect_vec();
163
164 let mut hasher = DefaultHasher::new();
167 (
168 self.upstreams(u).len(),
169 self.downstreams(u).len(),
170 upstream_fps,
171 )
172 .hash(&mut hasher);
173 let fingerprint = hasher.finish();
174
175 fps.insert(u, fingerprint);
176 }
177
178 Ok(fps)
179 }
180}
181
182struct Match {
184 target: Id,
186 table_matches: HashMap<TableId, PbTable>,
188}
189
190struct Matches {
192 inner: HashMap<Id, Match>,
194 matched_targets: HashSet<Id>,
196}
197
198impl Matches {
199 fn new() -> Self {
201 Self {
202 inner: HashMap::new(),
203 matched_targets: HashSet::new(),
204 }
205 }
206
207 fn target(&self, u: Id) -> Option<Id> {
209 self.inner.get(&u).map(|m| m.target)
210 }
211
212 fn len(&self) -> usize {
214 self.inner.len()
215 }
216
217 fn matched(&self, u: Id) -> bool {
219 self.inner.contains_key(&u)
220 }
221
222 fn target_matched(&self, v: Id) -> bool {
224 self.matched_targets.contains(&v)
225 }
226
227 fn try_match(&mut self, u: &Fragment, v: &Fragment) -> Result<()> {
232 if self.matched(u.id) {
233 panic!("fragment {} was already matched", u.id);
234 }
235
236 let collect_tables = |x: &StreamNode| {
238 let mut tables = Vec::new();
239 visit_stream_node_tables_inner(&mut x.clone(), true, false, |table, name| {
240 tables.push((name.to_owned(), table.clone()));
241 });
242 tables
243 };
244
245 let mut table_matches = HashMap::new();
246
247 let mut uq = VecDeque::from([&u.root]);
249 let mut vq = VecDeque::from([&v.root]);
250
251 while let Some(mut un) = uq.pop_front() {
252 let mut vn = vq.pop_front().unwrap();
255
256 let mut u_tables = collect_tables(un);
258 while u_tables.is_empty() && un.input.len() == 1 {
259 un = &un.input[0];
260 u_tables = collect_tables(un);
261 }
262 let mut v_tables = collect_tables(vn);
263 while v_tables.is_empty() && vn.input.len() == 1 {
264 vn = &vn.input[0];
265 v_tables = collect_tables(vn);
266 }
267
268 if un.input.is_empty() && vn.input.is_empty() {
270 continue;
271 }
272
273 if un.node_body.as_ref().unwrap().discriminant()
275 != vn.node_body.as_ref().unwrap().discriminant()
276 {
277 bail_operator!(from = un, to = vn, "operator has different type");
278 }
279 if un.input.len() != vn.input.len() {
280 bail_operator!(
281 from = un,
282 to = vn,
283 "operator has different number of inputs ({} vs {})",
284 un.input.len(),
285 vn.input.len()
286 );
287 }
288
289 uq.extend(un.input.iter());
291 vq.extend(vn.input.iter());
292
293 for (ut_name, ut) in u_tables {
294 let vt_cands = v_tables
295 .extract_if(.., |(vt_name, _)| *vt_name == ut_name)
296 .collect_vec();
297
298 if vt_cands.is_empty() {
299 bail_operator!(
300 from = un,
301 to = vn,
302 "cannot find a match for table `{ut_name}`",
303 );
304 } else if vt_cands.len() > 1 {
305 bail_operator!(
306 from = un,
307 to = vn,
308 "found multiple matches for table `{ut_name}`",
309 );
310 }
311
312 let (_, vt) = vt_cands.into_iter().next().unwrap();
313
314 let table_desc_for_compare = |table: &PbTable| {
317 let mut table = table.clone();
318 table.id = 0.into(); table.maybe_vnode_count = Some(42); TableDesc::from_pb_table(&table)
322 };
323
324 let ut_compare = table_desc_for_compare(&ut);
325 let vt_compare = table_desc_for_compare(&vt);
326
327 if ut_compare != vt_compare {
328 bail_operator!(
329 from = un,
330 to = vn,
331 "found a match for table `{ut_name}`, but they are incompatible, diff:\n{}",
332 pretty_assertions::Comparison::new(&ut_compare, &vt_compare)
333 );
334 }
335
336 table_matches.try_insert(ut.id, vt).unwrap_or_else(|_| {
337 panic!("duplicated table id {} in fragment {}", ut.id, u.id)
338 });
339 }
340 }
341
342 let m = Match {
343 target: v.id,
344 table_matches,
345 };
346 self.inner.insert(u.id, m);
347 self.matched_targets.insert(v.id);
348
349 Ok(())
350 }
351
352 fn undo_match(&mut self, u: Id) {
354 let target = self
355 .inner
356 .remove(&u)
357 .unwrap_or_else(|| panic!("fragment {} was not previously matched", u))
358 .target;
359
360 let target_removed = self.matched_targets.remove(&target);
361 assert!(target_removed);
362 }
363
364 fn into_table_mapping(self) -> HashMap<TableId, PbTable> {
366 self.inner
367 .into_iter()
368 .flat_map(|(_, m)| m.table_matches.into_iter())
369 .collect()
370 }
371}
372
373fn match_graph(g1: &Graph, g2: &Graph) -> Result<Matches> {
375 if g1.len() != g2.len() {
376 bail_graph!(
377 "graphs have different number of fragments ({} vs {})",
378 g1.len(),
379 g2.len()
380 );
381 }
382
383 let fps1 = g1.fingerprints()?;
384 let fps2 = g2.fingerprints()?;
385
386 let mut fp_cand = HashMap::with_capacity(g1.len());
388 for (&u, &f1) in &fps1 {
389 for (&v, &f2) in &fps2 {
390 if f1 == f2 {
391 fp_cand.entry(u).or_insert_with(HashSet::new).insert(v);
392 }
393 }
394 }
395
396 fn dfs(
397 g1: &Graph,
398 g2: &Graph,
399 fp_cand: &mut HashMap<Id, HashSet<Id>>,
400 matches: &mut Matches,
401 ) -> Result<()> {
402 if matches.len() == g1.len() {
404 return Ok(());
405 }
406
407 let (&u, u_cands) = fp_cand
409 .iter()
410 .filter(|(u, _)| !matches.matched(**u))
411 .min_by_key(|(_, cands)| cands.len())
412 .unwrap();
413 let u_cands = u_cands.clone();
414
415 let mut last_error = None;
416
417 for &v in &u_cands {
418 if matches.target_matched(v) {
420 continue;
421 }
422
423 let upstreams = g1.upstreams(u).to_vec();
425 for u_upstream in upstreams {
426 if let Some(v_upstream) = matches.target(u_upstream)
427 && !g2.upstreams(v).contains(&v_upstream)
428 {
429 continue;
431 }
432 }
433 let downstreams = g1.downstreams(u).to_vec();
435 for u_downstream in downstreams {
436 if let Some(v_downstream) = matches.target(u_downstream)
437 && !g2.downstreams(v).contains(&v_downstream)
438 {
439 continue;
441 }
442 }
443
444 match matches.try_match(&g1.nodes[&u], &g2.nodes[&v]) {
446 Ok(()) => {
447 let fp_cand_clone = fp_cand.clone();
448
449 for (_, u_cands) in fp_cand.iter_mut() {
451 u_cands.remove(&v);
452 }
453
454 match dfs(g1, g2, fp_cand, matches) {
456 Ok(()) => return Ok(()), Err(err) => {
458 last_error = Some(err);
459
460 *fp_cand = fp_cand_clone;
462 matches.undo_match(u);
463 }
464 }
465 }
466
467 Err(err) => last_error = Some(err),
468 }
469 }
470
471 if let Some(error) = last_error {
472 bail_fragment!(
473 source = Box::new(error),
474 id = u,
475 "tried against all {} candidates, but failed",
476 u_cands.len()
477 );
478 } else {
479 bail_fragment!(
480 id = u,
481 "cannot find a candidate with same topological position"
482 )
483 }
484 }
485
486 let mut matches = Matches::new();
487 dfs(g1, g2, &mut fp_cand, &mut matches)?;
488 Ok(matches)
489}
490
491pub(crate) fn match_graph_internal_tables(
493 g1: &Graph,
494 g2: &Graph,
495) -> Result<HashMap<TableId, PbTable>> {
496 match_graph(g1, g2).map(|matches| matches.into_table_mapping())
497}
498
499impl Graph {
500 pub(crate) fn from_building(graph: &StreamFragmentGraph) -> Self {
502 let nodes = graph
503 .fragments
504 .iter()
505 .map(|(&id, f)| {
506 let id = id.as_global_id();
507 (
508 id,
509 Fragment {
510 id,
511 root: f.node.clone().unwrap(),
512 },
513 )
514 })
515 .collect();
516
517 let downstreams = graph
518 .downstreams
519 .iter()
520 .map(|(&id, downstreams)| {
521 (
522 id.as_global_id(),
523 downstreams
524 .iter()
525 .map(|(&id, _)| id.as_global_id())
526 .collect(),
527 )
528 })
529 .collect();
530
531 let upstreams = graph
532 .upstreams
533 .iter()
534 .map(|(&id, upstreams)| {
535 (
536 id.as_global_id(),
537 upstreams.iter().map(|(&id, _)| id.as_global_id()).collect(),
538 )
539 })
540 .collect();
541
542 Self {
543 nodes,
544 downstreams,
545 upstreams,
546 }
547 }
548
549 pub(crate) fn from_existing(
551 fragments: &StreamJobFragments,
552 fragment_upstreams: &HashMap<Id, HashSet<Id>>,
553 ) -> Self {
554 let nodes: HashMap<_, _> = fragments
555 .fragments
556 .iter()
557 .map(|(&id, f)| {
558 (
559 id,
560 Fragment {
561 id,
562 root: f.nodes.clone(),
563 },
564 )
565 })
566 .collect();
567
568 let mut downstreams = HashMap::new();
569 let mut upstreams = HashMap::new();
570
571 for (&id, fragment_upstreams) in fragment_upstreams {
572 assert!(nodes.contains_key(&id));
573
574 for &upstream in fragment_upstreams {
575 if !nodes.contains_key(&upstream) {
576 continue;
578 }
579 downstreams
580 .entry(upstream)
581 .or_insert_with(Vec::new)
582 .push(id);
583 upstreams.entry(id).or_insert_with(Vec::new).push(upstream);
584 }
585 }
586
587 Self {
588 nodes,
589 downstreams,
590 upstreams,
591 }
592 }
593}