risingwave_frontend/optimizer/
optimizer_context.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Formatter;
16use std::cell::{Cell, RefCell, RefMut};
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType};
23
24use super::property::WatermarkGroupId;
25use crate::PlanRef;
26use crate::binder::ShareId;
27use crate::expr::{CorrelatedId, SessionTimezone};
28use crate::handler::HandlerArgs;
29use crate::optimizer::plan_node::PlanNodeId;
30use crate::session::SessionImpl;
31use crate::utils::{OverwriteOptions, WithOptions};
32
33const RESERVED_ID_NUM: u16 = 10000;
34
35type PhantomUnsend = PhantomData<Rc<()>>;
36
37pub struct OptimizerContext {
38    session_ctx: Arc<SessionImpl>,
39    /// The original SQL string, used for debugging.
40    sql: Arc<str>,
41    /// Normalized SQL string. See [`HandlerArgs::normalize_sql`].
42    normalized_sql: String,
43    /// Explain options
44    explain_options: ExplainOptions,
45    /// Store the trace of optimizer
46    optimizer_trace: RefCell<Vec<String>>,
47    /// Store the optimized logical plan of optimizer
48    logical_explain: RefCell<Option<String>>,
49    /// Store options or properties from the `with` clause
50    with_options: WithOptions,
51    /// Store the Session Timezone and whether it was used.
52    session_timezone: RefCell<SessionTimezone>,
53    /// Total number of optimization rules have been applied.
54    total_rule_applied: RefCell<usize>,
55    /// Store the configs can be overwritten in with clause
56    /// if not specified, use the value from session variable.
57    overwrite_options: OverwriteOptions,
58    /// Store the mapping between `share_id` and the corresponding
59    /// `PlanRef`, used by rcte's planning. (e.g., in `LogicalCteRef`)
60    rcte_cache: RefCell<HashMap<ShareId, PlanRef>>,
61
62    /// Last assigned plan node ID.
63    last_plan_node_id: Cell<i32>,
64    /// Last assigned correlated ID.
65    last_correlated_id: Cell<u32>,
66    /// Last assigned expr display ID.
67    last_expr_display_id: Cell<usize>,
68    /// Last assigned watermark group ID.
69    last_watermark_group_id: Cell<u32>,
70
71    _phantom: PhantomUnsend,
72}
73
74pub(in crate::optimizer) struct LastAssignedIds {
75    last_plan_node_id: i32,
76    last_correlated_id: u32,
77    last_expr_display_id: usize,
78    last_watermark_group_id: u32,
79}
80
81pub type OptimizerContextRef = Rc<OptimizerContext>;
82
83impl OptimizerContext {
84    /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`], with empty
85    /// [`ExplainOptions`].
86    pub fn from_handler_args(handler_args: HandlerArgs) -> Self {
87        Self::new(handler_args, ExplainOptions::default())
88    }
89
90    /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`] and [`ExplainOptions`].
91    pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
92        let session_timezone = RefCell::new(SessionTimezone::new(
93            handler_args.session.config().timezone().to_owned(),
94        ));
95        let overwrite_options = OverwriteOptions::new(&mut handler_args);
96        Self {
97            session_ctx: handler_args.session,
98            sql: handler_args.sql,
99            normalized_sql: handler_args.normalized_sql,
100            explain_options,
101            optimizer_trace: RefCell::new(vec![]),
102            logical_explain: RefCell::new(None),
103            with_options: handler_args.with_options,
104            session_timezone,
105            total_rule_applied: RefCell::new(0),
106            overwrite_options,
107            rcte_cache: RefCell::new(HashMap::new()),
108
109            last_plan_node_id: Cell::new(RESERVED_ID_NUM.into()),
110            last_correlated_id: Cell::new(0),
111            last_expr_display_id: Cell::new(RESERVED_ID_NUM.into()),
112            last_watermark_group_id: Cell::new(RESERVED_ID_NUM.into()),
113
114            _phantom: Default::default(),
115        }
116    }
117
118    // TODO(TaoWu): Remove the async.
119    #[cfg(test)]
120    #[expect(clippy::unused_async)]
121    pub async fn mock() -> OptimizerContextRef {
122        Self {
123            session_ctx: Arc::new(SessionImpl::mock()),
124            sql: Arc::from(""),
125            normalized_sql: "".to_owned(),
126            explain_options: ExplainOptions::default(),
127            optimizer_trace: RefCell::new(vec![]),
128            logical_explain: RefCell::new(None),
129            with_options: Default::default(),
130            session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
131            total_rule_applied: RefCell::new(0),
132            overwrite_options: OverwriteOptions::default(),
133            rcte_cache: RefCell::new(HashMap::new()),
134
135            last_plan_node_id: Cell::new(0),
136            last_correlated_id: Cell::new(0),
137            last_expr_display_id: Cell::new(0),
138            last_watermark_group_id: Cell::new(0),
139
140            _phantom: Default::default(),
141        }
142        .into()
143    }
144
145    pub fn next_plan_node_id(&self) -> PlanNodeId {
146        PlanNodeId(self.last_plan_node_id.update(|id| id + 1))
147    }
148
149    pub fn next_correlated_id(&self) -> CorrelatedId {
150        self.last_correlated_id.update(|id| id + 1)
151    }
152
153    pub fn next_expr_display_id(&self) -> usize {
154        self.last_expr_display_id.update(|id| id + 1)
155    }
156
157    pub fn next_watermark_group_id(&self) -> WatermarkGroupId {
158        self.last_watermark_group_id.update(|id| id + 1)
159    }
160
161    pub(in crate::optimizer) fn backup_elem_ids(&self) -> LastAssignedIds {
162        LastAssignedIds {
163            last_plan_node_id: self.last_plan_node_id.get(),
164            last_correlated_id: self.last_correlated_id.get(),
165            last_expr_display_id: self.last_expr_display_id.get(),
166            last_watermark_group_id: self.last_watermark_group_id.get(),
167        }
168    }
169
170    /// This should only be called in [`crate::optimizer::plan_node::reorganize_elements_id`].
171    pub(in crate::optimizer) fn reset_elem_ids(&self) {
172        self.last_plan_node_id.set(0);
173        self.last_correlated_id.set(0);
174        self.last_expr_display_id.set(0);
175        self.last_watermark_group_id.set(0);
176    }
177
178    pub(in crate::optimizer) fn restore_elem_ids(&self, backup: LastAssignedIds) {
179        self.last_plan_node_id.set(backup.last_plan_node_id);
180        self.last_correlated_id.set(backup.last_correlated_id);
181        self.last_expr_display_id.set(backup.last_expr_display_id);
182        self.last_watermark_group_id
183            .set(backup.last_watermark_group_id);
184    }
185
186    pub fn add_rule_applied(&self, num: usize) {
187        *self.total_rule_applied.borrow_mut() += num;
188    }
189
190    pub fn total_rule_applied(&self) -> usize {
191        *self.total_rule_applied.borrow()
192    }
193
194    pub fn is_explain_verbose(&self) -> bool {
195        self.explain_options.verbose
196    }
197
198    pub fn is_explain_trace(&self) -> bool {
199        self.explain_options.trace
200    }
201
202    pub fn explain_type(&self) -> ExplainType {
203        self.explain_options.explain_type.clone()
204    }
205
206    pub fn explain_format(&self) -> ExplainFormat {
207        self.explain_options.explain_format.clone()
208    }
209
210    pub fn is_explain_logical(&self) -> bool {
211        self.explain_type() == ExplainType::Logical
212    }
213
214    pub fn trace(&self, str: impl Into<String>) {
215        // If explain type is logical, do not store the trace for any optimizations beyond logical.
216        if self.is_explain_logical() && self.logical_explain.borrow().is_some() {
217            return;
218        }
219        let mut optimizer_trace = self.optimizer_trace.borrow_mut();
220        let string = str.into();
221        tracing::trace!(target: "explain_trace", "\n{}", string);
222        optimizer_trace.push(string);
223        optimizer_trace.push("\n".to_owned());
224    }
225
226    pub fn warn_to_user(&self, str: impl Into<String>) {
227        self.session_ctx().notice_to_user(str);
228    }
229
230    pub fn store_logical(&self, str: impl Into<String>) {
231        *self.logical_explain.borrow_mut() = Some(str.into())
232    }
233
234    pub fn take_logical(&self) -> Option<String> {
235        self.logical_explain.borrow_mut().take()
236    }
237
238    pub fn take_trace(&self) -> Vec<String> {
239        self.optimizer_trace.borrow_mut().drain(..).collect()
240    }
241
242    pub fn with_options(&self) -> &WithOptions {
243        &self.with_options
244    }
245
246    pub fn overwrite_options(&self) -> &OverwriteOptions {
247        &self.overwrite_options
248    }
249
250    pub fn session_ctx(&self) -> &Arc<SessionImpl> {
251        &self.session_ctx
252    }
253
254    /// Return the original SQL.
255    pub fn sql(&self) -> &str {
256        &self.sql
257    }
258
259    /// Return the normalized SQL.
260    pub fn normalized_sql(&self) -> &str {
261        &self.normalized_sql
262    }
263
264    pub fn session_timezone(&self) -> RefMut<'_, SessionTimezone> {
265        self.session_timezone.borrow_mut()
266    }
267
268    pub fn get_session_timezone(&self) -> String {
269        self.session_timezone.borrow().timezone()
270    }
271
272    pub fn get_rcte_cache_plan(&self, id: &ShareId) -> Option<PlanRef> {
273        self.rcte_cache.borrow().get(id).cloned()
274    }
275
276    pub fn insert_rcte_cache_plan(&self, id: ShareId, plan: PlanRef) {
277        self.rcte_cache.borrow_mut().insert(id, plan);
278    }
279}
280
281impl std::fmt::Debug for OptimizerContext {
282    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
283        write!(
284            f,
285            "QueryContext {{ sql = {}, explain_options = {}, with_options = {:?}, last_plan_node_id = {}, last_correlated_id = {} }}",
286            self.sql,
287            self.explain_options,
288            self.with_options,
289            self.last_plan_node_id.get(),
290            self.last_correlated_id.get(),
291        )
292    }
293}