1use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::handler::privilege::ObjectCheckItem;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44 pub table_id: TableId,
45 pub table_catalog: Arc<TableCatalog>,
46 pub table_indexes: Vec<Arc<IndexCatalog>>,
47 pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52 pub table_id: TableId,
53 pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58 pub catalog: SourceCatalog,
59 pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63 pub fn is_shareable_cdc_connector(&self) -> bool {
64 self.catalog.with_properties.is_shareable_cdc_connector()
65 }
66
67 pub fn is_shared(&self) -> bool {
68 self.catalog.info.is_shared()
69 }
70}
71
72impl Binder {
73 pub fn bind_catalog_relation_by_object_name(
74 &mut self,
75 object_name: ObjectName,
76 bind_creating_relations: bool,
77 ) -> Result<Relation> {
78 let (schema_name, table_name) =
79 Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
80 self.bind_catalog_relation_by_name(
81 None,
82 schema_name.as_deref(),
83 &table_name,
84 None,
85 None,
86 bind_creating_relations,
87 )
88 }
89
90 pub fn bind_catalog_relation_by_name(
92 &mut self,
93 db_name: Option<&str>,
94 schema_name: Option<&str>,
95 table_name: &str,
96 alias: Option<TableAlias>,
97 as_of: Option<AsOf>,
98 bind_creating_relations: bool,
99 ) -> Result<Relation> {
100 let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
102 let table = BoundSystemTable {
103 table_id: sys_table_catalog.id(),
104 sys_table_catalog: sys_table_catalog.clone(),
105 };
106 (
107 Relation::SystemTable(Box::new(table)),
108 sys_table_catalog
109 .columns
110 .iter()
111 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
112 .collect_vec(),
113 )
114 };
115
116 if let Some(db_name) = db_name {
118 let _ = self.catalog.get_database_by_name(db_name)?;
119 }
120
121 let (ret, columns) = {
123 match schema_name {
124 Some(schema_name) => {
125 let db_name = db_name.unwrap_or(&self.db_name);
126 let schema_path = SchemaPath::Name(schema_name);
127 if is_system_schema(schema_name) {
128 if let Ok(sys_table_catalog) =
129 self.catalog
130 .get_sys_table_by_name(db_name, schema_name, table_name)
131 {
132 resolve_sys_table_relation(sys_table_catalog)
133 } else if let Ok((view_catalog, _)) =
134 self.catalog
135 .get_view_by_name(db_name, schema_path, table_name)
136 {
137 self.resolve_view_relation(&view_catalog.clone())?
138 } else {
139 bail_not_implemented!(
140 issue = 1695,
141 r###"{}.{} is not supported, please use `SHOW` commands for now.
142`SHOW TABLES`,
143`SHOW MATERIALIZED VIEWS`,
144`DESCRIBE <table>`,
145`SHOW COLUMNS FROM [table]`
146"###,
147 schema_name,
148 table_name
149 );
150 }
151 } else if let Some(source_catalog) =
152 self.temporary_source_manager.get_source(table_name)
153 {
155 self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
156 } else if let Ok((table_catalog, schema_name)) = self
157 .catalog
158 .get_any_table_by_name(db_name, schema_path, table_name)
159 && (bind_creating_relations || table_catalog.is_created())
160 {
161 self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
162 } else if let Ok((source_catalog, _)) =
163 self.catalog
164 .get_source_by_name(db_name, schema_path, table_name)
165 {
166 self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
167 } else if let Ok((view_catalog, _)) =
168 self.catalog
169 .get_view_by_name(db_name, schema_path, table_name)
170 {
171 self.resolve_view_relation(&view_catalog.clone())?
172 } else {
173 return Err(CatalogError::NotFound(
174 "table or source",
175 table_name.to_owned(),
176 )
177 .into());
178 }
179 }
180 None => (|| {
181 let user_name = &self.auth_context.user_name;
182
183 for path in self.search_path.path() {
184 if is_system_schema(path)
185 && let Ok(sys_table_catalog) =
186 self.catalog
187 .get_sys_table_by_name(&self.db_name, path, table_name)
188 {
189 return Ok(resolve_sys_table_relation(sys_table_catalog));
190 } else {
191 let schema_name = if path == USER_NAME_WILD_CARD {
192 user_name
193 } else {
194 path
195 };
196
197 if let Ok(schema) =
198 self.catalog.get_schema_by_name(&self.db_name, schema_name)
199 {
200 if let Some(source_catalog) =
201 self.temporary_source_manager.get_source(table_name)
202 {
204 return self.resolve_source_relation(
205 &source_catalog.clone(),
206 as_of,
207 true,
208 );
209 } else if let Some(table_catalog) =
210 schema.get_any_table_by_name(table_name)
211 && (bind_creating_relations
212 || table_catalog.is_internal_table()
213 || table_catalog.is_created())
214 {
215 return self.resolve_table_relation(
216 table_catalog.clone(),
217 &schema_name.clone(),
218 as_of,
219 );
220 } else if let Some(source_catalog) =
221 schema.get_source_by_name(table_name)
222 {
223 return self.resolve_source_relation(
224 &source_catalog.clone(),
225 as_of,
226 false,
227 );
228 } else if let Some(view_catalog) =
229 schema.get_view_by_name(table_name)
230 {
231 return self.resolve_view_relation(&view_catalog.clone());
232 }
233 }
234 }
235 }
236
237 Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
238 })()?,
239 }
240 };
241
242 self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
243 Ok(ret)
244 }
245
246 pub(crate) fn check_privilege(
247 &self,
248 item: ObjectCheckItem,
249 database_id: DatabaseId,
250 ) -> Result<()> {
251 if self.context.disable_security_invoker {
253 return Ok(());
254 }
255
256 match self.bind_for {
257 BindFor::Stream | BindFor::Batch => {
258 if matches!(self.bind_for, BindFor::Stream)
260 && self.database_id != database_id
261 && matches!(item.object, PbObject::SourceId(_))
262 {
263 return Err(PermissionDenied(format!(
264 "SOURCE \"{}\" is not allowed for cross-db access",
265 item.name
266 ))
267 .into());
268 }
269 if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
270 if user.is_super || user.id == item.owner {
271 return Ok(());
272 }
273 if !user.has_privilege(&item.object, item.mode) {
274 return Err(PermissionDenied(item.error_message()).into());
275 }
276
277 if self.database_id != database_id
279 && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
280 {
281 let db_name = self
282 .catalog
283 .get_database_by_id(&database_id)?
284 .name
285 .to_owned();
286
287 return Err(PermissionDenied(format!(
288 "permission denied for database \"{db_name}\""
289 ))
290 .into());
291 }
292 } else {
293 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
294 }
295 }
296 BindFor::Ddl | BindFor::System => {
297 }
299 }
300 Ok(())
301 }
302
303 fn resolve_table_relation(
304 &mut self,
305 table_catalog: Arc<TableCatalog>,
306 schema_name: &str,
307 as_of: Option<AsOf>,
308 ) -> Result<(Relation, Vec<(bool, Field)>)> {
309 let table_id = table_catalog.id();
310 let columns = table_catalog
311 .columns
312 .iter()
313 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
314 .collect_vec();
315 self.check_privilege(
316 ObjectCheckItem::new(
317 table_catalog.owner,
318 AclMode::Select,
319 table_catalog.name.clone(),
320 PbObject::TableId(table_id.table_id),
321 ),
322 table_catalog.database_id,
323 )?;
324 self.included_relations.insert(table_id);
325
326 let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
327
328 let table = BoundBaseTable {
329 table_id,
330 table_catalog,
331 table_indexes,
332 as_of,
333 };
334
335 Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
336 }
337
338 fn resolve_source_relation(
339 &mut self,
340 source_catalog: &SourceCatalog,
341 as_of: Option<AsOf>,
342 is_temporary: bool,
343 ) -> Result<(Relation, Vec<(bool, Field)>)> {
344 debug_assert_column_ids_distinct(&source_catalog.columns);
345 if !is_temporary {
346 self.check_privilege(
347 ObjectCheckItem::new(
348 source_catalog.owner,
349 AclMode::Select,
350 source_catalog.name.clone(),
351 PbObject::SourceId(source_catalog.id),
352 ),
353 source_catalog.database_id,
354 )?;
355 }
356 self.included_relations.insert(source_catalog.id.into());
357 Ok((
358 Relation::Source(Box::new(BoundSource {
359 catalog: source_catalog.clone(),
360 as_of,
361 })),
362 source_catalog
363 .columns
364 .iter()
365 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
366 .collect_vec(),
367 ))
368 }
369
370 fn resolve_view_relation(
371 &mut self,
372 view_catalog: &ViewCatalog,
373 ) -> Result<(Relation, Vec<(bool, Field)>)> {
374 if !view_catalog.is_system_view() {
375 self.check_privilege(
376 ObjectCheckItem::new(
377 view_catalog.owner,
378 AclMode::Select,
379 view_catalog.name.clone(),
380 PbObject::ViewId(view_catalog.id),
381 ),
382 view_catalog.database_id,
383 )?;
384 }
385
386 let ast = Parser::parse_sql(&view_catalog.sql)
387 .expect("a view's sql should be parsed successfully");
388 let Statement::Query(query) = ast
389 .into_iter()
390 .exactly_one()
391 .expect("a view should contain only one statement")
392 else {
393 unreachable!("a view should contain a query statement");
394 };
395 let query = self.bind_query_for_view(*query).map_err(|e| {
396 ErrorCode::BindError(format!(
397 "failed to bind view {}, sql: {}\nerror: {}",
398 view_catalog.name,
399 view_catalog.sql,
400 e.as_report()
401 ))
402 })?;
403
404 let columns = view_catalog.columns.clone();
405
406 if !itertools::equal(
407 query.schema().fields().iter().map(|f| &f.data_type),
408 view_catalog.columns.iter().map(|f| &f.data_type),
409 ) {
410 return Err(ErrorCode::BindError(format!(
411 "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
412 view_catalog.name, view_catalog.sql, query.schema(), columns
413 )).into());
414 }
415
416 let share_id = match self.shared_views.get(&view_catalog.id) {
417 Some(share_id) => *share_id,
418 None => {
419 let share_id = self.next_share_id();
420 self.shared_views.insert(view_catalog.id, share_id);
421 self.included_relations.insert(view_catalog.id.into());
422 share_id
423 }
424 };
425 let input = Either::Left(query);
426 Ok((
427 Relation::Share(Box::new(BoundShare {
428 share_id,
429 input: BoundShareInput::Query(input),
430 })),
431 columns.iter().map(|c| (false, c.clone())).collect_vec(),
432 ))
433 }
434
435 fn resolve_table_indexes(
436 &self,
437 schema_name: &str,
438 table_id: TableId,
439 ) -> Result<Vec<Arc<IndexCatalog>>> {
440 Ok(self
441 .catalog
442 .get_schema_by_name(&self.db_name, schema_name)?
443 .get_indexes_by_table_id(&table_id))
444 }
445
446 pub(crate) fn bind_table(
447 &mut self,
448 schema_name: Option<&str>,
449 table_name: &str,
450 ) -> Result<BoundBaseTable> {
451 let db_name = &self.db_name;
452 let schema_path = self.bind_schema_path(schema_name);
453 let (table_catalog, schema_name) =
454 self.catalog
455 .get_created_table_by_name(db_name, schema_path, table_name)?;
456 let table_catalog = table_catalog.clone();
457
458 let table_id = table_catalog.id();
459 let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
460
461 let columns = table_catalog.columns.clone();
462
463 self.bind_table_to_context(
464 columns
465 .iter()
466 .map(|c| (c.is_hidden, (&c.column_desc).into())),
467 table_name.to_owned(),
468 None,
469 )?;
470
471 Ok(BoundBaseTable {
472 table_id,
473 table_catalog,
474 table_indexes,
475 as_of: None,
476 })
477 }
478
479 pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
480 let table_name = &table.name;
481 match table.table_type() {
482 TableType::Table => {}
483 TableType::Index => {
484 return Err(ErrorCode::InvalidInputSyntax(format!(
485 "cannot change index \"{table_name}\""
486 ))
487 .into());
488 }
489 TableType::MaterializedView => {
490 return Err(ErrorCode::InvalidInputSyntax(format!(
491 "cannot change materialized view \"{table_name}\""
492 ))
493 .into());
494 }
495 TableType::Internal => {
496 return Err(ErrorCode::InvalidInputSyntax(format!(
497 "cannot change internal table \"{table_name}\""
498 ))
499 .into());
500 }
501 }
502
503 if table.append_only && !is_insert {
504 return Err(ErrorCode::BindError(
505 "append-only table does not support update or delete".to_owned(),
506 )
507 .into());
508 }
509
510 Ok(())
511 }
512}