risingwave_frontend/binder/relation/
table_or_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::user::UserId;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44    pub table_id: TableId,
45    pub table_catalog: Arc<TableCatalog>,
46    pub table_indexes: Vec<Arc<IndexCatalog>>,
47    pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52    pub table_id: TableId,
53    pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58    pub catalog: SourceCatalog,
59    pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63    pub fn is_shareable_cdc_connector(&self) -> bool {
64        self.catalog.with_properties.is_shareable_cdc_connector()
65    }
66
67    pub fn is_shared(&self) -> bool {
68        self.catalog.info.is_shared()
69    }
70}
71
72impl Binder {
73    pub fn bind_catalog_relation_by_object_name(
74        &mut self,
75        object_name: ObjectName,
76        bind_creating_relations: bool,
77    ) -> Result<Relation> {
78        let (schema_name, table_name) =
79            Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
80        self.bind_catalog_relation_by_name(
81            None,
82            schema_name.as_deref(),
83            &table_name,
84            None,
85            None,
86            bind_creating_relations,
87        )
88    }
89
90    /// Binds table or source, or logical view according to what we get from the catalog.
91    pub fn bind_catalog_relation_by_name(
92        &mut self,
93        db_name: Option<&str>,
94        schema_name: Option<&str>,
95        table_name: &str,
96        alias: Option<TableAlias>,
97        as_of: Option<AsOf>,
98        bind_creating_relations: bool,
99    ) -> Result<Relation> {
100        // define some helper functions converting catalog to bound relation
101        let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
102            let table = BoundSystemTable {
103                table_id: sys_table_catalog.id(),
104                sys_table_catalog: sys_table_catalog.clone(),
105            };
106            (
107                Relation::SystemTable(Box::new(table)),
108                sys_table_catalog
109                    .columns
110                    .iter()
111                    .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
112                    .collect_vec(),
113            )
114        };
115
116        // check db_name if exists first
117        if let Some(db_name) = db_name {
118            let _ = self.catalog.get_database_by_name(db_name)?;
119        }
120
121        // start to bind
122        let (ret, columns) = {
123            match schema_name {
124                Some(schema_name) => {
125                    let db_name = db_name.unwrap_or(&self.db_name);
126                    let schema_path = SchemaPath::Name(schema_name);
127                    if is_system_schema(schema_name) {
128                        if let Ok(sys_table_catalog) =
129                            self.catalog
130                                .get_sys_table_by_name(db_name, schema_name, table_name)
131                        {
132                            resolve_sys_table_relation(sys_table_catalog)
133                        } else if let Ok((view_catalog, _)) =
134                            self.catalog
135                                .get_view_by_name(db_name, schema_path, table_name)
136                        {
137                            self.resolve_view_relation(&view_catalog.clone())?
138                        } else {
139                            bail_not_implemented!(
140                                issue = 1695,
141                                r###"{}.{} is not supported, please use `SHOW` commands for now.
142`SHOW TABLES`,
143`SHOW MATERIALIZED VIEWS`,
144`DESCRIBE <table>`,
145`SHOW COLUMNS FROM [table]`
146"###,
147                                schema_name,
148                                table_name
149                            );
150                        }
151                    } else if let Some(source_catalog) =
152                        self.temporary_source_manager.get_source(table_name)
153                    // don't care about the database and schema
154                    {
155                        self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
156                    } else if let Ok((table_catalog, schema_name)) = self
157                        .catalog
158                        .get_any_table_by_name(db_name, schema_path, table_name)
159                        && (bind_creating_relations || table_catalog.is_created())
160                    {
161                        self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
162                    } else if let Ok((source_catalog, _)) =
163                        self.catalog
164                            .get_source_by_name(db_name, schema_path, table_name)
165                    {
166                        self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
167                    } else if let Ok((view_catalog, _)) =
168                        self.catalog
169                            .get_view_by_name(db_name, schema_path, table_name)
170                    {
171                        self.resolve_view_relation(&view_catalog.clone())?
172                    } else {
173                        return Err(CatalogError::NotFound(
174                            "table or source",
175                            table_name.to_owned(),
176                        )
177                        .into());
178                    }
179                }
180                None => (|| {
181                    let user_name = &self.auth_context.user_name;
182
183                    for path in self.search_path.path() {
184                        if is_system_schema(path)
185                            && let Ok(sys_table_catalog) =
186                                self.catalog
187                                    .get_sys_table_by_name(&self.db_name, path, table_name)
188                        {
189                            return Ok(resolve_sys_table_relation(sys_table_catalog));
190                        } else {
191                            let schema_name = if path == USER_NAME_WILD_CARD {
192                                user_name
193                            } else {
194                                path
195                            };
196
197                            if let Ok(schema) =
198                                self.catalog.get_schema_by_name(&self.db_name, schema_name)
199                            {
200                                if let Some(source_catalog) =
201                                    self.temporary_source_manager.get_source(table_name)
202                                // don't care about the database and schema
203                                {
204                                    return self.resolve_source_relation(
205                                        &source_catalog.clone(),
206                                        as_of,
207                                        true,
208                                    );
209                                } else if let Some(table_catalog) =
210                                    schema.get_any_table_by_name(table_name)
211                                    && (bind_creating_relations
212                                        || table_catalog.is_internal_table()
213                                        || table_catalog.is_created())
214                                {
215                                    return self.resolve_table_relation(
216                                        table_catalog.clone(),
217                                        &schema_name.clone(),
218                                        as_of,
219                                    );
220                                } else if let Some(source_catalog) =
221                                    schema.get_source_by_name(table_name)
222                                {
223                                    return self.resolve_source_relation(
224                                        &source_catalog.clone(),
225                                        as_of,
226                                        false,
227                                    );
228                                } else if let Some(view_catalog) =
229                                    schema.get_view_by_name(table_name)
230                                {
231                                    return self.resolve_view_relation(&view_catalog.clone());
232                                }
233                            }
234                        }
235                    }
236
237                    Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
238                })()?,
239            }
240        };
241
242        self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
243        Ok(ret)
244    }
245
246    pub(crate) fn check_privilege(
247        &self,
248        object: PbObject,
249        database_id: DatabaseId,
250        mode: AclMode,
251        owner: UserId,
252    ) -> Result<()> {
253        // security invoker is disabled for view, ignore privilege check.
254        if self.context.disable_security_invoker {
255            return Ok(());
256        }
257
258        match self.bind_for {
259            BindFor::Stream | BindFor::Batch => {
260                // reject sources for cross-db access
261                if matches!(self.bind_for, BindFor::Stream)
262                    && self.database_id != database_id
263                    && matches!(object, PbObject::SourceId(_))
264                {
265                    return Err(PermissionDenied(
266                        "SOURCE is not allowed for cross-db access".to_owned(),
267                    )
268                    .into());
269                }
270                if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
271                    if user.is_super || user.id == owner {
272                        return Ok(());
273                    }
274                    if !user.has_privilege(&object, mode) {
275                        return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
276                    }
277
278                    // check CONNECT privilege for cross-db access
279                    if self.database_id != database_id
280                        && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
281                    {
282                        return Err(
283                            PermissionDenied("Do not have CONNECT privilege".to_owned()).into()
284                        );
285                    }
286                } else {
287                    return Err(PermissionDenied("Session user is invalid".to_owned()).into());
288                }
289            }
290            BindFor::Ddl | BindFor::System => {
291                // do nothing.
292            }
293        }
294        Ok(())
295    }
296
297    fn resolve_table_relation(
298        &mut self,
299        table_catalog: Arc<TableCatalog>,
300        schema_name: &str,
301        as_of: Option<AsOf>,
302    ) -> Result<(Relation, Vec<(bool, Field)>)> {
303        let table_id = table_catalog.id();
304        let columns = table_catalog
305            .columns
306            .iter()
307            .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
308            .collect_vec();
309        self.check_privilege(
310            PbObject::TableId(table_id.table_id),
311            table_catalog.database_id,
312            AclMode::Select,
313            table_catalog.owner,
314        )?;
315        self.included_relations.insert(table_id);
316
317        let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
318
319        let table = BoundBaseTable {
320            table_id,
321            table_catalog,
322            table_indexes,
323            as_of,
324        };
325
326        Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
327    }
328
329    fn resolve_source_relation(
330        &mut self,
331        source_catalog: &SourceCatalog,
332        as_of: Option<AsOf>,
333        is_temporary: bool,
334    ) -> Result<(Relation, Vec<(bool, Field)>)> {
335        debug_assert_column_ids_distinct(&source_catalog.columns);
336        if !is_temporary {
337            self.check_privilege(
338                PbObject::SourceId(source_catalog.id),
339                source_catalog.database_id,
340                AclMode::Select,
341                source_catalog.owner,
342            )?;
343        }
344        self.included_relations.insert(source_catalog.id.into());
345        Ok((
346            Relation::Source(Box::new(BoundSource {
347                catalog: source_catalog.clone(),
348                as_of,
349            })),
350            source_catalog
351                .columns
352                .iter()
353                .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
354                .collect_vec(),
355        ))
356    }
357
358    fn resolve_view_relation(
359        &mut self,
360        view_catalog: &ViewCatalog,
361    ) -> Result<(Relation, Vec<(bool, Field)>)> {
362        if !view_catalog.is_system_view() {
363            self.check_privilege(
364                PbObject::ViewId(view_catalog.id),
365                view_catalog.database_id,
366                AclMode::Select,
367                view_catalog.owner,
368            )?;
369        }
370
371        let ast = Parser::parse_sql(&view_catalog.sql)
372            .expect("a view's sql should be parsed successfully");
373        let Statement::Query(query) = ast
374            .into_iter()
375            .exactly_one()
376            .expect("a view should contain only one statement")
377        else {
378            unreachable!("a view should contain a query statement");
379        };
380        let query = self.bind_query_for_view(*query).map_err(|e| {
381            ErrorCode::BindError(format!(
382                "failed to bind view {}, sql: {}\nerror: {}",
383                view_catalog.name,
384                view_catalog.sql,
385                e.as_report()
386            ))
387        })?;
388
389        let columns = view_catalog.columns.clone();
390
391        if !itertools::equal(
392            query.schema().fields().iter().map(|f| &f.data_type),
393            view_catalog.columns.iter().map(|f| &f.data_type),
394        ) {
395            return Err(ErrorCode::BindError(format!(
396                "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
397                view_catalog.name, view_catalog.sql, query.schema(), columns
398            )).into());
399        }
400
401        let share_id = match self.shared_views.get(&view_catalog.id) {
402            Some(share_id) => *share_id,
403            None => {
404                let share_id = self.next_share_id();
405                self.shared_views.insert(view_catalog.id, share_id);
406                self.included_relations.insert(view_catalog.id.into());
407                share_id
408            }
409        };
410        let input = Either::Left(query);
411        Ok((
412            Relation::Share(Box::new(BoundShare {
413                share_id,
414                input: BoundShareInput::Query(input),
415            })),
416            columns.iter().map(|c| (false, c.clone())).collect_vec(),
417        ))
418    }
419
420    fn resolve_table_indexes(
421        &self,
422        schema_name: &str,
423        table_id: TableId,
424    ) -> Result<Vec<Arc<IndexCatalog>>> {
425        Ok(self
426            .catalog
427            .get_schema_by_name(&self.db_name, schema_name)?
428            .get_indexes_by_table_id(&table_id))
429    }
430
431    pub(crate) fn bind_table(
432        &mut self,
433        schema_name: Option<&str>,
434        table_name: &str,
435    ) -> Result<BoundBaseTable> {
436        let db_name = &self.db_name;
437        let schema_path = self.bind_schema_path(schema_name);
438        let (table_catalog, schema_name) =
439            self.catalog
440                .get_created_table_by_name(db_name, schema_path, table_name)?;
441        let table_catalog = table_catalog.clone();
442
443        let table_id = table_catalog.id();
444        let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
445
446        let columns = table_catalog.columns.clone();
447
448        self.bind_table_to_context(
449            columns
450                .iter()
451                .map(|c| (c.is_hidden, (&c.column_desc).into())),
452            table_name.to_owned(),
453            None,
454        )?;
455
456        Ok(BoundBaseTable {
457            table_id,
458            table_catalog,
459            table_indexes,
460            as_of: None,
461        })
462    }
463
464    pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
465        let table_name = &table.name;
466        match table.table_type() {
467            TableType::Table => {}
468            TableType::Index => {
469                return Err(ErrorCode::InvalidInputSyntax(format!(
470                    "cannot change index \"{table_name}\""
471                ))
472                .into());
473            }
474            TableType::MaterializedView => {
475                return Err(ErrorCode::InvalidInputSyntax(format!(
476                    "cannot change materialized view \"{table_name}\""
477                ))
478                .into());
479            }
480            TableType::Internal => {
481                return Err(ErrorCode::InvalidInputSyntax(format!(
482                    "cannot change internal table \"{table_name}\""
483                ))
484                .into());
485            }
486        }
487
488        if table.append_only && !is_insert {
489            return Err(ErrorCode::BindError(
490                "append-only table does not support update or delete".to_owned(),
491            )
492            .into());
493        }
494
495        Ok(())
496    }
497}