risingwave_frontend/binder/relation/
table_or_source.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use itertools::Itertools;
18use risingwave_common::acl::AclMode;
19use risingwave_common::bail_not_implemented;
20use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
21use risingwave_common::session_config::USER_NAME_WILD_CARD;
22use risingwave_connector::WithPropertiesExt;
23use risingwave_pb::user::grant_privilege::PbObject;
24use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
25use risingwave_sqlparser::parser::Parser;
26use thiserror_ext::AsReport;
27
28use super::BoundShare;
29use crate::binder::relation::BoundShareInput;
30use crate::binder::{BindFor, Binder, Relation};
31use crate::catalog::root_catalog::SchemaPath;
32use crate::catalog::source_catalog::SourceCatalog;
33use crate::catalog::system_catalog::SystemTableCatalog;
34use crate::catalog::table_catalog::{TableCatalog, TableType};
35use crate::catalog::view_catalog::ViewCatalog;
36use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
37use crate::error::ErrorCode::PermissionDenied;
38use crate::error::{ErrorCode, Result, RwError};
39use crate::handler::privilege::ObjectCheckItem;
40
41#[derive(Debug, Clone)]
42pub struct BoundBaseTable {
43    pub table_id: TableId,
44    pub table_catalog: Arc<TableCatalog>,
45    pub table_indexes: Vec<Arc<IndexCatalog>>,
46    pub as_of: Option<AsOf>,
47}
48
49#[derive(Debug, Clone)]
50pub struct BoundSystemTable {
51    pub table_id: TableId,
52    pub sys_table_catalog: Arc<SystemTableCatalog>,
53}
54
55#[derive(Debug, Clone)]
56pub struct BoundSource {
57    pub catalog: SourceCatalog,
58    pub as_of: Option<AsOf>,
59}
60
61impl BoundSource {
62    pub fn is_shareable_cdc_connector(&self) -> bool {
63        self.catalog.with_properties.is_shareable_cdc_connector()
64    }
65
66    pub fn is_shared(&self) -> bool {
67        self.catalog.info.is_shared()
68    }
69}
70
71impl Binder {
72    pub fn bind_catalog_relation_by_object_name(
73        &mut self,
74        object_name: &ObjectName,
75        bind_creating_relations: bool,
76    ) -> Result<Relation> {
77        let (schema_name, table_name) =
78            Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
79        self.bind_catalog_relation_by_name(
80            None,
81            schema_name.as_deref(),
82            &table_name,
83            None,
84            None,
85            bind_creating_relations,
86        )
87    }
88
89    /// Binds table or source, or logical view according to what we get from the catalog.
90    pub fn bind_catalog_relation_by_name(
91        &mut self,
92        db_name: Option<&str>,
93        schema_name: Option<&str>,
94        table_name: &str,
95        alias: Option<&TableAlias>,
96        as_of: Option<&AsOf>,
97        bind_creating_relations: bool,
98    ) -> Result<Relation> {
99        // define some helper functions converting catalog to bound relation
100        let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
101            let table = BoundSystemTable {
102                table_id: sys_table_catalog.id(),
103                sys_table_catalog: sys_table_catalog.clone(),
104            };
105            (
106                Relation::SystemTable(Box::new(table)),
107                sys_table_catalog
108                    .columns
109                    .iter()
110                    .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
111                    .collect_vec(),
112            )
113        };
114
115        // check db_name if exists first
116        if let Some(db_name) = db_name {
117            let _ = self.catalog.get_database_by_name(db_name)?;
118        }
119
120        // start to bind
121        let (ret, columns) = {
122            match schema_name {
123                Some(schema_name) => {
124                    let db_name = db_name.unwrap_or(&self.db_name).to_owned();
125                    let schema_path = SchemaPath::Name(schema_name);
126                    if is_system_schema(schema_name) {
127                        if let Ok(sys_table_catalog) =
128                            self.catalog
129                                .get_sys_table_by_name(&db_name, schema_name, table_name)
130                        {
131                            resolve_sys_table_relation(sys_table_catalog)
132                        } else if let Ok((view_catalog, _)) =
133                            self.catalog
134                                .get_view_by_name(&db_name, schema_path, table_name)
135                        {
136                            self.resolve_view_relation(&view_catalog.clone())?
137                        } else {
138                            bail_not_implemented!(
139                                issue = 1695,
140                                r###"{}.{} is not supported, please use `SHOW` commands for now.
141`SHOW TABLES`,
142`SHOW MATERIALIZED VIEWS`,
143`DESCRIBE <table>`,
144`SHOW COLUMNS FROM [table]`
145"###,
146                                schema_name,
147                                table_name
148                            );
149                        }
150                    } else if let Some(source_catalog) =
151                        self.temporary_source_manager.get_source(table_name)
152                    // don't care about the database and schema
153                    {
154                        self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
155                    } else if let Ok((table_catalog, schema_name)) = self
156                        .catalog
157                        .get_any_table_by_name(&db_name, schema_path, table_name)
158                        && (bind_creating_relations
159                            || table_catalog.is_internal_table()
160                            || table_catalog.is_created())
161                    {
162                        self.resolve_table_relation(
163                            table_catalog.clone(),
164                            &db_name,
165                            schema_name,
166                            as_of,
167                        )?
168                    } else if let Ok((source_catalog, _)) =
169                        self.catalog
170                            .get_source_by_name(&db_name, schema_path, table_name)
171                    {
172                        self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
173                    } else if let Ok((view_catalog, _)) =
174                        self.catalog
175                            .get_view_by_name(&db_name, schema_path, table_name)
176                    {
177                        self.resolve_view_relation(&view_catalog.clone())?
178                    } else if let Some(table_catalog) =
179                        self.staging_catalog_manager.get_table(table_name)
180                    {
181                        // don't care about the database and schema
182                        self.resolve_table_relation(
183                            table_catalog.clone().into(),
184                            &db_name,
185                            schema_name,
186                            as_of,
187                        )?
188                    } else {
189                        return Err(CatalogError::not_found("table or source", table_name).into());
190                    }
191                }
192                None => (|| {
193                    // If schema is not specified, db must be unspecified.
194                    // So we should always use current database here.
195                    assert!(db_name.is_none());
196                    let db_name = self.db_name.clone();
197                    let user_name = self.auth_context.user_name.clone();
198
199                    for path in self.search_path.path() {
200                        if is_system_schema(path)
201                            && let Ok(sys_table_catalog) = self
202                                .catalog
203                                .get_sys_table_by_name(&db_name, path, table_name)
204                        {
205                            return Ok(resolve_sys_table_relation(sys_table_catalog));
206                        } else {
207                            let schema_name = if path == USER_NAME_WILD_CARD {
208                                &user_name
209                            } else {
210                                &path.clone()
211                            };
212
213                            if let Ok(schema) =
214                                self.catalog.get_schema_by_name(&db_name, schema_name)
215                            {
216                                if let Some(source_catalog) =
217                                    self.temporary_source_manager.get_source(table_name)
218                                // don't care about the database and schema
219                                {
220                                    return self.resolve_source_relation(
221                                        &source_catalog.clone(),
222                                        as_of,
223                                        true,
224                                    );
225                                } else if let Some(table_catalog) =
226                                    schema.get_any_table_by_name(table_name)
227                                    && (bind_creating_relations
228                                        || table_catalog.is_internal_table()
229                                        || table_catalog.is_created())
230                                {
231                                    return self.resolve_table_relation(
232                                        table_catalog.clone(),
233                                        &db_name,
234                                        schema_name,
235                                        as_of,
236                                    );
237                                } else if let Some(source_catalog) =
238                                    schema.get_source_by_name(table_name)
239                                {
240                                    return self.resolve_source_relation(
241                                        &source_catalog.clone(),
242                                        as_of,
243                                        false,
244                                    );
245                                } else if let Some(view_catalog) =
246                                    schema.get_view_by_name(table_name)
247                                {
248                                    return self.resolve_view_relation(&view_catalog.clone());
249                                } else if let Some(table_catalog) =
250                                    self.staging_catalog_manager.get_table(table_name)
251                                {
252                                    // don't care about the database and schema
253                                    return self.resolve_table_relation(
254                                        table_catalog.clone().into(),
255                                        &db_name,
256                                        schema_name,
257                                        as_of,
258                                    );
259                                }
260                            }
261                        }
262                    }
263
264                    Err(CatalogError::not_found("table or source", table_name).into())
265                })()?,
266            }
267        };
268
269        self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
270        Ok(ret)
271    }
272
273    pub(crate) fn check_privilege(
274        &self,
275        item: ObjectCheckItem,
276        database_id: DatabaseId,
277    ) -> Result<()> {
278        // security invoker is disabled for view, ignore privilege check.
279        if self.context.disable_security_invoker {
280            return Ok(());
281        }
282
283        match self.bind_for {
284            BindFor::Stream | BindFor::Batch => {
285                // reject sources for cross-db access
286                if matches!(self.bind_for, BindFor::Stream)
287                    && self.database_id != database_id
288                    && matches!(item.object, PbObject::SourceId(_))
289                {
290                    return Err(PermissionDenied(format!(
291                        "SOURCE \"{}\" is not allowed for cross-db access",
292                        item.name
293                    ))
294                    .into());
295                }
296                if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
297                    if user.is_super || user.id == item.owner {
298                        return Ok(());
299                    }
300                    if !user.has_privilege(item.object, item.mode) {
301                        return Err(PermissionDenied(item.error_message()).into());
302                    }
303
304                    // check CONNECT privilege for cross-db access
305                    if self.database_id != database_id
306                        && !user.has_privilege(database_id, AclMode::Connect)
307                    {
308                        let db_name = self.catalog.get_database_by_id(database_id)?.name.clone();
309
310                        return Err(PermissionDenied(format!(
311                            "permission denied for database \"{db_name}\""
312                        ))
313                        .into());
314                    }
315                } else {
316                    return Err(PermissionDenied("Session user is invalid".to_owned()).into());
317                }
318            }
319            BindFor::Ddl | BindFor::System => {
320                // do nothing.
321            }
322        }
323        Ok(())
324    }
325
326    fn resolve_table_relation(
327        &mut self,
328        table_catalog: Arc<TableCatalog>,
329        db_name: &str,
330        schema_name: &str,
331        as_of: Option<&AsOf>,
332    ) -> Result<(Relation, Vec<(bool, Field)>)> {
333        let table_id = table_catalog.id();
334        let columns = table_catalog
335            .columns
336            .iter()
337            .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
338            .collect_vec();
339        self.check_privilege(
340            ObjectCheckItem::new(
341                table_catalog.owner,
342                AclMode::Select,
343                table_catalog.name.clone(),
344                table_id,
345            ),
346            table_catalog.database_id,
347        )?;
348        self.included_relations.insert(table_id.as_object_id());
349
350        let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
351
352        let table = BoundBaseTable {
353            table_id,
354            table_catalog,
355            table_indexes,
356            as_of: as_of.cloned(),
357        };
358
359        Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
360    }
361
362    fn resolve_source_relation(
363        &mut self,
364        source_catalog: &SourceCatalog,
365        as_of: Option<&AsOf>,
366        is_temporary: bool,
367    ) -> Result<(Relation, Vec<(bool, Field)>)> {
368        debug_assert_column_ids_distinct(&source_catalog.columns);
369        if !is_temporary {
370            self.check_privilege(
371                ObjectCheckItem::new(
372                    source_catalog.owner,
373                    AclMode::Select,
374                    source_catalog.name.clone(),
375                    source_catalog.id,
376                ),
377                source_catalog.database_id,
378            )?;
379        }
380        self.included_relations
381            .insert(source_catalog.id.as_object_id());
382        Ok((
383            Relation::Source(Box::new(BoundSource {
384                catalog: source_catalog.clone(),
385                as_of: as_of.cloned(),
386            })),
387            source_catalog
388                .columns
389                .iter()
390                .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
391                .collect_vec(),
392        ))
393    }
394
395    fn resolve_view_relation(
396        &mut self,
397        view_catalog: &ViewCatalog,
398    ) -> Result<(Relation, Vec<(bool, Field)>)> {
399        if !view_catalog.is_system_view() {
400            self.check_privilege(
401                ObjectCheckItem::new(
402                    view_catalog.owner,
403                    AclMode::Select,
404                    view_catalog.name.clone(),
405                    view_catalog.id,
406                ),
407                view_catalog.database_id,
408            )?;
409        }
410
411        let ast = Parser::parse_sql(&view_catalog.sql)
412            .expect("a view's sql should be parsed successfully");
413        let Statement::Query(query) = ast
414            .into_iter()
415            .exactly_one()
416            .expect("a view should contain only one statement")
417        else {
418            unreachable!("a view should contain a query statement");
419        };
420        let query = self.bind_query_for_view(&query).map_err(|e| {
421            ErrorCode::BindError(format!(
422                "failed to bind view {}, sql: {}\nerror: {}",
423                view_catalog.name,
424                view_catalog.sql,
425                e.as_report()
426            ))
427        })?;
428
429        let columns = view_catalog.columns.clone();
430
431        if !itertools::equal(
432            query.schema().fields().iter().map(|f| &f.data_type),
433            view_catalog.columns.iter().map(|f| &f.data_type),
434        ) {
435            return Err(ErrorCode::BindError(format!(
436                "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
437                view_catalog.name, view_catalog.sql, query.schema(), columns
438            )).into());
439        }
440
441        let share_id = match self.shared_views.get(&view_catalog.id) {
442            Some(share_id) => *share_id,
443            None => {
444                let share_id = self.next_share_id();
445                self.shared_views.insert(view_catalog.id, share_id);
446                self.included_relations
447                    .insert(view_catalog.id.as_object_id());
448                share_id
449            }
450        };
451        Ok((
452            Relation::Share(Box::new(BoundShare {
453                share_id,
454                input: BoundShareInput::Query(query),
455            })),
456            columns.iter().map(|c| (false, c.clone())).collect_vec(),
457        ))
458    }
459
460    fn resolve_table_indexes(
461        &self,
462        db_name: &str,
463        schema_name: &str,
464        table_id: TableId,
465    ) -> Result<Vec<Arc<IndexCatalog>>> {
466        let schema = self.catalog.get_schema_by_name(db_name, schema_name)?;
467        assert!(
468            schema.get_table_by_id(table_id).is_some() || table_id.is_placeholder(),
469            "table {table_id} not found in {db_name}.{schema_name}"
470        );
471
472        Ok(schema.get_created_indexes_by_table_id(table_id))
473    }
474
475    pub(crate) fn bind_table(
476        &mut self,
477        schema_name: Option<&str>,
478        table_name: &str,
479    ) -> Result<BoundBaseTable> {
480        let db_name = &self.db_name;
481        let schema_path = self.bind_schema_path(schema_name);
482        let (table_catalog, schema_name) =
483            self.catalog
484                .get_created_table_by_name(db_name, schema_path, table_name)?;
485        let table_catalog = table_catalog.clone();
486
487        let table_id = table_catalog.id();
488        let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
489
490        let columns = table_catalog.columns.clone();
491
492        self.bind_table_to_context(
493            columns
494                .iter()
495                .map(|c| (c.is_hidden, (&c.column_desc).into())),
496            table_name.to_owned(),
497            None,
498        )?;
499
500        Ok(BoundBaseTable {
501            table_id,
502            table_catalog,
503            table_indexes,
504            as_of: None,
505        })
506    }
507
508    pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
509        let table_name = &table.name;
510        match table.table_type() {
511            TableType::Table => {}
512            TableType::Index | TableType::VectorIndex => {
513                return Err(ErrorCode::InvalidInputSyntax(format!(
514                    "cannot change index \"{table_name}\""
515                ))
516                .into());
517            }
518            TableType::MaterializedView => {
519                return Err(ErrorCode::InvalidInputSyntax(format!(
520                    "cannot change materialized view \"{table_name}\""
521                ))
522                .into());
523            }
524            TableType::Internal => {
525                return Err(ErrorCode::InvalidInputSyntax(format!(
526                    "cannot change internal table \"{table_name}\""
527                ))
528                .into());
529            }
530        }
531
532        if table.append_only && !is_insert {
533            return Err(ErrorCode::BindError(
534                "append-only table does not support update or delete".to_owned(),
535            )
536            .into());
537        }
538
539        Ok(())
540    }
541}