risingwave_frontend/optimizer/
optimizer_context.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Formatter;
16use std::cell::{Cell, RefCell, RefMut};
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType};
23
24use super::property::WatermarkGroupId;
25use crate::expr::{CorrelatedId, SessionTimezone};
26use crate::handler::HandlerArgs;
27use crate::optimizer::LogicalPlanRef;
28use crate::optimizer::plan_node::PlanNodeId;
29use crate::session::SessionImpl;
30use crate::utils::{OverwriteOptions, WithOptions};
31use crate::{Explain, TableCatalog};
32
33const RESERVED_ID_NUM: u16 = 10000;
34
35type PhantomUnsend = PhantomData<Rc<()>>;
36
37pub struct OptimizerContext {
38    session_ctx: Arc<SessionImpl>,
39    /// The original SQL string, used for debugging.
40    sql: Arc<str>,
41    /// Normalized SQL string. See [`HandlerArgs::normalize_sql`].
42    normalized_sql: String,
43    /// Explain options
44    explain_options: ExplainOptions,
45    /// Store the trace of optimizer
46    optimizer_trace: RefCell<Vec<String>>,
47    /// Store the optimized logical plan of optimizer
48    logical_explain: RefCell<Option<String>>,
49    /// Store options or properties from the `with` clause
50    with_options: WithOptions,
51    /// Store the Session Timezone and whether it was used.
52    session_timezone: RefCell<SessionTimezone>,
53    /// Total number of optimization rules have been applied.
54    total_rule_applied: RefCell<usize>,
55    /// Store the configs can be overwritten in with clause
56    /// if not specified, use the value from session variable.
57    overwrite_options: OverwriteOptions,
58    /// Mapping from iceberg table identifier to current snapshot id.
59    /// Used to keep same snapshot id when multiple scans from the same iceberg table exist in a query.
60    iceberg_snapshot_id_map: RefCell<HashMap<String, Option<i64>>>,
61    /// Batch materialized view candidates for exact-match rewriting.
62    batch_mview_candidates: RefCell<Vec<MaterializedViewCandidate>>,
63
64    /// Last assigned plan node ID.
65    last_plan_node_id: Cell<i32>,
66    /// Last assigned correlated ID.
67    last_correlated_id: Cell<u32>,
68    /// Last assigned expr display ID.
69    last_expr_display_id: Cell<usize>,
70    /// Last assigned watermark group ID.
71    last_watermark_group_id: Cell<u32>,
72
73    // TODO: remove this when locality backfill is enabled by default
74    /// Count of places where locality backfill could have been applied but was not,
75    /// because `enable_locality_backfill` is off.
76    missed_locality_providers: Cell<usize>,
77
78    _phantom: PhantomUnsend,
79}
80
81#[derive(Clone, Debug)]
82pub struct MaterializedViewCandidate {
83    pub plan: LogicalPlanRef,
84    pub table: Arc<TableCatalog>,
85}
86
87pub(in crate::optimizer) struct LastAssignedIds {
88    last_plan_node_id: i32,
89    last_correlated_id: u32,
90    last_expr_display_id: usize,
91    last_watermark_group_id: u32,
92}
93
94pub type OptimizerContextRef = Rc<OptimizerContext>;
95
96impl OptimizerContext {
97    /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`], with empty
98    /// [`ExplainOptions`].
99    pub fn from_handler_args(handler_args: HandlerArgs) -> Self {
100        Self::new(handler_args, ExplainOptions::default())
101    }
102
103    /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`] and [`ExplainOptions`].
104    pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
105        let session_timezone = RefCell::new(SessionTimezone::new(
106            handler_args.session.config().timezone(),
107        ));
108        let overwrite_options = OverwriteOptions::new(&mut handler_args);
109        Self {
110            session_ctx: handler_args.session,
111            sql: handler_args.sql,
112            normalized_sql: handler_args.normalized_sql,
113            explain_options,
114            optimizer_trace: RefCell::new(vec![]),
115            logical_explain: RefCell::new(None),
116            with_options: handler_args.with_options,
117            session_timezone,
118            total_rule_applied: RefCell::new(0),
119            overwrite_options,
120            iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
121            batch_mview_candidates: RefCell::new(Vec::new()),
122
123            last_plan_node_id: Cell::new(RESERVED_ID_NUM.into()),
124            last_correlated_id: Cell::new(0),
125            last_expr_display_id: Cell::new(RESERVED_ID_NUM.into()),
126            last_watermark_group_id: Cell::new(RESERVED_ID_NUM.into()),
127
128            // TODO: remove this when locality backfill is enabled by default
129            missed_locality_providers: Cell::new(0),
130
131            _phantom: Default::default(),
132        }
133    }
134
135    // TODO(TaoWu): Remove the async.
136    #[cfg(test)]
137    #[expect(clippy::unused_async)]
138    pub async fn mock() -> OptimizerContextRef {
139        Self {
140            session_ctx: Arc::new(SessionImpl::mock()),
141            sql: Arc::from(""),
142            normalized_sql: "".to_owned(),
143            explain_options: ExplainOptions::default(),
144            optimizer_trace: RefCell::new(vec![]),
145            logical_explain: RefCell::new(None),
146            with_options: Default::default(),
147            session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
148            total_rule_applied: RefCell::new(0),
149            overwrite_options: OverwriteOptions::default(),
150            iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
151            batch_mview_candidates: RefCell::new(Vec::new()),
152
153            last_plan_node_id: Cell::new(0),
154            last_correlated_id: Cell::new(0),
155            last_expr_display_id: Cell::new(0),
156            last_watermark_group_id: Cell::new(0),
157
158            missed_locality_providers: Cell::new(0),
159
160            _phantom: Default::default(),
161        }
162        .into()
163    }
164
165    pub fn next_plan_node_id(&self) -> PlanNodeId {
166        self.last_plan_node_id.update(|id| id + 1);
167        PlanNodeId(self.last_plan_node_id.get())
168    }
169
170    pub fn next_correlated_id(&self) -> CorrelatedId {
171        self.last_correlated_id.update(|id| id + 1);
172        self.last_correlated_id.get()
173    }
174
175    pub fn next_expr_display_id(&self) -> usize {
176        self.last_expr_display_id.update(|id| id + 1);
177        self.last_expr_display_id.get()
178    }
179
180    pub fn next_watermark_group_id(&self) -> WatermarkGroupId {
181        self.last_watermark_group_id.update(|id| id + 1);
182        self.last_watermark_group_id.get()
183    }
184
185    pub(in crate::optimizer) fn backup_elem_ids(&self) -> LastAssignedIds {
186        LastAssignedIds {
187            last_plan_node_id: self.last_plan_node_id.get(),
188            last_correlated_id: self.last_correlated_id.get(),
189            last_expr_display_id: self.last_expr_display_id.get(),
190            last_watermark_group_id: self.last_watermark_group_id.get(),
191        }
192    }
193
194    /// This should only be called in [`crate::optimizer::plan_node::reorganize_elements_id`].
195    pub(in crate::optimizer) fn reset_elem_ids(&self) {
196        self.last_plan_node_id.set(0);
197        self.last_correlated_id.set(0);
198        self.last_expr_display_id.set(0);
199        self.last_watermark_group_id.set(0);
200    }
201
202    pub(in crate::optimizer) fn restore_elem_ids(&self, backup: LastAssignedIds) {
203        self.last_plan_node_id.set(backup.last_plan_node_id);
204        self.last_correlated_id.set(backup.last_correlated_id);
205        self.last_expr_display_id.set(backup.last_expr_display_id);
206        self.last_watermark_group_id
207            .set(backup.last_watermark_group_id);
208    }
209
210    pub fn add_rule_applied(&self, num: usize) {
211        *self.total_rule_applied.borrow_mut() += num;
212    }
213
214    pub fn total_rule_applied(&self) -> usize {
215        *self.total_rule_applied.borrow()
216    }
217
218    pub fn is_explain_verbose(&self) -> bool {
219        self.explain_options.verbose
220    }
221
222    pub fn is_explain_trace(&self) -> bool {
223        self.explain_options.trace
224    }
225
226    fn is_explain_logical(&self) -> bool {
227        self.explain_options.explain_type == ExplainType::Logical
228    }
229
230    pub fn trace(&self, str: impl Into<String>) {
231        // If explain type is logical, do not store the trace for any optimizations beyond logical.
232        if self.is_explain_logical() && self.logical_explain.borrow().is_some() {
233            return;
234        }
235        let mut optimizer_trace = self.optimizer_trace.borrow_mut();
236        let string = str.into();
237        tracing::info!(target: "explain_trace", "\n{}", string);
238        optimizer_trace.push(string);
239        optimizer_trace.push("\n".to_owned());
240    }
241
242    pub fn warn_to_user(&self, str: impl Into<String>) {
243        self.session_ctx().notice_to_user(str);
244    }
245
246    // TODO: remove this when locality backfill is enabled by default
247    /// Increment the counter for missed locality providers.
248    /// Called when locality backfill could have been applied but `enable_locality_backfill` is off.
249    pub fn inc_missed_locality_providers(&self) {
250        self.missed_locality_providers
251            .set(self.missed_locality_providers.get() + 1);
252    }
253
254    // TODO: remove this when locality backfill is enabled by default
255    /// Get the number of missed locality providers.
256    pub fn missed_locality_providers(&self) -> usize {
257        self.missed_locality_providers.get()
258    }
259
260    fn explain_plan_impl(&self, plan: &impl Explain) -> String {
261        match self.explain_options.explain_format {
262            ExplainFormat::Text => plan.explain_to_string(),
263            ExplainFormat::Json => plan.explain_to_json(),
264            ExplainFormat::Xml => plan.explain_to_xml(),
265            ExplainFormat::Yaml => plan.explain_to_yaml(),
266            ExplainFormat::Dot => plan.explain_to_dot(),
267        }
268    }
269
270    pub fn may_store_explain_logical(&self, plan: &LogicalPlanRef) {
271        if self.is_explain_logical() {
272            let str = self.explain_plan_impl(plan);
273            *self.logical_explain.borrow_mut() = Some(str);
274        }
275    }
276
277    pub fn take_logical(&self) -> Option<String> {
278        self.logical_explain.borrow_mut().take()
279    }
280
281    pub fn take_trace(&self) -> Vec<String> {
282        self.optimizer_trace.borrow_mut().drain(..).collect()
283    }
284
285    pub fn with_options(&self) -> &WithOptions {
286        &self.with_options
287    }
288
289    pub fn overwrite_options(&self) -> &OverwriteOptions {
290        &self.overwrite_options
291    }
292
293    pub fn add_batch_mview_candidate(&self, table: Arc<TableCatalog>, plan: LogicalPlanRef) {
294        self.batch_mview_candidates
295            .borrow_mut()
296            .push(MaterializedViewCandidate { plan, table });
297    }
298
299    pub fn batch_mview_candidates(&self) -> std::cell::Ref<'_, Vec<MaterializedViewCandidate>> {
300        self.batch_mview_candidates.borrow()
301    }
302
303    pub fn session_ctx(&self) -> &Arc<SessionImpl> {
304        &self.session_ctx
305    }
306
307    /// Return the original SQL.
308    pub fn sql(&self) -> &str {
309        &self.sql
310    }
311
312    /// Return the normalized SQL.
313    pub fn normalized_sql(&self) -> &str {
314        &self.normalized_sql
315    }
316
317    pub fn session_timezone(&self) -> RefMut<'_, SessionTimezone> {
318        self.session_timezone.borrow_mut()
319    }
320
321    pub fn get_session_timezone(&self) -> String {
322        self.session_timezone.borrow().timezone()
323    }
324
325    pub fn iceberg_snapshot_id_map(&self) -> RefMut<'_, HashMap<String, Option<i64>>> {
326        self.iceberg_snapshot_id_map.borrow_mut()
327    }
328}
329
330impl std::fmt::Debug for OptimizerContext {
331    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332        write!(
333            f,
334            "QueryContext {{ sql = {}, explain_options = {}, with_options = {:?}, last_plan_node_id = {}, last_correlated_id = {} }}",
335            self.sql,
336            self.explain_options,
337            self.with_options,
338            self.last_plan_node_id.get(),
339            self.last_correlated_id.get(),
340        )
341    }
342}