risingwave_frontend/utils/
stream_graph_formatter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::{BTreeMap, HashMap};
17
18use itertools::Itertools;
19use petgraph::Graph;
20use petgraph::dot::Dot;
21use petgraph::graph::NodeIndex;
22use pretty_xmlish::{Pretty, PrettyConfig};
23use risingwave_common::util::stream_graph_visitor;
24use risingwave_pb::catalog::Table;
25use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragmentEdge;
26use risingwave_pb::stream_plan::{DispatcherType, StreamFragmentGraph, StreamNode, stream_node};
27
28use super::PrettySerde;
29use crate::TableCatalog;
30
31/// ice: in the future, we may allow configurable width, boundaries, etc.
32pub fn explain_stream_graph(graph: &StreamFragmentGraph, is_verbose: bool) -> String {
33    let mut output = String::with_capacity(2048);
34    let mut config = PrettyConfig {
35        need_boundaries: false,
36        width: 80,
37        ..Default::default()
38    };
39    StreamGraphFormatter::new(is_verbose).explain_graph(graph, &mut config, &mut output);
40    output
41}
42
43pub fn explain_stream_graph_as_dot(sg: &StreamFragmentGraph, is_verbose: bool) -> String {
44    let graph = StreamGraphFormatter::new(is_verbose).explain_graph_as_dot(sg);
45    let dot = Dot::new(&graph);
46    dot.to_string()
47}
48
49/// A formatter to display the final stream plan graph, used for `explain (distsql) create
50/// materialized view ...`
51struct StreamGraphFormatter {
52    /// exchange's `operator_id` -> edge
53    edges: HashMap<u64, StreamFragmentEdge>,
54    verbose: bool,
55    tables: BTreeMap<u32, Table>,
56}
57
58impl StreamGraphFormatter {
59    fn new(verbose: bool) -> Self {
60        StreamGraphFormatter {
61            edges: HashMap::default(),
62            tables: BTreeMap::default(),
63            verbose,
64        }
65    }
66
67    /// collect the table catalog and return the table id
68    fn add_table(&mut self, tb: &Table) -> u32 {
69        self.tables.insert(tb.id, tb.clone());
70        tb.id
71    }
72
73    fn explain_graph(
74        &mut self,
75        graph: &StreamFragmentGraph,
76        config: &mut PrettyConfig,
77        output: &mut String,
78    ) {
79        self.edges.clear();
80        for edge in &graph.edges {
81            self.edges.insert(edge.link_id, edge.clone());
82        }
83        let mut max_width = 80;
84        for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
85            output.push_str("Fragment ");
86            output.push_str(&fragment.get_fragment_id().to_string());
87            output.push('\n');
88            let width = config.unicode(output, &self.explain_node(fragment.node.as_ref().unwrap()));
89            max_width = max(width, max_width);
90            config.width = max_width;
91            output.push_str("\n\n");
92        }
93        for tb in self.tables.values() {
94            let width = config.unicode(output, &self.explain_table(tb));
95            max_width = max(width, max_width);
96            config.width = max_width;
97            output.push_str("\n\n");
98        }
99    }
100
101    fn explain_graph_as_dot(&mut self, graph: &StreamFragmentGraph) -> Graph<String, String> {
102        self.edges.clear();
103        for edge in &graph.edges {
104            self.edges.insert(edge.link_id, edge.clone());
105        }
106
107        let mut g = Graph::<String, String>::new();
108        let mut nodes = HashMap::new();
109        for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) {
110            let mut label = String::new();
111            label.push_str("Fragment ");
112            label.push_str(&fragment.get_fragment_id().to_string());
113            label.push('\n');
114            nodes.insert(label.clone(), g.add_node(label.clone()));
115
116            build_graph_from_pretty(
117                &self.explain_node(fragment.node.as_ref().unwrap()),
118                &mut g,
119                &mut nodes,
120                Some(&label),
121            );
122        }
123        for tb in self.tables.values() {
124            build_graph_from_pretty(&self.explain_table(tb), &mut g, &mut nodes, None);
125        }
126        g
127    }
128
129    fn explain_table<'a>(&self, tb: &Table) -> Pretty<'a> {
130        let tb = TableCatalog::from(tb.clone());
131        let columns = tb
132            .columns
133            .iter()
134            .map(|c| {
135                let s = if self.verbose {
136                    format!("{}: {}", c.name(), c.data_type())
137                } else {
138                    c.name().to_owned()
139                };
140                Pretty::Text(s.into())
141            })
142            .collect();
143        let columns = Pretty::Array(columns);
144        let name = format!("Table {}", tb.id);
145        let mut fields = Vec::with_capacity(5);
146        fields.push(("columns", columns));
147        fields.push((
148            "primary key",
149            Pretty::Array(tb.pk.iter().map(Pretty::debug).collect()),
150        ));
151        fields.push((
152            "value indices",
153            Pretty::Array(tb.value_indices.iter().map(Pretty::debug).collect()),
154        ));
155        fields.push((
156            "distribution key",
157            Pretty::Array(tb.distribution_key.iter().map(Pretty::debug).collect()),
158        ));
159        fields.push((
160            "read pk prefix len hint",
161            Pretty::debug(&tb.read_prefix_len_hint),
162        ));
163        if let Some(vnode_col_idx) = tb.vnode_col_index {
164            fields.push(("vnode column idx", Pretty::debug(&vnode_col_idx)));
165        }
166        Pretty::childless_record(name, fields)
167    }
168
169    fn explain_node<'a>(&mut self, node: &StreamNode) -> Pretty<'a> {
170        let one_line_explain = match node.get_node_body().unwrap() {
171            stream_node::NodeBody::Exchange(_) => {
172                let edge = self.edges.get(&node.operator_id).unwrap();
173                let upstream_fragment_id = edge.upstream_id;
174                let dist = edge.dispatch_strategy.as_ref().unwrap();
175                format!(
176                    "StreamExchange {} from {}",
177                    match dist.r#type() {
178                        DispatcherType::Unspecified => unreachable!(),
179                        DispatcherType::Hash => format!("Hash({:?})", dist.dist_key_indices),
180                        DispatcherType::Broadcast => "Broadcast".to_owned(),
181                        DispatcherType::Simple => "Single".to_owned(),
182                        DispatcherType::NoShuffle => "NoShuffle".to_owned(),
183                    },
184                    upstream_fragment_id
185                )
186            }
187            _ => node.identity.clone(),
188        };
189
190        let mut tables: Vec<(String, u32)> = Vec::with_capacity(7);
191        let mut node_copy = node.clone();
192
193        stream_graph_visitor::visit_stream_node_tables_inner(
194            &mut node_copy,
195            false,
196            false,
197            |table, table_name| {
198                tables.push((table_name.to_owned(), self.add_table(table)));
199            },
200        );
201
202        let mut fields = Vec::with_capacity(3);
203        if !tables.is_empty() {
204            fields.push((
205                "tables",
206                Pretty::Array(
207                    tables
208                        .into_iter()
209                        .map(|(name, id)| Pretty::Text(format!("{}: {}", name, id).into()))
210                        .collect(),
211                ),
212            ));
213        }
214        if self.verbose {
215            fields.push((
216                "output",
217                Pretty::Array(
218                    node.fields
219                        .iter()
220                        .map(|f| Pretty::display(f.get_name()))
221                        .collect(),
222                ),
223            ));
224            fields.push((
225                "stream key",
226                Pretty::Array(
227                    node.stream_key
228                        .iter()
229                        .map(|i| Pretty::display(node.fields[*i as usize].get_name()))
230                        .collect(),
231                ),
232            ));
233        }
234        let children = node
235            .input
236            .iter()
237            .map(|input| self.explain_node(input))
238            .collect();
239        Pretty::simple_record(one_line_explain, fields, children)
240    }
241}
242
243pub fn build_graph_from_pretty(
244    pretty: &Pretty<'_>,
245    graph: &mut Graph<String, String>,
246    nodes: &mut HashMap<String, NodeIndex>,
247    parent_label: Option<&str>,
248) {
249    if let Pretty::Record(r) = pretty {
250        let mut label = String::new();
251        label.push_str(&r.name);
252        for (k, v) in &r.fields {
253            label.push('\n');
254            label.push_str(k);
255            label.push_str(": ");
256            label.push_str(
257                &serde_json::to_string(&PrettySerde(v.clone(), false))
258                    .expect("failed to serialize plan to dot"),
259            );
260        }
261        // output alignment.
262        if !r.fields.is_empty() {
263            label.push('\n');
264        }
265
266        let current_node = *nodes
267            .entry(label.clone())
268            .or_insert_with(|| graph.add_node(label.clone()));
269
270        if let Some(parent_label) = parent_label {
271            if let Some(&parent_node) = nodes.get(parent_label) {
272                graph.add_edge(parent_node, current_node, "".to_owned());
273            }
274        }
275
276        for child in &r.children {
277            build_graph_from_pretty(child, graph, nodes, Some(&label));
278        }
279    }
280}