risingwave_frontend/optimizer/
optimizer_context.rsuse core::fmt::Formatter;
use std::cell::{RefCell, RefMut};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use risingwave_sqlparser::ast::{ExplainFormat, ExplainOptions, ExplainType};
use crate::binder::ShareId;
use crate::expr::{CorrelatedId, SessionTimezone};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::PlanNodeId;
use crate::session::SessionImpl;
use crate::utils::{OverwriteOptions, WithOptions};
use crate::PlanRef;
const RESERVED_ID_NUM: u16 = 10000;
type PhantomUnsend = PhantomData<Rc<()>>;
pub struct OptimizerContext {
session_ctx: Arc<SessionImpl>,
next_plan_node_id: RefCell<i32>,
sql: Arc<str>,
normalized_sql: String,
explain_options: ExplainOptions,
optimizer_trace: RefCell<Vec<String>>,
logical_explain: RefCell<Option<String>>,
next_correlated_id: RefCell<u32>,
with_options: WithOptions,
session_timezone: RefCell<SessionTimezone>,
next_expr_display_id: RefCell<usize>,
total_rule_applied: RefCell<usize>,
overwrite_options: OverwriteOptions,
rcte_cache: RefCell<HashMap<ShareId, PlanRef>>,
_phantom: PhantomUnsend,
}
pub type OptimizerContextRef = Rc<OptimizerContext>;
impl OptimizerContext {
pub fn from_handler_args(handler_args: HandlerArgs) -> Self {
Self::new(handler_args, ExplainOptions::default())
}
pub fn new(mut handler_args: HandlerArgs, explain_options: ExplainOptions) -> Self {
let session_timezone = RefCell::new(SessionTimezone::new(
handler_args.session.config().timezone().to_owned(),
));
let overwrite_options = OverwriteOptions::new(&mut handler_args);
Self {
session_ctx: handler_args.session,
next_plan_node_id: RefCell::new(RESERVED_ID_NUM.into()),
sql: handler_args.sql,
normalized_sql: handler_args.normalized_sql,
explain_options,
optimizer_trace: RefCell::new(vec![]),
logical_explain: RefCell::new(None),
next_correlated_id: RefCell::new(0),
with_options: handler_args.with_options,
session_timezone,
next_expr_display_id: RefCell::new(RESERVED_ID_NUM.into()),
total_rule_applied: RefCell::new(0),
overwrite_options,
rcte_cache: RefCell::new(HashMap::new()),
_phantom: Default::default(),
}
}
#[cfg(test)]
#[expect(clippy::unused_async)]
pub async fn mock() -> OptimizerContextRef {
Self {
session_ctx: Arc::new(SessionImpl::mock()),
next_plan_node_id: RefCell::new(0),
sql: Arc::from(""),
normalized_sql: "".to_owned(),
explain_options: ExplainOptions::default(),
optimizer_trace: RefCell::new(vec![]),
logical_explain: RefCell::new(None),
next_correlated_id: RefCell::new(0),
with_options: Default::default(),
session_timezone: RefCell::new(SessionTimezone::new("UTC".into())),
next_expr_display_id: RefCell::new(0),
total_rule_applied: RefCell::new(0),
overwrite_options: OverwriteOptions::default(),
rcte_cache: RefCell::new(HashMap::new()),
_phantom: Default::default(),
}
.into()
}
pub fn next_plan_node_id(&self) -> PlanNodeId {
*self.next_plan_node_id.borrow_mut() += 1;
PlanNodeId(*self.next_plan_node_id.borrow())
}
pub fn get_plan_node_id(&self) -> i32 {
*self.next_plan_node_id.borrow()
}
pub fn set_plan_node_id(&self, next_plan_node_id: i32) {
*self.next_plan_node_id.borrow_mut() = next_plan_node_id;
}
pub fn next_expr_display_id(&self) -> usize {
*self.next_expr_display_id.borrow_mut() += 1;
*self.next_expr_display_id.borrow()
}
pub fn get_expr_display_id(&self) -> usize {
*self.next_expr_display_id.borrow()
}
pub fn set_expr_display_id(&self, expr_display_id: usize) {
*self.next_expr_display_id.borrow_mut() = expr_display_id;
}
pub fn next_correlated_id(&self) -> CorrelatedId {
*self.next_correlated_id.borrow_mut() += 1;
*self.next_correlated_id.borrow()
}
pub fn add_rule_applied(&self, num: usize) {
*self.total_rule_applied.borrow_mut() += num;
}
pub fn total_rule_applied(&self) -> usize {
*self.total_rule_applied.borrow()
}
pub fn is_explain_verbose(&self) -> bool {
self.explain_options.verbose
}
pub fn is_explain_trace(&self) -> bool {
self.explain_options.trace
}
pub fn explain_type(&self) -> ExplainType {
self.explain_options.explain_type.clone()
}
pub fn explain_format(&self) -> ExplainFormat {
self.explain_options.explain_format.clone()
}
pub fn is_explain_logical(&self) -> bool {
self.explain_type() == ExplainType::Logical
}
pub fn trace(&self, str: impl Into<String>) {
if self.is_explain_logical() && self.logical_explain.borrow().is_some() {
return;
}
let mut optimizer_trace = self.optimizer_trace.borrow_mut();
let string = str.into();
tracing::trace!(target: "explain_trace", "{}", string);
optimizer_trace.push(string);
optimizer_trace.push("\n".to_string());
}
pub fn warn_to_user(&self, str: impl Into<String>) {
self.session_ctx().notice_to_user(str);
}
pub fn store_logical(&self, str: impl Into<String>) {
*self.logical_explain.borrow_mut() = Some(str.into())
}
pub fn take_logical(&self) -> Option<String> {
self.logical_explain.borrow_mut().take()
}
pub fn take_trace(&self) -> Vec<String> {
self.optimizer_trace.borrow_mut().drain(..).collect()
}
pub fn with_options(&self) -> &WithOptions {
&self.with_options
}
pub fn overwrite_options(&self) -> &OverwriteOptions {
&self.overwrite_options
}
pub fn session_ctx(&self) -> &Arc<SessionImpl> {
&self.session_ctx
}
pub fn sql(&self) -> &str {
&self.sql
}
pub fn normalized_sql(&self) -> &str {
&self.normalized_sql
}
pub fn session_timezone(&self) -> RefMut<'_, SessionTimezone> {
self.session_timezone.borrow_mut()
}
pub fn get_session_timezone(&self) -> String {
self.session_timezone.borrow().timezone()
}
pub fn get_rcte_cache_plan(&self, id: &ShareId) -> Option<PlanRef> {
self.rcte_cache.borrow().get(id).cloned()
}
pub fn insert_rcte_cache_plan(&self, id: ShareId, plan: PlanRef) {
self.rcte_cache.borrow_mut().insert(id, plan);
}
}
impl std::fmt::Debug for OptimizerContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"QueryContext {{ next_plan_node_id = {}, sql = {}, explain_options = {}, next_correlated_id = {}, with_options = {:?} }}",
self.next_plan_node_id.borrow(),
self.sql,
self.explain_options,
self.next_correlated_id.borrow(),
&self.with_options
)
}
}