risingwave_frontend/optimizer/
optimizer_context.rs1use core::fmt::Formatter;
16use std::cell::{Cell, RefCell, RefMut};
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType};
23
24use super::property::WatermarkGroupId;
25use crate::expr::{CorrelatedId, SessionTimezone};
26use crate::handler::HandlerArgs;
27use crate::optimizer::LogicalPlanRef;
28use crate::optimizer::plan_node::PlanNodeId;
29use crate::session::SessionImpl;
30use crate::utils::{OverwriteOptions, WithOptions};
31use crate::{Explain, TableCatalog};
32
33const RESERVED_ID_NUM: u16 = 10000;
34
35type PhantomUnsend = PhantomData<Rc<()>>;
36
37pub struct OptimizerContext {
38 session_ctx: Arc<SessionImpl>,
39 sql: Arc<str>,
41 normalized_sql: String,
43 explain_options: ExplainOptions,
45 optimizer_trace: RefCell<Vec<String>>,
47 logical_explain: RefCell<Option<String>>,
49 with_options: WithOptions,
51 session_timezone: RefCell<SessionTimezone>,
53 total_rule_applied: RefCell<usize>,
55 overwrite_options: OverwriteOptions,
58 iceberg_snapshot_id_map: RefCell<HashMap<String, Option<i64>>>,
61 batch_mview_candidates: RefCell<Vec<MaterializedViewCandidate>>,
63
64 last_plan_node_id: Cell<i32>,
66 last_correlated_id: Cell<u32>,
68 last_expr_display_id: Cell<usize>,
70 last_watermark_group_id: Cell<u32>,
72
73 missed_locality_providers: Cell<usize>,
77
78 _phantom: PhantomUnsend,
79}
80
81#[derive(Clone, Debug)]
82pub struct MaterializedViewCandidate {
83 pub plan: LogicalPlanRef,
84 pub table: Arc<TableCatalog>,
85}
86
87pub(in crate::optimizer) struct LastAssignedIds {
88 last_plan_node_id: i32,
89 last_correlated_id: u32,
90 last_expr_display_id: usize,
91 last_watermark_group_id: u32,
92}
93
94pub type OptimizerContextRef = Rc<OptimizerContext>;
95
96impl OptimizerContext {
97 pub fn from_handler_args(handler_args: HandlerArgs) -> Self {
100 Self::new(handler_args, ExplainOptions::default())
101 }
102
103 pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
105 let session_timezone = RefCell::new(SessionTimezone::new(
106 handler_args.session.config().timezone(),
107 ));
108 let overwrite_options = OverwriteOptions::new(&mut handler_args);
109 Self {
110 session_ctx: handler_args.session,
111 sql: handler_args.sql,
112 normalized_sql: handler_args.normalized_sql,
113 explain_options,
114 optimizer_trace: RefCell::new(vec![]),
115 logical_explain: RefCell::new(None),
116 with_options: handler_args.with_options,
117 session_timezone,
118 total_rule_applied: RefCell::new(0),
119 overwrite_options,
120 iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
121 batch_mview_candidates: RefCell::new(Vec::new()),
122
123 last_plan_node_id: Cell::new(RESERVED_ID_NUM.into()),
124 last_correlated_id: Cell::new(0),
125 last_expr_display_id: Cell::new(RESERVED_ID_NUM.into()),
126 last_watermark_group_id: Cell::new(RESERVED_ID_NUM.into()),
127
128 missed_locality_providers: Cell::new(0),
130
131 _phantom: Default::default(),
132 }
133 }
134
135 #[cfg(test)]
137 #[expect(clippy::unused_async)]
138 pub async fn mock() -> OptimizerContextRef {
139 Self {
140 session_ctx: Arc::new(SessionImpl::mock()),
141 sql: Arc::from(""),
142 normalized_sql: "".to_owned(),
143 explain_options: ExplainOptions::default(),
144 optimizer_trace: RefCell::new(vec![]),
145 logical_explain: RefCell::new(None),
146 with_options: Default::default(),
147 session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
148 total_rule_applied: RefCell::new(0),
149 overwrite_options: OverwriteOptions::default(),
150 iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
151 batch_mview_candidates: RefCell::new(Vec::new()),
152
153 last_plan_node_id: Cell::new(0),
154 last_correlated_id: Cell::new(0),
155 last_expr_display_id: Cell::new(0),
156 last_watermark_group_id: Cell::new(0),
157
158 missed_locality_providers: Cell::new(0),
159
160 _phantom: Default::default(),
161 }
162 .into()
163 }
164
165 pub fn next_plan_node_id(&self) -> PlanNodeId {
166 self.last_plan_node_id.update(|id| id + 1);
167 PlanNodeId(self.last_plan_node_id.get())
168 }
169
170 pub fn next_correlated_id(&self) -> CorrelatedId {
171 self.last_correlated_id.update(|id| id + 1);
172 self.last_correlated_id.get()
173 }
174
175 pub fn next_expr_display_id(&self) -> usize {
176 self.last_expr_display_id.update(|id| id + 1);
177 self.last_expr_display_id.get()
178 }
179
180 pub fn next_watermark_group_id(&self) -> WatermarkGroupId {
181 self.last_watermark_group_id.update(|id| id + 1);
182 self.last_watermark_group_id.get()
183 }
184
185 pub(in crate::optimizer) fn backup_elem_ids(&self) -> LastAssignedIds {
186 LastAssignedIds {
187 last_plan_node_id: self.last_plan_node_id.get(),
188 last_correlated_id: self.last_correlated_id.get(),
189 last_expr_display_id: self.last_expr_display_id.get(),
190 last_watermark_group_id: self.last_watermark_group_id.get(),
191 }
192 }
193
194 pub(in crate::optimizer) fn reset_elem_ids(&self) {
196 self.last_plan_node_id.set(0);
197 self.last_correlated_id.set(0);
198 self.last_expr_display_id.set(0);
199 self.last_watermark_group_id.set(0);
200 }
201
202 pub(in crate::optimizer) fn restore_elem_ids(&self, backup: LastAssignedIds) {
203 self.last_plan_node_id.set(backup.last_plan_node_id);
204 self.last_correlated_id.set(backup.last_correlated_id);
205 self.last_expr_display_id.set(backup.last_expr_display_id);
206 self.last_watermark_group_id
207 .set(backup.last_watermark_group_id);
208 }
209
210 pub fn add_rule_applied(&self, num: usize) {
211 *self.total_rule_applied.borrow_mut() += num;
212 }
213
214 pub fn total_rule_applied(&self) -> usize {
215 *self.total_rule_applied.borrow()
216 }
217
218 pub fn is_explain_verbose(&self) -> bool {
219 self.explain_options.verbose
220 }
221
222 pub fn is_explain_trace(&self) -> bool {
223 self.explain_options.trace
224 }
225
226 fn is_explain_logical(&self) -> bool {
227 self.explain_options.explain_type == ExplainType::Logical
228 }
229
230 pub fn trace(&self, str: impl Into<String>) {
231 if self.is_explain_logical() && self.logical_explain.borrow().is_some() {
233 return;
234 }
235 let mut optimizer_trace = self.optimizer_trace.borrow_mut();
236 let string = str.into();
237 tracing::info!(target: "explain_trace", "\n{}", string);
238 optimizer_trace.push(string);
239 optimizer_trace.push("\n".to_owned());
240 }
241
242 pub fn warn_to_user(&self, str: impl Into<String>) {
243 self.session_ctx().notice_to_user(str);
244 }
245
246 pub fn inc_missed_locality_providers(&self) {
250 self.missed_locality_providers
251 .set(self.missed_locality_providers.get() + 1);
252 }
253
254 pub fn missed_locality_providers(&self) -> usize {
257 self.missed_locality_providers.get()
258 }
259
260 fn explain_plan_impl(&self, plan: &impl Explain) -> String {
261 match self.explain_options.explain_format {
262 ExplainFormat::Text => plan.explain_to_string(),
263 ExplainFormat::Json => plan.explain_to_json(),
264 ExplainFormat::Xml => plan.explain_to_xml(),
265 ExplainFormat::Yaml => plan.explain_to_yaml(),
266 ExplainFormat::Dot => plan.explain_to_dot(),
267 }
268 }
269
270 pub fn may_store_explain_logical(&self, plan: &LogicalPlanRef) {
271 if self.is_explain_logical() {
272 let str = self.explain_plan_impl(plan);
273 *self.logical_explain.borrow_mut() = Some(str);
274 }
275 }
276
277 pub fn take_logical(&self) -> Option<String> {
278 self.logical_explain.borrow_mut().take()
279 }
280
281 pub fn take_trace(&self) -> Vec<String> {
282 self.optimizer_trace.borrow_mut().drain(..).collect()
283 }
284
285 pub fn with_options(&self) -> &WithOptions {
286 &self.with_options
287 }
288
289 pub fn overwrite_options(&self) -> &OverwriteOptions {
290 &self.overwrite_options
291 }
292
293 pub fn add_batch_mview_candidate(&self, table: Arc<TableCatalog>, plan: LogicalPlanRef) {
294 self.batch_mview_candidates
295 .borrow_mut()
296 .push(MaterializedViewCandidate { plan, table });
297 }
298
299 pub fn batch_mview_candidates(&self) -> std::cell::Ref<'_, Vec<MaterializedViewCandidate>> {
300 self.batch_mview_candidates.borrow()
301 }
302
303 pub fn session_ctx(&self) -> &Arc<SessionImpl> {
304 &self.session_ctx
305 }
306
307 pub fn sql(&self) -> &str {
309 &self.sql
310 }
311
312 pub fn normalized_sql(&self) -> &str {
314 &self.normalized_sql
315 }
316
317 pub fn session_timezone(&self) -> RefMut<'_, SessionTimezone> {
318 self.session_timezone.borrow_mut()
319 }
320
321 pub fn get_session_timezone(&self) -> String {
322 self.session_timezone.borrow().timezone()
323 }
324
325 pub fn iceberg_snapshot_id_map(&self) -> RefMut<'_, HashMap<String, Option<i64>>> {
326 self.iceberg_snapshot_id_map.borrow_mut()
327 }
328}
329
330impl std::fmt::Debug for OptimizerContext {
331 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332 write!(
333 f,
334 "QueryContext {{ sql = {}, explain_options = {}, with_options = {:?}, last_plan_node_id = {}, last_correlated_id = {} }}",
335 self.sql,
336 self.explain_options,
337 self.with_options,
338 self.last_plan_node_id.get(),
339 self.last_correlated_id.get(),
340 )
341 }
342}