risingwave_frontend/utils/
stream_graph_formatter.rs1use std::cmp::max;
16use std::collections::{BTreeMap, HashMap};
17
18use itertools::Itertools;
19use petgraph::Graph;
20use petgraph::dot::Dot;
21use petgraph::graph::NodeIndex;
22use pretty_xmlish::{Pretty, PrettyConfig};
23use risingwave_common::util::stream_graph_visitor;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragmentEdge;
26use risingwave_pb::stream_plan::{DispatcherType, StreamFragmentGraph, StreamNode, stream_node};
27
28use super::PrettySerde;
29use crate::TableCatalog;
30
31pub fn explain_stream_graph(graph: &StreamFragmentGraph, is_verbose: bool) -> String {
33 let mut output = String::with_capacity(2048);
34 let mut config = PrettyConfig {
35 need_boundaries: false,
36 width: 80,
37 ..Default::default()
38 };
39 StreamGraphFormatter::new(is_verbose).explain_graph(graph, &mut config, &mut output);
40 output
41}
42
43pub fn explain_stream_graph_as_dot(sg: &StreamFragmentGraph, is_verbose: bool) -> String {
44 let graph = StreamGraphFormatter::new(is_verbose).explain_graph_as_dot(sg);
45 let dot = Dot::new(&graph);
46 dot.to_string()
47}
48
49struct StreamGraphFormatter {
52 edges: HashMap<u64, StreamFragmentEdge>,
54 verbose: bool,
55 tables: BTreeMap<u32, Table>,
56}
57
58impl StreamGraphFormatter {
59 fn new(verbose: bool) -> Self {
60 StreamGraphFormatter {
61 edges: HashMap::default(),
62 tables: BTreeMap::default(),
63 verbose,
64 }
65 }
66
67 fn add_table(&mut self, tb: &Table) -> u32 {
69 self.tables.insert(tb.id, tb.clone());
70 tb.id
71 }
72
73 fn explain_graph(
74 &mut self,
75 graph: &StreamFragmentGraph,
76 config: &mut PrettyConfig,
77 output: &mut String,
78 ) {
79 self.edges.clear();
80 for edge in &graph.edges {
81 self.edges.insert(edge.link_id, edge.clone());
82 }
83 let mut max_width = 80;
84 for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
85 output.push_str("Fragment ");
86 output.push_str(&fragment.get_fragment_id().to_string());
87 output.push('\n');
88 let width = config.unicode(output, &self.explain_node(fragment.node.as_ref().unwrap()));
89 max_width = max(width, max_width);
90 config.width = max_width;
91 output.push_str("\n\n");
92 }
93 for tb in self.tables.values() {
94 let width = config.unicode(output, &self.explain_table(tb));
95 max_width = max(width, max_width);
96 config.width = max_width;
97 output.push_str("\n\n");
98 }
99 }
100
101 fn explain_graph_as_dot(&mut self, graph: &StreamFragmentGraph) -> Graph<String, String> {
102 self.edges.clear();
103 for edge in &graph.edges {
104 self.edges.insert(edge.link_id, edge.clone());
105 }
106
107 let mut g = Graph::<String, String>::new();
108 let mut nodes = HashMap::new();
109 for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
110 let mut label = String::new();
111 label.push_str("Fragment ");
112 label.push_str(&fragment.get_fragment_id().to_string());
113 label.push('\n');
114 nodes.insert(label.clone(), g.add_node(label.clone()));
115
116 build_graph_from_pretty(
117 &self.explain_node(fragment.node.as_ref().unwrap()),
118 &mut g,
119 &mut nodes,
120 Some(&label),
121 );
122 }
123 for tb in self.tables.values() {
124 build_graph_from_pretty(&self.explain_table(tb), &mut g, &mut nodes, None);
125 }
126 g
127 }
128
129 fn explain_table<'a>(&self, tb: &Table) -> Pretty<'a> {
130 let tb = TableCatalog::from(tb.clone());
131 let columns = tb
132 .columns
133 .iter()
134 .map(|c| {
135 let s = if self.verbose {
136 format!("{}: {}", c.name(), c.data_type())
137 } else {
138 c.name().to_owned()
139 };
140 Pretty::Text(s.into())
141 })
142 .collect();
143 let columns = Pretty::Array(columns);
144 let name = format!("Table {}", tb.id);
145 let mut fields = Vec::with_capacity(5);
146 fields.push(("columns", columns));
147 fields.push((
148 "primary key",
149 Pretty::Array(tb.pk.iter().map(Pretty::debug).collect()),
150 ));
151 fields.push((
152 "value indices",
153 Pretty::Array(tb.value_indices.iter().map(Pretty::debug).collect()),
154 ));
155 fields.push((
156 "distribution key",
157 Pretty::Array(tb.distribution_key.iter().map(Pretty::debug).collect()),
158 ));
159 fields.push((
160 "read pk prefix len hint",
161 Pretty::debug(&tb.read_prefix_len_hint),
162 ));
163 if let Some(vnode_col_idx) = tb.vnode_col_index {
164 fields.push(("vnode column idx", Pretty::debug(&vnode_col_idx)));
165 }
166 Pretty::childless_record(name, fields)
167 }
168
169 fn explain_node<'a>(&mut self, node: &StreamNode) -> Pretty<'a> {
170 let one_line_explain = match node.get_node_body().unwrap() {
171 stream_node::NodeBody::Exchange(_) => {
172 let edge = self.edges.get(&node.operator_id).unwrap();
173 let upstream_fragment_id = edge.upstream_id;
174 let dist = edge.dispatch_strategy.as_ref().unwrap();
175 format!(
176 "StreamExchange {} from {}",
177 match dist.r#type() {
178 DispatcherType::Unspecified => unreachable!(),
179 DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices),
180 DispatcherType::Broadcast => "Broadcast".to_owned(),
181 DispatcherType::Simple => "Single".to_owned(),
182 DispatcherType::NoShuffle => "NoShuffle".to_owned(),
183 },
184 upstream_fragment_id
185 )
186 }
187 _ => node.identity.clone(),
188 };
189
190 let mut tables: Vec<(String, u32)> = Vec::with_capacity(7);
191 let mut node_copy = node.clone();
192
193 stream_graph_visitor::visit_stream_node_tables_inner(
194 &mut node_copy,
195 false,
196 false,
197 |table, table_name| {
198 tables.push((table_name.to_owned(), self.add_table(table)));
199 },
200 );
201
202 let mut fields = Vec::with_capacity(3);
203 if !tables.is_empty() {
204 fields.push((
205 "tables",
206 Pretty::Array(
207 tables
208 .into_iter()
209 .map(|(name, id)| Pretty::Text(format!("{}: {}", name, id).into()))
210 .collect(),
211 ),
212 ));
213 }
214 if self.verbose {
215 fields.push((
216 "output",
217 Pretty::Array(
218 node.fields
219 .iter()
220 .map(|f| Pretty::display(f.get_name()))
221 .collect(),
222 ),
223 ));
224 fields.push((
225 "stream key",
226 Pretty::Array(
227 node.stream_key
228 .iter()
229 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
230 .collect(),
231 ),
232 ));
233 }
234 let children = node
235 .input
236 .iter()
237 .map(|input| self.explain_node(input))
238 .collect();
239 Pretty::simple_record(one_line_explain, fields, children)
240 }
241}
242
243pub fn build_graph_from_pretty(
244 pretty: &Pretty<'_>,
245 graph: &mut Graph<String, String>,
246 nodes: &mut HashMap<String, NodeIndex>,
247 parent_label: Option<&str>,
248) {
249 if let Pretty::Record(r) = pretty {
250 let mut label = String::new();
251 label.push_str(&r.name);
252 for (k, v) in &r.fields {
253 label.push('\n');
254 label.push_str(k);
255 label.push_str(": ");
256 label.push_str(
257 &serde_json::to_string(&PrettySerde(v.clone(), false))
258 .expect("failed to serialize plan to dot"),
259 );
260 }
261 if !r.fields.is_empty() {
263 label.push('\n');
264 }
265
266 let current_node = *nodes
267 .entry(label.clone())
268 .or_insert_with(|| graph.add_node(label.clone()));
269
270 if let Some(parent_label) = parent_label {
271 if let Some(&parent_node) = nodes.get(parent_label) {
272 graph.add_edge(parent_node, current_node, "".to_owned());
273 }
274 }
275
276 for child in &r.children {
277 build_graph_from_pretty(child, graph, nodes, Some(&label));
278 }
279 }
280}