risingwave_frontend/optimizer/plan_visitor/
temporal_join_validator.rsuse risingwave_sqlparser::ast::AsOf;
use super::{DefaultBehavior, Merge};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
BatchSeqScan, LogicalScan, PlanTreeNodeBinary, StreamTableScan, StreamTemporalJoin,
};
use crate::optimizer::plan_visitor::PlanVisitor;
use crate::PlanRef;
#[derive(Debug, Clone, Default)]
pub struct TemporalJoinValidator {
found_non_append_only_temporal_join: bool,
}
impl TemporalJoinValidator {
pub fn exist_dangling_temporal_scan(plan: PlanRef) -> bool {
let mut decider = TemporalJoinValidator {
found_non_append_only_temporal_join: false,
};
let ctx = plan.ctx();
let has_dangling_temporal_scan = decider.visit(plan);
if decider.found_non_append_only_temporal_join {
ctx.session_ctx().notice_to_user("A non-append-only temporal join is used in the query. It would introduce a additional memo-table comparing to append-only temporal join.");
}
has_dangling_temporal_scan
}
}
impl PlanVisitor for TemporalJoinValidator {
type Result = bool;
type DefaultBehavior = impl DefaultBehavior<Self::Result>;
fn default_behavior() -> Self::DefaultBehavior {
Merge(|a, b| a | b)
}
fn visit_stream_table_scan(&mut self, stream_table_scan: &StreamTableScan) -> bool {
matches!(stream_table_scan.core().as_of, Some(AsOf::ProcessTime))
}
fn visit_batch_seq_scan(&mut self, batch_seq_scan: &BatchSeqScan) -> bool {
matches!(batch_seq_scan.core().as_of, Some(AsOf::ProcessTime))
}
fn visit_logical_scan(&mut self, logical_scan: &LogicalScan) -> bool {
matches!(logical_scan.as_of(), Some(AsOf::ProcessTime))
}
fn visit_stream_temporal_join(&mut self, stream_temporal_join: &StreamTemporalJoin) -> bool {
if !stream_temporal_join.append_only() {
self.found_non_append_only_temporal_join = true;
}
self.visit(stream_temporal_join.left())
}
}