risingwave_frontend/optimizer/plan_node/generic/
source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use educe::Educe;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_connector::WithPropertiesExt;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::super::utils::TableCatalogBuilder;
25use super::GenericPlanNode;
26use crate::TableCatalog;
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::FunctionalDependencySet;
30
31/// In which scnario the source node is created
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33#[expect(clippy::enum_variant_names)]
34pub enum SourceNodeKind {
35    /// `CREATE TABLE` with a connector.
36    CreateTable,
37    /// `CREATE SOURCE` with a streaming job (shared source).
38    CreateSharedSource,
39    /// `CREATE MATERIALIZED VIEW` or batch scan from a source.
40    ///
41    /// Note:
42    /// - For non-shared source, `CREATE SOURCE` will not create a source node, and `CREATE MATERIALIZE VIEW` will create a `StreamSource`.
43    /// - For shared source, `CREATE MATERIALIZE VIEW` will create `StreamSourceScan` instead of `StreamSource`.
44    CreateMViewOrBatch,
45}
46
47/// [`Source`] returns contents of a table or other equivalent object
48#[derive(Debug, Clone, Educe)]
49#[educe(PartialEq, Eq, Hash)]
50pub struct Source {
51    /// If there is an external stream source, `catalog` will be `Some`. Otherwise, it is `None`.
52    pub catalog: Option<Rc<SourceCatalog>>,
53
54    // NOTE: Here we store `column_catalog` and `row_id_index`
55    // because they are needed when `catalog` is None.
56    // When `catalog` is Some, they are the same as these fields in `catalog`.
57    pub column_catalog: Vec<ColumnCatalog>,
58    pub row_id_index: Option<usize>,
59
60    pub kind: SourceNodeKind,
61
62    #[educe(PartialEq(ignore))]
63    #[educe(Hash(ignore))]
64    pub ctx: OptimizerContextRef,
65
66    pub as_of: Option<AsOf>,
67}
68
69impl GenericPlanNode for Source {
70    fn schema(&self) -> Schema {
71        let fields = self
72            .column_catalog
73            .iter()
74            .map(|c| (&c.column_desc).into())
75            .collect();
76        Schema { fields }
77    }
78
79    fn stream_key(&self) -> Option<Vec<usize>> {
80        // FIXME: output col idx is not set. But iceberg source can prune cols.
81        // XXX: there's a RISINGWAVE_ICEBERG_ROW_ID. Should we use it?
82        self.row_id_index.map(|idx| vec![idx])
83    }
84
85    fn ctx(&self) -> OptimizerContextRef {
86        self.ctx.clone()
87    }
88
89    fn functional_dependency(&self) -> FunctionalDependencySet {
90        let pk_indices = self.stream_key();
91        match pk_indices {
92            Some(pk_indices) => {
93                debug_assert!(
94                    pk_indices
95                        .iter()
96                        .all(|idx| *idx < self.column_catalog.len())
97                );
98                FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
99            }
100            None => FunctionalDependencySet::new(self.column_catalog.len()),
101        }
102    }
103}
104
105impl Source {
106    /// The output is [`risingwave_connector::source::filesystem::FsPageItem`] / [`iceberg::scan::FileScanTask`]
107    pub fn file_list_node(core: Self) -> Self {
108        let column_catalog = if core.is_iceberg_connector() {
109            vec![
110                ColumnCatalog {
111                    column_desc: ColumnDesc::from_field_with_column_id(
112                        &Field {
113                            name: "file_path".to_owned(),
114                            data_type: DataType::Varchar,
115                        },
116                        0,
117                    ),
118                    is_hidden: false,
119                },
120                ColumnCatalog {
121                    column_desc: ColumnDesc::from_field_with_column_id(
122                        &Field {
123                            name: "file_scan_task".to_owned(),
124                            data_type: DataType::Jsonb,
125                        },
126                        1,
127                    ),
128                    is_hidden: false,
129                },
130            ]
131        } else if core.is_new_fs_connector() {
132            vec![
133                ColumnCatalog {
134                    column_desc: ColumnDesc::from_field_with_column_id(
135                        &Field {
136                            name: "filename".to_owned(),
137                            data_type: DataType::Varchar,
138                        },
139                        0,
140                    ),
141                    is_hidden: false,
142                },
143                // This columns seems unused.
144                ColumnCatalog {
145                    column_desc: ColumnDesc::from_field_with_column_id(
146                        &Field {
147                            name: "last_edit_time".to_owned(),
148                            data_type: DataType::Timestamptz,
149                        },
150                        1,
151                    ),
152                    is_hidden: false,
153                },
154                ColumnCatalog {
155                    column_desc: ColumnDesc::from_field_with_column_id(
156                        &Field {
157                            name: "file_size".to_owned(),
158                            data_type: DataType::Int64,
159                        },
160                        2,
161                    ),
162                    is_hidden: false,
163                },
164            ]
165        } else {
166            unreachable!()
167        };
168        Self {
169            column_catalog,
170            row_id_index: None,
171            ..core
172        }
173    }
174
175    pub fn is_new_fs_connector(&self) -> bool {
176        self.catalog
177            .as_ref()
178            .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector())
179    }
180
181    pub fn is_iceberg_connector(&self) -> bool {
182        self.catalog
183            .as_ref()
184            .is_some_and(|catalog| catalog.with_properties.is_iceberg_connector())
185    }
186
187    pub fn is_kafka_connector(&self) -> bool {
188        self.catalog
189            .as_ref()
190            .is_some_and(|catalog| catalog.with_properties.is_kafka_connector())
191    }
192
193    /// Currently, only iceberg source supports time travel.
194    pub fn support_time_travel(&self) -> bool {
195        self.is_iceberg_connector()
196    }
197
198    pub fn exclude_iceberg_hidden_columns(mut self) -> Self {
199        let Some(catalog) = &mut self.catalog else {
200            return self;
201        };
202        if catalog.info.is_shared() {
203            // for shared source, we should produce all columns
204            return self;
205        }
206        if self.kind != SourceNodeKind::CreateMViewOrBatch {
207            return self;
208        }
209
210        let prune = |col: &ColumnCatalog| col.is_hidden() && !col.is_row_id_column();
211
212        // minus the number of hidden columns before row_id_index.
213        self.row_id_index = self.row_id_index.map(|idx| {
214            let mut cnt = 0;
215            for col in self.column_catalog.iter().take(idx + 1) {
216                if prune(col) {
217                    cnt += 1;
218                }
219            }
220            idx - cnt
221        });
222        self.column_catalog.retain(|c| !prune(c));
223        self
224    }
225
226    /// The columns in stream/batch source node indicate the actual columns it will produce,
227    /// instead of the columns defined in source catalog. The difference is generated columns.
228    pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
229        let original_row_id_index = self.row_id_index;
230        // minus the number of generated columns before row_id_index.
231        self.row_id_index = original_row_id_index.map(|idx| {
232            let mut cnt = 0;
233            for col in self.column_catalog.iter().take(idx + 1) {
234                if col.is_generated() {
235                    cnt += 1;
236                }
237            }
238            idx - cnt
239        });
240        self.column_catalog.retain(|c| !c.is_generated());
241        (self, original_row_id_index)
242    }
243
244    /// Source's state table is `partition_id -> offset_info`.
245    /// Its schema is irrelevant to the data's schema.
246    ///
247    /// ## Notes on the distribution of the state table (`is_distributed`)
248    ///
249    /// Source executors are always distributed, but their state tables are special.
250    ///
251    /// ### `StreamSourceExecutor`: singleton (only one vnode)
252    ///
253    /// Its states are not sharded by consistent hash.
254    ///
255    /// Each actor accesses (point get) some partitions (a.k.a splits).
256    /// They are assigned by `SourceManager` in meta,
257    /// instead of `vnode` computed from the `partition_id`.
258    ///
259    /// ### `StreamFsFetch`: distributed by `partition_id`
260    ///
261    /// Each actor accesses (range scan) splits according to the `vnode`
262    /// computed from `partition_id`.
263    /// This is a normal distributed table.
264    pub fn infer_internal_table_catalog(is_distributed: bool) -> TableCatalog {
265        let mut builder = TableCatalogBuilder::default();
266
267        let key = Field {
268            data_type: DataType::Varchar,
269            name: "partition_id".to_owned(),
270        };
271        let value = Field {
272            data_type: DataType::Jsonb,
273            name: "offset_info".to_owned(),
274        };
275
276        let ordered_col_idx = builder.add_column(&key);
277        builder.add_column(&value);
278        builder.add_order_column(ordered_col_idx, OrderType::ascending());
279
280        builder.build(
281            if is_distributed {
282                vec![ordered_col_idx]
283            } else {
284                vec![]
285            },
286            1,
287        )
288    }
289}