risingwave_frontend/utils/
stream_graph_formatter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::{BTreeMap, HashMap};
17
18use itertools::Itertools;
19use petgraph::Graph;
20use petgraph::dot::Dot;
21use petgraph::graph::NodeIndex;
22use pretty_xmlish::{Pretty, PrettyConfig};
23use risingwave_common::util::stream_graph_visitor;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragmentEdge;
26use risingwave_pb::stream_plan::{DispatcherType, StreamFragmentGraph, StreamNode, stream_node};
27
28use super::PrettySerde;
29use crate::TableCatalog;
30
31/// ice: in the future, we may allow configurable width, boundaries, etc.
32pub fn explain_stream_graph(
33    graph: &StreamFragmentGraph,
34    table: Option<Table>,
35    is_verbose: bool,
36) -> String {
37    let mut output = String::with_capacity(2048);
38    let mut config = PrettyConfig {
39        need_boundaries: false,
40        width: 80,
41        ..Default::default()
42    };
43    let mut fmt = StreamGraphFormatter::new(is_verbose);
44    if let Some(tb) = table {
45        fmt.add_table(&tb);
46    }
47    fmt.explain_graph(graph, &mut config, &mut output);
48    output
49}
50
51pub fn explain_stream_graph_as_dot(
52    sg: &StreamFragmentGraph,
53    table: Option<Table>,
54    is_verbose: bool,
55) -> String {
56    let mut fmt = StreamGraphFormatter::new(is_verbose);
57    if let Some(tb) = table {
58        fmt.add_table(&tb);
59    }
60    let graph = fmt.explain_graph_as_dot(sg);
61    let dot = Dot::new(&graph);
62    dot.to_string()
63}
64
65/// A formatter to display the final stream plan graph, used for `explain (distsql) create
66/// materialized view ...`
67struct StreamGraphFormatter {
68    /// exchange's `operator_id` -> edge
69    edges: HashMap<u64, StreamFragmentEdge>,
70    verbose: bool,
71    tables: BTreeMap<u32, Table>,
72}
73
74impl StreamGraphFormatter {
75    fn new(verbose: bool) -> Self {
76        StreamGraphFormatter {
77            edges: HashMap::default(),
78            tables: BTreeMap::default(),
79            verbose,
80        }
81    }
82
83    /// collect the table catalog and return the table id
84    fn add_table(&mut self, tb: &Table) -> u32 {
85        self.tables.insert(tb.id, tb.clone());
86        tb.id
87    }
88
89    fn explain_graph(
90        &mut self,
91        graph: &StreamFragmentGraph,
92        config: &mut PrettyConfig,
93        output: &mut String,
94    ) {
95        self.edges.clear();
96        for edge in &graph.edges {
97            self.edges.insert(edge.link_id, edge.clone());
98        }
99        let mut max_width = 80;
100        for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
101            output.push_str("Fragment ");
102            output.push_str(&fragment.get_fragment_id().to_string());
103            output.push('\n');
104            let width = config.unicode(output, &self.explain_node(fragment.node.as_ref().unwrap()));
105            max_width = max(width, max_width);
106            config.width = max_width;
107            output.push_str("\n\n");
108        }
109        for tb in self.tables.values() {
110            let width = config.unicode(output, &self.explain_table(tb));
111            max_width = max(width, max_width);
112            config.width = max_width;
113            output.push_str("\n\n");
114        }
115    }
116
117    fn explain_graph_as_dot(&mut self, graph: &StreamFragmentGraph) -> Graph<String, String> {
118        self.edges.clear();
119        for edge in &graph.edges {
120            self.edges.insert(edge.link_id, edge.clone());
121        }
122
123        let mut g = Graph::<String, String>::new();
124        let mut nodes = HashMap::new();
125        for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
126            let mut label = String::new();
127            label.push_str("Fragment ");
128            label.push_str(&fragment.get_fragment_id().to_string());
129            label.push('\n');
130            nodes.insert(label.clone(), g.add_node(label.clone()));
131
132            build_graph_from_pretty(
133                &self.explain_node(fragment.node.as_ref().unwrap()),
134                &mut g,
135                &mut nodes,
136                Some(&label),
137            );
138        }
139        for tb in self.tables.values() {
140            build_graph_from_pretty(&self.explain_table(tb), &mut g, &mut nodes, None);
141        }
142        g
143    }
144
145    fn explain_table<'a>(&self, tb: &Table) -> Pretty<'a> {
146        let tb = TableCatalog::from(tb.clone());
147        let columns = tb
148            .columns
149            .iter()
150            .map(|c| {
151                let s = if self.verbose {
152                    format!("{}: {}", c.name(), c.data_type())
153                } else {
154                    c.name().to_owned()
155                };
156                Pretty::Text(s.into())
157            })
158            .collect();
159        let columns = Pretty::Array(columns);
160        let name = format!("Table {}", tb.id);
161        let mut fields = Vec::with_capacity(5);
162        fields.push(("columns", columns));
163        fields.push((
164            "primary key",
165            Pretty::Array(tb.pk.iter().map(Pretty::debug).collect()),
166        ));
167        fields.push((
168            "value indices",
169            Pretty::Array(tb.value_indices.iter().map(Pretty::debug).collect()),
170        ));
171        fields.push((
172            "distribution key",
173            Pretty::Array(tb.distribution_key.iter().map(Pretty::debug).collect()),
174        ));
175        fields.push((
176            "read pk prefix len hint",
177            Pretty::debug(&tb.read_prefix_len_hint),
178        ));
179        if let Some(vnode_col_idx) = tb.vnode_col_index {
180            fields.push(("vnode column idx", Pretty::debug(&vnode_col_idx)));
181        }
182        Pretty::childless_record(name, fields)
183    }
184
185    fn explain_node<'a>(&mut self, node: &StreamNode) -> Pretty<'a> {
186        let one_line_explain = match node.get_node_body().unwrap() {
187            stream_node::NodeBody::Exchange(_) => {
188                let edge = self.edges.get(&node.operator_id).unwrap();
189                let upstream_fragment_id = edge.upstream_id;
190                let dist = edge.dispatch_strategy.as_ref().unwrap();
191                format!(
192                    "StreamExchange {} from {}",
193                    match dist.r#type() {
194                        DispatcherType::Unspecified => unreachable!(),
195                        DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices),
196                        DispatcherType::Broadcast => "Broadcast".to_owned(),
197                        DispatcherType::Simple => "Single".to_owned(),
198                        DispatcherType::NoShuffle => "NoShuffle".to_owned(),
199                    },
200                    upstream_fragment_id
201                )
202            }
203            _ => node.identity.clone(),
204        };
205
206        let mut tables: Vec<(String, u32)> = Vec::with_capacity(7);
207        let mut node_copy = node.clone();
208
209        stream_graph_visitor::visit_stream_node_tables_inner(
210            &mut node_copy,
211            true,
212            false,
213            |table, table_name| {
214                tables.push((table_name.to_owned(), self.add_table(table)));
215            },
216        );
217
218        let mut fields = Vec::with_capacity(3);
219        if !tables.is_empty() {
220            fields.push((
221                "tables",
222                Pretty::Array(
223                    tables
224                        .into_iter()
225                        .map(|(name, id)| Pretty::Text(format!("{}: {}", name, id).into()))
226                        .collect(),
227                ),
228            ));
229        }
230        if self.verbose {
231            fields.push((
232                "output",
233                Pretty::Array(
234                    node.fields
235                        .iter()
236                        .map(|f| Pretty::display(f.get_name()))
237                        .collect(),
238                ),
239            ));
240            fields.push((
241                "stream key",
242                Pretty::Array(
243                    node.stream_key
244                        .iter()
245                        .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
246                        .collect(),
247                ),
248            ));
249        }
250        let children = node
251            .input
252            .iter()
253            .map(|input| self.explain_node(input))
254            .collect();
255        Pretty::simple_record(one_line_explain, fields, children)
256    }
257}
258
259pub fn build_graph_from_pretty(
260    pretty: &Pretty<'_>,
261    graph: &mut Graph<String, String>,
262    nodes: &mut HashMap<String, NodeIndex>,
263    parent_label: Option<&str>,
264) {
265    if let Pretty::Record(r) = pretty {
266        let mut label = String::new();
267        label.push_str(&r.name);
268        for (k, v) in &r.fields {
269            label.push('\n');
270            label.push_str(k);
271            label.push_str(": ");
272            label.push_str(
273                &serde_json::to_string(&PrettySerde(v.clone(), false))
274                    .expect("failed to serialize plan to dot"),
275            );
276        }
277        // output alignment.
278        if !r.fields.is_empty() {
279            label.push('\n');
280        }
281
282        let current_node = *nodes
283            .entry(label.clone())
284            .or_insert_with(|| graph.add_node(label.clone()));
285
286        if let Some(parent_label) = parent_label
287            && let Some(&parent_node) = nodes.get(parent_label)
288        {
289            graph.add_edge(parent_node, current_node, "".to_owned());
290        }
291
292        for child in &r.children {
293            build_graph_from_pretty(child, graph, nodes, Some(&label));
294        }
295    }
296}