risingwave_frontend/binder/relation/
table_or_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::handler::privilege::ObjectCheckItem;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44    pub table_id: TableId,
45    pub table_catalog: Arc<TableCatalog>,
46    pub table_indexes: Vec<Arc<IndexCatalog>>,
47    pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52    pub table_id: TableId,
53    pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58    pub catalog: SourceCatalog,
59    pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63    pub fn is_shareable_cdc_connector(&self) -> bool {
64        self.catalog.with_properties.is_shareable_cdc_connector()
65    }
66
67    pub fn is_shared(&self) -> bool {
68        self.catalog.info.is_shared()
69    }
70}
71
72impl Binder {
73    pub fn bind_catalog_relation_by_object_name(
74        &mut self,
75        object_name: &ObjectName,
76        bind_creating_relations: bool,
77    ) -> Result<Relation> {
78        let (schema_name, table_name) =
79            Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
80        self.bind_catalog_relation_by_name(
81            None,
82            schema_name.as_deref(),
83            &table_name,
84            None,
85            None,
86            bind_creating_relations,
87        )
88    }
89
90    /// Binds table or source, or logical view according to what we get from the catalog.
91    pub fn bind_catalog_relation_by_name(
92        &mut self,
93        db_name: Option<&str>,
94        schema_name: Option<&str>,
95        table_name: &str,
96        alias: Option<&TableAlias>,
97        as_of: Option<&AsOf>,
98        bind_creating_relations: bool,
99    ) -> Result<Relation> {
100        // define some helper functions converting catalog to bound relation
101        let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
102            let table = BoundSystemTable {
103                table_id: sys_table_catalog.id(),
104                sys_table_catalog: sys_table_catalog.clone(),
105            };
106            (
107                Relation::SystemTable(Box::new(table)),
108                sys_table_catalog
109                    .columns
110                    .iter()
111                    .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
112                    .collect_vec(),
113            )
114        };
115
116        // check db_name if exists first
117        if let Some(db_name) = db_name {
118            let _ = self.catalog.get_database_by_name(db_name)?;
119        }
120
121        // start to bind
122        let (ret, columns) = {
123            match schema_name {
124                Some(schema_name) => {
125                    let db_name = db_name.unwrap_or(&self.db_name).to_owned();
126                    let schema_path = SchemaPath::Name(schema_name);
127                    if is_system_schema(schema_name) {
128                        if let Ok(sys_table_catalog) =
129                            self.catalog
130                                .get_sys_table_by_name(&db_name, schema_name, table_name)
131                        {
132                            resolve_sys_table_relation(sys_table_catalog)
133                        } else if let Ok((view_catalog, _)) =
134                            self.catalog
135                                .get_view_by_name(&db_name, schema_path, table_name)
136                        {
137                            self.resolve_view_relation(&view_catalog.clone())?
138                        } else {
139                            bail_not_implemented!(
140                                issue = 1695,
141                                r###"{}.{} is not supported, please use `SHOW` commands for now.
142`SHOW TABLES`,
143`SHOW MATERIALIZED VIEWS`,
144`DESCRIBE <table>`,
145`SHOW COLUMNS FROM [table]`
146"###,
147                                schema_name,
148                                table_name
149                            );
150                        }
151                    } else if let Some(source_catalog) =
152                        self.temporary_source_manager.get_source(table_name)
153                    // don't care about the database and schema
154                    {
155                        self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
156                    } else if let Ok((table_catalog, schema_name)) = self
157                        .catalog
158                        .get_any_table_by_name(&db_name, schema_path, table_name)
159                        && (bind_creating_relations || table_catalog.is_created())
160                    {
161                        self.resolve_table_relation(
162                            table_catalog.clone(),
163                            &db_name,
164                            schema_name,
165                            as_of,
166                        )?
167                    } else if let Ok((source_catalog, _)) =
168                        self.catalog
169                            .get_source_by_name(&db_name, schema_path, table_name)
170                    {
171                        self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
172                    } else if let Ok((view_catalog, _)) =
173                        self.catalog
174                            .get_view_by_name(&db_name, schema_path, table_name)
175                    {
176                        self.resolve_view_relation(&view_catalog.clone())?
177                    } else {
178                        return Err(CatalogError::NotFound(
179                            "table or source",
180                            table_name.to_owned(),
181                        )
182                        .into());
183                    }
184                }
185                None => (|| {
186                    // If schema is not specified, db must be unspecified.
187                    // So we should always use current database here.
188                    assert!(db_name.is_none());
189                    let db_name = self.db_name.clone();
190                    let user_name = self.auth_context.user_name.clone();
191
192                    for path in self.search_path.path() {
193                        if is_system_schema(path)
194                            && let Ok(sys_table_catalog) = self
195                                .catalog
196                                .get_sys_table_by_name(&db_name, path, table_name)
197                        {
198                            return Ok(resolve_sys_table_relation(sys_table_catalog));
199                        } else {
200                            let schema_name = if path == USER_NAME_WILD_CARD {
201                                &user_name
202                            } else {
203                                &path.clone()
204                            };
205
206                            if let Ok(schema) =
207                                self.catalog.get_schema_by_name(&db_name, schema_name)
208                            {
209                                if let Some(source_catalog) =
210                                    self.temporary_source_manager.get_source(table_name)
211                                // don't care about the database and schema
212                                {
213                                    return self.resolve_source_relation(
214                                        &source_catalog.clone(),
215                                        as_of,
216                                        true,
217                                    );
218                                } else if let Some(table_catalog) =
219                                    schema.get_any_table_by_name(table_name)
220                                    && (bind_creating_relations
221                                        || table_catalog.is_internal_table()
222                                        || table_catalog.is_created())
223                                {
224                                    return self.resolve_table_relation(
225                                        table_catalog.clone(),
226                                        &db_name,
227                                        schema_name,
228                                        as_of,
229                                    );
230                                } else if let Some(source_catalog) =
231                                    schema.get_source_by_name(table_name)
232                                {
233                                    return self.resolve_source_relation(
234                                        &source_catalog.clone(),
235                                        as_of,
236                                        false,
237                                    );
238                                } else if let Some(view_catalog) =
239                                    schema.get_view_by_name(table_name)
240                                {
241                                    return self.resolve_view_relation(&view_catalog.clone());
242                                }
243                            }
244                        }
245                    }
246
247                    Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
248                })()?,
249            }
250        };
251
252        self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
253        Ok(ret)
254    }
255
256    pub(crate) fn check_privilege(
257        &self,
258        item: ObjectCheckItem,
259        database_id: DatabaseId,
260    ) -> Result<()> {
261        // security invoker is disabled for view, ignore privilege check.
262        if self.context.disable_security_invoker {
263            return Ok(());
264        }
265
266        match self.bind_for {
267            BindFor::Stream | BindFor::Batch => {
268                // reject sources for cross-db access
269                if matches!(self.bind_for, BindFor::Stream)
270                    && self.database_id != database_id
271                    && matches!(item.object, PbObject::SourceId(_))
272                {
273                    return Err(PermissionDenied(format!(
274                        "SOURCE \"{}\" is not allowed for cross-db access",
275                        item.name
276                    ))
277                    .into());
278                }
279                if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
280                    if user.is_super || user.id == item.owner {
281                        return Ok(());
282                    }
283                    if !user.has_privilege(&item.object, item.mode) {
284                        return Err(PermissionDenied(item.error_message()).into());
285                    }
286
287                    // check CONNECT privilege for cross-db access
288                    if self.database_id != database_id
289                        && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
290                    {
291                        let db_name = self
292                            .catalog
293                            .get_database_by_id(&database_id)?
294                            .name
295                            .to_owned();
296
297                        return Err(PermissionDenied(format!(
298                            "permission denied for database \"{db_name}\""
299                        ))
300                        .into());
301                    }
302                } else {
303                    return Err(PermissionDenied("Session user is invalid".to_owned()).into());
304                }
305            }
306            BindFor::Ddl | BindFor::System => {
307                // do nothing.
308            }
309        }
310        Ok(())
311    }
312
313    fn resolve_table_relation(
314        &mut self,
315        table_catalog: Arc<TableCatalog>,
316        db_name: &str,
317        schema_name: &str,
318        as_of: Option<&AsOf>,
319    ) -> Result<(Relation, Vec<(bool, Field)>)> {
320        let table_id = table_catalog.id();
321        let columns = table_catalog
322            .columns
323            .iter()
324            .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
325            .collect_vec();
326        self.check_privilege(
327            ObjectCheckItem::new(
328                table_catalog.owner,
329                AclMode::Select,
330                table_catalog.name.clone(),
331                PbObject::TableId(table_id.table_id),
332            ),
333            table_catalog.database_id,
334        )?;
335        self.included_relations.insert(table_id);
336
337        let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
338
339        let table = BoundBaseTable {
340            table_id,
341            table_catalog,
342            table_indexes,
343            as_of: as_of.cloned(),
344        };
345
346        Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
347    }
348
349    fn resolve_source_relation(
350        &mut self,
351        source_catalog: &SourceCatalog,
352        as_of: Option<&AsOf>,
353        is_temporary: bool,
354    ) -> Result<(Relation, Vec<(bool, Field)>)> {
355        debug_assert_column_ids_distinct(&source_catalog.columns);
356        if !is_temporary {
357            self.check_privilege(
358                ObjectCheckItem::new(
359                    source_catalog.owner,
360                    AclMode::Select,
361                    source_catalog.name.clone(),
362                    PbObject::SourceId(source_catalog.id),
363                ),
364                source_catalog.database_id,
365            )?;
366        }
367        self.included_relations.insert(source_catalog.id.into());
368        Ok((
369            Relation::Source(Box::new(BoundSource {
370                catalog: source_catalog.clone(),
371                as_of: as_of.cloned(),
372            })),
373            source_catalog
374                .columns
375                .iter()
376                .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
377                .collect_vec(),
378        ))
379    }
380
381    fn resolve_view_relation(
382        &mut self,
383        view_catalog: &ViewCatalog,
384    ) -> Result<(Relation, Vec<(bool, Field)>)> {
385        if !view_catalog.is_system_view() {
386            self.check_privilege(
387                ObjectCheckItem::new(
388                    view_catalog.owner,
389                    AclMode::Select,
390                    view_catalog.name.clone(),
391                    PbObject::ViewId(view_catalog.id),
392                ),
393                view_catalog.database_id,
394            )?;
395        }
396
397        let ast = Parser::parse_sql(&view_catalog.sql)
398            .expect("a view's sql should be parsed successfully");
399        let Statement::Query(query) = ast
400            .into_iter()
401            .exactly_one()
402            .expect("a view should contain only one statement")
403        else {
404            unreachable!("a view should contain a query statement");
405        };
406        let query = self.bind_query_for_view(&query).map_err(|e| {
407            ErrorCode::BindError(format!(
408                "failed to bind view {}, sql: {}\nerror: {}",
409                view_catalog.name,
410                view_catalog.sql,
411                e.as_report()
412            ))
413        })?;
414
415        let columns = view_catalog.columns.clone();
416
417        if !itertools::equal(
418            query.schema().fields().iter().map(|f| &f.data_type),
419            view_catalog.columns.iter().map(|f| &f.data_type),
420        ) {
421            return Err(ErrorCode::BindError(format!(
422                "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
423                view_catalog.name, view_catalog.sql, query.schema(), columns
424            )).into());
425        }
426
427        let share_id = match self.shared_views.get(&view_catalog.id) {
428            Some(share_id) => *share_id,
429            None => {
430                let share_id = self.next_share_id();
431                self.shared_views.insert(view_catalog.id, share_id);
432                self.included_relations.insert(view_catalog.id.into());
433                share_id
434            }
435        };
436        let input = Either::Left(query);
437        Ok((
438            Relation::Share(Box::new(BoundShare {
439                share_id,
440                input: BoundShareInput::Query(input),
441            })),
442            columns.iter().map(|c| (false, c.clone())).collect_vec(),
443        ))
444    }
445
446    fn resolve_table_indexes(
447        &self,
448        db_name: &str,
449        schema_name: &str,
450        table_id: TableId,
451    ) -> Result<Vec<Arc<IndexCatalog>>> {
452        let schema = self.catalog.get_schema_by_name(db_name, schema_name)?;
453        assert!(
454            schema.get_table_by_id(&table_id).is_some(),
455            "table {table_id} not found in {db_name}.{schema_name}"
456        );
457
458        Ok(schema.get_indexes_by_table_id(&table_id))
459    }
460
461    pub(crate) fn bind_table(
462        &mut self,
463        schema_name: Option<&str>,
464        table_name: &str,
465    ) -> Result<BoundBaseTable> {
466        let db_name = &self.db_name;
467        let schema_path = self.bind_schema_path(schema_name);
468        let (table_catalog, schema_name) =
469            self.catalog
470                .get_created_table_by_name(db_name, schema_path, table_name)?;
471        let table_catalog = table_catalog.clone();
472
473        let table_id = table_catalog.id();
474        let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
475
476        let columns = table_catalog.columns.clone();
477
478        self.bind_table_to_context(
479            columns
480                .iter()
481                .map(|c| (c.is_hidden, (&c.column_desc).into())),
482            table_name.to_owned(),
483            None,
484        )?;
485
486        Ok(BoundBaseTable {
487            table_id,
488            table_catalog,
489            table_indexes,
490            as_of: None,
491        })
492    }
493
494    pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
495        let table_name = &table.name;
496        match table.table_type() {
497            TableType::Table => {}
498            TableType::Index => {
499                return Err(ErrorCode::InvalidInputSyntax(format!(
500                    "cannot change index \"{table_name}\""
501                ))
502                .into());
503            }
504            TableType::MaterializedView => {
505                return Err(ErrorCode::InvalidInputSyntax(format!(
506                    "cannot change materialized view \"{table_name}\""
507                ))
508                .into());
509            }
510            TableType::Internal => {
511                return Err(ErrorCode::InvalidInputSyntax(format!(
512                    "cannot change internal table \"{table_name}\""
513                ))
514                .into());
515            }
516        }
517
518        if table.append_only && !is_insert {
519            return Err(ErrorCode::BindError(
520                "append-only table does not support update or delete".to_owned(),
521            )
522            .into());
523        }
524
525        Ok(())
526    }
527}