risingwave_frontend/utils/
stream_graph_formatter.rs1use std::cmp::max;
16use std::collections::{BTreeMap, HashMap};
17
18use itertools::Itertools;
19use petgraph::Graph;
20use petgraph::dot::Dot;
21use petgraph::graph::NodeIndex;
22use pretty_xmlish::{Pretty, PrettyConfig};
23use risingwave_common::util::stream_graph_visitor;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragmentEdge;
26use risingwave_pb::stream_plan::{DispatcherType, StreamFragmentGraph, StreamNode, stream_node};
27
28use super::PrettySerde;
29use crate::TableCatalog;
30
31pub fn explain_stream_graph(
33 graph: &StreamFragmentGraph,
34 table: Option<Table>,
35 is_verbose: bool,
36) -> String {
37 let mut output = String::with_capacity(2048);
38 let mut config = PrettyConfig {
39 need_boundaries: false,
40 width: 80,
41 ..Default::default()
42 };
43 let mut fmt = StreamGraphFormatter::new(is_verbose);
44 if let Some(tb) = table {
45 fmt.add_table(&tb);
46 }
47 fmt.explain_graph(graph, &mut config, &mut output);
48 output
49}
50
51pub fn explain_stream_graph_as_dot(
52 sg: &StreamFragmentGraph,
53 table: Option<Table>,
54 is_verbose: bool,
55) -> String {
56 let mut fmt = StreamGraphFormatter::new(is_verbose);
57 if let Some(tb) = table {
58 fmt.add_table(&tb);
59 }
60 let graph = fmt.explain_graph_as_dot(sg);
61 let dot = Dot::new(&graph);
62 dot.to_string()
63}
64
65struct StreamGraphFormatter {
68 edges: HashMap<u64, StreamFragmentEdge>,
70 verbose: bool,
71 tables: BTreeMap<u32, Table>,
72}
73
74impl StreamGraphFormatter {
75 fn new(verbose: bool) -> Self {
76 StreamGraphFormatter {
77 edges: HashMap::default(),
78 tables: BTreeMap::default(),
79 verbose,
80 }
81 }
82
83 fn add_table(&mut self, tb: &Table) -> u32 {
85 self.tables.insert(tb.id, tb.clone());
86 tb.id
87 }
88
89 fn explain_graph(
90 &mut self,
91 graph: &StreamFragmentGraph,
92 config: &mut PrettyConfig,
93 output: &mut String,
94 ) {
95 self.edges.clear();
96 for edge in &graph.edges {
97 self.edges.insert(edge.link_id, edge.clone());
98 }
99 let mut max_width = 80;
100 for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
101 output.push_str("Fragment ");
102 output.push_str(&fragment.get_fragment_id().to_string());
103 output.push('\n');
104 let width = config.unicode(output, &self.explain_node(fragment.node.as_ref().unwrap()));
105 max_width = max(width, max_width);
106 config.width = max_width;
107 output.push_str("\n\n");
108 }
109 for tb in self.tables.values() {
110 let width = config.unicode(output, &self.explain_table(tb));
111 max_width = max(width, max_width);
112 config.width = max_width;
113 output.push_str("\n\n");
114 }
115 }
116
117 fn explain_graph_as_dot(&mut self, graph: &StreamFragmentGraph) -> Graph<String, String> {
118 self.edges.clear();
119 for edge in &graph.edges {
120 self.edges.insert(edge.link_id, edge.clone());
121 }
122
123 let mut g = Graph::<String, String>::new();
124 let mut nodes = HashMap::new();
125 for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
126 let mut label = String::new();
127 label.push_str("Fragment ");
128 label.push_str(&fragment.get_fragment_id().to_string());
129 label.push('\n');
130 nodes.insert(label.clone(), g.add_node(label.clone()));
131
132 build_graph_from_pretty(
133 &self.explain_node(fragment.node.as_ref().unwrap()),
134 &mut g,
135 &mut nodes,
136 Some(&label),
137 );
138 }
139 for tb in self.tables.values() {
140 build_graph_from_pretty(&self.explain_table(tb), &mut g, &mut nodes, None);
141 }
142 g
143 }
144
145 fn explain_table<'a>(&self, tb: &Table) -> Pretty<'a> {
146 let tb = TableCatalog::from(tb.clone());
147 let columns = tb
148 .columns
149 .iter()
150 .map(|c| {
151 let s = if self.verbose {
152 format!("{}: {}", c.name(), c.data_type())
153 } else {
154 c.name().to_owned()
155 };
156 Pretty::Text(s.into())
157 })
158 .collect();
159 let columns = Pretty::Array(columns);
160 let name = format!("Table {}", tb.id);
161 let mut fields = Vec::with_capacity(5);
162 fields.push(("columns", columns));
163 fields.push((
164 "primary key",
165 Pretty::Array(tb.pk.iter().map(Pretty::debug).collect()),
166 ));
167 fields.push((
168 "value indices",
169 Pretty::Array(tb.value_indices.iter().map(Pretty::debug).collect()),
170 ));
171 fields.push((
172 "distribution key",
173 Pretty::Array(tb.distribution_key.iter().map(Pretty::debug).collect()),
174 ));
175 fields.push((
176 "read pk prefix len hint",
177 Pretty::debug(&tb.read_prefix_len_hint),
178 ));
179 if let Some(vnode_col_idx) = tb.vnode_col_index {
180 fields.push(("vnode column idx", Pretty::debug(&vnode_col_idx)));
181 }
182 Pretty::childless_record(name, fields)
183 }
184
185 fn explain_node<'a>(&mut self, node: &StreamNode) -> Pretty<'a> {
186 let one_line_explain = match node.get_node_body().unwrap() {
187 stream_node::NodeBody::Exchange(_) => {
188 let edge = self.edges.get(&node.operator_id).unwrap();
189 let upstream_fragment_id = edge.upstream_id;
190 let dist = edge.dispatch_strategy.as_ref().unwrap();
191 format!(
192 "StreamExchange {} from {}",
193 match dist.r#type() {
194 DispatcherType::Unspecified => unreachable!(),
195 DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices),
196 DispatcherType::Broadcast => "Broadcast".to_owned(),
197 DispatcherType::Simple => "Single".to_owned(),
198 DispatcherType::NoShuffle => "NoShuffle".to_owned(),
199 },
200 upstream_fragment_id
201 )
202 }
203 _ => node.identity.clone(),
204 };
205
206 let mut tables: Vec<(String, u32)> = Vec::with_capacity(7);
207 let mut node_copy = node.clone();
208
209 stream_graph_visitor::visit_stream_node_tables_inner(
210 &mut node_copy,
211 true,
212 false,
213 |table, table_name| {
214 tables.push((table_name.to_owned(), self.add_table(table)));
215 },
216 );
217
218 let mut fields = Vec::with_capacity(3);
219 if !tables.is_empty() {
220 fields.push((
221 "tables",
222 Pretty::Array(
223 tables
224 .into_iter()
225 .map(|(name, id)| Pretty::Text(format!("{}: {}", name, id).into()))
226 .collect(),
227 ),
228 ));
229 }
230 if self.verbose {
231 fields.push((
232 "output",
233 Pretty::Array(
234 node.fields
235 .iter()
236 .map(|f| Pretty::display(f.get_name()))
237 .collect(),
238 ),
239 ));
240 fields.push((
241 "stream key",
242 Pretty::Array(
243 node.stream_key
244 .iter()
245 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
246 .collect(),
247 ),
248 ));
249 }
250 let children = node
251 .input
252 .iter()
253 .map(|input| self.explain_node(input))
254 .collect();
255 Pretty::simple_record(one_line_explain, fields, children)
256 }
257}
258
259pub fn build_graph_from_pretty(
260 pretty: &Pretty<'_>,
261 graph: &mut Graph<String, String>,
262 nodes: &mut HashMap<String, NodeIndex>,
263 parent_label: Option<&str>,
264) {
265 if let Pretty::Record(r) = pretty {
266 let mut label = String::new();
267 label.push_str(&r.name);
268 for (k, v) in &r.fields {
269 label.push('\n');
270 label.push_str(k);
271 label.push_str(": ");
272 label.push_str(
273 &serde_json::to_string(&PrettySerde(v.clone(), false))
274 .expect("failed to serialize plan to dot"),
275 );
276 }
277 if !r.fields.is_empty() {
279 label.push('\n');
280 }
281
282 let current_node = *nodes
283 .entry(label.clone())
284 .or_insert_with(|| graph.add_node(label.clone()));
285
286 if let Some(parent_label) = parent_label
287 && let Some(&parent_node) = nodes.get(parent_label)
288 {
289 graph.add_edge(parent_node, current_node, "".to_owned());
290 }
291
292 for child in &r.children {
293 build_graph_from_pretty(child, graph, nodes, Some(&label));
294 }
295 }
296}