risingwave_frontend/binder/relation/
table_or_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::user::UserId;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44    pub table_id: TableId,
45    pub table_catalog: Arc<TableCatalog>,
46    pub table_indexes: Vec<Arc<IndexCatalog>>,
47    pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52    pub table_id: TableId,
53    pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58    pub catalog: SourceCatalog,
59    pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63    pub fn is_shareable_cdc_connector(&self) -> bool {
64        self.catalog.with_properties.is_shareable_cdc_connector()
65    }
66
67    pub fn is_shared(&self) -> bool {
68        self.catalog.info.is_shared()
69    }
70}
71
72impl Binder {
73    /// Binds table or source, or logical view according to what we get from the catalog.
74    pub fn bind_relation_by_name_inner(
75        &mut self,
76        db_name: Option<&str>,
77        schema_name: Option<&str>,
78        table_name: &str,
79        alias: Option<TableAlias>,
80        as_of: Option<AsOf>,
81    ) -> Result<Relation> {
82        // define some helper functions converting catalog to bound relation
83        let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
84            let table = BoundSystemTable {
85                table_id: sys_table_catalog.id(),
86                sys_table_catalog: sys_table_catalog.clone(),
87            };
88            (
89                Relation::SystemTable(Box::new(table)),
90                sys_table_catalog
91                    .columns
92                    .iter()
93                    .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
94                    .collect_vec(),
95            )
96        };
97
98        // check db_name if exists first
99        if let Some(db_name) = db_name {
100            let _ = self.catalog.get_database_by_name(db_name)?;
101        }
102
103        // start to bind
104        let (ret, columns) = {
105            match schema_name {
106                Some(schema_name) => {
107                    let db_name = db_name.unwrap_or(&self.db_name);
108                    let schema_path = SchemaPath::Name(schema_name);
109                    if is_system_schema(schema_name) {
110                        if let Ok(sys_table_catalog) =
111                            self.catalog
112                                .get_sys_table_by_name(db_name, schema_name, table_name)
113                        {
114                            resolve_sys_table_relation(sys_table_catalog)
115                        } else if let Ok((view_catalog, _)) =
116                            self.catalog
117                                .get_view_by_name(db_name, schema_path, table_name)
118                        {
119                            self.resolve_view_relation(&view_catalog.clone())?
120                        } else {
121                            bail_not_implemented!(
122                                issue = 1695,
123                                r###"{}.{} is not supported, please use `SHOW` commands for now.
124`SHOW TABLES`,
125`SHOW MATERIALIZED VIEWS`,
126`DESCRIBE <table>`,
127`SHOW COLUMNS FROM [table]`
128"###,
129                                schema_name,
130                                table_name
131                            );
132                        }
133                    } else if let Some(source_catalog) =
134                        self.temporary_source_manager.get_source(table_name)
135                    // don't care about the database and schema
136                    {
137                        self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
138                    } else if let Ok((table_catalog, schema_name)) = self
139                        .catalog
140                        .get_created_table_by_name(db_name, schema_path, table_name)
141                    {
142                        self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
143                    } else if let Ok((source_catalog, _)) =
144                        self.catalog
145                            .get_source_by_name(db_name, schema_path, table_name)
146                    {
147                        self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
148                    } else if let Ok((view_catalog, _)) =
149                        self.catalog
150                            .get_view_by_name(db_name, schema_path, table_name)
151                    {
152                        self.resolve_view_relation(&view_catalog.clone())?
153                    } else {
154                        return Err(CatalogError::NotFound(
155                            "table or source",
156                            table_name.to_owned(),
157                        )
158                        .into());
159                    }
160                }
161                None => (|| {
162                    let user_name = &self.auth_context.user_name;
163
164                    for path in self.search_path.path() {
165                        if is_system_schema(path)
166                            && let Ok(sys_table_catalog) =
167                                self.catalog
168                                    .get_sys_table_by_name(&self.db_name, path, table_name)
169                        {
170                            return Ok(resolve_sys_table_relation(sys_table_catalog));
171                        } else {
172                            let schema_name = if path == USER_NAME_WILD_CARD {
173                                user_name
174                            } else {
175                                path
176                            };
177
178                            if let Ok(schema) =
179                                self.catalog.get_schema_by_name(&self.db_name, schema_name)
180                            {
181                                if let Some(source_catalog) =
182                                    self.temporary_source_manager.get_source(table_name)
183                                // don't care about the database and schema
184                                {
185                                    return self.resolve_source_relation(
186                                        &source_catalog.clone(),
187                                        as_of,
188                                        true,
189                                    );
190                                } else if let Some(table_catalog) = schema
191                                    .get_created_table_or_any_internal_table_by_name(table_name)
192                                {
193                                    return self.resolve_table_relation(
194                                        table_catalog.clone(),
195                                        &schema_name.clone(),
196                                        as_of,
197                                    );
198                                } else if let Some(source_catalog) =
199                                    schema.get_source_by_name(table_name)
200                                {
201                                    return self.resolve_source_relation(
202                                        &source_catalog.clone(),
203                                        as_of,
204                                        false,
205                                    );
206                                } else if let Some(view_catalog) =
207                                    schema.get_view_by_name(table_name)
208                                {
209                                    return self.resolve_view_relation(&view_catalog.clone());
210                                }
211                            }
212                        }
213                    }
214
215                    Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
216                })()?,
217            }
218        };
219
220        self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
221        Ok(ret)
222    }
223
224    pub(crate) fn check_privilege(
225        &self,
226        object: PbObject,
227        database_id: DatabaseId,
228        mode: AclMode,
229        owner: UserId,
230    ) -> Result<()> {
231        // security invoker is disabled for view, ignore privilege check.
232        if self.context.disable_security_invoker {
233            return Ok(());
234        }
235
236        match self.bind_for {
237            BindFor::Stream | BindFor::Batch => {
238                // reject sources for cross-db access
239                if matches!(self.bind_for, BindFor::Stream)
240                    && self.database_id != database_id
241                    && matches!(object, PbObject::SourceId(_))
242                {
243                    return Err(PermissionDenied(
244                        "SOURCE is not allowed for cross-db access".to_owned(),
245                    )
246                    .into());
247                }
248                if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
249                    if user.is_super || user.id == owner {
250                        return Ok(());
251                    }
252                    if !user.has_privilege(&object, mode) {
253                        return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
254                    }
255
256                    // check CONNECT privilege for cross-db access
257                    if self.database_id != database_id
258                        && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
259                    {
260                        return Err(
261                            PermissionDenied("Do not have CONNECT privilege".to_owned()).into()
262                        );
263                    }
264                } else {
265                    return Err(PermissionDenied("Session user is invalid".to_owned()).into());
266                }
267            }
268            BindFor::Ddl | BindFor::System => {
269                // do nothing.
270            }
271        }
272        Ok(())
273    }
274
275    fn resolve_table_relation(
276        &mut self,
277        table_catalog: Arc<TableCatalog>,
278        schema_name: &str,
279        as_of: Option<AsOf>,
280    ) -> Result<(Relation, Vec<(bool, Field)>)> {
281        let table_id = table_catalog.id();
282        let columns = table_catalog
283            .columns
284            .iter()
285            .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
286            .collect_vec();
287        self.check_privilege(
288            PbObject::TableId(table_id.table_id),
289            table_catalog.database_id,
290            AclMode::Select,
291            table_catalog.owner,
292        )?;
293        self.included_relations.insert(table_id);
294
295        let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
296
297        let table = BoundBaseTable {
298            table_id,
299            table_catalog,
300            table_indexes,
301            as_of,
302        };
303
304        Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
305    }
306
307    fn resolve_source_relation(
308        &mut self,
309        source_catalog: &SourceCatalog,
310        as_of: Option<AsOf>,
311        is_temporary: bool,
312    ) -> Result<(Relation, Vec<(bool, Field)>)> {
313        debug_assert_column_ids_distinct(&source_catalog.columns);
314        if !is_temporary {
315            self.check_privilege(
316                PbObject::SourceId(source_catalog.id),
317                source_catalog.database_id,
318                AclMode::Select,
319                source_catalog.owner,
320            )?;
321        }
322        self.included_relations.insert(source_catalog.id.into());
323        Ok((
324            Relation::Source(Box::new(BoundSource {
325                catalog: source_catalog.clone(),
326                as_of,
327            })),
328            source_catalog
329                .columns
330                .iter()
331                .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
332                .collect_vec(),
333        ))
334    }
335
336    fn resolve_view_relation(
337        &mut self,
338        view_catalog: &ViewCatalog,
339    ) -> Result<(Relation, Vec<(bool, Field)>)> {
340        if !view_catalog.is_system_view() {
341            self.check_privilege(
342                PbObject::ViewId(view_catalog.id),
343                view_catalog.database_id,
344                AclMode::Select,
345                view_catalog.owner,
346            )?;
347        }
348
349        let ast = Parser::parse_sql(&view_catalog.sql)
350            .expect("a view's sql should be parsed successfully");
351        let Statement::Query(query) = ast
352            .into_iter()
353            .exactly_one()
354            .expect("a view should contain only one statement")
355        else {
356            unreachable!("a view should contain a query statement");
357        };
358        let query = self.bind_query_for_view(*query).map_err(|e| {
359            ErrorCode::BindError(format!(
360                "failed to bind view {}, sql: {}\nerror: {}",
361                view_catalog.name,
362                view_catalog.sql,
363                e.as_report()
364            ))
365        })?;
366
367        let columns = view_catalog.columns.clone();
368
369        if !itertools::equal(
370            query.schema().fields().iter().map(|f| &f.data_type),
371            view_catalog.columns.iter().map(|f| &f.data_type),
372        ) {
373            return Err(ErrorCode::BindError(format!(
374                "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
375                view_catalog.name, view_catalog.sql, query.schema(), columns
376            )).into());
377        }
378
379        let share_id = match self.shared_views.get(&view_catalog.id) {
380            Some(share_id) => *share_id,
381            None => {
382                let share_id = self.next_share_id();
383                self.shared_views.insert(view_catalog.id, share_id);
384                self.included_relations.insert(view_catalog.id.into());
385                share_id
386            }
387        };
388        let input = Either::Left(query);
389        Ok((
390            Relation::Share(Box::new(BoundShare {
391                share_id,
392                input: BoundShareInput::Query(input),
393            })),
394            columns.iter().map(|c| (false, c.clone())).collect_vec(),
395        ))
396    }
397
398    fn resolve_table_indexes(
399        &self,
400        schema_name: &str,
401        table_id: TableId,
402    ) -> Result<Vec<Arc<IndexCatalog>>> {
403        Ok(self
404            .catalog
405            .get_schema_by_name(&self.db_name, schema_name)?
406            .get_indexes_by_table_id(&table_id))
407    }
408
409    pub(crate) fn bind_table(
410        &mut self,
411        schema_name: Option<&str>,
412        table_name: &str,
413    ) -> Result<BoundBaseTable> {
414        let db_name = &self.db_name;
415        let schema_path = self.bind_schema_path(schema_name);
416        let (table_catalog, schema_name) =
417            self.catalog
418                .get_created_table_by_name(db_name, schema_path, table_name)?;
419        let table_catalog = table_catalog.clone();
420
421        let table_id = table_catalog.id();
422        let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
423
424        let columns = table_catalog.columns.clone();
425
426        self.bind_table_to_context(
427            columns
428                .iter()
429                .map(|c| (c.is_hidden, (&c.column_desc).into())),
430            table_name.to_owned(),
431            None,
432        )?;
433
434        Ok(BoundBaseTable {
435            table_id,
436            table_catalog,
437            table_indexes,
438            as_of: None,
439        })
440    }
441
442    pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
443        let table_name = &table.name;
444        match table.table_type() {
445            TableType::Table => {}
446            TableType::Index => {
447                return Err(ErrorCode::InvalidInputSyntax(format!(
448                    "cannot change index \"{table_name}\""
449                ))
450                .into());
451            }
452            TableType::MaterializedView => {
453                return Err(ErrorCode::InvalidInputSyntax(format!(
454                    "cannot change materialized view \"{table_name}\""
455                ))
456                .into());
457            }
458            TableType::Internal => {
459                return Err(ErrorCode::InvalidInputSyntax(format!(
460                    "cannot change internal table \"{table_name}\""
461                ))
462                .into());
463            }
464        }
465
466        if table.append_only && !is_insert {
467            return Err(ErrorCode::BindError(
468                "append-only table does not support update or delete".to_owned(),
469            )
470            .into());
471        }
472
473        Ok(())
474    }
475}