1use std::sync::Arc;
16
17use itertools::Itertools;
18use risingwave_common::acl::AclMode;
19use risingwave_common::bail_not_implemented;
20use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
21use risingwave_common::session_config::USER_NAME_WILD_CARD;
22use risingwave_connector::WithPropertiesExt;
23use risingwave_pb::user::grant_privilege::PbObject;
24use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
25use risingwave_sqlparser::parser::Parser;
26use thiserror_ext::AsReport;
27
28use super::BoundShare;
29use crate::binder::relation::BoundShareInput;
30use crate::binder::{BindFor, Binder, Relation};
31use crate::catalog::root_catalog::SchemaPath;
32use crate::catalog::source_catalog::SourceCatalog;
33use crate::catalog::system_catalog::SystemTableCatalog;
34use crate::catalog::table_catalog::{TableCatalog, TableType};
35use crate::catalog::view_catalog::ViewCatalog;
36use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
37use crate::error::ErrorCode::PermissionDenied;
38use crate::error::{ErrorCode, Result, RwError};
39use crate::handler::privilege::ObjectCheckItem;
40
41#[derive(Debug, Clone)]
42pub struct BoundBaseTable {
43 pub table_id: TableId,
44 pub table_catalog: Arc<TableCatalog>,
45 pub table_indexes: Vec<Arc<IndexCatalog>>,
46 pub as_of: Option<AsOf>,
47}
48
49#[derive(Debug, Clone)]
50pub struct BoundSystemTable {
51 pub table_id: TableId,
52 pub sys_table_catalog: Arc<SystemTableCatalog>,
53}
54
55#[derive(Debug, Clone)]
56pub struct BoundSource {
57 pub catalog: SourceCatalog,
58 pub as_of: Option<AsOf>,
59}
60
61impl BoundSource {
62 pub fn is_shareable_cdc_connector(&self) -> bool {
63 self.catalog.with_properties.is_shareable_cdc_connector()
64 }
65
66 pub fn is_shared(&self) -> bool {
67 self.catalog.info.is_shared()
68 }
69}
70
71impl Binder {
72 pub fn bind_catalog_relation_by_object_name(
73 &mut self,
74 object_name: &ObjectName,
75 bind_creating_relations: bool,
76 ) -> Result<Relation> {
77 let (schema_name, table_name) =
78 Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
79 self.bind_catalog_relation_by_name(
80 None,
81 schema_name.as_deref(),
82 &table_name,
83 None,
84 None,
85 bind_creating_relations,
86 )
87 }
88
89 pub fn bind_catalog_relation_by_name(
91 &mut self,
92 db_name: Option<&str>,
93 schema_name: Option<&str>,
94 table_name: &str,
95 alias: Option<&TableAlias>,
96 as_of: Option<&AsOf>,
97 bind_creating_relations: bool,
98 ) -> Result<Relation> {
99 let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
101 let table = BoundSystemTable {
102 table_id: sys_table_catalog.id(),
103 sys_table_catalog: sys_table_catalog.clone(),
104 };
105 (
106 Relation::SystemTable(Box::new(table)),
107 sys_table_catalog
108 .columns
109 .iter()
110 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
111 .collect_vec(),
112 )
113 };
114
115 if let Some(db_name) = db_name {
117 let _ = self.catalog.get_database_by_name(db_name)?;
118 }
119
120 let (ret, columns) = {
122 match schema_name {
123 Some(schema_name) => {
124 let db_name = db_name.unwrap_or(&self.db_name).to_owned();
125 let schema_path = SchemaPath::Name(schema_name);
126 if is_system_schema(schema_name) {
127 if let Ok(sys_table_catalog) =
128 self.catalog
129 .get_sys_table_by_name(&db_name, schema_name, table_name)
130 {
131 resolve_sys_table_relation(sys_table_catalog)
132 } else if let Ok((view_catalog, _)) =
133 self.catalog
134 .get_view_by_name(&db_name, schema_path, table_name)
135 {
136 self.resolve_view_relation(&view_catalog.clone())?
137 } else {
138 bail_not_implemented!(
139 issue = 1695,
140 r###"{}.{} is not supported, please use `SHOW` commands for now.
141`SHOW TABLES`,
142`SHOW MATERIALIZED VIEWS`,
143`DESCRIBE <table>`,
144`SHOW COLUMNS FROM [table]`
145"###,
146 schema_name,
147 table_name
148 );
149 }
150 } else if let Some(source_catalog) =
151 self.temporary_source_manager.get_source(table_name)
152 {
154 self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
155 } else if let Ok((table_catalog, schema_name)) = self
156 .catalog
157 .get_any_table_by_name(&db_name, schema_path, table_name)
158 && (bind_creating_relations
159 || table_catalog.is_internal_table()
160 || table_catalog.is_created())
161 {
162 self.resolve_table_relation(
163 table_catalog.clone(),
164 &db_name,
165 schema_name,
166 as_of,
167 )?
168 } else if let Ok((source_catalog, _)) =
169 self.catalog
170 .get_source_by_name(&db_name, schema_path, table_name)
171 {
172 self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
173 } else if let Ok((view_catalog, _)) =
174 self.catalog
175 .get_view_by_name(&db_name, schema_path, table_name)
176 {
177 self.resolve_view_relation(&view_catalog.clone())?
178 } else if let Some(table_catalog) =
179 self.staging_catalog_manager.get_table(table_name)
180 {
181 self.resolve_table_relation(
183 table_catalog.clone().into(),
184 &db_name,
185 schema_name,
186 as_of,
187 )?
188 } else {
189 return Err(CatalogError::not_found("table or source", table_name).into());
190 }
191 }
192 None => (|| {
193 assert!(db_name.is_none());
196 let db_name = self.db_name.clone();
197 let user_name = self.auth_context.user_name.clone();
198
199 for path in self.search_path.path() {
200 if is_system_schema(path)
201 && let Ok(sys_table_catalog) = self
202 .catalog
203 .get_sys_table_by_name(&db_name, path, table_name)
204 {
205 return Ok(resolve_sys_table_relation(sys_table_catalog));
206 } else {
207 let schema_name = if path == USER_NAME_WILD_CARD {
208 &user_name
209 } else {
210 &path.clone()
211 };
212
213 if let Ok(schema) =
214 self.catalog.get_schema_by_name(&db_name, schema_name)
215 {
216 if let Some(source_catalog) =
217 self.temporary_source_manager.get_source(table_name)
218 {
220 return self.resolve_source_relation(
221 &source_catalog.clone(),
222 as_of,
223 true,
224 );
225 } else if let Some(table_catalog) =
226 schema.get_any_table_by_name(table_name)
227 && (bind_creating_relations
228 || table_catalog.is_internal_table()
229 || table_catalog.is_created())
230 {
231 return self.resolve_table_relation(
232 table_catalog.clone(),
233 &db_name,
234 schema_name,
235 as_of,
236 );
237 } else if let Some(source_catalog) =
238 schema.get_source_by_name(table_name)
239 {
240 return self.resolve_source_relation(
241 &source_catalog.clone(),
242 as_of,
243 false,
244 );
245 } else if let Some(view_catalog) =
246 schema.get_view_by_name(table_name)
247 {
248 return self.resolve_view_relation(&view_catalog.clone());
249 } else if let Some(table_catalog) =
250 self.staging_catalog_manager.get_table(table_name)
251 {
252 return self.resolve_table_relation(
254 table_catalog.clone().into(),
255 &db_name,
256 schema_name,
257 as_of,
258 );
259 }
260 }
261 }
262 }
263
264 Err(CatalogError::not_found("table or source", table_name).into())
265 })()?,
266 }
267 };
268
269 self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
270 Ok(ret)
271 }
272
273 pub(crate) fn check_privilege(
274 &self,
275 item: ObjectCheckItem,
276 database_id: DatabaseId,
277 ) -> Result<()> {
278 if self.context.disable_security_invoker {
280 return Ok(());
281 }
282
283 match self.bind_for {
284 BindFor::Stream | BindFor::Batch => {
285 if matches!(self.bind_for, BindFor::Stream)
287 && self.database_id != database_id
288 && matches!(item.object, PbObject::SourceId(_))
289 {
290 return Err(PermissionDenied(format!(
291 "SOURCE \"{}\" is not allowed for cross-db access",
292 item.name
293 ))
294 .into());
295 }
296 if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
297 if user.is_super || user.id == item.owner {
298 return Ok(());
299 }
300 if !user.has_privilege(item.object, item.mode) {
301 return Err(PermissionDenied(item.error_message()).into());
302 }
303
304 if self.database_id != database_id
306 && !user.has_privilege(database_id, AclMode::Connect)
307 {
308 let db_name = self.catalog.get_database_by_id(database_id)?.name.clone();
309
310 return Err(PermissionDenied(format!(
311 "permission denied for database \"{db_name}\""
312 ))
313 .into());
314 }
315 } else {
316 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
317 }
318 }
319 BindFor::Ddl | BindFor::System => {
320 }
322 }
323 Ok(())
324 }
325
326 fn resolve_table_relation(
327 &mut self,
328 table_catalog: Arc<TableCatalog>,
329 db_name: &str,
330 schema_name: &str,
331 as_of: Option<&AsOf>,
332 ) -> Result<(Relation, Vec<(bool, Field)>)> {
333 let table_id = table_catalog.id();
334 let columns = table_catalog
335 .columns
336 .iter()
337 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
338 .collect_vec();
339 self.check_privilege(
340 ObjectCheckItem::new(
341 table_catalog.owner,
342 AclMode::Select,
343 table_catalog.name.clone(),
344 table_id,
345 ),
346 table_catalog.database_id,
347 )?;
348 self.included_relations.insert(table_id.as_object_id());
349
350 let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
351
352 let table = BoundBaseTable {
353 table_id,
354 table_catalog,
355 table_indexes,
356 as_of: as_of.cloned(),
357 };
358
359 Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
360 }
361
362 fn resolve_source_relation(
363 &mut self,
364 source_catalog: &SourceCatalog,
365 as_of: Option<&AsOf>,
366 is_temporary: bool,
367 ) -> Result<(Relation, Vec<(bool, Field)>)> {
368 debug_assert_column_ids_distinct(&source_catalog.columns);
369 if !is_temporary {
370 self.check_privilege(
371 ObjectCheckItem::new(
372 source_catalog.owner,
373 AclMode::Select,
374 source_catalog.name.clone(),
375 source_catalog.id,
376 ),
377 source_catalog.database_id,
378 )?;
379 }
380 self.included_relations
381 .insert(source_catalog.id.as_object_id());
382 Ok((
383 Relation::Source(Box::new(BoundSource {
384 catalog: source_catalog.clone(),
385 as_of: as_of.cloned(),
386 })),
387 source_catalog
388 .columns
389 .iter()
390 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
391 .collect_vec(),
392 ))
393 }
394
395 fn resolve_view_relation(
396 &mut self,
397 view_catalog: &ViewCatalog,
398 ) -> Result<(Relation, Vec<(bool, Field)>)> {
399 if !view_catalog.is_system_view() {
400 self.check_privilege(
401 ObjectCheckItem::new(
402 view_catalog.owner,
403 AclMode::Select,
404 view_catalog.name.clone(),
405 view_catalog.id,
406 ),
407 view_catalog.database_id,
408 )?;
409 }
410
411 let ast = Parser::parse_sql(&view_catalog.sql)
412 .expect("a view's sql should be parsed successfully");
413 let Statement::Query(query) = ast
414 .into_iter()
415 .exactly_one()
416 .expect("a view should contain only one statement")
417 else {
418 unreachable!("a view should contain a query statement");
419 };
420 let query = self.bind_query_for_view(&query).map_err(|e| {
421 ErrorCode::BindError(format!(
422 "failed to bind view {}, sql: {}\nerror: {}",
423 view_catalog.name,
424 view_catalog.sql,
425 e.as_report()
426 ))
427 })?;
428
429 let columns = view_catalog.columns.clone();
430
431 if !itertools::equal(
432 query.schema().fields().iter().map(|f| &f.data_type),
433 view_catalog.columns.iter().map(|f| &f.data_type),
434 ) {
435 return Err(ErrorCode::BindError(format!(
436 "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
437 view_catalog.name, view_catalog.sql, query.schema(), columns
438 )).into());
439 }
440
441 let share_id = match self.shared_views.get(&view_catalog.id) {
442 Some(share_id) => *share_id,
443 None => {
444 let share_id = self.next_share_id();
445 self.shared_views.insert(view_catalog.id, share_id);
446 self.included_relations
447 .insert(view_catalog.id.as_object_id());
448 share_id
449 }
450 };
451 Ok((
452 Relation::Share(Box::new(BoundShare {
453 share_id,
454 input: BoundShareInput::Query(query),
455 })),
456 columns.iter().map(|c| (false, c.clone())).collect_vec(),
457 ))
458 }
459
460 fn resolve_table_indexes(
461 &self,
462 db_name: &str,
463 schema_name: &str,
464 table_id: TableId,
465 ) -> Result<Vec<Arc<IndexCatalog>>> {
466 let schema = self.catalog.get_schema_by_name(db_name, schema_name)?;
467 assert!(
468 schema.get_table_by_id(table_id).is_some() || table_id.is_placeholder(),
469 "table {table_id} not found in {db_name}.{schema_name}"
470 );
471
472 Ok(schema.get_created_indexes_by_table_id(table_id))
473 }
474
475 pub(crate) fn bind_table(
476 &mut self,
477 schema_name: Option<&str>,
478 table_name: &str,
479 ) -> Result<BoundBaseTable> {
480 let db_name = &self.db_name;
481 let schema_path = self.bind_schema_path(schema_name);
482 let (table_catalog, schema_name) =
483 self.catalog
484 .get_created_table_by_name(db_name, schema_path, table_name)?;
485 let table_catalog = table_catalog.clone();
486
487 let table_id = table_catalog.id();
488 let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
489
490 let columns = table_catalog.columns.clone();
491
492 self.bind_table_to_context(
493 columns
494 .iter()
495 .map(|c| (c.is_hidden, (&c.column_desc).into())),
496 table_name.to_owned(),
497 None,
498 )?;
499
500 Ok(BoundBaseTable {
501 table_id,
502 table_catalog,
503 table_indexes,
504 as_of: None,
505 })
506 }
507
508 pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
509 let table_name = &table.name;
510 match table.table_type() {
511 TableType::Table => {}
512 TableType::Index | TableType::VectorIndex => {
513 return Err(ErrorCode::InvalidInputSyntax(format!(
514 "cannot change index \"{table_name}\""
515 ))
516 .into());
517 }
518 TableType::MaterializedView => {
519 return Err(ErrorCode::InvalidInputSyntax(format!(
520 "cannot change materialized view \"{table_name}\""
521 ))
522 .into());
523 }
524 TableType::Internal => {
525 return Err(ErrorCode::InvalidInputSyntax(format!(
526 "cannot change internal table \"{table_name}\""
527 ))
528 .into());
529 }
530 }
531
532 if table.append_only && !is_insert {
533 return Err(ErrorCode::BindError(
534 "append-only table does not support update or delete".to_owned(),
535 )
536 .into());
537 }
538
539 Ok(())
540 }
541}