risingwave_frontend/optimizer/
optimizer_context.rs1use core::fmt::Formatter;
16use std::cell::{Cell, RefCell, RefMut};
17use std::collections::HashMap;
18use std::marker::PhantomData;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType};
23
24use super::property::WatermarkGroupId;
25use crate::Explain;
26use crate::expr::{CorrelatedId, SessionTimezone};
27use crate::handler::HandlerArgs;
28use crate::optimizer::LogicalPlanRef;
29use crate::optimizer::plan_node::PlanNodeId;
30use crate::session::SessionImpl;
31use crate::utils::{OverwriteOptions, WithOptions};
32
33const RESERVED_ID_NUM: u16 = 10000;
34
35type PhantomUnsend = PhantomData<Rc<()>>;
36
37pub struct OptimizerContext {
38 session_ctx: Arc<SessionImpl>,
39 sql: Arc<str>,
41 normalized_sql: String,
43 explain_options: ExplainOptions,
45 optimizer_trace: RefCell<Vec<String>>,
47 logical_explain: RefCell<Option<String>>,
49 with_options: WithOptions,
51 session_timezone: RefCell<SessionTimezone>,
53 total_rule_applied: RefCell<usize>,
55 overwrite_options: OverwriteOptions,
58 iceberg_snapshot_id_map: RefCell<HashMap<String, Option<i64>>>,
61
62 last_plan_node_id: Cell<i32>,
64 last_correlated_id: Cell<u32>,
66 last_expr_display_id: Cell<usize>,
68 last_watermark_group_id: Cell<u32>,
70
71 _phantom: PhantomUnsend,
72}
73
74pub(in crate::optimizer) struct LastAssignedIds {
75 last_plan_node_id: i32,
76 last_correlated_id: u32,
77 last_expr_display_id: usize,
78 last_watermark_group_id: u32,
79}
80
81pub type OptimizerContextRef = Rc<OptimizerContext>;
82
83impl OptimizerContext {
84 pub fn from_handler_args(handler_args: HandlerArgs) -> Self {
87 Self::new(handler_args, ExplainOptions::default())
88 }
89
90 pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
92 let session_timezone = RefCell::new(SessionTimezone::new(
93 handler_args.session.config().timezone(),
94 ));
95 let overwrite_options = OverwriteOptions::new(&mut handler_args);
96 Self {
97 session_ctx: handler_args.session,
98 sql: handler_args.sql,
99 normalized_sql: handler_args.normalized_sql,
100 explain_options,
101 optimizer_trace: RefCell::new(vec![]),
102 logical_explain: RefCell::new(None),
103 with_options: handler_args.with_options,
104 session_timezone,
105 total_rule_applied: RefCell::new(0),
106 overwrite_options,
107 iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
108
109 last_plan_node_id: Cell::new(RESERVED_ID_NUM.into()),
110 last_correlated_id: Cell::new(0),
111 last_expr_display_id: Cell::new(RESERVED_ID_NUM.into()),
112 last_watermark_group_id: Cell::new(RESERVED_ID_NUM.into()),
113
114 _phantom: Default::default(),
115 }
116 }
117
118 #[cfg(test)]
120 #[expect(clippy::unused_async)]
121 pub async fn mock() -> OptimizerContextRef {
122 Self {
123 session_ctx: Arc::new(SessionImpl::mock()),
124 sql: Arc::from(""),
125 normalized_sql: "".to_owned(),
126 explain_options: ExplainOptions::default(),
127 optimizer_trace: RefCell::new(vec![]),
128 logical_explain: RefCell::new(None),
129 with_options: Default::default(),
130 session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
131 total_rule_applied: RefCell::new(0),
132 overwrite_options: OverwriteOptions::default(),
133 iceberg_snapshot_id_map: RefCell::new(HashMap::new()),
134
135 last_plan_node_id: Cell::new(0),
136 last_correlated_id: Cell::new(0),
137 last_expr_display_id: Cell::new(0),
138 last_watermark_group_id: Cell::new(0),
139
140 _phantom: Default::default(),
141 }
142 .into()
143 }
144
145 pub fn next_plan_node_id(&self) -> PlanNodeId {
146 self.last_plan_node_id.update(|id| id + 1);
147 PlanNodeId(self.last_plan_node_id.get())
148 }
149
150 pub fn next_correlated_id(&self) -> CorrelatedId {
151 self.last_correlated_id.update(|id| id + 1);
152 self.last_correlated_id.get()
153 }
154
155 pub fn next_expr_display_id(&self) -> usize {
156 self.last_expr_display_id.update(|id| id + 1);
157 self.last_expr_display_id.get()
158 }
159
160 pub fn next_watermark_group_id(&self) -> WatermarkGroupId {
161 self.last_watermark_group_id.update(|id| id + 1);
162 self.last_watermark_group_id.get()
163 }
164
165 pub(in crate::optimizer) fn backup_elem_ids(&self) -> LastAssignedIds {
166 LastAssignedIds {
167 last_plan_node_id: self.last_plan_node_id.get(),
168 last_correlated_id: self.last_correlated_id.get(),
169 last_expr_display_id: self.last_expr_display_id.get(),
170 last_watermark_group_id: self.last_watermark_group_id.get(),
171 }
172 }
173
174 pub(in crate::optimizer) fn reset_elem_ids(&self) {
176 self.last_plan_node_id.set(0);
177 self.last_correlated_id.set(0);
178 self.last_expr_display_id.set(0);
179 self.last_watermark_group_id.set(0);
180 }
181
182 pub(in crate::optimizer) fn restore_elem_ids(&self, backup: LastAssignedIds) {
183 self.last_plan_node_id.set(backup.last_plan_node_id);
184 self.last_correlated_id.set(backup.last_correlated_id);
185 self.last_expr_display_id.set(backup.last_expr_display_id);
186 self.last_watermark_group_id
187 .set(backup.last_watermark_group_id);
188 }
189
190 pub fn add_rule_applied(&self, num: usize) {
191 *self.total_rule_applied.borrow_mut() += num;
192 }
193
194 pub fn total_rule_applied(&self) -> usize {
195 *self.total_rule_applied.borrow()
196 }
197
198 pub fn is_explain_verbose(&self) -> bool {
199 self.explain_options.verbose
200 }
201
202 pub fn is_explain_trace(&self) -> bool {
203 self.explain_options.trace
204 }
205
206 fn is_explain_logical(&self) -> bool {
207 self.explain_options.explain_type == ExplainType::Logical
208 }
209
210 pub fn trace(&self, str: impl Into<String>) {
211 if self.is_explain_logical() && self.logical_explain.borrow().is_some() {
213 return;
214 }
215 let mut optimizer_trace = self.optimizer_trace.borrow_mut();
216 let string = str.into();
217 tracing::info!(target: "explain_trace", "\n{}", string);
218 optimizer_trace.push(string);
219 optimizer_trace.push("\n".to_owned());
220 }
221
222 pub fn warn_to_user(&self, str: impl Into<String>) {
223 self.session_ctx().notice_to_user(str);
224 }
225
226 fn explain_plan_impl(&self, plan: &impl Explain) -> String {
227 match self.explain_options.explain_format {
228 ExplainFormat::Text => plan.explain_to_string(),
229 ExplainFormat::Json => plan.explain_to_json(),
230 ExplainFormat::Xml => plan.explain_to_xml(),
231 ExplainFormat::Yaml => plan.explain_to_yaml(),
232 ExplainFormat::Dot => plan.explain_to_dot(),
233 }
234 }
235
236 pub fn may_store_explain_logical(&self, plan: &LogicalPlanRef) {
237 if self.is_explain_logical() {
238 let str = self.explain_plan_impl(plan);
239 *self.logical_explain.borrow_mut() = Some(str);
240 }
241 }
242
243 pub fn take_logical(&self) -> Option<String> {
244 self.logical_explain.borrow_mut().take()
245 }
246
247 pub fn take_trace(&self) -> Vec<String> {
248 self.optimizer_trace.borrow_mut().drain(..).collect()
249 }
250
251 pub fn with_options(&self) -> &WithOptions {
252 &self.with_options
253 }
254
255 pub fn overwrite_options(&self) -> &OverwriteOptions {
256 &self.overwrite_options
257 }
258
259 pub fn session_ctx(&self) -> &Arc<SessionImpl> {
260 &self.session_ctx
261 }
262
263 pub fn sql(&self) -> &str {
265 &self.sql
266 }
267
268 pub fn normalized_sql(&self) -> &str {
270 &self.normalized_sql
271 }
272
273 pub fn session_timezone(&self) -> RefMut<'_, SessionTimezone> {
274 self.session_timezone.borrow_mut()
275 }
276
277 pub fn get_session_timezone(&self) -> String {
278 self.session_timezone.borrow().timezone()
279 }
280
281 pub fn iceberg_snapshot_id_map(&self) -> RefMut<'_, HashMap<String, Option<i64>>> {
282 self.iceberg_snapshot_id_map.borrow_mut()
283 }
284}
285
286impl std::fmt::Debug for OptimizerContext {
287 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
288 write!(
289 f,
290 "QueryContext {{ sql = {}, explain_options = {}, with_options = {:?}, last_plan_node_id = {}, last_correlated_id = {} }}",
291 self.sql,
292 self.explain_options,
293 self.with_options,
294 self.last_plan_node_id.get(),
295 self.last_correlated_id.get(),
296 )
297 }
298}