1use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::handler::privilege::ObjectCheckItem;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44 pub table_id: TableId,
45 pub table_catalog: Arc<TableCatalog>,
46 pub table_indexes: Vec<Arc<IndexCatalog>>,
47 pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52 pub table_id: TableId,
53 pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58 pub catalog: SourceCatalog,
59 pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63 pub fn is_shareable_cdc_connector(&self) -> bool {
64 self.catalog.with_properties.is_shareable_cdc_connector()
65 }
66
67 pub fn is_shared(&self) -> bool {
68 self.catalog.info.is_shared()
69 }
70}
71
72impl Binder {
73 pub fn bind_catalog_relation_by_object_name(
74 &mut self,
75 object_name: &ObjectName,
76 bind_creating_relations: bool,
77 ) -> Result<Relation> {
78 let (schema_name, table_name) =
79 Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
80 self.bind_catalog_relation_by_name(
81 None,
82 schema_name.as_deref(),
83 &table_name,
84 None,
85 None,
86 bind_creating_relations,
87 )
88 }
89
90 pub fn bind_catalog_relation_by_name(
92 &mut self,
93 db_name: Option<&str>,
94 schema_name: Option<&str>,
95 table_name: &str,
96 alias: Option<&TableAlias>,
97 as_of: Option<&AsOf>,
98 bind_creating_relations: bool,
99 ) -> Result<Relation> {
100 let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
102 let table = BoundSystemTable {
103 table_id: sys_table_catalog.id(),
104 sys_table_catalog: sys_table_catalog.clone(),
105 };
106 (
107 Relation::SystemTable(Box::new(table)),
108 sys_table_catalog
109 .columns
110 .iter()
111 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
112 .collect_vec(),
113 )
114 };
115
116 if let Some(db_name) = db_name {
118 let _ = self.catalog.get_database_by_name(db_name)?;
119 }
120
121 let (ret, columns) = {
123 match schema_name {
124 Some(schema_name) => {
125 let db_name = db_name.unwrap_or(&self.db_name).to_owned();
126 let schema_path = SchemaPath::Name(schema_name);
127 if is_system_schema(schema_name) {
128 if let Ok(sys_table_catalog) =
129 self.catalog
130 .get_sys_table_by_name(&db_name, schema_name, table_name)
131 {
132 resolve_sys_table_relation(sys_table_catalog)
133 } else if let Ok((view_catalog, _)) =
134 self.catalog
135 .get_view_by_name(&db_name, schema_path, table_name)
136 {
137 self.resolve_view_relation(&view_catalog.clone())?
138 } else {
139 bail_not_implemented!(
140 issue = 1695,
141 r###"{}.{} is not supported, please use `SHOW` commands for now.
142`SHOW TABLES`,
143`SHOW MATERIALIZED VIEWS`,
144`DESCRIBE <table>`,
145`SHOW COLUMNS FROM [table]`
146"###,
147 schema_name,
148 table_name
149 );
150 }
151 } else if let Some(source_catalog) =
152 self.temporary_source_manager.get_source(table_name)
153 {
155 self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
156 } else if let Ok((table_catalog, schema_name)) = self
157 .catalog
158 .get_any_table_by_name(&db_name, schema_path, table_name)
159 && (bind_creating_relations
160 || table_catalog.is_internal_table()
161 || table_catalog.is_created())
162 {
163 self.resolve_table_relation(
164 table_catalog.clone(),
165 &db_name,
166 schema_name,
167 as_of,
168 )?
169 } else if let Ok((source_catalog, _)) =
170 self.catalog
171 .get_source_by_name(&db_name, schema_path, table_name)
172 {
173 self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
174 } else if let Ok((view_catalog, _)) =
175 self.catalog
176 .get_view_by_name(&db_name, schema_path, table_name)
177 {
178 self.resolve_view_relation(&view_catalog.clone())?
179 } else if let Some(table_catalog) =
180 self.staging_catalog_manager.get_table(table_name)
181 {
182 self.resolve_table_relation(
184 table_catalog.clone().into(),
185 &db_name,
186 schema_name,
187 as_of,
188 )?
189 } else {
190 return Err(CatalogError::not_found("table or source", table_name).into());
191 }
192 }
193 None => (|| {
194 assert!(db_name.is_none());
197 let db_name = self.db_name.clone();
198 let user_name = self.auth_context.user_name.clone();
199
200 for path in self.search_path.path() {
201 if is_system_schema(path)
202 && let Ok(sys_table_catalog) = self
203 .catalog
204 .get_sys_table_by_name(&db_name, path, table_name)
205 {
206 return Ok(resolve_sys_table_relation(sys_table_catalog));
207 } else {
208 let schema_name = if path == USER_NAME_WILD_CARD {
209 &user_name
210 } else {
211 &path.clone()
212 };
213
214 if let Ok(schema) =
215 self.catalog.get_schema_by_name(&db_name, schema_name)
216 {
217 if let Some(source_catalog) =
218 self.temporary_source_manager.get_source(table_name)
219 {
221 return self.resolve_source_relation(
222 &source_catalog.clone(),
223 as_of,
224 true,
225 );
226 } else if let Some(table_catalog) =
227 schema.get_any_table_by_name(table_name)
228 && (bind_creating_relations
229 || table_catalog.is_internal_table()
230 || table_catalog.is_created())
231 {
232 return self.resolve_table_relation(
233 table_catalog.clone(),
234 &db_name,
235 schema_name,
236 as_of,
237 );
238 } else if let Some(source_catalog) =
239 schema.get_source_by_name(table_name)
240 {
241 return self.resolve_source_relation(
242 &source_catalog.clone(),
243 as_of,
244 false,
245 );
246 } else if let Some(view_catalog) =
247 schema.get_view_by_name(table_name)
248 {
249 return self.resolve_view_relation(&view_catalog.clone());
250 } else if let Some(table_catalog) =
251 self.staging_catalog_manager.get_table(table_name)
252 {
253 return self.resolve_table_relation(
255 table_catalog.clone().into(),
256 &db_name,
257 schema_name,
258 as_of,
259 );
260 }
261 }
262 }
263 }
264
265 Err(CatalogError::not_found("table or source", table_name).into())
266 })()?,
267 }
268 };
269
270 self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
271 Ok(ret)
272 }
273
274 pub(crate) fn check_privilege(
275 &self,
276 item: ObjectCheckItem,
277 database_id: DatabaseId,
278 ) -> Result<()> {
279 if self.context.disable_security_invoker {
281 return Ok(());
282 }
283
284 match self.bind_for {
285 BindFor::Stream | BindFor::Batch => {
286 if matches!(self.bind_for, BindFor::Stream)
288 && self.database_id != database_id
289 && matches!(item.object, PbObject::SourceId(_))
290 {
291 return Err(PermissionDenied(format!(
292 "SOURCE \"{}\" is not allowed for cross-db access",
293 item.name
294 ))
295 .into());
296 }
297 if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
298 if user.is_super || user.id == item.owner {
299 return Ok(());
300 }
301 if !user.has_privilege(item.object, item.mode) {
302 return Err(PermissionDenied(item.error_message()).into());
303 }
304
305 if self.database_id != database_id
307 && !user.has_privilege(database_id, AclMode::Connect)
308 {
309 let db_name = self.catalog.get_database_by_id(database_id)?.name.clone();
310
311 return Err(PermissionDenied(format!(
312 "permission denied for database \"{db_name}\""
313 ))
314 .into());
315 }
316 } else {
317 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
318 }
319 }
320 BindFor::Ddl | BindFor::System => {
321 }
323 }
324 Ok(())
325 }
326
327 fn resolve_table_relation(
328 &mut self,
329 table_catalog: Arc<TableCatalog>,
330 db_name: &str,
331 schema_name: &str,
332 as_of: Option<&AsOf>,
333 ) -> Result<(Relation, Vec<(bool, Field)>)> {
334 let table_id = table_catalog.id();
335 let columns = table_catalog
336 .columns
337 .iter()
338 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
339 .collect_vec();
340 self.check_privilege(
341 ObjectCheckItem::new(
342 table_catalog.owner,
343 AclMode::Select,
344 table_catalog.name.clone(),
345 table_id,
346 ),
347 table_catalog.database_id,
348 )?;
349 self.included_relations.insert(table_id.as_object_id());
350
351 let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
352
353 let table = BoundBaseTable {
354 table_id,
355 table_catalog,
356 table_indexes,
357 as_of: as_of.cloned(),
358 };
359
360 Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
361 }
362
363 fn resolve_source_relation(
364 &mut self,
365 source_catalog: &SourceCatalog,
366 as_of: Option<&AsOf>,
367 is_temporary: bool,
368 ) -> Result<(Relation, Vec<(bool, Field)>)> {
369 debug_assert_column_ids_distinct(&source_catalog.columns);
370 if !is_temporary {
371 self.check_privilege(
372 ObjectCheckItem::new(
373 source_catalog.owner,
374 AclMode::Select,
375 source_catalog.name.clone(),
376 source_catalog.id,
377 ),
378 source_catalog.database_id,
379 )?;
380 }
381 self.included_relations
382 .insert(source_catalog.id.as_object_id());
383 Ok((
384 Relation::Source(Box::new(BoundSource {
385 catalog: source_catalog.clone(),
386 as_of: as_of.cloned(),
387 })),
388 source_catalog
389 .columns
390 .iter()
391 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
392 .collect_vec(),
393 ))
394 }
395
396 fn resolve_view_relation(
397 &mut self,
398 view_catalog: &ViewCatalog,
399 ) -> Result<(Relation, Vec<(bool, Field)>)> {
400 if !view_catalog.is_system_view() {
401 self.check_privilege(
402 ObjectCheckItem::new(
403 view_catalog.owner,
404 AclMode::Select,
405 view_catalog.name.clone(),
406 view_catalog.id,
407 ),
408 view_catalog.database_id,
409 )?;
410 }
411
412 let ast = Parser::parse_sql(&view_catalog.sql)
413 .expect("a view's sql should be parsed successfully");
414 let Statement::Query(query) = ast
415 .into_iter()
416 .exactly_one()
417 .expect("a view should contain only one statement")
418 else {
419 unreachable!("a view should contain a query statement");
420 };
421 let query = self.bind_query_for_view(&query).map_err(|e| {
422 ErrorCode::BindError(format!(
423 "failed to bind view {}, sql: {}\nerror: {}",
424 view_catalog.name,
425 view_catalog.sql,
426 e.as_report()
427 ))
428 })?;
429
430 let columns = view_catalog.columns.clone();
431
432 if !itertools::equal(
433 query.schema().fields().iter().map(|f| &f.data_type),
434 view_catalog.columns.iter().map(|f| &f.data_type),
435 ) {
436 return Err(ErrorCode::BindError(format!(
437 "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
438 view_catalog.name, view_catalog.sql, query.schema(), columns
439 )).into());
440 }
441
442 let share_id = match self.shared_views.get(&view_catalog.id) {
443 Some(share_id) => *share_id,
444 None => {
445 let share_id = self.next_share_id();
446 self.shared_views.insert(view_catalog.id, share_id);
447 self.included_relations
448 .insert(view_catalog.id.as_object_id());
449 share_id
450 }
451 };
452 let input = Either::Left(query);
453 Ok((
454 Relation::Share(Box::new(BoundShare {
455 share_id,
456 input: BoundShareInput::Query(input),
457 })),
458 columns.iter().map(|c| (false, c.clone())).collect_vec(),
459 ))
460 }
461
462 fn resolve_table_indexes(
463 &self,
464 db_name: &str,
465 schema_name: &str,
466 table_id: TableId,
467 ) -> Result<Vec<Arc<IndexCatalog>>> {
468 let schema = self.catalog.get_schema_by_name(db_name, schema_name)?;
469 assert!(
470 schema.get_table_by_id(table_id).is_some() || table_id.is_placeholder(),
471 "table {table_id} not found in {db_name}.{schema_name}"
472 );
473
474 Ok(schema.get_created_indexes_by_table_id(table_id))
475 }
476
477 pub(crate) fn bind_table(
478 &mut self,
479 schema_name: Option<&str>,
480 table_name: &str,
481 ) -> Result<BoundBaseTable> {
482 let db_name = &self.db_name;
483 let schema_path = self.bind_schema_path(schema_name);
484 let (table_catalog, schema_name) =
485 self.catalog
486 .get_created_table_by_name(db_name, schema_path, table_name)?;
487 let table_catalog = table_catalog.clone();
488
489 let table_id = table_catalog.id();
490 let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
491
492 let columns = table_catalog.columns.clone();
493
494 self.bind_table_to_context(
495 columns
496 .iter()
497 .map(|c| (c.is_hidden, (&c.column_desc).into())),
498 table_name.to_owned(),
499 None,
500 )?;
501
502 Ok(BoundBaseTable {
503 table_id,
504 table_catalog,
505 table_indexes,
506 as_of: None,
507 })
508 }
509
510 pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
511 let table_name = &table.name;
512 match table.table_type() {
513 TableType::Table => {}
514 TableType::Index | TableType::VectorIndex => {
515 return Err(ErrorCode::InvalidInputSyntax(format!(
516 "cannot change index \"{table_name}\""
517 ))
518 .into());
519 }
520 TableType::MaterializedView => {
521 return Err(ErrorCode::InvalidInputSyntax(format!(
522 "cannot change materialized view \"{table_name}\""
523 ))
524 .into());
525 }
526 TableType::Internal => {
527 return Err(ErrorCode::InvalidInputSyntax(format!(
528 "cannot change internal table \"{table_name}\""
529 ))
530 .into());
531 }
532 }
533
534 if table.append_only && !is_insert {
535 return Err(ErrorCode::BindError(
536 "append-only table does not support update or delete".to_owned(),
537 )
538 .into());
539 }
540
541 Ok(())
542 }
543}