risingwave_frontend/optimizer/plan_node/
batch_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_pb::batch_plan::SourceNode;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_sqlparser::ast::AsOf;
21
22use super::batch::prelude::*;
23use super::utils::{Distill, childless_record, column_names_pretty};
24use super::{
25    BatchPlanRef as PlanRef, ExprRewritable, PlanBase, ToBatchPb, ToDistributedBatch, ToLocalBatch,
26    generic,
27};
28use crate::catalog::source_catalog::SourceCatalog;
29use crate::error::Result;
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::property::{Distribution, Order};
32
33/// [`BatchSource`] represents a table/connector source at the very beginning of the graph.
34///
35/// For supported batch connectors, see [`crate::scheduler::plan_fragmenter::SourceScanInfo`].
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct BatchSource {
38    pub base: PlanBase<Batch>,
39    pub core: generic::Source,
40}
41
42impl BatchSource {
43    pub fn new(core: generic::Source) -> Self {
44        let base = PlanBase::new_batch_with_core(
45            &core,
46            // Use `Single` by default, will be updated later with `clone_with_dist`.
47            Distribution::Single,
48            Order::any(),
49        );
50
51        Self { base, core }
52    }
53
54    pub fn column_names(&self) -> Vec<&str> {
55        self.schema().names_str()
56    }
57
58    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
59        self.core.catalog.clone()
60    }
61
62    pub fn as_of(&self) -> Option<AsOf> {
63        self.core.as_of.clone()
64    }
65
66    pub fn clone_with_dist(&self) -> Self {
67        let base = self
68            .base
69            .clone_with_new_distribution(Distribution::SomeShard);
70        Self {
71            base,
72            core: self.core.clone(),
73        }
74    }
75}
76
77impl_plan_tree_node_for_leaf! { Batch, BatchSource }
78
79impl Distill for BatchSource {
80    fn distill<'a>(&self) -> XmlNode<'a> {
81        let src = Pretty::from(self.source_catalog().unwrap().name.clone());
82        let mut fields = vec![
83            ("source", src),
84            ("columns", column_names_pretty(self.schema())),
85        ];
86        if let Some(as_of) = &self.core.as_of {
87            fields.push(("as_of", Pretty::debug(as_of)));
88        }
89        childless_record("BatchSource", fields)
90    }
91}
92
93impl ToLocalBatch for BatchSource {
94    fn to_local(&self) -> Result<PlanRef> {
95        Ok(self.clone_with_dist().into())
96    }
97}
98
99impl ToDistributedBatch for BatchSource {
100    fn to_distributed(&self) -> Result<PlanRef> {
101        Ok(self.clone_with_dist().into())
102    }
103}
104
105impl ToBatchPb for BatchSource {
106    fn to_batch_prost_body(&self) -> NodeBody {
107        let source_catalog = self.source_catalog().unwrap();
108        let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
109        NodeBody::Source(SourceNode {
110            source_id: source_catalog.id,
111            info: Some(source_catalog.info.clone()),
112            columns: self
113                .core
114                .column_catalog
115                .iter()
116                .map(|c| c.to_protobuf())
117                .collect(),
118            with_properties,
119            split: vec![],
120            secret_refs,
121        })
122    }
123}
124
125impl ExprRewritable<Batch> for BatchSource {}
126
127impl ExprVisitable for BatchSource {}