risingwave_frontend/optimizer/plan_node/generic/
source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use educe::Educe;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_connector::WithPropertiesExt;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::super::utils::TableCatalogBuilder;
25use super::GenericPlanNode;
26use crate::TableCatalog;
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::{FunctionalDependencySet, StreamKind};
30
31/// In which scnario the source node is created
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33#[expect(clippy::enum_variant_names)]
34pub enum SourceNodeKind {
35    /// `CREATE TABLE` with a connector.
36    CreateTable,
37    /// `CREATE SOURCE` with a streaming job (shared source).
38    CreateSharedSource,
39    /// `CREATE MATERIALIZED VIEW` or batch scan from a source.
40    ///
41    /// Note:
42    /// - For non-shared source, `CREATE SOURCE` will not create a source node, and `CREATE MATERIALIZE VIEW` will create a `StreamSource`.
43    /// - For shared source, `CREATE MATERIALIZE VIEW` will create `StreamSourceScan` instead of `StreamSource`.
44    CreateMViewOrBatch,
45}
46
47/// [`Source`] returns contents of a table or other equivalent object
48#[derive(Debug, Clone, Educe)]
49#[educe(PartialEq, Eq, Hash)]
50pub struct Source {
51    /// If there is an external stream source, `catalog` will be `Some`. Otherwise, it is `None`.
52    pub catalog: Option<Rc<SourceCatalog>>,
53
54    // NOTE: Here we store `column_catalog` and `row_id_index`
55    // because they are needed when `catalog` is None.
56    // When `catalog` is Some, they are the same as these fields in `catalog`.
57    pub column_catalog: Vec<ColumnCatalog>,
58    pub row_id_index: Option<usize>,
59
60    pub kind: SourceNodeKind,
61
62    #[educe(PartialEq(ignore))]
63    #[educe(Hash(ignore))]
64    pub ctx: OptimizerContextRef,
65
66    pub as_of: Option<AsOf>,
67}
68
69impl GenericPlanNode for Source {
70    fn schema(&self) -> Schema {
71        let fields = self
72            .column_catalog
73            .iter()
74            .map(|c| (&c.column_desc).into())
75            .collect();
76        Schema { fields }
77    }
78
79    fn stream_key(&self) -> Option<Vec<usize>> {
80        // FIXME: output col idx is not set. But iceberg source can prune cols.
81        // XXX: there's a RISINGWAVE_ICEBERG_ROW_ID. Should we use it?
82        if let Some(idx) = self.row_id_index {
83            Some(vec![idx])
84        } else if let Some(catalog) = &self.catalog {
85            catalog
86                .pk_col_ids
87                .iter()
88                .map(|id| {
89                    self.column_catalog
90                        .iter()
91                        .position(|c| c.column_id() == *id)
92                })
93                .collect::<Option<Vec<_>>>()
94        } else {
95            None
96        }
97    }
98
99    fn ctx(&self) -> OptimizerContextRef {
100        self.ctx.clone()
101    }
102
103    fn functional_dependency(&self) -> FunctionalDependencySet {
104        let pk_indices = self.stream_key();
105        match pk_indices {
106            Some(pk_indices) => {
107                debug_assert!(
108                    pk_indices
109                        .iter()
110                        .all(|idx| *idx < self.column_catalog.len())
111                );
112                FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
113            }
114            None => FunctionalDependencySet::new(self.column_catalog.len()),
115        }
116    }
117}
118
119impl Source {
120    pub fn stream_kind(&self) -> StreamKind {
121        if let Some(catalog) = &self.catalog {
122            if catalog.append_only {
123                StreamKind::AppendOnly
124            } else {
125                // Always treat source as upsert, as we either don't parse the old record for `Update`, or we don't
126                // trust the old record from external source.
127                StreamKind::Upsert
128            }
129        } else {
130            // `Source` acts only as a barrier receiver. There's no data at all.
131            StreamKind::AppendOnly
132        }
133    }
134
135    /// The output is [`risingwave_connector::source::filesystem::FsPageItem`] / [`iceberg::scan::FileScanTask`]
136    pub fn file_list_node(core: Self) -> Self {
137        let column_catalog = if core.is_iceberg_connector() {
138            vec![
139                ColumnCatalog {
140                    column_desc: ColumnDesc::from_field_with_column_id(
141                        &Field {
142                            name: "file_path".to_owned(),
143                            data_type: DataType::Varchar,
144                        },
145                        0,
146                    ),
147                    is_hidden: false,
148                },
149                ColumnCatalog {
150                    column_desc: ColumnDesc::from_field_with_column_id(
151                        &Field {
152                            name: "file_scan_task".to_owned(),
153                            data_type: DataType::Jsonb,
154                        },
155                        1,
156                    ),
157                    is_hidden: false,
158                },
159            ]
160        } else if core.is_batch_connector() {
161            vec![
162                ColumnCatalog {
163                    column_desc: ColumnDesc::from_field_with_column_id(
164                        &Field {
165                            name: "batch_task_id".to_owned(),
166                            data_type: DataType::Varchar,
167                        },
168                        0,
169                    ),
170                    is_hidden: false,
171                },
172                ColumnCatalog {
173                    column_desc: ColumnDesc::from_field_with_column_id(
174                        &Field {
175                            name: "batch_task_info".to_owned(),
176                            data_type: DataType::Jsonb,
177                        },
178                        1,
179                    ),
180                    is_hidden: false,
181                },
182            ]
183        } else if core.is_new_fs_connector() {
184            vec![
185                ColumnCatalog {
186                    column_desc: ColumnDesc::from_field_with_column_id(
187                        &Field {
188                            name: "filename".to_owned(),
189                            data_type: DataType::Varchar,
190                        },
191                        0,
192                    ),
193                    is_hidden: false,
194                },
195                // This columns seems unused.
196                ColumnCatalog {
197                    column_desc: ColumnDesc::from_field_with_column_id(
198                        &Field {
199                            name: "last_edit_time".to_owned(),
200                            data_type: DataType::Timestamptz,
201                        },
202                        1,
203                    ),
204                    is_hidden: false,
205                },
206                ColumnCatalog {
207                    column_desc: ColumnDesc::from_field_with_column_id(
208                        &Field {
209                            name: "file_size".to_owned(),
210                            data_type: DataType::Int64,
211                        },
212                        2,
213                    ),
214                    is_hidden: false,
215                },
216            ]
217        } else {
218            unreachable!()
219        };
220        Self {
221            column_catalog,
222            row_id_index: None,
223            ..core
224        }
225    }
226
227    pub fn is_new_fs_connector(&self) -> bool {
228        self.catalog
229            .as_ref()
230            .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector())
231    }
232
233    pub fn is_iceberg_connector(&self) -> bool {
234        self.catalog
235            .as_ref()
236            .is_some_and(|catalog| catalog.with_properties.is_iceberg_connector())
237    }
238
239    pub fn is_kafka_connector(&self) -> bool {
240        self.catalog
241            .as_ref()
242            .is_some_and(|catalog| catalog.with_properties.is_kafka_connector())
243    }
244
245    pub fn is_batch_connector(&self) -> bool {
246        self.catalog
247            .as_ref()
248            .is_some_and(|catalog| catalog.with_properties.is_batch_connector())
249    }
250
251    pub fn requires_singleton(&self) -> bool {
252        self.is_iceberg_connector() || self.is_batch_connector()
253    }
254
255    /// Currently, only iceberg source supports time travel.
256    pub fn support_time_travel(&self) -> bool {
257        self.is_iceberg_connector()
258    }
259
260    pub fn exclude_iceberg_hidden_columns(mut self) -> Self {
261        let Some(catalog) = &mut self.catalog else {
262            return self;
263        };
264        if catalog.info.is_shared() {
265            // for shared source, we should produce all columns
266            return self;
267        }
268        if self.kind != SourceNodeKind::CreateMViewOrBatch {
269            return self;
270        }
271
272        let prune = |col: &ColumnCatalog| col.is_hidden() && !col.is_row_id_column();
273
274        // minus the number of hidden columns before row_id_index.
275        self.row_id_index = self.row_id_index.map(|idx| {
276            let mut cnt = 0;
277            for col in self.column_catalog.iter().take(idx + 1) {
278                if prune(col) {
279                    cnt += 1;
280                }
281            }
282            idx - cnt
283        });
284        self.column_catalog.retain(|c| !prune(c));
285        self
286    }
287
288    /// The columns in stream/batch source node indicate the actual columns it will produce,
289    /// instead of the columns defined in source catalog. The difference is generated columns.
290    pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
291        let original_row_id_index = self.row_id_index;
292        // minus the number of generated columns before row_id_index.
293        self.row_id_index = original_row_id_index.map(|idx| {
294            let mut cnt = 0;
295            for col in self.column_catalog.iter().take(idx + 1) {
296                if col.is_generated() {
297                    cnt += 1;
298                }
299            }
300            idx - cnt
301        });
302        self.column_catalog.retain(|c| !c.is_generated());
303        (self, original_row_id_index)
304    }
305
306    /// Source's state table is `partition_id -> offset_info`.
307    /// Its schema is irrelevant to the data's schema.
308    ///
309    /// ## Notes on the distribution of the state table (`is_distributed`)
310    ///
311    /// Source executors are always distributed, but their state tables are special.
312    ///
313    /// ### `StreamSourceExecutor`: singleton (only one vnode)
314    ///
315    /// Its states are not sharded by consistent hash.
316    ///
317    /// Each actor accesses (point get) some partitions (a.k.a splits).
318    /// They are assigned by `SourceManager` in meta,
319    /// instead of `vnode` computed from the `partition_id`.
320    ///
321    /// ### `StreamFsFetch`: distributed by `partition_id`
322    ///
323    /// Each actor accesses (range scan) splits according to the `vnode`
324    /// computed from `partition_id`.
325    /// This is a normal distributed table.
326    pub fn infer_internal_table_catalog(is_distributed: bool) -> TableCatalog {
327        let mut builder = TableCatalogBuilder::default();
328
329        let key = Field {
330            data_type: DataType::Varchar,
331            name: "partition_id".to_owned(),
332        };
333        let value = Field {
334            data_type: DataType::Jsonb,
335            name: "offset_info".to_owned(),
336        };
337
338        let ordered_col_idx = builder.add_column(&key);
339        builder.add_column(&value);
340        builder.add_order_column(ordered_col_idx, OrderType::ascending());
341
342        builder.build(
343            if is_distributed {
344                vec![ordered_col_idx]
345            } else {
346                vec![]
347            },
348            1,
349        )
350    }
351}