1use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, ObjectName, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::user::UserId;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44 pub table_id: TableId,
45 pub table_catalog: Arc<TableCatalog>,
46 pub table_indexes: Vec<Arc<IndexCatalog>>,
47 pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52 pub table_id: TableId,
53 pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58 pub catalog: SourceCatalog,
59 pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63 pub fn is_shareable_cdc_connector(&self) -> bool {
64 self.catalog.with_properties.is_shareable_cdc_connector()
65 }
66
67 pub fn is_shared(&self) -> bool {
68 self.catalog.info.is_shared()
69 }
70}
71
72impl Binder {
73 pub fn bind_catalog_relation_by_object_name(
74 &mut self,
75 object_name: ObjectName,
76 bind_creating_relations: bool,
77 ) -> Result<Relation> {
78 let (schema_name, table_name) =
79 Binder::resolve_schema_qualified_name(&self.db_name, object_name)?;
80 self.bind_catalog_relation_by_name(
81 None,
82 schema_name.as_deref(),
83 &table_name,
84 None,
85 None,
86 bind_creating_relations,
87 )
88 }
89
90 pub fn bind_catalog_relation_by_name(
92 &mut self,
93 db_name: Option<&str>,
94 schema_name: Option<&str>,
95 table_name: &str,
96 alias: Option<TableAlias>,
97 as_of: Option<AsOf>,
98 bind_creating_relations: bool,
99 ) -> Result<Relation> {
100 let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
102 let table = BoundSystemTable {
103 table_id: sys_table_catalog.id(),
104 sys_table_catalog: sys_table_catalog.clone(),
105 };
106 (
107 Relation::SystemTable(Box::new(table)),
108 sys_table_catalog
109 .columns
110 .iter()
111 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
112 .collect_vec(),
113 )
114 };
115
116 if let Some(db_name) = db_name {
118 let _ = self.catalog.get_database_by_name(db_name)?;
119 }
120
121 let (ret, columns) = {
123 match schema_name {
124 Some(schema_name) => {
125 let db_name = db_name.unwrap_or(&self.db_name);
126 let schema_path = SchemaPath::Name(schema_name);
127 if is_system_schema(schema_name) {
128 if let Ok(sys_table_catalog) =
129 self.catalog
130 .get_sys_table_by_name(db_name, schema_name, table_name)
131 {
132 resolve_sys_table_relation(sys_table_catalog)
133 } else if let Ok((view_catalog, _)) =
134 self.catalog
135 .get_view_by_name(db_name, schema_path, table_name)
136 {
137 self.resolve_view_relation(&view_catalog.clone())?
138 } else {
139 bail_not_implemented!(
140 issue = 1695,
141 r###"{}.{} is not supported, please use `SHOW` commands for now.
142`SHOW TABLES`,
143`SHOW MATERIALIZED VIEWS`,
144`DESCRIBE <table>`,
145`SHOW COLUMNS FROM [table]`
146"###,
147 schema_name,
148 table_name
149 );
150 }
151 } else if let Some(source_catalog) =
152 self.temporary_source_manager.get_source(table_name)
153 {
155 self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
156 } else if let Ok((table_catalog, schema_name)) = self
157 .catalog
158 .get_any_table_by_name(db_name, schema_path, table_name)
159 && (bind_creating_relations || table_catalog.is_created())
160 {
161 self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
162 } else if let Ok((source_catalog, _)) =
163 self.catalog
164 .get_source_by_name(db_name, schema_path, table_name)
165 {
166 self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
167 } else if let Ok((view_catalog, _)) =
168 self.catalog
169 .get_view_by_name(db_name, schema_path, table_name)
170 {
171 self.resolve_view_relation(&view_catalog.clone())?
172 } else {
173 return Err(CatalogError::NotFound(
174 "table or source",
175 table_name.to_owned(),
176 )
177 .into());
178 }
179 }
180 None => (|| {
181 let user_name = &self.auth_context.user_name;
182
183 for path in self.search_path.path() {
184 if is_system_schema(path)
185 && let Ok(sys_table_catalog) =
186 self.catalog
187 .get_sys_table_by_name(&self.db_name, path, table_name)
188 {
189 return Ok(resolve_sys_table_relation(sys_table_catalog));
190 } else {
191 let schema_name = if path == USER_NAME_WILD_CARD {
192 user_name
193 } else {
194 path
195 };
196
197 if let Ok(schema) =
198 self.catalog.get_schema_by_name(&self.db_name, schema_name)
199 {
200 if let Some(source_catalog) =
201 self.temporary_source_manager.get_source(table_name)
202 {
204 return self.resolve_source_relation(
205 &source_catalog.clone(),
206 as_of,
207 true,
208 );
209 } else if let Some(table_catalog) =
210 schema.get_any_table_by_name(table_name)
211 && (bind_creating_relations
212 || table_catalog.is_internal_table()
213 || table_catalog.is_created())
214 {
215 return self.resolve_table_relation(
216 table_catalog.clone(),
217 &schema_name.clone(),
218 as_of,
219 );
220 } else if let Some(source_catalog) =
221 schema.get_source_by_name(table_name)
222 {
223 return self.resolve_source_relation(
224 &source_catalog.clone(),
225 as_of,
226 false,
227 );
228 } else if let Some(view_catalog) =
229 schema.get_view_by_name(table_name)
230 {
231 return self.resolve_view_relation(&view_catalog.clone());
232 }
233 }
234 }
235 }
236
237 Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
238 })()?,
239 }
240 };
241
242 self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
243 Ok(ret)
244 }
245
246 pub(crate) fn check_privilege(
247 &self,
248 object: PbObject,
249 database_id: DatabaseId,
250 mode: AclMode,
251 owner: UserId,
252 ) -> Result<()> {
253 if self.context.disable_security_invoker {
255 return Ok(());
256 }
257
258 match self.bind_for {
259 BindFor::Stream | BindFor::Batch => {
260 if matches!(self.bind_for, BindFor::Stream)
262 && self.database_id != database_id
263 && matches!(object, PbObject::SourceId(_))
264 {
265 return Err(PermissionDenied(
266 "SOURCE is not allowed for cross-db access".to_owned(),
267 )
268 .into());
269 }
270 if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
271 if user.is_super || user.id == owner {
272 return Ok(());
273 }
274 if !user.has_privilege(&object, mode) {
275 return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
276 }
277
278 if self.database_id != database_id
280 && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
281 {
282 return Err(
283 PermissionDenied("Do not have CONNECT privilege".to_owned()).into()
284 );
285 }
286 } else {
287 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
288 }
289 }
290 BindFor::Ddl | BindFor::System => {
291 }
293 }
294 Ok(())
295 }
296
297 fn resolve_table_relation(
298 &mut self,
299 table_catalog: Arc<TableCatalog>,
300 schema_name: &str,
301 as_of: Option<AsOf>,
302 ) -> Result<(Relation, Vec<(bool, Field)>)> {
303 let table_id = table_catalog.id();
304 let columns = table_catalog
305 .columns
306 .iter()
307 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
308 .collect_vec();
309 self.check_privilege(
310 PbObject::TableId(table_id.table_id),
311 table_catalog.database_id,
312 AclMode::Select,
313 table_catalog.owner,
314 )?;
315 self.included_relations.insert(table_id);
316
317 let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
318
319 let table = BoundBaseTable {
320 table_id,
321 table_catalog,
322 table_indexes,
323 as_of,
324 };
325
326 Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
327 }
328
329 fn resolve_source_relation(
330 &mut self,
331 source_catalog: &SourceCatalog,
332 as_of: Option<AsOf>,
333 is_temporary: bool,
334 ) -> Result<(Relation, Vec<(bool, Field)>)> {
335 debug_assert_column_ids_distinct(&source_catalog.columns);
336 if !is_temporary {
337 self.check_privilege(
338 PbObject::SourceId(source_catalog.id),
339 source_catalog.database_id,
340 AclMode::Select,
341 source_catalog.owner,
342 )?;
343 }
344 self.included_relations.insert(source_catalog.id.into());
345 Ok((
346 Relation::Source(Box::new(BoundSource {
347 catalog: source_catalog.clone(),
348 as_of,
349 })),
350 source_catalog
351 .columns
352 .iter()
353 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
354 .collect_vec(),
355 ))
356 }
357
358 fn resolve_view_relation(
359 &mut self,
360 view_catalog: &ViewCatalog,
361 ) -> Result<(Relation, Vec<(bool, Field)>)> {
362 if !view_catalog.is_system_view() {
363 self.check_privilege(
364 PbObject::ViewId(view_catalog.id),
365 view_catalog.database_id,
366 AclMode::Select,
367 view_catalog.owner,
368 )?;
369 }
370
371 let ast = Parser::parse_sql(&view_catalog.sql)
372 .expect("a view's sql should be parsed successfully");
373 let Statement::Query(query) = ast
374 .into_iter()
375 .exactly_one()
376 .expect("a view should contain only one statement")
377 else {
378 unreachable!("a view should contain a query statement");
379 };
380 let query = self.bind_query_for_view(*query).map_err(|e| {
381 ErrorCode::BindError(format!(
382 "failed to bind view {}, sql: {}\nerror: {}",
383 view_catalog.name,
384 view_catalog.sql,
385 e.as_report()
386 ))
387 })?;
388
389 let columns = view_catalog.columns.clone();
390
391 if !itertools::equal(
392 query.schema().fields().iter().map(|f| &f.data_type),
393 view_catalog.columns.iter().map(|f| &f.data_type),
394 ) {
395 return Err(ErrorCode::BindError(format!(
396 "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
397 view_catalog.name, view_catalog.sql, query.schema(), columns
398 )).into());
399 }
400
401 let share_id = match self.shared_views.get(&view_catalog.id) {
402 Some(share_id) => *share_id,
403 None => {
404 let share_id = self.next_share_id();
405 self.shared_views.insert(view_catalog.id, share_id);
406 self.included_relations.insert(view_catalog.id.into());
407 share_id
408 }
409 };
410 let input = Either::Left(query);
411 Ok((
412 Relation::Share(Box::new(BoundShare {
413 share_id,
414 input: BoundShareInput::Query(input),
415 })),
416 columns.iter().map(|c| (false, c.clone())).collect_vec(),
417 ))
418 }
419
420 fn resolve_table_indexes(
421 &self,
422 schema_name: &str,
423 table_id: TableId,
424 ) -> Result<Vec<Arc<IndexCatalog>>> {
425 Ok(self
426 .catalog
427 .get_schema_by_name(&self.db_name, schema_name)?
428 .get_indexes_by_table_id(&table_id))
429 }
430
431 pub(crate) fn bind_table(
432 &mut self,
433 schema_name: Option<&str>,
434 table_name: &str,
435 ) -> Result<BoundBaseTable> {
436 let db_name = &self.db_name;
437 let schema_path = self.bind_schema_path(schema_name);
438 let (table_catalog, schema_name) =
439 self.catalog
440 .get_created_table_by_name(db_name, schema_path, table_name)?;
441 let table_catalog = table_catalog.clone();
442
443 let table_id = table_catalog.id();
444 let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
445
446 let columns = table_catalog.columns.clone();
447
448 self.bind_table_to_context(
449 columns
450 .iter()
451 .map(|c| (c.is_hidden, (&c.column_desc).into())),
452 table_name.to_owned(),
453 None,
454 )?;
455
456 Ok(BoundBaseTable {
457 table_id,
458 table_catalog,
459 table_indexes,
460 as_of: None,
461 })
462 }
463
464 pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
465 let table_name = &table.name;
466 match table.table_type() {
467 TableType::Table => {}
468 TableType::Index => {
469 return Err(ErrorCode::InvalidInputSyntax(format!(
470 "cannot change index \"{table_name}\""
471 ))
472 .into());
473 }
474 TableType::MaterializedView => {
475 return Err(ErrorCode::InvalidInputSyntax(format!(
476 "cannot change materialized view \"{table_name}\""
477 ))
478 .into());
479 }
480 TableType::Internal => {
481 return Err(ErrorCode::InvalidInputSyntax(format!(
482 "cannot change internal table \"{table_name}\""
483 ))
484 .into());
485 }
486 }
487
488 if table.append_only && !is_insert {
489 return Err(ErrorCode::BindError(
490 "append-only table does not support update or delete".to_owned(),
491 )
492 .into());
493 }
494
495 Ok(())
496 }
497}