risingwave_frontend/binder/relation/
table_or_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::handler::privilege::ObjectCheckItem;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44    pub table_id: TableId,
45    pub table_catalog: Arc<TableCatalog>,
46    pub table_indexes: Vec<Arc<IndexCatalog>>,
47    pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52    pub table_id: TableId,
53    pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58    pub catalog: SourceCatalog,
59    pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63    pub fn is_shareable_cdc_connector(&self) -> bool {
64        self.catalog.with_properties.is_shareable_cdc_connector()
65    }
66
67    pub fn is_shared(&self) -> bool {
68        self.catalog.info.is_shared()
69    }
70}
71
72impl Binder {
73    pub fn bind_catalog_relation_by_object_name(
74        &mut self,
75        object_name: &ObjectName,
76        bind_creating_relations: bool,
77    ) -> Result<Relation> {
78        let (schema_name, table_name) =
79            Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
80        self.bind_catalog_relation_by_name(
81            None,
82            schema_name.as_deref(),
83            &table_name,
84            None,
85            None,
86            bind_creating_relations,
87        )
88    }
89
90    /// Binds table or source, or logical view according to what we get from the catalog.
91    pub fn bind_catalog_relation_by_name(
92        &mut self,
93        db_name: Option<&str>,
94        schema_name: Option<&str>,
95        table_name: &str,
96        alias: Option<&TableAlias>,
97        as_of: Option<&AsOf>,
98        bind_creating_relations: bool,
99    ) -> Result<Relation> {
100        // define some helper functions converting catalog to bound relation
101        let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
102            let table = BoundSystemTable {
103                table_id: sys_table_catalog.id(),
104                sys_table_catalog: sys_table_catalog.clone(),
105            };
106            (
107                Relation::SystemTable(Box::new(table)),
108                sys_table_catalog
109                    .columns
110                    .iter()
111                    .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
112                    .collect_vec(),
113            )
114        };
115
116        // check db_name if exists first
117        if let Some(db_name) = db_name {
118            let _ = self.catalog.get_database_by_name(db_name)?;
119        }
120
121        // start to bind
122        let (ret, columns) = {
123            match schema_name {
124                Some(schema_name) => {
125                    let db_name = db_name.unwrap_or(&self.db_name).to_owned();
126                    let schema_path = SchemaPath::Name(schema_name);
127                    if is_system_schema(schema_name) {
128                        if let Ok(sys_table_catalog) =
129                            self.catalog
130                                .get_sys_table_by_name(&db_name, schema_name, table_name)
131                        {
132                            resolve_sys_table_relation(sys_table_catalog)
133                        } else if let Ok((view_catalog, _)) =
134                            self.catalog
135                                .get_view_by_name(&db_name, schema_path, table_name)
136                        {
137                            self.resolve_view_relation(&view_catalog.clone())?
138                        } else {
139                            bail_not_implemented!(
140                                issue = 1695,
141                                r###"{}.{} is not supported, please use `SHOW` commands for now.
142`SHOW TABLES`,
143`SHOW MATERIALIZED VIEWS`,
144`DESCRIBE <table>`,
145`SHOW COLUMNS FROM [table]`
146"###,
147                                schema_name,
148                                table_name
149                            );
150                        }
151                    } else if let Some(source_catalog) =
152                        self.temporary_source_manager.get_source(table_name)
153                    // don't care about the database and schema
154                    {
155                        self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
156                    } else if let Ok((table_catalog, schema_name)) = self
157                        .catalog
158                        .get_any_table_by_name(&db_name, schema_path, table_name)
159                        && (bind_creating_relations
160                            || table_catalog.is_internal_table()
161                            || table_catalog.is_created())
162                    {
163                        self.resolve_table_relation(
164                            table_catalog.clone(),
165                            &db_name,
166                            schema_name,
167                            as_of,
168                        )?
169                    } else if let Ok((source_catalog, _)) =
170                        self.catalog
171                            .get_source_by_name(&db_name, schema_path, table_name)
172                    {
173                        self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
174                    } else if let Ok((view_catalog, _)) =
175                        self.catalog
176                            .get_view_by_name(&db_name, schema_path, table_name)
177                    {
178                        self.resolve_view_relation(&view_catalog.clone())?
179                    } else if let Some(table_catalog) =
180                        self.staging_catalog_manager.get_table(table_name)
181                    {
182                        // don't care about the database and schema
183                        self.resolve_table_relation(
184                            table_catalog.clone().into(),
185                            &db_name,
186                            schema_name,
187                            as_of,
188                        )?
189                    } else {
190                        return Err(CatalogError::not_found("table or source", table_name).into());
191                    }
192                }
193                None => (|| {
194                    // If schema is not specified, db must be unspecified.
195                    // So we should always use current database here.
196                    assert!(db_name.is_none());
197                    let db_name = self.db_name.clone();
198                    let user_name = self.auth_context.user_name.clone();
199
200                    for path in self.search_path.path() {
201                        if is_system_schema(path)
202                            && let Ok(sys_table_catalog) = self
203                                .catalog
204                                .get_sys_table_by_name(&db_name, path, table_name)
205                        {
206                            return Ok(resolve_sys_table_relation(sys_table_catalog));
207                        } else {
208                            let schema_name = if path == USER_NAME_WILD_CARD {
209                                &user_name
210                            } else {
211                                &path.clone()
212                            };
213
214                            if let Ok(schema) =
215                                self.catalog.get_schema_by_name(&db_name, schema_name)
216                            {
217                                if let Some(source_catalog) =
218                                    self.temporary_source_manager.get_source(table_name)
219                                // don't care about the database and schema
220                                {
221                                    return self.resolve_source_relation(
222                                        &source_catalog.clone(),
223                                        as_of,
224                                        true,
225                                    );
226                                } else if let Some(table_catalog) =
227                                    schema.get_any_table_by_name(table_name)
228                                    && (bind_creating_relations
229                                        || table_catalog.is_internal_table()
230                                        || table_catalog.is_created())
231                                {
232                                    return self.resolve_table_relation(
233                                        table_catalog.clone(),
234                                        &db_name,
235                                        schema_name,
236                                        as_of,
237                                    );
238                                } else if let Some(source_catalog) =
239                                    schema.get_source_by_name(table_name)
240                                {
241                                    return self.resolve_source_relation(
242                                        &source_catalog.clone(),
243                                        as_of,
244                                        false,
245                                    );
246                                } else if let Some(view_catalog) =
247                                    schema.get_view_by_name(table_name)
248                                {
249                                    return self.resolve_view_relation(&view_catalog.clone());
250                                } else if let Some(table_catalog) =
251                                    self.staging_catalog_manager.get_table(table_name)
252                                {
253                                    // don't care about the database and schema
254                                    return self.resolve_table_relation(
255                                        table_catalog.clone().into(),
256                                        &db_name,
257                                        schema_name,
258                                        as_of,
259                                    );
260                                }
261                            }
262                        }
263                    }
264
265                    Err(CatalogError::not_found("table or source", table_name).into())
266                })()?,
267            }
268        };
269
270        self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
271        Ok(ret)
272    }
273
274    pub(crate) fn check_privilege(
275        &self,
276        item: ObjectCheckItem,
277        database_id: DatabaseId,
278    ) -> Result<()> {
279        // security invoker is disabled for view, ignore privilege check.
280        if self.context.disable_security_invoker {
281            return Ok(());
282        }
283
284        match self.bind_for {
285            BindFor::Stream | BindFor::Batch => {
286                // reject sources for cross-db access
287                if matches!(self.bind_for, BindFor::Stream)
288                    && self.database_id != database_id
289                    && matches!(item.object, PbObject::SourceId(_))
290                {
291                    return Err(PermissionDenied(format!(
292                        "SOURCE \"{}\" is not allowed for cross-db access",
293                        item.name
294                    ))
295                    .into());
296                }
297                if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
298                    if user.is_super || user.id == item.owner {
299                        return Ok(());
300                    }
301                    if !user.has_privilege(item.object, item.mode) {
302                        return Err(PermissionDenied(item.error_message()).into());
303                    }
304
305                    // check CONNECT privilege for cross-db access
306                    if self.database_id != database_id
307                        && !user.has_privilege(database_id, AclMode::Connect)
308                    {
309                        let db_name = self.catalog.get_database_by_id(database_id)?.name.clone();
310
311                        return Err(PermissionDenied(format!(
312                            "permission denied for database \"{db_name}\""
313                        ))
314                        .into());
315                    }
316                } else {
317                    return Err(PermissionDenied("Session user is invalid".to_owned()).into());
318                }
319            }
320            BindFor::Ddl | BindFor::System => {
321                // do nothing.
322            }
323        }
324        Ok(())
325    }
326
327    fn resolve_table_relation(
328        &mut self,
329        table_catalog: Arc<TableCatalog>,
330        db_name: &str,
331        schema_name: &str,
332        as_of: Option<&AsOf>,
333    ) -> Result<(Relation, Vec<(bool, Field)>)> {
334        let table_id = table_catalog.id();
335        let columns = table_catalog
336            .columns
337            .iter()
338            .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
339            .collect_vec();
340        self.check_privilege(
341            ObjectCheckItem::new(
342                table_catalog.owner,
343                AclMode::Select,
344                table_catalog.name.clone(),
345                table_id,
346            ),
347            table_catalog.database_id,
348        )?;
349        self.included_relations.insert(table_id.as_object_id());
350
351        let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
352
353        let table = BoundBaseTable {
354            table_id,
355            table_catalog,
356            table_indexes,
357            as_of: as_of.cloned(),
358        };
359
360        Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
361    }
362
363    fn resolve_source_relation(
364        &mut self,
365        source_catalog: &SourceCatalog,
366        as_of: Option<&AsOf>,
367        is_temporary: bool,
368    ) -> Result<(Relation, Vec<(bool, Field)>)> {
369        debug_assert_column_ids_distinct(&source_catalog.columns);
370        if !is_temporary {
371            self.check_privilege(
372                ObjectCheckItem::new(
373                    source_catalog.owner,
374                    AclMode::Select,
375                    source_catalog.name.clone(),
376                    source_catalog.id,
377                ),
378                source_catalog.database_id,
379            )?;
380        }
381        self.included_relations
382            .insert(source_catalog.id.as_object_id());
383        Ok((
384            Relation::Source(Box::new(BoundSource {
385                catalog: source_catalog.clone(),
386                as_of: as_of.cloned(),
387            })),
388            source_catalog
389                .columns
390                .iter()
391                .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
392                .collect_vec(),
393        ))
394    }
395
396    fn resolve_view_relation(
397        &mut self,
398        view_catalog: &ViewCatalog,
399    ) -> Result<(Relation, Vec<(bool, Field)>)> {
400        if !view_catalog.is_system_view() {
401            self.check_privilege(
402                ObjectCheckItem::new(
403                    view_catalog.owner,
404                    AclMode::Select,
405                    view_catalog.name.clone(),
406                    view_catalog.id,
407                ),
408                view_catalog.database_id,
409            )?;
410        }
411
412        let ast = Parser::parse_sql(&view_catalog.sql)
413            .expect("a view's sql should be parsed successfully");
414        let Statement::Query(query) = ast
415            .into_iter()
416            .exactly_one()
417            .expect("a view should contain only one statement")
418        else {
419            unreachable!("a view should contain a query statement");
420        };
421        let query = self.bind_query_for_view(&query).map_err(|e| {
422            ErrorCode::BindError(format!(
423                "failed to bind view {}, sql: {}\nerror: {}",
424                view_catalog.name,
425                view_catalog.sql,
426                e.as_report()
427            ))
428        })?;
429
430        let columns = view_catalog.columns.clone();
431
432        if !itertools::equal(
433            query.schema().fields().iter().map(|f| &f.data_type),
434            view_catalog.columns.iter().map(|f| &f.data_type),
435        ) {
436            return Err(ErrorCode::BindError(format!(
437                "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
438                view_catalog.name, view_catalog.sql, query.schema(), columns
439            )).into());
440        }
441
442        let share_id = match self.shared_views.get(&view_catalog.id) {
443            Some(share_id) => *share_id,
444            None => {
445                let share_id = self.next_share_id();
446                self.shared_views.insert(view_catalog.id, share_id);
447                self.included_relations
448                    .insert(view_catalog.id.as_object_id());
449                share_id
450            }
451        };
452        let input = Either::Left(query);
453        Ok((
454            Relation::Share(Box::new(BoundShare {
455                share_id,
456                input: BoundShareInput::Query(input),
457            })),
458            columns.iter().map(|c| (false, c.clone())).collect_vec(),
459        ))
460    }
461
462    fn resolve_table_indexes(
463        &self,
464        db_name: &str,
465        schema_name: &str,
466        table_id: TableId,
467    ) -> Result<Vec<Arc<IndexCatalog>>> {
468        let schema = self.catalog.get_schema_by_name(db_name, schema_name)?;
469        assert!(
470            schema.get_table_by_id(table_id).is_some() || table_id.is_placeholder(),
471            "table {table_id} not found in {db_name}.{schema_name}"
472        );
473
474        Ok(schema.get_created_indexes_by_table_id(table_id))
475    }
476
477    pub(crate) fn bind_table(
478        &mut self,
479        schema_name: Option<&str>,
480        table_name: &str,
481    ) -> Result<BoundBaseTable> {
482        let db_name = &self.db_name;
483        let schema_path = self.bind_schema_path(schema_name);
484        let (table_catalog, schema_name) =
485            self.catalog
486                .get_created_table_by_name(db_name, schema_path, table_name)?;
487        let table_catalog = table_catalog.clone();
488
489        let table_id = table_catalog.id();
490        let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
491
492        let columns = table_catalog.columns.clone();
493
494        self.bind_table_to_context(
495            columns
496                .iter()
497                .map(|c| (c.is_hidden, (&c.column_desc).into())),
498            table_name.to_owned(),
499            None,
500        )?;
501
502        Ok(BoundBaseTable {
503            table_id,
504            table_catalog,
505            table_indexes,
506            as_of: None,
507        })
508    }
509
510    pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
511        let table_name = &table.name;
512        match table.table_type() {
513            TableType::Table => {}
514            TableType::Index | TableType::VectorIndex => {
515                return Err(ErrorCode::InvalidInputSyntax(format!(
516                    "cannot change index \"{table_name}\""
517                ))
518                .into());
519            }
520            TableType::MaterializedView => {
521                return Err(ErrorCode::InvalidInputSyntax(format!(
522                    "cannot change materialized view \"{table_name}\""
523                ))
524                .into());
525            }
526            TableType::Internal => {
527                return Err(ErrorCode::InvalidInputSyntax(format!(
528                    "cannot change internal table \"{table_name}\""
529                ))
530                .into());
531            }
532        }
533
534        if table.append_only && !is_insert {
535            return Err(ErrorCode::BindError(
536                "append-only table does not support update or delete".to_owned(),
537            )
538            .into());
539        }
540
541        Ok(())
542    }
543}