risingwave_frontend/utils/
stream_graph_formatter.rs1use std::cmp::max;
16use std::collections::{BTreeMap, HashMap};
17
18use itertools::Itertools;
19use petgraph::Graph;
20use petgraph::dot::Dot;
21use petgraph::graph::NodeIndex;
22use pretty_xmlish::{Pretty, PrettyConfig};
23use risingwave_common::util::stream_graph_visitor;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragmentEdge;
26use risingwave_pb::stream_plan::{DispatcherType, StreamFragmentGraph, StreamNode, stream_node};
27
28use super::PrettySerde;
29use crate::TableCatalog;
30use crate::catalog::TableId;
31
32pub fn explain_stream_graph(
34 graph: &StreamFragmentGraph,
35 table: Option<Table>,
36 is_verbose: bool,
37) -> String {
38 let mut output = String::with_capacity(2048);
39 let mut config = PrettyConfig {
40 need_boundaries: false,
41 width: 80,
42 ..Default::default()
43 };
44 let mut fmt = StreamGraphFormatter::new(is_verbose);
45 if let Some(tb) = table {
46 fmt.add_table(&tb);
47 }
48 fmt.explain_graph(graph, &mut config, &mut output);
49 output
50}
51
52pub fn explain_stream_graph_as_dot(
53 sg: &StreamFragmentGraph,
54 table: Option<Table>,
55 is_verbose: bool,
56) -> String {
57 let mut fmt = StreamGraphFormatter::new(is_verbose);
58 if let Some(tb) = table {
59 fmt.add_table(&tb);
60 }
61 let graph = fmt.explain_graph_as_dot(sg);
62 let dot = Dot::new(&graph);
63 dot.to_string()
64}
65
66struct StreamGraphFormatter {
69 edges: HashMap<u64, StreamFragmentEdge>,
71 verbose: bool,
72 tables: BTreeMap<TableId, Table>,
73}
74
75impl StreamGraphFormatter {
76 fn new(verbose: bool) -> Self {
77 StreamGraphFormatter {
78 edges: HashMap::default(),
79 tables: BTreeMap::default(),
80 verbose,
81 }
82 }
83
84 fn add_table(&mut self, tb: &Table) -> TableId {
86 self.tables.insert(tb.id, tb.clone());
87 tb.id
88 }
89
90 fn explain_graph(
91 &mut self,
92 graph: &StreamFragmentGraph,
93 config: &mut PrettyConfig,
94 output: &mut String,
95 ) {
96 self.edges.clear();
97 for edge in &graph.edges {
98 self.edges.insert(edge.link_id, edge.clone());
99 }
100 let mut max_width = 80;
101 for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
102 output.push_str("Fragment ");
103 output.push_str(&fragment.get_fragment_id().to_string());
104 output.push('\n');
105 let width = config.unicode(output, &self.explain_node(fragment.node.as_ref().unwrap()));
106 max_width = max(width, max_width);
107 config.width = max_width;
108 output.push_str("\n\n");
109 }
110 for tb in self.tables.values() {
111 let width = config.unicode(output, &self.explain_table(tb));
112 max_width = max(width, max_width);
113 config.width = max_width;
114 output.push_str("\n\n");
115 }
116 }
117
118 fn explain_graph_as_dot(&mut self, graph: &StreamFragmentGraph) -> Graph<String, String> {
119 self.edges.clear();
120 for edge in &graph.edges {
121 self.edges.insert(edge.link_id, edge.clone());
122 }
123
124 let mut g = Graph::<String, String>::new();
125 let mut nodes = HashMap::new();
126 for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
127 let mut label = String::new();
128 label.push_str("Fragment ");
129 label.push_str(&fragment.get_fragment_id().to_string());
130 label.push('\n');
131 nodes.insert(label.clone(), g.add_node(label.clone()));
132
133 build_graph_from_pretty(
134 &self.explain_node(fragment.node.as_ref().unwrap()),
135 &mut g,
136 &mut nodes,
137 Some(&label),
138 );
139 }
140 for tb in self.tables.values() {
141 build_graph_from_pretty(&self.explain_table(tb), &mut g, &mut nodes, None);
142 }
143 g
144 }
145
146 fn explain_table<'a>(&self, tb: &Table) -> Pretty<'a> {
147 let tb = TableCatalog::from(tb.clone());
148 let columns = tb
149 .columns
150 .iter()
151 .map(|c| {
152 let s = if self.verbose {
153 format!("{}: {}", c.name(), c.data_type())
154 } else {
155 c.name().to_owned()
156 };
157 Pretty::Text(s.into())
158 })
159 .collect();
160 let columns = Pretty::Array(columns);
161 let name = format!("Table {}", tb.id);
162 let mut fields = Vec::with_capacity(5);
163 fields.push(("columns", columns));
164 fields.push((
165 "primary key",
166 Pretty::Array(tb.pk.iter().map(Pretty::debug).collect()),
167 ));
168 fields.push((
169 "value indices",
170 Pretty::Array(tb.value_indices.iter().map(Pretty::debug).collect()),
171 ));
172 fields.push((
173 "distribution key",
174 Pretty::Array(tb.distribution_key.iter().map(Pretty::debug).collect()),
175 ));
176 fields.push((
177 "read pk prefix len hint",
178 Pretty::debug(&tb.read_prefix_len_hint),
179 ));
180 if let Some(vnode_col_idx) = tb.vnode_col_index {
181 fields.push(("vnode column idx", Pretty::debug(&vnode_col_idx)));
182 }
183 if !tb.clean_watermark_indices.is_empty() {
184 fields.push((
185 "clean watermark indices",
186 Pretty::debug(&tb.clean_watermark_indices),
187 ));
188 }
189 Pretty::childless_record(name, fields)
190 }
191
192 fn explain_node<'a>(&mut self, node: &StreamNode) -> Pretty<'a> {
193 let one_line_explain = match node.get_node_body().unwrap() {
194 stream_node::NodeBody::Exchange(_) => {
195 let edge = self.edges.get(&node.operator_id.as_raw_id()).unwrap();
196 let upstream_fragment_id = edge.upstream_id;
197 let dist = edge.dispatch_strategy.as_ref().unwrap();
198 format!(
199 "StreamExchange {} from {}",
200 match dist.r#type() {
201 DispatcherType::Unspecified => unreachable!(),
202 DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices),
203 DispatcherType::Broadcast => "Broadcast".to_owned(),
204 DispatcherType::Simple => "Single".to_owned(),
205 DispatcherType::NoShuffle => "NoShuffle".to_owned(),
206 },
207 upstream_fragment_id
208 )
209 }
210 _ => node.identity.clone(),
211 };
212
213 let mut tables: Vec<(String, TableId)> = Vec::with_capacity(7);
214 let mut node_copy = node.clone();
215
216 stream_graph_visitor::visit_stream_node_tables_inner(
217 &mut node_copy,
218 true,
219 false,
220 |table, table_name| {
221 tables.push((table_name.to_owned(), self.add_table(table)));
222 },
223 );
224
225 let mut fields = Vec::with_capacity(3);
226 if !tables.is_empty() {
227 fields.push((
228 "tables",
229 Pretty::Array(
230 tables
231 .into_iter()
232 .map(|(name, id)| Pretty::Text(format!("{}: {}", name, id).into()))
233 .collect(),
234 ),
235 ));
236 }
237 if self.verbose {
238 fields.push((
239 "output",
240 Pretty::Array(
241 node.fields
242 .iter()
243 .map(|f| Pretty::display(f.get_name()))
244 .collect(),
245 ),
246 ));
247 fields.push((
248 "stream key",
249 Pretty::Array(
250 node.stream_key
251 .iter()
252 .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
253 .collect(),
254 ),
255 ));
256 }
257 let children = node
258 .input
259 .iter()
260 .map(|input| self.explain_node(input))
261 .collect();
262 Pretty::simple_record(one_line_explain, fields, children)
263 }
264}
265
266pub fn build_graph_from_pretty(
267 pretty: &Pretty<'_>,
268 graph: &mut Graph<String, String>,
269 nodes: &mut HashMap<String, NodeIndex>,
270 parent_label: Option<&str>,
271) {
272 if let Pretty::Record(r) = pretty {
273 let mut label = String::new();
274 label.push_str(&r.name);
275 for (k, v) in &r.fields {
276 label.push('\n');
277 label.push_str(k);
278 label.push_str(": ");
279 label.push_str(
280 &serde_json::to_string(&PrettySerde(v.clone(), false))
281 .expect("failed to serialize plan to dot"),
282 );
283 }
284 if !r.fields.is_empty() {
286 label.push('\n');
287 }
288
289 let current_node = *nodes
290 .entry(label.clone())
291 .or_insert_with(|| graph.add_node(label.clone()));
292
293 if let Some(parent_label) = parent_label
294 && let Some(&parent_node) = nodes.get(parent_label)
295 {
296 graph.add_edge(parent_node, current_node, "".to_owned());
297 }
298
299 for child in &r.children {
300 build_graph_from_pretty(child, graph, nodes, Some(&label));
301 }
302 }
303}