risingwave_frontend/utils/
stream_graph_formatter.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::{BTreeMap, HashMap};
17
18use itertools::Itertools;
19use petgraph::Graph;
20use petgraph::dot::Dot;
21use petgraph::graph::NodeIndex;
22use pretty_xmlish::{Pretty, PrettyConfig};
23use risingwave_common::util::stream_graph_visitor;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragmentEdge;
26use risingwave_pb::stream_plan::{DispatcherType, StreamFragmentGraph, StreamNode, stream_node};
27
28use super::PrettySerde;
29use crate::TableCatalog;
30use crate::catalog::TableId;
31
32/// ice: in the future, we may allow configurable width, boundaries, etc.
33pub fn explain_stream_graph(
34    graph: &StreamFragmentGraph,
35    table: Option<Table>,
36    is_verbose: bool,
37) -> String {
38    let mut output = String::with_capacity(2048);
39    let mut config = PrettyConfig {
40        need_boundaries: false,
41        width: 80,
42        ..Default::default()
43    };
44    let mut fmt = StreamGraphFormatter::new(is_verbose);
45    if let Some(tb) = table {
46        fmt.add_table(&tb);
47    }
48    fmt.explain_graph(graph, &mut config, &mut output);
49    output
50}
51
52pub fn explain_stream_graph_as_dot(
53    sg: &StreamFragmentGraph,
54    table: Option<Table>,
55    is_verbose: bool,
56) -> String {
57    let mut fmt = StreamGraphFormatter::new(is_verbose);
58    if let Some(tb) = table {
59        fmt.add_table(&tb);
60    }
61    let graph = fmt.explain_graph_as_dot(sg);
62    let dot = Dot::new(&graph);
63    dot.to_string()
64}
65
66/// A formatter to display the final stream plan graph, used for `explain (distsql) create
67/// materialized view ...`
68struct StreamGraphFormatter {
69    /// exchange's `operator_id` -> edge
70    edges: HashMap<u64, StreamFragmentEdge>,
71    verbose: bool,
72    tables: BTreeMap<TableId, Table>,
73}
74
75impl StreamGraphFormatter {
76    fn new(verbose: bool) -> Self {
77        StreamGraphFormatter {
78            edges: HashMap::default(),
79            tables: BTreeMap::default(),
80            verbose,
81        }
82    }
83
84    /// collect the table catalog and return the table id
85    fn add_table(&mut self, tb: &Table) -> TableId {
86        self.tables.insert(tb.id, tb.clone());
87        tb.id
88    }
89
90    fn explain_graph(
91        &mut self,
92        graph: &StreamFragmentGraph,
93        config: &mut PrettyConfig,
94        output: &mut String,
95    ) {
96        self.edges.clear();
97        for edge in &graph.edges {
98            self.edges.insert(edge.link_id, edge.clone());
99        }
100        let mut max_width = 80;
101        for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
102            output.push_str("Fragment ");
103            output.push_str(&fragment.get_fragment_id().to_string());
104            output.push('\n');
105            let width = config.unicode(output, &self.explain_node(fragment.node.as_ref().unwrap()));
106            max_width = max(width, max_width);
107            config.width = max_width;
108            output.push_str("\n\n");
109        }
110        for tb in self.tables.values() {
111            let width = config.unicode(output, &self.explain_table(tb));
112            max_width = max(width, max_width);
113            config.width = max_width;
114            output.push_str("\n\n");
115        }
116    }
117
118    fn explain_graph_as_dot(&mut self, graph: &StreamFragmentGraph) -> Graph<String, String> {
119        self.edges.clear();
120        for edge in &graph.edges {
121            self.edges.insert(edge.link_id, edge.clone());
122        }
123
124        let mut g = Graph::<String, String>::new();
125        let mut nodes = HashMap::new();
126        for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
127            let mut label = String::new();
128            label.push_str("Fragment ");
129            label.push_str(&fragment.get_fragment_id().to_string());
130            label.push('\n');
131            nodes.insert(label.clone(), g.add_node(label.clone()));
132
133            build_graph_from_pretty(
134                &self.explain_node(fragment.node.as_ref().unwrap()),
135                &mut g,
136                &mut nodes,
137                Some(&label),
138            );
139        }
140        for tb in self.tables.values() {
141            build_graph_from_pretty(&self.explain_table(tb), &mut g, &mut nodes, None);
142        }
143        g
144    }
145
146    fn explain_table<'a>(&self, tb: &Table) -> Pretty<'a> {
147        let tb = TableCatalog::from(tb.clone());
148        let columns = tb
149            .columns
150            .iter()
151            .map(|c| {
152                let s = if self.verbose {
153                    format!("{}: {}", c.name(), c.data_type())
154                } else {
155                    c.name().to_owned()
156                };
157                Pretty::Text(s.into())
158            })
159            .collect();
160        let columns = Pretty::Array(columns);
161        let name = format!("Table {}", tb.id);
162        let mut fields = Vec::with_capacity(5);
163        fields.push(("columns", columns));
164        fields.push((
165            "primary key",
166            Pretty::Array(tb.pk.iter().map(Pretty::debug).collect()),
167        ));
168        fields.push((
169            "value indices",
170            Pretty::Array(tb.value_indices.iter().map(Pretty::debug).collect()),
171        ));
172        fields.push((
173            "distribution key",
174            Pretty::Array(tb.distribution_key.iter().map(Pretty::debug).collect()),
175        ));
176        fields.push((
177            "read pk prefix len hint",
178            Pretty::debug(&tb.read_prefix_len_hint),
179        ));
180        if let Some(vnode_col_idx) = tb.vnode_col_index {
181            fields.push(("vnode column idx", Pretty::debug(&vnode_col_idx)));
182        }
183        if !tb.clean_watermark_indices.is_empty() {
184            fields.push((
185                "clean watermark indices",
186                Pretty::debug(&tb.clean_watermark_indices),
187            ));
188        }
189        Pretty::childless_record(name, fields)
190    }
191
192    fn explain_node<'a>(&mut self, node: &StreamNode) -> Pretty<'a> {
193        let one_line_explain = match node.get_node_body().unwrap() {
194            stream_node::NodeBody::Exchange(_) => {
195                let edge = self.edges.get(&node.operator_id.as_raw_id()).unwrap();
196                let upstream_fragment_id = edge.upstream_id;
197                let dist = edge.dispatch_strategy.as_ref().unwrap();
198                format!(
199                    "StreamExchange {} from {}",
200                    match dist.r#type() {
201                        DispatcherType::Unspecified => unreachable!(),
202                        DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices),
203                        DispatcherType::Broadcast => "Broadcast".to_owned(),
204                        DispatcherType::Simple => "Single".to_owned(),
205                        DispatcherType::NoShuffle => "NoShuffle".to_owned(),
206                    },
207                    upstream_fragment_id
208                )
209            }
210            _ => node.identity.clone(),
211        };
212
213        let mut tables: Vec<(String, TableId)> = Vec::with_capacity(7);
214        let mut node_copy = node.clone();
215
216        stream_graph_visitor::visit_stream_node_tables_inner(
217            &mut node_copy,
218            true,
219            false,
220            |table, table_name| {
221                tables.push((table_name.to_owned(), self.add_table(table)));
222            },
223        );
224
225        let mut fields = Vec::with_capacity(3);
226        if !tables.is_empty() {
227            fields.push((
228                "tables",
229                Pretty::Array(
230                    tables
231                        .into_iter()
232                        .map(|(name, id)| Pretty::Text(format!("{}: {}", name, id).into()))
233                        .collect(),
234                ),
235            ));
236        }
237        if self.verbose {
238            fields.push((
239                "output",
240                Pretty::Array(
241                    node.fields
242                        .iter()
243                        .map(|f| Pretty::display(f.get_name()))
244                        .collect(),
245                ),
246            ));
247            fields.push((
248                "stream key",
249                Pretty::Array(
250                    node.stream_key
251                        .iter()
252                        .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
253                        .collect(),
254                ),
255            ));
256        }
257        let children = node
258            .input
259            .iter()
260            .map(|input| self.explain_node(input))
261            .collect();
262        Pretty::simple_record(one_line_explain, fields, children)
263    }
264}
265
266pub fn build_graph_from_pretty(
267    pretty: &Pretty<'_>,
268    graph: &mut Graph<String, String>,
269    nodes: &mut HashMap<String, NodeIndex>,
270    parent_label: Option<&str>,
271) {
272    if let Pretty::Record(r) = pretty {
273        let mut label = String::new();
274        label.push_str(&r.name);
275        for (k, v) in &r.fields {
276            label.push('\n');
277            label.push_str(k);
278            label.push_str(": ");
279            label.push_str(
280                &serde_json::to_string(&PrettySerde(v.clone(), false))
281                    .expect("failed to serialize plan to dot"),
282            );
283        }
284        // output alignment.
285        if !r.fields.is_empty() {
286            label.push('\n');
287        }
288
289        let current_node = *nodes
290            .entry(label.clone())
291            .or_insert_with(|| graph.add_node(label.clone()));
292
293        if let Some(parent_label) = parent_label
294            && let Some(&parent_node) = nodes.get(parent_label)
295        {
296            graph.add_edge(parent_node, current_node, "".to_owned());
297        }
298
299        for child in &r.children {
300            build_graph_from_pretty(child, graph, nodes, Some(&label));
301        }
302    }
303}