risingwave_frontend/optimizer/plan_node/
logical_iceberg_scan.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::{Hash, Hasher};
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_connector::source::iceberg::IcebergFileScanTask;
19use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
20
21use super::generic::GenericPlanRef;
22use super::utils::{Distill, childless_record};
23use super::{
24    ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, PlanBase, PredicatePushdown,
25    ToBatch, ToStream, generic,
26};
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::error::Result;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::plan_node::utils::column_names_pretty;
31use crate::optimizer::plan_node::{
32    BatchIcebergScan, ColumnPruningContext, LogicalFilter, PredicatePushdownContext,
33    RewriteStreamContext, ToStreamContext,
34};
35use crate::utils::{ColIndexMapping, Condition};
36
37/// `LogicalIcebergScan` is only used by batch queries.
38/// It represents a scan of Iceberg data files, with delete files handled via anti-joins
39/// added on top of this scan.
40///
41/// The conversion flow is:
42/// 1. `LogicalSource` (iceberg) -> `LogicalIcebergIntermediateScan`
43/// 2. Predicate pushdown and column pruning on `LogicalIcebergIntermediateScan`
44/// 3. `LogicalIcebergIntermediateScan` -> `LogicalIcebergScan`
45#[derive(Debug, Clone, PartialEq)]
46pub struct LogicalIcebergScan {
47    pub base: PlanBase<Logical>,
48    pub core: generic::Source,
49    pub task: IcebergFileScanTask,
50}
51
52impl Eq for LogicalIcebergScan {}
53
54impl Hash for LogicalIcebergScan {
55    fn hash<H: Hasher>(&self, state: &mut H) {
56        self.base.hash(state);
57        self.core.hash(state);
58        self.iceberg_scan_type().hash(state);
59    }
60}
61
62impl LogicalIcebergScan {
63    pub fn new(core: generic::Source, task: IcebergFileScanTask) -> Self {
64        let base = PlanBase::new_logical_with_core(&core);
65        LogicalIcebergScan { base, core, task }
66    }
67
68    pub fn source_catalog(&self) -> Option<&SourceCatalog> {
69        self.core.catalog.as_deref()
70    }
71
72    pub fn iceberg_scan_type(&self) -> IcebergScanType {
73        match &self.task {
74            IcebergFileScanTask::Data(_) => IcebergScanType::DataScan,
75            IcebergFileScanTask::EqualityDelete(_) => IcebergScanType::EqualityDeleteScan,
76            IcebergFileScanTask::PositionDelete(_) => IcebergScanType::PositionDeleteScan,
77        }
78    }
79}
80
81impl_plan_tree_node_for_leaf! { Logical, LogicalIcebergScan}
82impl Distill for LogicalIcebergScan {
83    fn distill<'a>(&self) -> XmlNode<'a> {
84        let fields = if let Some(catalog) = self.source_catalog() {
85            let src = Pretty::from(catalog.name.clone());
86            vec![
87                ("source", src),
88                ("columns", column_names_pretty(self.schema())),
89                (
90                    "iceberg_scan_type",
91                    Pretty::debug(&self.iceberg_scan_type()),
92                ),
93            ]
94        } else {
95            vec![]
96        };
97        childless_record("LogicalIcebergScan", fields)
98    }
99}
100
101impl ColPrunable for LogicalIcebergScan {
102    fn prune_col(&self, _: &[usize], _: &mut ColumnPruningContext) -> PlanRef {
103        // Column pruning should have been done in LogicalIcebergIntermediateScan.
104        unreachable!()
105    }
106}
107
108impl ExprRewritable<Logical> for LogicalIcebergScan {}
109
110impl ExprVisitable for LogicalIcebergScan {}
111
112impl PredicatePushdown for LogicalIcebergScan {
113    fn predicate_pushdown(
114        &self,
115        predicate: Condition,
116        _ctx: &mut PredicatePushdownContext,
117    ) -> PlanRef {
118        // Predicate pushdown should have been done in LogicalIcebergIntermediateScan.
119        LogicalFilter::create(self.clone().into(), predicate)
120    }
121}
122
123impl ToBatch for LogicalIcebergScan {
124    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
125        let plan = BatchIcebergScan::new(self.core.clone(), self.task.clone()).into();
126        Ok(plan)
127    }
128}
129
130impl ToStream for LogicalIcebergScan {
131    fn to_stream(
132        &self,
133        _ctx: &mut ToStreamContext,
134    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
135        unreachable!()
136    }
137
138    fn logical_rewrite_for_stream(
139        &self,
140        _ctx: &mut RewriteStreamContext,
141    ) -> Result<(PlanRef, ColIndexMapping)> {
142        unreachable!()
143    }
144}