risingwave_meta/stream/stream_graph/
state_match.rs1use std::collections::{HashMap, HashSet, VecDeque};
19use std::hash::{DefaultHasher, Hash as _, Hasher as _};
20
21use itertools::Itertools;
22use risingwave_common::catalog::TableDesc;
23use risingwave_common::util::stream_graph_visitor::visit_stream_node_tables_inner;
24use risingwave_pb::catalog::PbTable;
25use risingwave_pb::stream_plan::StreamNode;
26use strum::IntoDiscriminant;
27
28use crate::model::StreamJobFragments;
29use crate::stream::StreamFragmentGraph;
30
31pub(crate) struct StreamNodeDesc(Box<str>);
33
34impl From<&StreamNode> for StreamNodeDesc {
35 fn from(node: &StreamNode) -> Self {
36 let id = node.operator_id;
37 let identity = &node.identity;
38 let body = node.node_body.as_ref().unwrap();
39
40 Self(format!("{}({}, {})", body, id, identity).into_boxed_str())
41 }
42}
43
44impl std::fmt::Display for StreamNodeDesc {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "{}", self.0)
47 }
48}
49
50#[derive(thiserror::Error, thiserror_ext::Macro, thiserror_ext::ReportDebug)]
52pub(crate) enum Error {
53 #[error("failed to match graph: {message}")]
54 Graph { message: String },
55
56 #[error("failed to match fragment {id}: {message}")]
57 Fragment {
58 source: Option<Box<Error>>,
59 id: Id,
60 message: String,
61 },
62
63 #[error("failed to match operator {from} to {to}: {message}")]
64 Operator {
65 from: StreamNodeDesc,
66 to: StreamNodeDesc,
67 message: String,
68 },
69}
70
71type Result<T, E = Error> = std::result::Result<T, E>;
72
73type Id = u32;
75
76type TableId = u32;
78
79struct Fragment {
81 id: Id,
83 root: StreamNode,
85}
86
87pub struct Graph {
89 nodes: HashMap<Id, Fragment>,
91 downstreams: HashMap<Id, Vec<Id>>,
93 upstreams: HashMap<Id, Vec<Id>>,
95}
96
97impl Graph {
98 fn len(&self) -> usize {
100 self.nodes.len()
101 }
102
103 fn downstreams(&self, id: Id) -> &[Id] {
105 self.downstreams.get(&id).map_or(&[], |v| v.as_slice())
106 }
107
108 fn upstreams(&self, id: Id) -> &[Id] {
110 self.upstreams.get(&id).map_or(&[], |v| v.as_slice())
111 }
112
113 fn topo_order(&self) -> Result<Vec<Id>> {
115 let mut topo = Vec::new();
116 let mut downstream_cnts = HashMap::new();
117
118 for node_id in self.nodes.keys() {
120 let downstream_cnt = self.downstreams(*node_id).len();
121 if downstream_cnt == 0 {
122 topo.push(*node_id);
123 } else {
124 downstream_cnts.insert(*node_id, downstream_cnt);
125 }
126 }
127
128 let mut i = 0;
129 while let Some(&node_id) = topo.get(i) {
130 i += 1;
131 for &upstream_id in self.upstreams(node_id) {
133 let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
134 *downstream_cnt -= 1;
135 if *downstream_cnt == 0 {
136 downstream_cnts.remove(&upstream_id);
137 topo.push(upstream_id);
138 }
139 }
140 }
141
142 if !downstream_cnts.is_empty() {
143 bail_graph!("fragment graph is not a DAG");
145 }
146 assert_eq!(topo.len(), self.len());
147
148 Ok(topo)
149 }
150
151 fn fingerprints(&self) -> Result<HashMap<Id, u64>> {
155 let mut fps = HashMap::new();
156
157 let order = self.topo_order()?;
158 for u in order.into_iter().rev() {
159 let upstream_fps = self
160 .upstreams(u)
161 .iter()
162 .map(|id| *fps.get(id).unwrap())
163 .sorted() .collect_vec();
165
166 let mut hasher = DefaultHasher::new();
169 (
170 self.upstreams(u).len(),
171 self.downstreams(u).len(),
172 upstream_fps,
173 )
174 .hash(&mut hasher);
175 let fingerprint = hasher.finish();
176
177 fps.insert(u, fingerprint);
178 }
179
180 Ok(fps)
181 }
182}
183
184struct Match {
186 target: Id,
188 table_matches: HashMap<TableId, PbTable>,
190}
191
192struct Matches {
194 inner: HashMap<Id, Match>,
196 matched_targets: HashSet<Id>,
198}
199
200impl Matches {
201 fn new() -> Self {
203 Self {
204 inner: HashMap::new(),
205 matched_targets: HashSet::new(),
206 }
207 }
208
209 fn target(&self, u: Id) -> Option<Id> {
211 self.inner.get(&u).map(|m| m.target)
212 }
213
214 fn len(&self) -> usize {
216 self.inner.len()
217 }
218
219 fn matched(&self, u: Id) -> bool {
221 self.inner.contains_key(&u)
222 }
223
224 fn target_matched(&self, v: Id) -> bool {
226 self.matched_targets.contains(&v)
227 }
228
229 fn try_match(&mut self, u: &Fragment, v: &Fragment) -> Result<()> {
234 if self.matched(u.id) {
235 panic!("fragment {} was already matched", u.id);
236 }
237
238 let collect_tables = |x: &StreamNode| {
240 let mut tables = Vec::new();
241 visit_stream_node_tables_inner(&mut x.clone(), true, false, |table, name| {
242 tables.push((name.to_owned(), table.clone()));
243 });
244 tables
245 };
246
247 let mut table_matches = HashMap::new();
248
249 let mut uq = VecDeque::from([&u.root]);
251 let mut vq = VecDeque::from([&v.root]);
252
253 while let Some(mut un) = uq.pop_front() {
254 let mut vn = vq.pop_front().unwrap();
257
258 let mut u_tables = collect_tables(un);
260 while u_tables.is_empty() && un.input.len() == 1 {
261 un = &un.input[0];
262 u_tables = collect_tables(un);
263 }
264 let mut v_tables = collect_tables(vn);
265 while v_tables.is_empty() && vn.input.len() == 1 {
266 vn = &vn.input[0];
267 v_tables = collect_tables(vn);
268 }
269
270 if un.input.is_empty() && vn.input.is_empty() {
272 continue;
273 }
274
275 if un.node_body.as_ref().unwrap().discriminant()
277 != vn.node_body.as_ref().unwrap().discriminant()
278 {
279 bail_operator!(from = un, to = vn, "operator has different type");
280 }
281 if un.input.len() != vn.input.len() {
282 bail_operator!(
283 from = un,
284 to = vn,
285 "operator has different number of inputs ({} vs {})",
286 un.input.len(),
287 vn.input.len()
288 );
289 }
290
291 uq.extend(un.input.iter());
293 vq.extend(vn.input.iter());
294
295 for (ut_name, ut) in u_tables {
296 let vt_cands = v_tables
297 .extract_if(.., |(vt_name, _)| *vt_name == ut_name)
298 .collect_vec();
299
300 if vt_cands.is_empty() {
301 bail_operator!(
302 from = un,
303 to = vn,
304 "cannot find a match for table `{ut_name}`",
305 );
306 } else if vt_cands.len() > 1 {
307 bail_operator!(
308 from = un,
309 to = vn,
310 "found multiple matches for table `{ut_name}`",
311 );
312 }
313
314 let (_, vt) = vt_cands.into_iter().next().unwrap();
315
316 let table_desc_for_compare = |table: &PbTable| {
319 let mut table = table.clone();
320 table.id = 0; table.maybe_vnode_count = Some(42); TableDesc::from_pb_table(&table)
324 };
325
326 let ut_compare = table_desc_for_compare(&ut);
327 let vt_compare = table_desc_for_compare(&vt);
328
329 if ut_compare != vt_compare {
330 bail_operator!(
331 from = un,
332 to = vn,
333 "found a match for table `{ut_name}`, but they are incompatible, diff:\n{}",
334 pretty_assertions::Comparison::new(&ut_compare, &vt_compare)
335 );
336 }
337
338 table_matches.try_insert(ut.id, vt).unwrap_or_else(|_| {
339 panic!("duplicated table id {} in fragment {}", ut.id, u.id)
340 });
341 }
342 }
343
344 let m = Match {
345 target: v.id,
346 table_matches,
347 };
348 self.inner.insert(u.id, m);
349 self.matched_targets.insert(v.id);
350
351 Ok(())
352 }
353
354 fn undo_match(&mut self, u: Id) {
356 let target = self
357 .inner
358 .remove(&u)
359 .unwrap_or_else(|| panic!("fragment {} was not previously matched", u))
360 .target;
361
362 let target_removed = self.matched_targets.remove(&target);
363 assert!(target_removed);
364 }
365
366 fn into_table_mapping(self) -> HashMap<TableId, PbTable> {
368 self.inner
369 .into_iter()
370 .flat_map(|(_, m)| m.table_matches.into_iter())
371 .collect()
372 }
373}
374
375fn match_graph(g1: &Graph, g2: &Graph) -> Result<Matches> {
377 if g1.len() != g2.len() {
378 bail_graph!(
379 "graphs have different number of fragments ({} vs {})",
380 g1.len(),
381 g2.len()
382 );
383 }
384
385 let fps1 = g1.fingerprints()?;
386 let fps2 = g2.fingerprints()?;
387
388 let mut fp_cand = HashMap::with_capacity(g1.len());
390 for (&u, &f1) in &fps1 {
391 for (&v, &f2) in &fps2 {
392 if f1 == f2 {
393 fp_cand.entry(u).or_insert_with(HashSet::new).insert(v);
394 }
395 }
396 }
397
398 fn dfs(
399 g1: &Graph,
400 g2: &Graph,
401 fp_cand: &mut HashMap<Id, HashSet<Id>>,
402 matches: &mut Matches,
403 ) -> Result<()> {
404 if matches.len() == g1.len() {
406 return Ok(());
407 }
408
409 let (&u, u_cands) = fp_cand
411 .iter()
412 .filter(|(u, _)| !matches.matched(**u))
413 .min_by_key(|(_, cands)| cands.len())
414 .unwrap();
415 let u_cands = u_cands.clone();
416
417 let mut last_error = None;
418
419 for &v in &u_cands {
420 if matches.target_matched(v) {
422 continue;
423 }
424
425 let upstreams = g1.upstreams(u).to_vec();
427 for u_upstream in upstreams {
428 if let Some(v_upstream) = matches.target(u_upstream)
429 && !g2.upstreams(v).contains(&v_upstream)
430 {
431 continue;
433 }
434 }
435 let downstreams = g1.downstreams(u).to_vec();
437 for u_downstream in downstreams {
438 if let Some(v_downstream) = matches.target(u_downstream)
439 && !g2.downstreams(v).contains(&v_downstream)
440 {
441 continue;
443 }
444 }
445
446 match matches.try_match(&g1.nodes[&u], &g2.nodes[&v]) {
448 Ok(()) => {
449 let fp_cand_clone = fp_cand.clone();
450
451 for (_, u_cands) in fp_cand.iter_mut() {
453 u_cands.remove(&v);
454 }
455
456 match dfs(g1, g2, fp_cand, matches) {
458 Ok(()) => return Ok(()), Err(err) => {
460 last_error = Some(err);
461
462 *fp_cand = fp_cand_clone;
464 matches.undo_match(u);
465 }
466 }
467 }
468
469 Err(err) => last_error = Some(err),
470 }
471 }
472
473 if let Some(error) = last_error {
474 bail_fragment!(
475 source = Box::new(error),
476 id = u,
477 "tried against all {} candidates, but failed",
478 u_cands.len()
479 );
480 } else {
481 bail_fragment!(
482 id = u,
483 "cannot find a candidate with same topological position"
484 )
485 }
486 }
487
488 let mut matches = Matches::new();
489 dfs(g1, g2, &mut fp_cand, &mut matches)?;
490 Ok(matches)
491}
492
493pub(crate) fn match_graph_internal_tables(
495 g1: &Graph,
496 g2: &Graph,
497) -> Result<HashMap<TableId, PbTable>> {
498 match_graph(g1, g2).map(|matches| matches.into_table_mapping())
499}
500
501impl Graph {
502 pub(crate) fn from_building(graph: &StreamFragmentGraph) -> Self {
504 let nodes = graph
505 .fragments
506 .iter()
507 .map(|(&id, f)| {
508 (
509 id.as_global_id(),
510 Fragment {
511 id: id.as_global_id(),
512 root: f.node.clone().unwrap(),
513 },
514 )
515 })
516 .collect();
517
518 let downstreams = graph
519 .downstreams
520 .iter()
521 .map(|(&id, downstreams)| {
522 (
523 id.as_global_id(),
524 downstreams
525 .iter()
526 .map(|(&id, _)| id.as_global_id())
527 .collect(),
528 )
529 })
530 .collect();
531
532 let upstreams = graph
533 .upstreams
534 .iter()
535 .map(|(&id, upstreams)| {
536 (
537 id.as_global_id(),
538 upstreams.iter().map(|(&id, _)| id.as_global_id()).collect(),
539 )
540 })
541 .collect();
542
543 Self {
544 nodes,
545 downstreams,
546 upstreams,
547 }
548 }
549
550 pub(crate) fn from_existing(
552 fragments: &StreamJobFragments,
553 fragment_upstreams: &HashMap<Id, HashSet<Id>>,
554 ) -> Self {
555 let nodes: HashMap<_, _> = fragments
556 .fragments
557 .iter()
558 .map(|(&id, f)| {
559 (
560 id,
561 Fragment {
562 id,
563 root: f.nodes.clone(),
564 },
565 )
566 })
567 .collect();
568
569 let mut downstreams = HashMap::new();
570 let mut upstreams = HashMap::new();
571
572 for (&id, fragment_upstreams) in fragment_upstreams {
573 assert!(nodes.contains_key(&id));
574
575 for &upstream in fragment_upstreams {
576 if !nodes.contains_key(&upstream) {
577 continue;
579 }
580 downstreams
581 .entry(upstream)
582 .or_insert_with(Vec::new)
583 .push(id);
584 upstreams.entry(id).or_insert_with(Vec::new).push(upstream);
585 }
586 }
587
588 Self {
589 nodes,
590 downstreams,
591 upstreams,
592 }
593 }
594}