risingwave_frontend/utils/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod pretty_serde;
16pub use pretty_serde::PrettySerde;
17mod column_index_mapping;
18use std::any::Any;
19use std::hash::{Hash, Hasher};
20use std::sync::LazyLock;
21
22pub use column_index_mapping::*;
23mod condition;
24pub mod data_type;
25pub use condition::*;
26mod connected_components;
27pub(crate) use connected_components::*;
28mod stream_graph_formatter;
29pub use stream_graph_formatter::*;
30mod with_options;
31use tokio::runtime::Runtime;
32pub use with_options::*;
33mod rewrite_index;
34pub use rewrite_index::*;
35mod index_set;
36pub use index_set::*;
37pub(crate) mod group_by;
38pub mod overwrite_options;
39pub use group_by::*;
40pub use overwrite_options::*;
41
42use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef};
43
44pub static FRONTEND_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
45    tokio::runtime::Builder::new_multi_thread()
46        .thread_name("rw-frontend")
47        .enable_all()
48        .build()
49        .expect("failed to build frontend runtime")
50});
51
52/// Substitute `InputRef` with corresponding `ExprImpl`.
53pub struct Substitute {
54    pub mapping: Vec<ExprImpl>,
55}
56
57impl ExprRewriter for Substitute {
58    fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
59        assert!(
60            input_ref
61                .return_type()
62                .equals_datatype(&self.mapping[input_ref.index()].return_type()),
63            "Type mismatch when substituting {:?} of {:?} with {:?} of {:?}",
64            input_ref,
65            input_ref.return_type(),
66            self.mapping[input_ref.index()],
67            self.mapping[input_ref.index()].return_type()
68        );
69        self.mapping[input_ref.index()].clone()
70    }
71}
72
73// Traits for easy manipulation of recursive structures
74
75/// A `Layer` is a container with subcomponents of type `Sub`.
76/// We usually use `Layer` to represents one layer of a tree-like structure,
77/// where the subcomponents are the recursive subtrees.
78/// But in general, the subcomponent can be of different type than the `Layer`.
79/// Such structural relation between `Sub` and `Layer`
80/// allows us to lift transformation on `Sub` to that on `Layer.`
81/// A related and even more general notion is `Functor`,
82/// which might also be helpful to define in the future.
83pub trait Layer: Sized {
84    type Sub;
85
86    /// Given a transformation `f : Sub -> Sub`,
87    /// we can derive a transformation on the entire `Layer` by acting `f` on all subcomponents.
88    fn map<F>(self, f: F) -> Self
89    where
90        F: FnMut(Self::Sub) -> Self::Sub;
91
92    /// Given a traversal `f : Sub -> ()`,
93    /// we can derive a traversal on the entire `Layer`
94    /// by sequentially visiting the subcomponents with `f`.
95    fn descent<F>(&self, f: F)
96    where
97        F: FnMut(&Self::Sub);
98}
99
100/// A tree-like structure is a `Layer` where the subcomponents are recursively trees.
101pub trait Tree = Layer<Sub = Self>;
102
103/// Given a tree-like structure `T`,
104/// we usually can specify a transformation `T -> T`
105/// by providing a pre-order transformation `pre : T -> T`
106/// and a post-order transformation `post : T -> T`.
107/// Specifically, the derived transformation `apply : T -> T` first applies `pre`,
108/// then maps itself over the subtrees, and finally applies `post`.
109/// This allows us to obtain a global transformation acting recursively on all levels
110/// by specifying simpler transformations at acts locally.
111pub trait Endo<T: Tree> {
112    fn pre(&mut self, t: T) -> T {
113        t
114    }
115
116    fn post(&mut self, t: T) -> T {
117        t
118    }
119
120    /// The real application function is left undefined.
121    /// If we want the derived transformation
122    /// we can simply call `tree_apply` in the implementation.
123    /// But for more complicated requirements,
124    /// e.g. skipping over certain subtrees, custom logic can be added.
125    fn apply(&mut self, t: T) -> T;
126
127    /// The derived transformation based on `pre` and `post`.
128    fn tree_apply(&mut self, t: T) -> T {
129        let t = self.pre(t).map(|s| self.apply(s));
130        self.post(t)
131    }
132}
133
134/// A similar trait to generate traversal over tree-like structure.
135/// See `Endo` for more details.
136#[allow(unused_variables)]
137pub trait Visit<T: Tree> {
138    fn pre(&mut self, t: &T) {}
139
140    fn post(&mut self, t: &T) {}
141
142    fn visit(&mut self, t: &T);
143
144    fn tree_visit(&mut self, t: &T) {
145        self.pre(t);
146        t.descent(|i| self.visit(i));
147        self.post(t);
148    }
149}
150
151// Workaround object safety rules for Eq and Hash, adopted from
152// https://github.com/bevyengine/bevy/blob/f7fbfaf9c72035e98c6b6cec0c7d26ff9f5b1c82/crates/bevy_utils/src/label.rs
153
154/// An object safe version of [`Eq`]. This trait is automatically implemented
155/// for any `'static` type that implements `Eq`.
156pub trait DynEq: Any {
157    fn as_any(&self) -> &dyn Any;
158    fn dyn_eq(&self, other: &dyn DynEq) -> bool;
159}
160
161impl<T: Any + Eq> DynEq for T {
162    fn as_any(&self) -> &dyn Any {
163        self
164    }
165
166    fn dyn_eq(&self, other: &dyn DynEq) -> bool {
167        let other = other.as_any().downcast_ref::<T>();
168        other.is_some_and(|other| self == other)
169    }
170}
171
172impl PartialEq<dyn DynEq + 'static> for dyn DynEq {
173    fn eq(&self, other: &Self) -> bool {
174        self.dyn_eq(other)
175    }
176}
177
178impl Eq for dyn DynEq {
179    fn assert_receiver_is_total_eq(&self) {}
180}
181
182/// An object safe version of [`Hash`]. This trait is automatically implemented
183/// for any `'static` type that implements `Hash`.
184pub trait DynHash: DynEq {
185    fn as_dyn_eq(&self) -> &dyn DynEq;
186    fn dyn_hash(&self, state: &mut dyn Hasher);
187}
188
189impl<T: DynEq + Hash> DynHash for T {
190    fn as_dyn_eq(&self) -> &dyn DynEq {
191        self
192    }
193
194    fn dyn_hash(&self, mut state: &mut dyn Hasher) {
195        T::hash(self, &mut state);
196        self.type_id().hash(&mut state);
197    }
198}
199
200impl Hash for dyn DynHash {
201    fn hash<H: Hasher>(&self, state: &mut H) {
202        self.dyn_hash(state);
203    }
204}
205
206pub fn ordinal(i: usize) -> String {
207    let s = i.to_string();
208    let suffix = if s.ends_with('1') && !s.ends_with("11") {
209        "st"
210    } else if s.ends_with('2') && !s.ends_with("12") {
211        "nd"
212    } else if s.ends_with('3') && !s.ends_with("13") {
213        "rd"
214    } else {
215        "th"
216    };
217    s + suffix
218}