risingwave_frontend/optimizer/plan_node/
logical_iceberg_scan.rsuse std::rc::Rc;
use pretty_xmlish::{Pretty, XmlNode};
use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{
generic, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
PredicatePushdown, ToBatch, ToStream,
};
use crate::catalog::source_catalog::SourceCatalog;
use crate::error::Result;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::column_names_pretty;
use crate::optimizer::plan_node::{
BatchIcebergScan, ColumnPruningContext, LogicalFilter, LogicalSource, PredicatePushdownContext,
RewriteStreamContext, ToStreamContext,
};
use crate::utils::{ColIndexMapping, Condition};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalIcebergScan {
pub base: PlanBase<Logical>,
pub core: generic::Source,
}
impl LogicalIcebergScan {
pub fn new(logical_source: &LogicalSource) -> Self {
assert!(logical_source.core.is_iceberg_connector());
let core = logical_source.core.clone();
let base = PlanBase::new_logical_with_core(&core);
assert!(logical_source.output_exprs.is_none());
LogicalIcebergScan { base, core }
}
pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
self.core.catalog.clone()
}
pub fn clone_with_required_cols(&self, required_cols: &[usize]) -> Self {
assert!(!required_cols.is_empty());
let mut core = self.core.clone();
core.column_catalog = required_cols
.iter()
.map(|idx| core.column_catalog[*idx].clone())
.collect();
let base = PlanBase::new_logical_with_core(&core);
LogicalIcebergScan { base, core }
}
}
impl_plan_tree_node_for_leaf! {LogicalIcebergScan}
impl Distill for LogicalIcebergScan {
fn distill<'a>(&self) -> XmlNode<'a> {
let fields = if let Some(catalog) = self.source_catalog() {
let src = Pretty::from(catalog.name.clone());
vec![
("source", src),
("columns", column_names_pretty(self.schema())),
]
} else {
vec![]
};
childless_record("LogicalIcebergScan", fields)
}
}
impl ColPrunable for LogicalIcebergScan {
fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
if required_cols.is_empty() {
let mapping =
ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
LogicalProject::with_mapping(self.clone_with_required_cols(&[0]).into(), mapping).into()
} else {
self.clone_with_required_cols(required_cols).into()
}
}
}
impl ExprRewritable for LogicalIcebergScan {}
impl ExprVisitable for LogicalIcebergScan {}
impl PredicatePushdown for LogicalIcebergScan {
fn predicate_pushdown(
&self,
predicate: Condition,
_ctx: &mut PredicatePushdownContext,
) -> PlanRef {
LogicalFilter::create(self.clone().into(), predicate)
}
}
impl ToBatch for LogicalIcebergScan {
fn to_batch(&self) -> Result<PlanRef> {
let plan: PlanRef = BatchIcebergScan::new(self.core.clone()).into();
Ok(plan)
}
}
impl ToStream for LogicalIcebergScan {
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
unreachable!()
}
fn logical_rewrite_for_stream(
&self,
_ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
unreachable!()
}
}