risingwave_frontend/optimizer/
optimizer_context.rs1use core::fmt::Formatter;
16use std::cell::{Cell, RefCell, RefMut};
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType};
23
24use super::property::WatermarkGroupId;
25use crate::expr::{CorrelatedId, SessionTimezone};
26use crate::handler::HandlerArgs;
27use crate::optimizer::LogicalPlanRef;
28use crate::optimizer::plan_node::PlanNodeId;
29use crate::session::SessionImpl;
30use crate::utils::{OverwriteOptions, WithOptions};
31use crate::{Explain, TableCatalog};
32
33const RESERVED_ID_NUM: u16 = 10000;
34
35type PhantomUnsend = PhantomData<Rc<()>>;
36
37pub struct OptimizerContext {
38 session_ctx: Arc<SessionImpl>,
39 sql: Arc<str>,
41 normalized_sql: String,
43 explain_options: ExplainOptions,
45 optimizer_trace: RefCell<Vec<String>>,
47 logical_explain: RefCell<Option<String>>,
49 with_options: WithOptions,
51 session_timezone: RefCell<SessionTimezone>,
53 total_rule_applied: RefCell<usize>,
55 overwrite_options: OverwriteOptions,
58 iceberg_snapshot_id_map: RefCell<HashMap<String, Option<i64>>>,
61 batch_mview_candidates: RefCell<Vec<MaterializedViewCandidate>>,
63
64 last_plan_node_id: Cell<i32>,
66 last_correlated_id: Cell<u32>,
68 last_expr_display_id: Cell<usize>,
70 last_watermark_group_id: Cell<u32>,
72
73 _phantom: PhantomUnsend,
74}
75
76#[derive(Clone, Debug)]
77pub struct MaterializedViewCandidate {
78 pub plan: LogicalPlanRef,
79 pub table: Arc<TableCatalog>,
80}
81
82pub(in crate::optimizer) struct LastAssignedIds {
83 last_plan_node_id: i32,
84 last_correlated_id: u32,
85 last_expr_display_id: usize,
86 last_watermark_group_id: u32,
87}
88
89pub type OptimizerContextRef = Rc<OptimizerContext>;
90
91impl OptimizerContext {
92 pub fn from_handler_args(handler_args: HandlerArgs) -> Self {
95 Self::new(handler_args, ExplainOptions::default())
96 }
97
98 pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
100 let session_timezone = RefCell::new(SessionTimezone::new(
101 handler_args.session.config().timezone(),
102 ));
103 let overwrite_options = OverwriteOptions::new(&mut handler_args);
104 Self {
105 session_ctx: handler_args.session,
106 sql: handler_args.sql,
107 normalized_sql: handler_args.normalized_sql,
108 explain_options,
109 optimizer_trace: RefCell::new(vec![]),
110 logical_explain: RefCell::new(None),
111 with_options: handler_args.with_options,
112 session_timezone,
113 total_rule_applied: RefCell::new(0),
114 overwrite_options,
115 iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
116 batch_mview_candidates: RefCell::new(Vec::new()),
117
118 last_plan_node_id: Cell::new(RESERVED_ID_NUM.into()),
119 last_correlated_id: Cell::new(0),
120 last_expr_display_id: Cell::new(RESERVED_ID_NUM.into()),
121 last_watermark_group_id: Cell::new(RESERVED_ID_NUM.into()),
122
123 _phantom: Default::default(),
124 }
125 }
126
127 #[cfg(test)]
129 #[expect(clippy::unused_async)]
130 pub async fn mock() -> OptimizerContextRef {
131 Self {
132 session_ctx: Arc::new(SessionImpl::mock()),
133 sql: Arc::from(""),
134 normalized_sql: "".to_owned(),
135 explain_options: ExplainOptions::default(),
136 optimizer_trace: RefCell::new(vec![]),
137 logical_explain: RefCell::new(None),
138 with_options: Default::default(),
139 session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
140 total_rule_applied: RefCell::new(0),
141 overwrite_options: OverwriteOptions::default(),
142 iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
143 batch_mview_candidates: RefCell::new(Vec::new()),
144
145 last_plan_node_id: Cell::new(0),
146 last_correlated_id: Cell::new(0),
147 last_expr_display_id: Cell::new(0),
148 last_watermark_group_id: Cell::new(0),
149
150 _phantom: Default::default(),
151 }
152 .into()
153 }
154
155 pub fn next_plan_node_id(&self) -> PlanNodeId {
156 self.last_plan_node_id.update(|id| id + 1);
157 PlanNodeId(self.last_plan_node_id.get())
158 }
159
160 pub fn next_correlated_id(&self) -> CorrelatedId {
161 self.last_correlated_id.update(|id| id + 1);
162 self.last_correlated_id.get()
163 }
164
165 pub fn next_expr_display_id(&self) -> usize {
166 self.last_expr_display_id.update(|id| id + 1);
167 self.last_expr_display_id.get()
168 }
169
170 pub fn next_watermark_group_id(&self) -> WatermarkGroupId {
171 self.last_watermark_group_id.update(|id| id + 1);
172 self.last_watermark_group_id.get()
173 }
174
175 pub(in crate::optimizer) fn backup_elem_ids(&self) -> LastAssignedIds {
176 LastAssignedIds {
177 last_plan_node_id: self.last_plan_node_id.get(),
178 last_correlated_id: self.last_correlated_id.get(),
179 last_expr_display_id: self.last_expr_display_id.get(),
180 last_watermark_group_id: self.last_watermark_group_id.get(),
181 }
182 }
183
184 pub(in crate::optimizer) fn reset_elem_ids(&self) {
186 self.last_plan_node_id.set(0);
187 self.last_correlated_id.set(0);
188 self.last_expr_display_id.set(0);
189 self.last_watermark_group_id.set(0);
190 }
191
192 pub(in crate::optimizer) fn restore_elem_ids(&self, backup: LastAssignedIds) {
193 self.last_plan_node_id.set(backup.last_plan_node_id);
194 self.last_correlated_id.set(backup.last_correlated_id);
195 self.last_expr_display_id.set(backup.last_expr_display_id);
196 self.last_watermark_group_id
197 .set(backup.last_watermark_group_id);
198 }
199
200 pub fn add_rule_applied(&self, num: usize) {
201 *self.total_rule_applied.borrow_mut() += num;
202 }
203
204 pub fn total_rule_applied(&self) -> usize {
205 *self.total_rule_applied.borrow()
206 }
207
208 pub fn is_explain_verbose(&self) -> bool {
209 self.explain_options.verbose
210 }
211
212 pub fn is_explain_trace(&self) -> bool {
213 self.explain_options.trace
214 }
215
216 fn is_explain_logical(&self) -> bool {
217 self.explain_options.explain_type == ExplainType::Logical
218 }
219
220 pub fn trace(&self, str: impl Into<String>) {
221 if self.is_explain_logical() && self.logical_explain.borrow().is_some() {
223 return;
224 }
225 let mut optimizer_trace = self.optimizer_trace.borrow_mut();
226 let string = str.into();
227 tracing::info!(target: "explain_trace", "\n{}", string);
228 optimizer_trace.push(string);
229 optimizer_trace.push("\n".to_owned());
230 }
231
232 pub fn warn_to_user(&self, str: impl Into<String>) {
233 self.session_ctx().notice_to_user(str);
234 }
235
236 fn explain_plan_impl(&self, plan: &impl Explain) -> String {
237 match self.explain_options.explain_format {
238 ExplainFormat::Text => plan.explain_to_string(),
239 ExplainFormat::Json => plan.explain_to_json(),
240 ExplainFormat::Xml => plan.explain_to_xml(),
241 ExplainFormat::Yaml => plan.explain_to_yaml(),
242 ExplainFormat::Dot => plan.explain_to_dot(),
243 }
244 }
245
246 pub fn may_store_explain_logical(&self, plan: &LogicalPlanRef) {
247 if self.is_explain_logical() {
248 let str = self.explain_plan_impl(plan);
249 *self.logical_explain.borrow_mut() = Some(str);
250 }
251 }
252
253 pub fn take_logical(&self) -> Option<String> {
254 self.logical_explain.borrow_mut().take()
255 }
256
257 pub fn take_trace(&self) -> Vec<String> {
258 self.optimizer_trace.borrow_mut().drain(..).collect()
259 }
260
261 pub fn with_options(&self) -> &WithOptions {
262 &self.with_options
263 }
264
265 pub fn overwrite_options(&self) -> &OverwriteOptions {
266 &self.overwrite_options
267 }
268
269 pub fn add_batch_mview_candidate(&self, table: Arc<TableCatalog>, plan: LogicalPlanRef) {
270 self.batch_mview_candidates
271 .borrow_mut()
272 .push(MaterializedViewCandidate { plan, table });
273 }
274
275 pub fn batch_mview_candidates(&self) -> std::cell::Ref<'_, Vec<MaterializedViewCandidate>> {
276 self.batch_mview_candidates.borrow()
277 }
278
279 pub fn session_ctx(&self) -> &Arc<SessionImpl> {
280 &self.session_ctx
281 }
282
283 pub fn sql(&self) -> &str {
285 &self.sql
286 }
287
288 pub fn normalized_sql(&self) -> &str {
290 &self.normalized_sql
291 }
292
293 pub fn session_timezone(&self) -> RefMut<'_, SessionTimezone> {
294 self.session_timezone.borrow_mut()
295 }
296
297 pub fn get_session_timezone(&self) -> String {
298 self.session_timezone.borrow().timezone()
299 }
300
301 pub fn iceberg_snapshot_id_map(&self) -> RefMut<'_, HashMap<String, Option<i64>>> {
302 self.iceberg_snapshot_id_map.borrow_mut()
303 }
304}
305
306impl std::fmt::Debug for OptimizerContext {
307 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
308 write!(
309 f,
310 "QueryContext {{ sql = {}, explain_options = {}, with_options = {:?}, last_plan_node_id = {}, last_correlated_id = {} }}",
311 self.sql,
312 self.explain_options,
313 self.with_options,
314 self.last_plan_node_id.get(),
315 self.last_correlated_id.get(),
316 )
317 }
318}