risingwave_frontend/optimizer/
optimizer_context.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::fmt::Formatter;
16use std::cell::{Cell, RefCell, RefMut};
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType};
23
24use super::property::WatermarkGroupId;
25use crate::expr::{CorrelatedId, SessionTimezone};
26use crate::handler::HandlerArgs;
27use crate::optimizer::LogicalPlanRef;
28use crate::optimizer::plan_node::PlanNodeId;
29use crate::session::SessionImpl;
30use crate::utils::{OverwriteOptions, WithOptions};
31use crate::{Explain, TableCatalog};
32
33const RESERVED_ID_NUM: u16 = 10000;
34
35type PhantomUnsend = PhantomData<Rc<()>>;
36
37pub struct OptimizerContext {
38    session_ctx: Arc<SessionImpl>,
39    /// The original SQL string, used for debugging.
40    sql: Arc<str>,
41    /// Normalized SQL string. See [`HandlerArgs::normalize_sql`].
42    normalized_sql: String,
43    /// Explain options
44    explain_options: ExplainOptions,
45    /// Store the trace of optimizer
46    optimizer_trace: RefCell<Vec<String>>,
47    /// Store the optimized logical plan of optimizer
48    logical_explain: RefCell<Option<String>>,
49    /// Store options or properties from the `with` clause
50    with_options: WithOptions,
51    /// Store the Session Timezone and whether it was used.
52    session_timezone: RefCell<SessionTimezone>,
53    /// Total number of optimization rules have been applied.
54    total_rule_applied: RefCell<usize>,
55    /// Store the configs can be overwritten in with clause
56    /// if not specified, use the value from session variable.
57    overwrite_options: OverwriteOptions,
58    /// Mapping from iceberg table identifier to current snapshot id.
59    /// Used to keep same snapshot id when multiple scans from the same iceberg table exist in a query.
60    iceberg_snapshot_id_map: RefCell<HashMap<String, Option<i64>>>,
61    /// Batch materialized view candidates for exact-match rewriting.
62    batch_mview_candidates: RefCell<Vec<MaterializedViewCandidate>>,
63
64    /// Last assigned plan node ID.
65    last_plan_node_id: Cell<i32>,
66    /// Last assigned correlated ID.
67    last_correlated_id: Cell<u32>,
68    /// Last assigned expr display ID.
69    last_expr_display_id: Cell<usize>,
70    /// Last assigned watermark group ID.
71    last_watermark_group_id: Cell<u32>,
72
73    // TODO: remove this when locality backfill is enabled by default
74    /// Count of places where locality backfill could have been applied but was not,
75    /// because `enable_locality_backfill` is off.
76    missed_locality_providers: Cell<usize>,
77
78    _phantom: PhantomUnsend,
79}
80
81#[derive(Clone, Debug)]
82pub struct MaterializedViewCandidate {
83    pub plan: LogicalPlanRef,
84    pub table: Arc<TableCatalog>,
85}
86
87pub(in crate::optimizer) struct LastAssignedIds {
88    last_plan_node_id: i32,
89    last_correlated_id: u32,
90    last_expr_display_id: usize,
91    last_watermark_group_id: u32,
92}
93
94pub type OptimizerContextRef = Rc<OptimizerContext>;
95
96impl OptimizerContext {
97    /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`], with empty
98    /// [`ExplainOptions`].
99    pub fn from_handler_args(handler_args: HandlerArgs) -> Self {
100        Self::new(handler_args, ExplainOptions::default())
101    }
102
103    /// Create a new [`OptimizerContext`] from the given [`HandlerArgs`] and [`ExplainOptions`].
104    pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
105        let session_timezone = RefCell::new(SessionTimezone::new(
106            handler_args.session.config().timezone(),
107        ));
108        let overwrite_options = OverwriteOptions::new(&mut handler_args);
109        Self {
110            session_ctx: handler_args.session,
111            sql: handler_args.sql,
112            normalized_sql: handler_args.normalized_sql,
113            explain_options,
114            optimizer_trace: RefCell::new(vec![]),
115            logical_explain: RefCell::new(None),
116            with_options: handler_args.with_options,
117            session_timezone,
118            total_rule_applied: RefCell::new(0),
119            overwrite_options,
120            iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
121            batch_mview_candidates: RefCell::new(Vec::new()),
122
123            last_plan_node_id: Cell::new(RESERVED_ID_NUM.into()),
124            last_correlated_id: Cell::new(0),
125            last_expr_display_id: Cell::new(RESERVED_ID_NUM.into()),
126            last_watermark_group_id: Cell::new(RESERVED_ID_NUM.into()),
127
128            // TODO: remove this when locality backfill is enabled by default
129            missed_locality_providers: Cell::new(0),
130
131            _phantom: Default::default(),
132        }
133    }
134
135    #[cfg(test)]
136    pub fn mock() -> OptimizerContextRef {
137        Self {
138            session_ctx: Arc::new(SessionImpl::mock()),
139            sql: Arc::from(""),
140            normalized_sql: "".to_owned(),
141            explain_options: ExplainOptions::default(),
142            optimizer_trace: RefCell::new(vec![]),
143            logical_explain: RefCell::new(None),
144            with_options: Default::default(),
145            session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
146            total_rule_applied: RefCell::new(0),
147            overwrite_options: OverwriteOptions::default(),
148            iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
149            batch_mview_candidates: RefCell::new(Vec::new()),
150
151            last_plan_node_id: Cell::new(0),
152            last_correlated_id: Cell::new(0),
153            last_expr_display_id: Cell::new(0),
154            last_watermark_group_id: Cell::new(0),
155
156            missed_locality_providers: Cell::new(0),
157
158            _phantom: Default::default(),
159        }
160        .into()
161    }
162
163    pub fn next_plan_node_id(&self) -> PlanNodeId {
164        self.last_plan_node_id.update(|id| id + 1);
165        PlanNodeId(self.last_plan_node_id.get())
166    }
167
168    pub fn next_correlated_id(&self) -> CorrelatedId {
169        self.last_correlated_id.update(|id| id + 1);
170        self.last_correlated_id.get()
171    }
172
173    pub fn next_expr_display_id(&self) -> usize {
174        self.last_expr_display_id.update(|id| id + 1);
175        self.last_expr_display_id.get()
176    }
177
178    pub fn next_watermark_group_id(&self) -> WatermarkGroupId {
179        self.last_watermark_group_id.update(|id| id + 1);
180        self.last_watermark_group_id.get()
181    }
182
183    pub(in crate::optimizer) fn backup_elem_ids(&self) -> LastAssignedIds {
184        LastAssignedIds {
185            last_plan_node_id: self.last_plan_node_id.get(),
186            last_correlated_id: self.last_correlated_id.get(),
187            last_expr_display_id: self.last_expr_display_id.get(),
188            last_watermark_group_id: self.last_watermark_group_id.get(),
189        }
190    }
191
192    /// This should only be called in [`crate::optimizer::plan_node::reorganize_elements_id`].
193    pub(in crate::optimizer) fn reset_elem_ids(&self) {
194        self.last_plan_node_id.set(0);
195        self.last_correlated_id.set(0);
196        self.last_expr_display_id.set(0);
197        self.last_watermark_group_id.set(0);
198    }
199
200    pub(in crate::optimizer) fn restore_elem_ids(&self, backup: LastAssignedIds) {
201        self.last_plan_node_id.set(backup.last_plan_node_id);
202        self.last_correlated_id.set(backup.last_correlated_id);
203        self.last_expr_display_id.set(backup.last_expr_display_id);
204        self.last_watermark_group_id
205            .set(backup.last_watermark_group_id);
206    }
207
208    pub fn add_rule_applied(&self, num: usize) {
209        *self.total_rule_applied.borrow_mut() += num;
210    }
211
212    pub fn total_rule_applied(&self) -> usize {
213        *self.total_rule_applied.borrow()
214    }
215
216    pub fn is_explain_verbose(&self) -> bool {
217        self.explain_options.verbose
218    }
219
220    pub fn is_explain_trace(&self) -> bool {
221        self.explain_options.trace
222    }
223
224    fn is_explain_logical(&self) -> bool {
225        self.explain_options.explain_type == ExplainType::Logical
226    }
227
228    pub fn trace(&self, str: impl Into<String>) {
229        // If explain type is logical, do not store the trace for any optimizations beyond logical.
230        if self.is_explain_logical() && self.logical_explain.borrow().is_some() {
231            return;
232        }
233        let mut optimizer_trace = self.optimizer_trace.borrow_mut();
234        let string = str.into();
235        tracing::info!(target: "explain_trace", "\n{}", string);
236        optimizer_trace.push(string);
237        optimizer_trace.push("\n".to_owned());
238    }
239
240    pub fn warn_to_user(&self, str: impl Into<String>) {
241        self.session_ctx().notice_to_user(str);
242    }
243
244    // TODO: remove this when locality backfill is enabled by default
245    /// Increment the counter for missed locality providers.
246    /// Called when locality backfill could have been applied but `enable_locality_backfill` is off.
247    pub fn inc_missed_locality_providers(&self) {
248        self.missed_locality_providers
249            .set(self.missed_locality_providers.get() + 1);
250    }
251
252    // TODO: remove this when locality backfill is enabled by default
253    /// Get the number of missed locality providers.
254    pub fn missed_locality_providers(&self) -> usize {
255        self.missed_locality_providers.get()
256    }
257
258    fn explain_plan_impl(&self, plan: &impl Explain) -> String {
259        match self.explain_options.explain_format {
260            ExplainFormat::Text => plan.explain_to_string(),
261            ExplainFormat::Json => plan.explain_to_json(),
262            ExplainFormat::Xml => plan.explain_to_xml(),
263            ExplainFormat::Yaml => plan.explain_to_yaml(),
264            ExplainFormat::Dot => plan.explain_to_dot(),
265        }
266    }
267
268    pub fn may_store_explain_logical(&self, plan: &LogicalPlanRef) {
269        if self.is_explain_logical() {
270            let str = self.explain_plan_impl(plan);
271            *self.logical_explain.borrow_mut() = Some(str);
272        }
273    }
274
275    pub fn take_logical(&self) -> Option<String> {
276        self.logical_explain.borrow_mut().take()
277    }
278
279    pub fn take_trace(&self) -> Vec<String> {
280        self.optimizer_trace.borrow_mut().drain(..).collect()
281    }
282
283    pub fn with_options(&self) -> &WithOptions {
284        &self.with_options
285    }
286
287    pub fn overwrite_options(&self) -> &OverwriteOptions {
288        &self.overwrite_options
289    }
290
291    pub fn add_batch_mview_candidate(&self, table: Arc<TableCatalog>, plan: LogicalPlanRef) {
292        self.batch_mview_candidates
293            .borrow_mut()
294            .push(MaterializedViewCandidate { plan, table });
295    }
296
297    pub fn batch_mview_candidates(&self) -> std::cell::Ref<'_, Vec<MaterializedViewCandidate>> {
298        self.batch_mview_candidates.borrow()
299    }
300
301    pub fn session_ctx(&self) -> &Arc<SessionImpl> {
302        &self.session_ctx
303    }
304
305    /// Return the original SQL.
306    pub fn sql(&self) -> &str {
307        &self.sql
308    }
309
310    /// Return the normalized SQL.
311    pub fn normalized_sql(&self) -> &str {
312        &self.normalized_sql
313    }
314
315    pub fn session_timezone(&self) -> RefMut<'_, SessionTimezone> {
316        self.session_timezone.borrow_mut()
317    }
318
319    pub fn get_session_timezone(&self) -> String {
320        self.session_timezone.borrow().timezone()
321    }
322
323    pub fn iceberg_snapshot_id_map(&self) -> RefMut<'_, HashMap<String, Option<i64>>> {
324        self.iceberg_snapshot_id_map.borrow_mut()
325    }
326}
327
328impl std::fmt::Debug for OptimizerContext {
329    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
330        write!(
331            f,
332            "QueryContext {{ sql = {}, explain_options = {}, with_options = {:?}, last_plan_node_id = {}, last_correlated_id = {} }}",
333            self.sql,
334            self.explain_options,
335            self.with_options,
336            self.last_plan_node_id.get(),
337            self.last_correlated_id.get(),
338        )
339    }
340}