risingwave_frontend/optimizer/plan_node/
batch_iceberg_scan.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::{Hash, Hasher};
16use std::rc::Rc;
17
18use pretty_xmlish::{Pretty, XmlNode};
19use risingwave_connector::source::iceberg::IcebergFileScanTask;
20use risingwave_pb::batch_plan::IcebergScanNode;
21use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType;
22use risingwave_pb::batch_plan::plan_node::NodeBody;
23use risingwave_sqlparser::ast::AsOf;
24
25use super::batch::prelude::*;
26use super::utils::{Distill, childless_record, column_names_pretty};
27use super::{
28    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
29    generic,
30};
31use crate::catalog::source_catalog::SourceCatalog;
32use crate::error::Result;
33use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
34use crate::optimizer::property::{Distribution, Order};
35
36#[derive(Debug, Clone, PartialEq)]
37pub struct BatchIcebergScan {
38    pub base: PlanBase<Batch>,
39    pub core: generic::Source,
40    pub task: IcebergFileScanTask,
41    limit: Option<u64>,
42}
43
44impl Eq for BatchIcebergScan {}
45
46impl Hash for BatchIcebergScan {
47    fn hash<H: Hasher>(&self, state: &mut H) {
48        self.base.hash(state);
49        self.core.hash(state);
50        self.iceberg_scan_type().hash(state);
51        self.limit.hash(state);
52    }
53}
54
55impl BatchIcebergScan {
56    pub fn new(core: generic::Source, task: IcebergFileScanTask) -> Self {
57        let base = PlanBase::new_batch_with_core(
58            &core,
59            // Use `Single` by default, will be updated later with `clone_with_dist`.
60            Distribution::Single,
61            Order::any(),
62        );
63
64        Self {
65            base,
66            core,
67            task,
68            limit: None,
69        }
70    }
71
72    pub fn clone_with_limit(&self, limit: Option<u64>) -> Self {
73        Self {
74            base: self.base.clone(),
75            core: self.core.clone(),
76            task: self.task.clone(),
77            limit,
78        }
79    }
80
81    pub fn limit(&self) -> Option<u64> {
82        self.limit
83    }
84
85    pub fn iceberg_scan_type(&self) -> IcebergScanType {
86        match &self.task {
87            IcebergFileScanTask::Data(_) => IcebergScanType::DataScan,
88            IcebergFileScanTask::EqualityDelete(_) => IcebergScanType::EqualityDeleteScan,
89            IcebergFileScanTask::PositionDelete(_) => IcebergScanType::PositionDeleteScan,
90        }
91    }
92
93    pub fn predicate(&self) -> Option<String> {
94        let predicate = self.task.predicate()?;
95        Some(predicate.to_string())
96    }
97
98    pub fn column_names(&self) -> Vec<&str> {
99        self.schema().names_str()
100    }
101
102    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
103        self.core.catalog.clone()
104    }
105
106    pub fn clone_with_dist(&self) -> Self {
107        let base = self
108            .base
109            .clone_with_new_distribution(Distribution::SomeShard);
110        Self {
111            base,
112            core: self.core.clone(),
113            task: self.task.clone(),
114            limit: self.limit,
115        }
116    }
117
118    pub fn as_of(&self) -> Option<AsOf> {
119        self.core.as_of.clone()
120    }
121}
122
123impl_plan_tree_node_for_leaf! { Batch, BatchIcebergScan }
124
125impl Distill for BatchIcebergScan {
126    fn distill<'a>(&self) -> XmlNode<'a> {
127        let src = Pretty::from(self.source_catalog().unwrap().name.clone());
128        let mut fields = vec![
129            ("source", src),
130            ("columns", column_names_pretty(self.schema())),
131            (
132                "iceberg_scan_type",
133                Pretty::from(format!("{:?}", self.iceberg_scan_type())),
134            ),
135        ];
136        if let Some(predicate) = self.predicate() {
137            fields.push(("predicate", Pretty::from(predicate)));
138        }
139        if let Some(limit) = self.limit {
140            fields.push(("limit", Pretty::debug(&limit)));
141        }
142        childless_record("BatchIcebergScan", fields)
143    }
144}
145
146impl ToLocalBatch for BatchIcebergScan {
147    fn to_local(&self) -> Result<PlanRef> {
148        Ok(self.clone_with_dist().into())
149    }
150}
151
152impl ToDistributedBatch for BatchIcebergScan {
153    fn to_distributed(&self) -> Result<PlanRef> {
154        Ok(self.clone_with_dist().into())
155    }
156}
157
158impl ToBatchPb for BatchIcebergScan {
159    fn to_batch_prost_body(&self) -> NodeBody {
160        let source_catalog = self.source_catalog().unwrap();
161        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
162        NodeBody::IcebergScan(IcebergScanNode {
163            columns: self
164                .core
165                .column_catalog
166                .iter()
167                .map(|c| c.to_protobuf())
168                .collect(),
169            with_properties,
170            split: vec![],
171            secret_refs,
172            iceberg_scan_type: self.iceberg_scan_type() as i32,
173        })
174    }
175}
176
177impl ExprRewritable<Batch> for BatchIcebergScan {}
178
179impl ExprVisitable for BatchIcebergScan {}