1use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::handler::privilege::ObjectCheckItem;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44 pub table_id: TableId,
45 pub table_catalog: Arc<TableCatalog>,
46 pub table_indexes: Vec<Arc<IndexCatalog>>,
47 pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52 pub table_id: TableId,
53 pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58 pub catalog: SourceCatalog,
59 pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63 pub fn is_shareable_cdc_connector(&self) -> bool {
64 self.catalog.with_properties.is_shareable_cdc_connector()
65 }
66
67 pub fn is_shared(&self) -> bool {
68 self.catalog.info.is_shared()
69 }
70}
71
72impl Binder {
73 pub fn bind_catalog_relation_by_object_name(
74 &mut self,
75 object_name: &ObjectName,
76 bind_creating_relations: bool,
77 ) -> Result<Relation> {
78 let (schema_name, table_name) =
79 Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
80 self.bind_catalog_relation_by_name(
81 None,
82 schema_name.as_deref(),
83 &table_name,
84 None,
85 None,
86 bind_creating_relations,
87 )
88 }
89
90 pub fn bind_catalog_relation_by_name(
92 &mut self,
93 db_name: Option<&str>,
94 schema_name: Option<&str>,
95 table_name: &str,
96 alias: Option<&TableAlias>,
97 as_of: Option<&AsOf>,
98 bind_creating_relations: bool,
99 ) -> Result<Relation> {
100 let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
102 let table = BoundSystemTable {
103 table_id: sys_table_catalog.id(),
104 sys_table_catalog: sys_table_catalog.clone(),
105 };
106 (
107 Relation::SystemTable(Box::new(table)),
108 sys_table_catalog
109 .columns
110 .iter()
111 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
112 .collect_vec(),
113 )
114 };
115
116 if let Some(db_name) = db_name {
118 let _ = self.catalog.get_database_by_name(db_name)?;
119 }
120
121 let (ret, columns) = {
123 match schema_name {
124 Some(schema_name) => {
125 let db_name = db_name.unwrap_or(&self.db_name).to_owned();
126 let schema_path = SchemaPath::Name(schema_name);
127 if is_system_schema(schema_name) {
128 if let Ok(sys_table_catalog) =
129 self.catalog
130 .get_sys_table_by_name(&db_name, schema_name, table_name)
131 {
132 resolve_sys_table_relation(sys_table_catalog)
133 } else if let Ok((view_catalog, _)) =
134 self.catalog
135 .get_view_by_name(&db_name, schema_path, table_name)
136 {
137 self.resolve_view_relation(&view_catalog.clone())?
138 } else {
139 bail_not_implemented!(
140 issue = 1695,
141 r###"{}.{} is not supported, please use `SHOW` commands for now.
142`SHOW TABLES`,
143`SHOW MATERIALIZED VIEWS`,
144`DESCRIBE <table>`,
145`SHOW COLUMNS FROM [table]`
146"###,
147 schema_name,
148 table_name
149 );
150 }
151 } else if let Some(source_catalog) =
152 self.temporary_source_manager.get_source(table_name)
153 {
155 self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
156 } else if let Ok((table_catalog, schema_name)) = self
157 .catalog
158 .get_any_table_by_name(&db_name, schema_path, table_name)
159 && (bind_creating_relations || table_catalog.is_created())
160 {
161 self.resolve_table_relation(
162 table_catalog.clone(),
163 &db_name,
164 schema_name,
165 as_of,
166 )?
167 } else if let Ok((source_catalog, _)) =
168 self.catalog
169 .get_source_by_name(&db_name, schema_path, table_name)
170 {
171 self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
172 } else if let Ok((view_catalog, _)) =
173 self.catalog
174 .get_view_by_name(&db_name, schema_path, table_name)
175 {
176 self.resolve_view_relation(&view_catalog.clone())?
177 } else {
178 return Err(CatalogError::NotFound(
179 "table or source",
180 table_name.to_owned(),
181 )
182 .into());
183 }
184 }
185 None => (|| {
186 assert!(db_name.is_none());
189 let db_name = self.db_name.clone();
190 let user_name = self.auth_context.user_name.clone();
191
192 for path in self.search_path.path() {
193 if is_system_schema(path)
194 && let Ok(sys_table_catalog) = self
195 .catalog
196 .get_sys_table_by_name(&db_name, path, table_name)
197 {
198 return Ok(resolve_sys_table_relation(sys_table_catalog));
199 } else {
200 let schema_name = if path == USER_NAME_WILD_CARD {
201 &user_name
202 } else {
203 &path.clone()
204 };
205
206 if let Ok(schema) =
207 self.catalog.get_schema_by_name(&db_name, schema_name)
208 {
209 if let Some(source_catalog) =
210 self.temporary_source_manager.get_source(table_name)
211 {
213 return self.resolve_source_relation(
214 &source_catalog.clone(),
215 as_of,
216 true,
217 );
218 } else if let Some(table_catalog) =
219 schema.get_any_table_by_name(table_name)
220 && (bind_creating_relations
221 || table_catalog.is_internal_table()
222 || table_catalog.is_created())
223 {
224 return self.resolve_table_relation(
225 table_catalog.clone(),
226 &db_name,
227 schema_name,
228 as_of,
229 );
230 } else if let Some(source_catalog) =
231 schema.get_source_by_name(table_name)
232 {
233 return self.resolve_source_relation(
234 &source_catalog.clone(),
235 as_of,
236 false,
237 );
238 } else if let Some(view_catalog) =
239 schema.get_view_by_name(table_name)
240 {
241 return self.resolve_view_relation(&view_catalog.clone());
242 }
243 }
244 }
245 }
246
247 Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
248 })()?,
249 }
250 };
251
252 self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
253 Ok(ret)
254 }
255
256 pub(crate) fn check_privilege(
257 &self,
258 item: ObjectCheckItem,
259 database_id: DatabaseId,
260 ) -> Result<()> {
261 if self.context.disable_security_invoker {
263 return Ok(());
264 }
265
266 match self.bind_for {
267 BindFor::Stream | BindFor::Batch => {
268 if matches!(self.bind_for, BindFor::Stream)
270 && self.database_id != database_id
271 && matches!(item.object, PbObject::SourceId(_))
272 {
273 return Err(PermissionDenied(format!(
274 "SOURCE \"{}\" is not allowed for cross-db access",
275 item.name
276 ))
277 .into());
278 }
279 if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
280 if user.is_super || user.id == item.owner {
281 return Ok(());
282 }
283 if !user.has_privilege(&item.object, item.mode) {
284 return Err(PermissionDenied(item.error_message()).into());
285 }
286
287 if self.database_id != database_id
289 && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
290 {
291 let db_name = self
292 .catalog
293 .get_database_by_id(&database_id)?
294 .name
295 .to_owned();
296
297 return Err(PermissionDenied(format!(
298 "permission denied for database \"{db_name}\""
299 ))
300 .into());
301 }
302 } else {
303 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
304 }
305 }
306 BindFor::Ddl | BindFor::System => {
307 }
309 }
310 Ok(())
311 }
312
313 fn resolve_table_relation(
314 &mut self,
315 table_catalog: Arc<TableCatalog>,
316 db_name: &str,
317 schema_name: &str,
318 as_of: Option<&AsOf>,
319 ) -> Result<(Relation, Vec<(bool, Field)>)> {
320 let table_id = table_catalog.id();
321 let columns = table_catalog
322 .columns
323 .iter()
324 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
325 .collect_vec();
326 self.check_privilege(
327 ObjectCheckItem::new(
328 table_catalog.owner,
329 AclMode::Select,
330 table_catalog.name.clone(),
331 PbObject::TableId(table_id.table_id),
332 ),
333 table_catalog.database_id,
334 )?;
335 self.included_relations.insert(table_id);
336
337 let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
338
339 let table = BoundBaseTable {
340 table_id,
341 table_catalog,
342 table_indexes,
343 as_of: as_of.cloned(),
344 };
345
346 Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
347 }
348
349 fn resolve_source_relation(
350 &mut self,
351 source_catalog: &SourceCatalog,
352 as_of: Option<&AsOf>,
353 is_temporary: bool,
354 ) -> Result<(Relation, Vec<(bool, Field)>)> {
355 debug_assert_column_ids_distinct(&source_catalog.columns);
356 if !is_temporary {
357 self.check_privilege(
358 ObjectCheckItem::new(
359 source_catalog.owner,
360 AclMode::Select,
361 source_catalog.name.clone(),
362 PbObject::SourceId(source_catalog.id),
363 ),
364 source_catalog.database_id,
365 )?;
366 }
367 self.included_relations.insert(source_catalog.id.into());
368 Ok((
369 Relation::Source(Box::new(BoundSource {
370 catalog: source_catalog.clone(),
371 as_of: as_of.cloned(),
372 })),
373 source_catalog
374 .columns
375 .iter()
376 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
377 .collect_vec(),
378 ))
379 }
380
381 fn resolve_view_relation(
382 &mut self,
383 view_catalog: &ViewCatalog,
384 ) -> Result<(Relation, Vec<(bool, Field)>)> {
385 if !view_catalog.is_system_view() {
386 self.check_privilege(
387 ObjectCheckItem::new(
388 view_catalog.owner,
389 AclMode::Select,
390 view_catalog.name.clone(),
391 PbObject::ViewId(view_catalog.id),
392 ),
393 view_catalog.database_id,
394 )?;
395 }
396
397 let ast = Parser::parse_sql(&view_catalog.sql)
398 .expect("a view's sql should be parsed successfully");
399 let Statement::Query(query) = ast
400 .into_iter()
401 .exactly_one()
402 .expect("a view should contain only one statement")
403 else {
404 unreachable!("a view should contain a query statement");
405 };
406 let query = self.bind_query_for_view(&query).map_err(|e| {
407 ErrorCode::BindError(format!(
408 "failed to bind view {}, sql: {}\nerror: {}",
409 view_catalog.name,
410 view_catalog.sql,
411 e.as_report()
412 ))
413 })?;
414
415 let columns = view_catalog.columns.clone();
416
417 if !itertools::equal(
418 query.schema().fields().iter().map(|f| &f.data_type),
419 view_catalog.columns.iter().map(|f| &f.data_type),
420 ) {
421 return Err(ErrorCode::BindError(format!(
422 "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
423 view_catalog.name, view_catalog.sql, query.schema(), columns
424 )).into());
425 }
426
427 let share_id = match self.shared_views.get(&view_catalog.id) {
428 Some(share_id) => *share_id,
429 None => {
430 let share_id = self.next_share_id();
431 self.shared_views.insert(view_catalog.id, share_id);
432 self.included_relations.insert(view_catalog.id.into());
433 share_id
434 }
435 };
436 let input = Either::Left(query);
437 Ok((
438 Relation::Share(Box::new(BoundShare {
439 share_id,
440 input: BoundShareInput::Query(input),
441 })),
442 columns.iter().map(|c| (false, c.clone())).collect_vec(),
443 ))
444 }
445
446 fn resolve_table_indexes(
447 &self,
448 db_name: &str,
449 schema_name: &str,
450 table_id: TableId,
451 ) -> Result<Vec<Arc<IndexCatalog>>> {
452 let schema = self.catalog.get_schema_by_name(db_name, schema_name)?;
453 assert!(
454 schema.get_table_by_id(&table_id).is_some(),
455 "table {table_id} not found in {db_name}.{schema_name}"
456 );
457
458 Ok(schema.get_indexes_by_table_id(&table_id))
459 }
460
461 pub(crate) fn bind_table(
462 &mut self,
463 schema_name: Option<&str>,
464 table_name: &str,
465 ) -> Result<BoundBaseTable> {
466 let db_name = &self.db_name;
467 let schema_path = self.bind_schema_path(schema_name);
468 let (table_catalog, schema_name) =
469 self.catalog
470 .get_created_table_by_name(db_name, schema_path, table_name)?;
471 let table_catalog = table_catalog.clone();
472
473 let table_id = table_catalog.id();
474 let table_indexes = self.resolve_table_indexes(db_name, schema_name, table_id)?;
475
476 let columns = table_catalog.columns.clone();
477
478 self.bind_table_to_context(
479 columns
480 .iter()
481 .map(|c| (c.is_hidden, (&c.column_desc).into())),
482 table_name.to_owned(),
483 None,
484 )?;
485
486 Ok(BoundBaseTable {
487 table_id,
488 table_catalog,
489 table_indexes,
490 as_of: None,
491 })
492 }
493
494 pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
495 let table_name = &table.name;
496 match table.table_type() {
497 TableType::Table => {}
498 TableType::Index => {
499 return Err(ErrorCode::InvalidInputSyntax(format!(
500 "cannot change index \"{table_name}\""
501 ))
502 .into());
503 }
504 TableType::MaterializedView => {
505 return Err(ErrorCode::InvalidInputSyntax(format!(
506 "cannot change materialized view \"{table_name}\""
507 ))
508 .into());
509 }
510 TableType::Internal => {
511 return Err(ErrorCode::InvalidInputSyntax(format!(
512 "cannot change internal table \"{table_name}\""
513 ))
514 .into());
515 }
516 }
517
518 if table.append_only && !is_insert {
519 return Err(ErrorCode::BindError(
520 "append-only table does not support update or delete".to_owned(),
521 )
522 .into());
523 }
524
525 Ok(())
526 }
527}