risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_connector::source::iceberg::IcebergFileScanTask;
20use risingwave_pb::batch_plan::IcebergScanNode;
21use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
22use risingwave_pb::batch_plan::plan_node::NodeBody;
23use risingwave_sqlparser::ast::AsOf;
24
25use super::batch::prelude::*;
26use super::utils::{Distill, childless_record, column_names_pretty};
27use super::{
28    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
29    generic,
30};
31use crate::catalog::source_catalog::SourceCatalog;
32use crate::error::Result;
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::property::{Distribution, Order};
35
36#[derive(Debug, Clone, PartialEq)]
37pub struct BatchIcebergScan {
38    pub base: PlanBase<Batch>,
39    pub core: generic::Source,
40    pub task: IcebergFileScanTask,
41}
42
43impl Eq for BatchIcebergScan {}
44
45impl Hash for BatchIcebergScan {
46    fn hash<H: Hasher>(&self, state: &mut H) {
47        self.base.hash(state);
48        self.core.hash(state);
49        self.iceberg_scan_type().hash(state);
50    }
51}
52
53impl BatchIcebergScan {
54    pub fn new(core: generic::Source, task: IcebergFileScanTask) -> Self {
55        let base = PlanBase::new_batch_with_core(
56            &core,
57            // Use `Single` by default, will be updated later with `clone_with_dist`.
58            Distribution::Single,
59            Order::any(),
60        );
61
62        Self { base, core, task }
63    }
64
65    pub fn iceberg_scan_type(&self) -> IcebergScanType {
66        match &self.task {
67            IcebergFileScanTask::Data(_) => IcebergScanType::DataScan,
68            IcebergFileScanTask::EqualityDelete(_) => IcebergScanType::EqualityDeleteScan,
69            IcebergFileScanTask::PositionDelete(_) => IcebergScanType::PositionDeleteScan,
70        }
71    }
72
73    pub fn predicate(&self) -> Option<String> {
74        let predicate = self.task.predicate()?;
75        Some(predicate.to_string())
76    }
77
78    pub fn column_names(&self) -> Vec<&str> {
79        self.schema().names_str()
80    }
81
82    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
83        self.core.catalog.clone()
84    }
85
86    pub fn clone_with_dist(&self) -> Self {
87        let base = self
88            .base
89            .clone_with_new_distribution(Distribution::SomeShard);
90        Self {
91            base,
92            core: self.core.clone(),
93            task: self.task.clone(),
94        }
95    }
96
97    pub fn as_of(&self) -> Option<AsOf> {
98        self.core.as_of.clone()
99    }
100}
101
102impl_plan_tree_node_for_leaf! { Batch, BatchIcebergScan }
103
104impl Distill for BatchIcebergScan {
105    fn distill<'a>(&self) -> XmlNode<'a> {
106        let src = Pretty::from(self.source_catalog().unwrap().name.clone());
107        let mut fields = vec![
108            ("source", src),
109            ("columns", column_names_pretty(self.schema())),
110            (
111                "iceberg_scan_type",
112                Pretty::from(format!("{:?}", self.iceberg_scan_type())),
113            ),
114        ];
115        if let Some(predicate) = self.predicate() {
116            fields.push(("predicate", Pretty::from(predicate)));
117        }
118        childless_record("BatchIcebergScan", fields)
119    }
120}
121
122impl ToLocalBatch for BatchIcebergScan {
123    fn to_local(&self) -> Result<PlanRef> {
124        Ok(self.clone_with_dist().into())
125    }
126}
127
128impl ToDistributedBatch for BatchIcebergScan {
129    fn to_distributed(&self) -> Result<PlanRef> {
130        Ok(self.clone_with_dist().into())
131    }
132}
133
134impl ToBatchPb for BatchIcebergScan {
135    fn to_batch_prost_body(&self) -> NodeBody {
136        let source_catalog = self.source_catalog().unwrap();
137        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
138        NodeBody::IcebergScan(IcebergScanNode {
139            columns: self
140                .core
141                .column_catalog
142                .iter()
143                .map(|c| c.to_protobuf())
144                .collect(),
145            with_properties,
146            split: vec![],
147            secret_refs,
148            iceberg_scan_type: self.iceberg_scan_type() as i32,
149        })
150    }
151}
152
153impl ExprRewritable<Batch> for BatchIcebergScan {}
154
155impl ExprVisitable for BatchIcebergScan {}