risingwave_frontend/binder/relation/
table_or_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::handler::privilege::ObjectCheckItem;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44    pub table_id: TableId,
45    pub table_catalog: Arc<TableCatalog>,
46    pub table_indexes: Vec<Arc<IndexCatalog>>,
47    pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52    pub table_id: TableId,
53    pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58    pub catalog: SourceCatalog,
59    pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63    pub fn is_shareable_cdc_connector(&self) -> bool {
64        self.catalog.with_properties.is_shareable_cdc_connector()
65    }
66
67    pub fn is_shared(&self) -> bool {
68        self.catalog.info.is_shared()
69    }
70}
71
72impl Binder {
73    pub fn bind_catalog_relation_by_object_name(
74        &mut self,
75        object_name: ObjectName,
76        bind_creating_relations: bool,
77    ) -> Result<Relation> {
78        let (schema_name, table_name) =
79            Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
80        self.bind_catalog_relation_by_name(
81            None,
82            schema_name.as_deref(),
83            &table_name,
84            None,
85            None,
86            bind_creating_relations,
87        )
88    }
89
90    /// Binds table or source, or logical view according to what we get from the catalog.
91    pub fn bind_catalog_relation_by_name(
92        &mut self,
93        db_name: Option<&str>,
94        schema_name: Option<&str>,
95        table_name: &str,
96        alias: Option<TableAlias>,
97        as_of: Option<AsOf>,
98        bind_creating_relations: bool,
99    ) -> Result<Relation> {
100        // define some helper functions converting catalog to bound relation
101        let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
102            let table = BoundSystemTable {
103                table_id: sys_table_catalog.id(),
104                sys_table_catalog: sys_table_catalog.clone(),
105            };
106            (
107                Relation::SystemTable(Box::new(table)),
108                sys_table_catalog
109                    .columns
110                    .iter()
111                    .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
112                    .collect_vec(),
113            )
114        };
115
116        // check db_name if exists first
117        if let Some(db_name) = db_name {
118            let _ = self.catalog.get_database_by_name(db_name)?;
119        }
120
121        // start to bind
122        let (ret, columns) = {
123            match schema_name {
124                Some(schema_name) => {
125                    let db_name = db_name.unwrap_or(&self.db_name);
126                    let schema_path = SchemaPath::Name(schema_name);
127                    if is_system_schema(schema_name) {
128                        if let Ok(sys_table_catalog) =
129                            self.catalog
130                                .get_sys_table_by_name(db_name, schema_name, table_name)
131                        {
132                            resolve_sys_table_relation(sys_table_catalog)
133                        } else if let Ok((view_catalog, _)) =
134                            self.catalog
135                                .get_view_by_name(db_name, schema_path, table_name)
136                        {
137                            self.resolve_view_relation(&view_catalog.clone())?
138                        } else {
139                            bail_not_implemented!(
140                                issue = 1695,
141                                r###"{}.{} is not supported, please use `SHOW` commands for now.
142`SHOW TABLES`,
143`SHOW MATERIALIZED VIEWS`,
144`DESCRIBE <table>`,
145`SHOW COLUMNS FROM [table]`
146"###,
147                                schema_name,
148                                table_name
149                            );
150                        }
151                    } else if let Some(source_catalog) =
152                        self.temporary_source_manager.get_source(table_name)
153                    // don't care about the database and schema
154                    {
155                        self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
156                    } else if let Ok((table_catalog, schema_name)) = self
157                        .catalog
158                        .get_any_table_by_name(db_name, schema_path, table_name)
159                        && (bind_creating_relations || table_catalog.is_created())
160                    {
161                        self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
162                    } else if let Ok((source_catalog, _)) =
163                        self.catalog
164                            .get_source_by_name(db_name, schema_path, table_name)
165                    {
166                        self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
167                    } else if let Ok((view_catalog, _)) =
168                        self.catalog
169                            .get_view_by_name(db_name, schema_path, table_name)
170                    {
171                        self.resolve_view_relation(&view_catalog.clone())?
172                    } else {
173                        return Err(CatalogError::NotFound(
174                            "table or source",
175                            table_name.to_owned(),
176                        )
177                        .into());
178                    }
179                }
180                None => (|| {
181                    let user_name = &self.auth_context.user_name;
182
183                    for path in self.search_path.path() {
184                        if is_system_schema(path)
185                            && let Ok(sys_table_catalog) =
186                                self.catalog
187                                    .get_sys_table_by_name(&self.db_name, path, table_name)
188                        {
189                            return Ok(resolve_sys_table_relation(sys_table_catalog));
190                        } else {
191                            let schema_name = if path == USER_NAME_WILD_CARD {
192                                user_name
193                            } else {
194                                path
195                            };
196
197                            if let Ok(schema) =
198                                self.catalog.get_schema_by_name(&self.db_name, schema_name)
199                            {
200                                if let Some(source_catalog) =
201                                    self.temporary_source_manager.get_source(table_name)
202                                // don't care about the database and schema
203                                {
204                                    return self.resolve_source_relation(
205                                        &source_catalog.clone(),
206                                        as_of,
207                                        true,
208                                    );
209                                } else if let Some(table_catalog) =
210                                    schema.get_any_table_by_name(table_name)
211                                    && (bind_creating_relations
212                                        || table_catalog.is_internal_table()
213                                        || table_catalog.is_created())
214                                {
215                                    return self.resolve_table_relation(
216                                        table_catalog.clone(),
217                                        &schema_name.clone(),
218                                        as_of,
219                                    );
220                                } else if let Some(source_catalog) =
221                                    schema.get_source_by_name(table_name)
222                                {
223                                    return self.resolve_source_relation(
224                                        &source_catalog.clone(),
225                                        as_of,
226                                        false,
227                                    );
228                                } else if let Some(view_catalog) =
229                                    schema.get_view_by_name(table_name)
230                                {
231                                    return self.resolve_view_relation(&view_catalog.clone());
232                                }
233                            }
234                        }
235                    }
236
237                    Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
238                })()?,
239            }
240        };
241
242        self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
243        Ok(ret)
244    }
245
246    pub(crate) fn check_privilege(
247        &self,
248        item: ObjectCheckItem,
249        database_id: DatabaseId,
250    ) -> Result<()> {
251        // security invoker is disabled for view, ignore privilege check.
252        if self.context.disable_security_invoker {
253            return Ok(());
254        }
255
256        match self.bind_for {
257            BindFor::Stream | BindFor::Batch => {
258                // reject sources for cross-db access
259                if matches!(self.bind_for, BindFor::Stream)
260                    && self.database_id != database_id
261                    && matches!(item.object, PbObject::SourceId(_))
262                {
263                    return Err(PermissionDenied(format!(
264                        "SOURCE \"{}\" is not allowed for cross-db access",
265                        item.name
266                    ))
267                    .into());
268                }
269                if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
270                    if user.is_super || user.id == item.owner {
271                        return Ok(());
272                    }
273                    if !user.has_privilege(&item.object, item.mode) {
274                        return Err(PermissionDenied(item.error_message()).into());
275                    }
276
277                    // check CONNECT privilege for cross-db access
278                    if self.database_id != database_id
279                        && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
280                    {
281                        let db_name = self
282                            .catalog
283                            .get_database_by_id(&database_id)?
284                            .name
285                            .to_owned();
286
287                        return Err(PermissionDenied(format!(
288                            "permission denied for database \"{db_name}\""
289                        ))
290                        .into());
291                    }
292                } else {
293                    return Err(PermissionDenied("Session user is invalid".to_owned()).into());
294                }
295            }
296            BindFor::Ddl | BindFor::System => {
297                // do nothing.
298            }
299        }
300        Ok(())
301    }
302
303    fn resolve_table_relation(
304        &mut self,
305        table_catalog: Arc<TableCatalog>,
306        schema_name: &str,
307        as_of: Option<AsOf>,
308    ) -> Result<(Relation, Vec<(bool, Field)>)> {
309        let table_id = table_catalog.id();
310        let columns = table_catalog
311            .columns
312            .iter()
313            .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
314            .collect_vec();
315        self.check_privilege(
316            ObjectCheckItem::new(
317                table_catalog.owner,
318                AclMode::Select,
319                table_catalog.name.clone(),
320                PbObject::TableId(table_id.table_id),
321            ),
322            table_catalog.database_id,
323        )?;
324        self.included_relations.insert(table_id);
325
326        let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
327
328        let table = BoundBaseTable {
329            table_id,
330            table_catalog,
331            table_indexes,
332            as_of,
333        };
334
335        Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
336    }
337
338    fn resolve_source_relation(
339        &mut self,
340        source_catalog: &SourceCatalog,
341        as_of: Option<AsOf>,
342        is_temporary: bool,
343    ) -> Result<(Relation, Vec<(bool, Field)>)> {
344        debug_assert_column_ids_distinct(&source_catalog.columns);
345        if !is_temporary {
346            self.check_privilege(
347                ObjectCheckItem::new(
348                    source_catalog.owner,
349                    AclMode::Select,
350                    source_catalog.name.clone(),
351                    PbObject::SourceId(source_catalog.id),
352                ),
353                source_catalog.database_id,
354            )?;
355        }
356        self.included_relations.insert(source_catalog.id.into());
357        Ok((
358            Relation::Source(Box::new(BoundSource {
359                catalog: source_catalog.clone(),
360                as_of,
361            })),
362            source_catalog
363                .columns
364                .iter()
365                .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
366                .collect_vec(),
367        ))
368    }
369
370    fn resolve_view_relation(
371        &mut self,
372        view_catalog: &ViewCatalog,
373    ) -> Result<(Relation, Vec<(bool, Field)>)> {
374        if !view_catalog.is_system_view() {
375            self.check_privilege(
376                ObjectCheckItem::new(
377                    view_catalog.owner,
378                    AclMode::Select,
379                    view_catalog.name.clone(),
380                    PbObject::ViewId(view_catalog.id),
381                ),
382                view_catalog.database_id,
383            )?;
384        }
385
386        let ast = Parser::parse_sql(&view_catalog.sql)
387            .expect("a view's sql should be parsed successfully");
388        let Statement::Query(query) = ast
389            .into_iter()
390            .exactly_one()
391            .expect("a view should contain only one statement")
392        else {
393            unreachable!("a view should contain a query statement");
394        };
395        let query = self.bind_query_for_view(*query).map_err(|e| {
396            ErrorCode::BindError(format!(
397                "failed to bind view {}, sql: {}\nerror: {}",
398                view_catalog.name,
399                view_catalog.sql,
400                e.as_report()
401            ))
402        })?;
403
404        let columns = view_catalog.columns.clone();
405
406        if !itertools::equal(
407            query.schema().fields().iter().map(|f| &f.data_type),
408            view_catalog.columns.iter().map(|f| &f.data_type),
409        ) {
410            return Err(ErrorCode::BindError(format!(
411                "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
412                view_catalog.name, view_catalog.sql, query.schema(), columns
413            )).into());
414        }
415
416        let share_id = match self.shared_views.get(&view_catalog.id) {
417            Some(share_id) => *share_id,
418            None => {
419                let share_id = self.next_share_id();
420                self.shared_views.insert(view_catalog.id, share_id);
421                self.included_relations.insert(view_catalog.id.into());
422                share_id
423            }
424        };
425        let input = Either::Left(query);
426        Ok((
427            Relation::Share(Box::new(BoundShare {
428                share_id,
429                input: BoundShareInput::Query(input),
430            })),
431            columns.iter().map(|c| (false, c.clone())).collect_vec(),
432        ))
433    }
434
435    fn resolve_table_indexes(
436        &self,
437        schema_name: &str,
438        table_id: TableId,
439    ) -> Result<Vec<Arc<IndexCatalog>>> {
440        Ok(self
441            .catalog
442            .get_schema_by_name(&self.db_name, schema_name)?
443            .get_indexes_by_table_id(&table_id))
444    }
445
446    pub(crate) fn bind_table(
447        &mut self,
448        schema_name: Option<&str>,
449        table_name: &str,
450    ) -> Result<BoundBaseTable> {
451        let db_name = &self.db_name;
452        let schema_path = self.bind_schema_path(schema_name);
453        let (table_catalog, schema_name) =
454            self.catalog
455                .get_created_table_by_name(db_name, schema_path, table_name)?;
456        let table_catalog = table_catalog.clone();
457
458        let table_id = table_catalog.id();
459        let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
460
461        let columns = table_catalog.columns.clone();
462
463        self.bind_table_to_context(
464            columns
465                .iter()
466                .map(|c| (c.is_hidden, (&c.column_desc).into())),
467            table_name.to_owned(),
468            None,
469        )?;
470
471        Ok(BoundBaseTable {
472            table_id,
473            table_catalog,
474            table_indexes,
475            as_of: None,
476        })
477    }
478
479    pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
480        let table_name = &table.name;
481        match table.table_type() {
482            TableType::Table => {}
483            TableType::Index => {
484                return Err(ErrorCode::InvalidInputSyntax(format!(
485                    "cannot change index \"{table_name}\""
486                ))
487                .into());
488            }
489            TableType::MaterializedView => {
490                return Err(ErrorCode::InvalidInputSyntax(format!(
491                    "cannot change materialized view \"{table_name}\""
492                ))
493                .into());
494            }
495            TableType::Internal => {
496                return Err(ErrorCode::InvalidInputSyntax(format!(
497                    "cannot change internal table \"{table_name}\""
498                ))
499                .into());
500            }
501        }
502
503        if table.append_only && !is_insert {
504            return Err(ErrorCode::BindError(
505                "append-only table does not support update or delete".to_owned(),
506            )
507            .into());
508        }
509
510        Ok(())
511    }
512}