1use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::user::UserId;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44 pub table_id: TableId,
45 pub table_catalog: Arc<TableCatalog>,
46 pub table_indexes: Vec<Arc<IndexCatalog>>,
47 pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52 pub table_id: TableId,
53 pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58 pub catalog: SourceCatalog,
59 pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63 pub fn is_shareable_cdc_connector(&self) -> bool {
64 self.catalog.with_properties.is_shareable_cdc_connector()
65 }
66
67 pub fn is_shared(&self) -> bool {
68 self.catalog.info.is_shared()
69 }
70}
71
72impl Binder {
73 pub fn bind_relation_by_name_inner(
75 &mut self,
76 db_name: Option<&str>,
77 schema_name: Option<&str>,
78 table_name: &str,
79 alias: Option<TableAlias>,
80 as_of: Option<AsOf>,
81 ) -> Result<Relation> {
82 let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
84 let table = BoundSystemTable {
85 table_id: sys_table_catalog.id(),
86 sys_table_catalog: sys_table_catalog.clone(),
87 };
88 (
89 Relation::SystemTable(Box::new(table)),
90 sys_table_catalog
91 .columns
92 .iter()
93 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
94 .collect_vec(),
95 )
96 };
97
98 if let Some(db_name) = db_name {
100 let _ = self.catalog.get_database_by_name(db_name)?;
101 }
102
103 let (ret, columns) = {
105 match schema_name {
106 Some(schema_name) => {
107 let db_name = db_name.unwrap_or(&self.db_name);
108 let schema_path = SchemaPath::Name(schema_name);
109 if is_system_schema(schema_name) {
110 if let Ok(sys_table_catalog) =
111 self.catalog
112 .get_sys_table_by_name(db_name, schema_name, table_name)
113 {
114 resolve_sys_table_relation(sys_table_catalog)
115 } else if let Ok((view_catalog, _)) =
116 self.catalog
117 .get_view_by_name(db_name, schema_path, table_name)
118 {
119 self.resolve_view_relation(&view_catalog.clone())?
120 } else {
121 bail_not_implemented!(
122 issue = 1695,
123 r###"{}.{} is not supported, please use `SHOW` commands for now.
124`SHOW TABLES`,
125`SHOW MATERIALIZED VIEWS`,
126`DESCRIBE <table>`,
127`SHOW COLUMNS FROM [table]`
128"###,
129 schema_name,
130 table_name
131 );
132 }
133 } else if let Some(source_catalog) =
134 self.temporary_source_manager.get_source(table_name)
135 {
137 self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
138 } else if let Ok((table_catalog, schema_name)) = self
139 .catalog
140 .get_created_table_by_name(db_name, schema_path, table_name)
141 {
142 self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
143 } else if let Ok((source_catalog, _)) =
144 self.catalog
145 .get_source_by_name(db_name, schema_path, table_name)
146 {
147 self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
148 } else if let Ok((view_catalog, _)) =
149 self.catalog
150 .get_view_by_name(db_name, schema_path, table_name)
151 {
152 self.resolve_view_relation(&view_catalog.clone())?
153 } else {
154 return Err(CatalogError::NotFound(
155 "table or source",
156 table_name.to_owned(),
157 )
158 .into());
159 }
160 }
161 None => (|| {
162 let user_name = &self.auth_context.user_name;
163
164 for path in self.search_path.path() {
165 if is_system_schema(path)
166 && let Ok(sys_table_catalog) =
167 self.catalog
168 .get_sys_table_by_name(&self.db_name, path, table_name)
169 {
170 return Ok(resolve_sys_table_relation(sys_table_catalog));
171 } else {
172 let schema_name = if path == USER_NAME_WILD_CARD {
173 user_name
174 } else {
175 path
176 };
177
178 if let Ok(schema) =
179 self.catalog.get_schema_by_name(&self.db_name, schema_name)
180 {
181 if let Some(source_catalog) =
182 self.temporary_source_manager.get_source(table_name)
183 {
185 return self.resolve_source_relation(
186 &source_catalog.clone(),
187 as_of,
188 true,
189 );
190 } else if let Some(table_catalog) = schema
191 .get_created_table_or_any_internal_table_by_name(table_name)
192 {
193 return self.resolve_table_relation(
194 table_catalog.clone(),
195 &schema_name.clone(),
196 as_of,
197 );
198 } else if let Some(source_catalog) =
199 schema.get_source_by_name(table_name)
200 {
201 return self.resolve_source_relation(
202 &source_catalog.clone(),
203 as_of,
204 false,
205 );
206 } else if let Some(view_catalog) =
207 schema.get_view_by_name(table_name)
208 {
209 return self.resolve_view_relation(&view_catalog.clone());
210 }
211 }
212 }
213 }
214
215 Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
216 })()?,
217 }
218 };
219
220 self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
221 Ok(ret)
222 }
223
224 pub(crate) fn check_privilege(
225 &self,
226 object: PbObject,
227 database_id: DatabaseId,
228 mode: AclMode,
229 owner: UserId,
230 ) -> Result<()> {
231 if self.context.disable_security_invoker {
233 return Ok(());
234 }
235
236 match self.bind_for {
237 BindFor::Stream | BindFor::Batch => {
238 if matches!(self.bind_for, BindFor::Stream)
240 && self.database_id != database_id
241 && matches!(object, PbObject::SourceId(_))
242 {
243 return Err(PermissionDenied(
244 "SOURCE is not allowed for cross-db access".to_owned(),
245 )
246 .into());
247 }
248 if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
249 if user.is_super || user.id == owner {
250 return Ok(());
251 }
252 if !user.has_privilege(&object, mode) {
253 return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
254 }
255
256 if self.database_id != database_id
258 && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
259 {
260 return Err(
261 PermissionDenied("Do not have CONNECT privilege".to_owned()).into()
262 );
263 }
264 } else {
265 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
266 }
267 }
268 BindFor::Ddl | BindFor::System => {
269 }
271 }
272 Ok(())
273 }
274
275 fn resolve_table_relation(
276 &mut self,
277 table_catalog: Arc<TableCatalog>,
278 schema_name: &str,
279 as_of: Option<AsOf>,
280 ) -> Result<(Relation, Vec<(bool, Field)>)> {
281 let table_id = table_catalog.id();
282 let columns = table_catalog
283 .columns
284 .iter()
285 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
286 .collect_vec();
287 self.check_privilege(
288 PbObject::TableId(table_id.table_id),
289 table_catalog.database_id,
290 AclMode::Select,
291 table_catalog.owner,
292 )?;
293 self.included_relations.insert(table_id);
294
295 let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
296
297 let table = BoundBaseTable {
298 table_id,
299 table_catalog,
300 table_indexes,
301 as_of,
302 };
303
304 Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
305 }
306
307 fn resolve_source_relation(
308 &mut self,
309 source_catalog: &SourceCatalog,
310 as_of: Option<AsOf>,
311 is_temporary: bool,
312 ) -> Result<(Relation, Vec<(bool, Field)>)> {
313 debug_assert_column_ids_distinct(&source_catalog.columns);
314 if !is_temporary {
315 self.check_privilege(
316 PbObject::SourceId(source_catalog.id),
317 source_catalog.database_id,
318 AclMode::Select,
319 source_catalog.owner,
320 )?;
321 }
322 self.included_relations.insert(source_catalog.id.into());
323 Ok((
324 Relation::Source(Box::new(BoundSource {
325 catalog: source_catalog.clone(),
326 as_of,
327 })),
328 source_catalog
329 .columns
330 .iter()
331 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
332 .collect_vec(),
333 ))
334 }
335
336 fn resolve_view_relation(
337 &mut self,
338 view_catalog: &ViewCatalog,
339 ) -> Result<(Relation, Vec<(bool, Field)>)> {
340 if !view_catalog.is_system_view() {
341 self.check_privilege(
342 PbObject::ViewId(view_catalog.id),
343 view_catalog.database_id,
344 AclMode::Select,
345 view_catalog.owner,
346 )?;
347 }
348
349 let ast = Parser::parse_sql(&view_catalog.sql)
350 .expect("a view's sql should be parsed successfully");
351 let Statement::Query(query) = ast
352 .into_iter()
353 .exactly_one()
354 .expect("a view should contain only one statement")
355 else {
356 unreachable!("a view should contain a query statement");
357 };
358 let query = self.bind_query_for_view(*query).map_err(|e| {
359 ErrorCode::BindError(format!(
360 "failed to bind view {}, sql: {}\nerror: {}",
361 view_catalog.name,
362 view_catalog.sql,
363 e.as_report()
364 ))
365 })?;
366
367 let columns = view_catalog.columns.clone();
368
369 if !itertools::equal(
370 query.schema().fields().iter().map(|f| &f.data_type),
371 view_catalog.columns.iter().map(|f| &f.data_type),
372 ) {
373 return Err(ErrorCode::BindError(format!(
374 "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
375 view_catalog.name, view_catalog.sql, query.schema(), columns
376 )).into());
377 }
378
379 let share_id = match self.shared_views.get(&view_catalog.id) {
380 Some(share_id) => *share_id,
381 None => {
382 let share_id = self.next_share_id();
383 self.shared_views.insert(view_catalog.id, share_id);
384 self.included_relations.insert(view_catalog.id.into());
385 share_id
386 }
387 };
388 let input = Either::Left(query);
389 Ok((
390 Relation::Share(Box::new(BoundShare {
391 share_id,
392 input: BoundShareInput::Query(input),
393 })),
394 columns.iter().map(|c| (false, c.clone())).collect_vec(),
395 ))
396 }
397
398 fn resolve_table_indexes(
399 &self,
400 schema_name: &str,
401 table_id: TableId,
402 ) -> Result<Vec<Arc<IndexCatalog>>> {
403 Ok(self
404 .catalog
405 .get_schema_by_name(&self.db_name, schema_name)?
406 .get_indexes_by_table_id(&table_id))
407 }
408
409 pub(crate) fn bind_table(
410 &mut self,
411 schema_name: Option<&str>,
412 table_name: &str,
413 ) -> Result<BoundBaseTable> {
414 let db_name = &self.db_name;
415 let schema_path = self.bind_schema_path(schema_name);
416 let (table_catalog, schema_name) =
417 self.catalog
418 .get_created_table_by_name(db_name, schema_path, table_name)?;
419 let table_catalog = table_catalog.clone();
420
421 let table_id = table_catalog.id();
422 let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
423
424 let columns = table_catalog.columns.clone();
425
426 self.bind_table_to_context(
427 columns
428 .iter()
429 .map(|c| (c.is_hidden, (&c.column_desc).into())),
430 table_name.to_owned(),
431 None,
432 )?;
433
434 Ok(BoundBaseTable {
435 table_id,
436 table_catalog,
437 table_indexes,
438 as_of: None,
439 })
440 }
441
442 pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
443 let table_name = &table.name;
444 match table.table_type() {
445 TableType::Table => {}
446 TableType::Index => {
447 return Err(ErrorCode::InvalidInputSyntax(format!(
448 "cannot change index \"{table_name}\""
449 ))
450 .into());
451 }
452 TableType::MaterializedView => {
453 return Err(ErrorCode::InvalidInputSyntax(format!(
454 "cannot change materialized view \"{table_name}\""
455 ))
456 .into());
457 }
458 TableType::Internal => {
459 return Err(ErrorCode::InvalidInputSyntax(format!(
460 "cannot change internal table \"{table_name}\""
461 ))
462 .into());
463 }
464 }
465
466 if table.append_only && !is_insert {
467 return Err(ErrorCode::BindError(
468 "append-only table does not support update or delete".to_owned(),
469 )
470 .into());
471 }
472
473 Ok(())
474 }
475}