risingwave_frontend/optimizer/
optimizer_context.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Formatter;
16use std::cell::{Cell, RefCell, RefMut};
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType};
23
24use super::property::WatermarkGroupId;
25use crate::expr::{CorrelatedId, SessionTimezone};
26use crate::handler::HandlerArgs;
27use crate::optimizer::LogicalPlanRef;
28use crate::optimizer::plan_node::PlanNodeId;
29use crate::session::SessionImpl;
30use crate::utils::{OverwriteOptions, WithOptions};
31use crate::{Explain, TableCatalog};
32
33const RESERVED_ID_NUM: u16 = 10000;
34
35type PhantomUnsend = PhantomData<Rc<()>>;
36
37pub struct OptimizerContext {
38    session_ctx: Arc<SessionImpl>,
39    /// The original SQL string, used for debugging.
40    sql: Arc<str>,
41    /// Normalized SQL string. See [`HandlerArgs::normalize_sql`].
42    normalized_sql: String,
43    /// Explain options
44    explain_options: ExplainOptions,
45    /// Store the trace of optimizer
46    optimizer_trace: RefCell<Vec<String>>,
47    /// Store the optimized logical plan of optimizer
48    logical_explain: RefCell<Option<String>>,
49    /// Store options or properties from the `with` clause
50    with_options: WithOptions,
51    /// Store the Session Timezone and whether it was used.
52    session_timezone: RefCell<SessionTimezone>,
53    /// Total number of optimization rules have been applied.
54    total_rule_applied: RefCell<usize>,
55    /// Store the configs can be overwritten in with clause
56    /// if not specified, use the value from session variable.
57    overwrite_options: OverwriteOptions,
58    /// Mapping from iceberg table identifier to current snapshot id.
59    /// Used to keep same snapshot id when multiple scans from the same iceberg table exist in a query.
60    iceberg_snapshot_id_map: RefCell<HashMap<String, Option<i64>>>,
61    /// Batch materialized view candidates for exact-match rewriting.
62    batch_mview_candidates: RefCell<Vec<MaterializedViewCandidate>>,
63
64    /// Last assigned plan node ID.
65    last_plan_node_id: Cell<i32>,
66    /// Last assigned correlated ID.
67    last_correlated_id: Cell<u32>,
68    /// Last assigned expr display ID.
69    last_expr_display_id: Cell<usize>,
70    /// Last assigned watermark group ID.
71    last_watermark_group_id: Cell<u32>,
72
73    _phantom: PhantomUnsend,
74}
75
76#[derive(Clone, Debug)]
77pub struct MaterializedViewCandidate {
78    pub plan: LogicalPlanRef,
79    pub table: Arc<TableCatalog>,
80}
81
82pub(in crate::optimizer) struct LastAssignedIds {
83    last_plan_node_id: i32,
84    last_correlated_id: u32,
85    last_expr_display_id: usize,
86    last_watermark_group_id: u32,
87}
88
89pub type OptimizerContextRef = Rc<OptimizerContext>;
90
91impl OptimizerContext {
92    /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`], with empty
93    /// [`ExplainOptions`].
94    pub fn from_handler_args(handler_args: HandlerArgs) -> Self {
95        Self::new(handler_args, ExplainOptions::default())
96    }
97
98    /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`] and [`ExplainOptions`].
99    pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
100        let session_timezone = RefCell::new(SessionTimezone::new(
101            handler_args.session.config().timezone(),
102        ));
103        let overwrite_options = OverwriteOptions::new(&mut handler_args);
104        Self {
105            session_ctx: handler_args.session,
106            sql: handler_args.sql,
107            normalized_sql: handler_args.normalized_sql,
108            explain_options,
109            optimizer_trace: RefCell::new(vec![]),
110            logical_explain: RefCell::new(None),
111            with_options: handler_args.with_options,
112            session_timezone,
113            total_rule_applied: RefCell::new(0),
114            overwrite_options,
115            iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
116            batch_mview_candidates: RefCell::new(Vec::new()),
117
118            last_plan_node_id: Cell::new(RESERVED_ID_NUM.into()),
119            last_correlated_id: Cell::new(0),
120            last_expr_display_id: Cell::new(RESERVED_ID_NUM.into()),
121            last_watermark_group_id: Cell::new(RESERVED_ID_NUM.into()),
122
123            _phantom: Default::default(),
124        }
125    }
126
127    // TODO(TaoWu): Remove the async.
128    #[cfg(test)]
129    #[expect(clippy::unused_async)]
130    pub async fn mock() -> OptimizerContextRef {
131        Self {
132            session_ctx: Arc::new(SessionImpl::mock()),
133            sql: Arc::from(""),
134            normalized_sql: "".to_owned(),
135            explain_options: ExplainOptions::default(),
136            optimizer_trace: RefCell::new(vec![]),
137            logical_explain: RefCell::new(None),
138            with_options: Default::default(),
139            session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
140            total_rule_applied: RefCell::new(0),
141            overwrite_options: OverwriteOptions::default(),
142            iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
143            batch_mview_candidates: RefCell::new(Vec::new()),
144
145            last_plan_node_id: Cell::new(0),
146            last_correlated_id: Cell::new(0),
147            last_expr_display_id: Cell::new(0),
148            last_watermark_group_id: Cell::new(0),
149
150            _phantom: Default::default(),
151        }
152        .into()
153    }
154
155    pub fn next_plan_node_id(&self) -> PlanNodeId {
156        self.last_plan_node_id.update(|id| id + 1);
157        PlanNodeId(self.last_plan_node_id.get())
158    }
159
160    pub fn next_correlated_id(&self) -> CorrelatedId {
161        self.last_correlated_id.update(|id| id + 1);
162        self.last_correlated_id.get()
163    }
164
165    pub fn next_expr_display_id(&self) -> usize {
166        self.last_expr_display_id.update(|id| id + 1);
167        self.last_expr_display_id.get()
168    }
169
170    pub fn next_watermark_group_id(&self) -> WatermarkGroupId {
171        self.last_watermark_group_id.update(|id| id + 1);
172        self.last_watermark_group_id.get()
173    }
174
175    pub(in crate::optimizer) fn backup_elem_ids(&self) -> LastAssignedIds {
176        LastAssignedIds {
177            last_plan_node_id: self.last_plan_node_id.get(),
178            last_correlated_id: self.last_correlated_id.get(),
179            last_expr_display_id: self.last_expr_display_id.get(),
180            last_watermark_group_id: self.last_watermark_group_id.get(),
181        }
182    }
183
184    /// This should only be called in [`crate::optimizer::plan_node::reorganize_elements_id`].
185    pub(in crate::optimizer) fn reset_elem_ids(&self) {
186        self.last_plan_node_id.set(0);
187        self.last_correlated_id.set(0);
188        self.last_expr_display_id.set(0);
189        self.last_watermark_group_id.set(0);
190    }
191
192    pub(in crate::optimizer) fn restore_elem_ids(&self, backup: LastAssignedIds) {
193        self.last_plan_node_id.set(backup.last_plan_node_id);
194        self.last_correlated_id.set(backup.last_correlated_id);
195        self.last_expr_display_id.set(backup.last_expr_display_id);
196        self.last_watermark_group_id
197            .set(backup.last_watermark_group_id);
198    }
199
200    pub fn add_rule_applied(&self, num: usize) {
201        *self.total_rule_applied.borrow_mut() += num;
202    }
203
204    pub fn total_rule_applied(&self) -> usize {
205        *self.total_rule_applied.borrow()
206    }
207
208    pub fn is_explain_verbose(&self) -> bool {
209        self.explain_options.verbose
210    }
211
212    pub fn is_explain_trace(&self) -> bool {
213        self.explain_options.trace
214    }
215
216    fn is_explain_logical(&self) -> bool {
217        self.explain_options.explain_type == ExplainType::Logical
218    }
219
220    pub fn trace(&self, str: impl Into<String>) {
221        // If explain type is logical, do not store the trace for any optimizations beyond logical.
222        if self.is_explain_logical() && self.logical_explain.borrow().is_some() {
223            return;
224        }
225        let mut optimizer_trace = self.optimizer_trace.borrow_mut();
226        let string = str.into();
227        tracing::info!(target: "explain_trace", "\n{}", string);
228        optimizer_trace.push(string);
229        optimizer_trace.push("\n".to_owned());
230    }
231
232    pub fn warn_to_user(&self, str: impl Into<String>) {
233        self.session_ctx().notice_to_user(str);
234    }
235
236    fn explain_plan_impl(&self, plan: &impl Explain) -> String {
237        match self.explain_options.explain_format {
238            ExplainFormat::Text => plan.explain_to_string(),
239            ExplainFormat::Json => plan.explain_to_json(),
240            ExplainFormat::Xml => plan.explain_to_xml(),
241            ExplainFormat::Yaml => plan.explain_to_yaml(),
242            ExplainFormat::Dot => plan.explain_to_dot(),
243        }
244    }
245
246    pub fn may_store_explain_logical(&self, plan: &LogicalPlanRef) {
247        if self.is_explain_logical() {
248            let str = self.explain_plan_impl(plan);
249            *self.logical_explain.borrow_mut() = Some(str);
250        }
251    }
252
253    pub fn take_logical(&self) -> Option<String> {
254        self.logical_explain.borrow_mut().take()
255    }
256
257    pub fn take_trace(&self) -> Vec<String> {
258        self.optimizer_trace.borrow_mut().drain(..).collect()
259    }
260
261    pub fn with_options(&self) -> &WithOptions {
262        &self.with_options
263    }
264
265    pub fn overwrite_options(&self) -> &OverwriteOptions {
266        &self.overwrite_options
267    }
268
269    pub fn add_batch_mview_candidate(&self, table: Arc<TableCatalog>, plan: LogicalPlanRef) {
270        self.batch_mview_candidates
271            .borrow_mut()
272            .push(MaterializedViewCandidate { plan, table });
273    }
274
275    pub fn batch_mview_candidates(&self) -> std::cell::Ref<'_, Vec<MaterializedViewCandidate>> {
276        self.batch_mview_candidates.borrow()
277    }
278
279    pub fn session_ctx(&self) -> &Arc<SessionImpl> {
280        &self.session_ctx
281    }
282
283    /// Return the original SQL.
284    pub fn sql(&self) -> &str {
285        &self.sql
286    }
287
288    /// Return the normalized SQL.
289    pub fn normalized_sql(&self) -> &str {
290        &self.normalized_sql
291    }
292
293    pub fn session_timezone(&self) -> RefMut<'_, SessionTimezone> {
294        self.session_timezone.borrow_mut()
295    }
296
297    pub fn get_session_timezone(&self) -> String {
298        self.session_timezone.borrow().timezone()
299    }
300
301    pub fn iceberg_snapshot_id_map(&self) -> RefMut<'_, HashMap<String, Option<i64>>> {
302        self.iceberg_snapshot_id_map.borrow_mut()
303    }
304}
305
306impl std::fmt::Debug for OptimizerContext {
307    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
308        write!(
309            f,
310            "QueryContext {{ sql = {}, explain_options = {}, with_options = {:?}, last_plan_node_id = {}, last_correlated_id = {} }}",
311            self.sql,
312            self.explain_options,
313            self.with_options,
314            self.last_plan_node_id.get(),
315            self.last_correlated_id.get(),
316        )
317    }
318}