1use std::sync::Arc;
16
17use either::Either;
18use itertools::Itertools;
19use risingwave_common::acl::AclMode;
20use risingwave_common::bail_not_implemented;
21use risingwave_common::catalog::{Field, debug_assert_column_ids_distinct, is_system_schema};
22use risingwave_common::session_config::USER_NAME_WILD_CARD;
23use risingwave_connector::WithPropertiesExt;
24use risingwave_pb::user::grant_privilege::PbObject;
25use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
26use risingwave_sqlparser::parser::Parser;
27use thiserror_ext::AsReport;
28
29use super::BoundShare;
30use crate::binder::relation::BoundShareInput;
31use crate::binder::{BindFor, Binder, Relation};
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::system_catalog::SystemTableCatalog;
35use crate::catalog::table_catalog::{TableCatalog, TableType};
36use crate::catalog::view_catalog::ViewCatalog;
37use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId};
38use crate::error::ErrorCode::PermissionDenied;
39use crate::error::{ErrorCode, Result, RwError};
40use crate::user::UserId;
41
42#[derive(Debug, Clone)]
43pub struct BoundBaseTable {
44 pub table_id: TableId,
45 pub table_catalog: Arc<TableCatalog>,
46 pub table_indexes: Vec<Arc<IndexCatalog>>,
47 pub as_of: Option<AsOf>,
48}
49
50#[derive(Debug, Clone)]
51pub struct BoundSystemTable {
52 pub table_id: TableId,
53 pub sys_table_catalog: Arc<SystemTableCatalog>,
54}
55
56#[derive(Debug, Clone)]
57pub struct BoundSource {
58 pub catalog: SourceCatalog,
59 pub as_of: Option<AsOf>,
60}
61
62impl BoundSource {
63 pub fn is_shareable_cdc_connector(&self) -> bool {
64 self.catalog.with_properties.is_shareable_cdc_connector()
65 }
66
67 pub fn is_shared(&self) -> bool {
68 self.catalog.info.is_shared()
69 }
70}
71
72impl Binder {
73 pub fn bind_relation_by_name_inner(
75 &mut self,
76 db_name: Option<&str>,
77 schema_name: Option<&str>,
78 table_name: &str,
79 alias: Option<TableAlias>,
80 as_of: Option<AsOf>,
81 ) -> Result<Relation> {
82 let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
84 let table = BoundSystemTable {
85 table_id: sys_table_catalog.id(),
86 sys_table_catalog: sys_table_catalog.clone(),
87 };
88 (
89 Relation::SystemTable(Box::new(table)),
90 sys_table_catalog
91 .columns
92 .iter()
93 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
94 .collect_vec(),
95 )
96 };
97
98 let (ret, columns) = {
100 match schema_name {
101 Some(schema_name) => {
102 let db_name = db_name.unwrap_or(&self.db_name);
103 let schema_path = SchemaPath::Name(schema_name);
104 if is_system_schema(schema_name) {
105 if let Ok(sys_table_catalog) =
106 self.catalog
107 .get_sys_table_by_name(db_name, schema_name, table_name)
108 {
109 resolve_sys_table_relation(sys_table_catalog)
110 } else if let Ok((view_catalog, _)) =
111 self.catalog
112 .get_view_by_name(db_name, schema_path, table_name)
113 {
114 self.resolve_view_relation(&view_catalog.clone())?
115 } else {
116 bail_not_implemented!(
117 issue = 1695,
118 r###"{}.{} is not supported, please use `SHOW` commands for now.
119`SHOW TABLES`,
120`SHOW MATERIALIZED VIEWS`,
121`DESCRIBE <table>`,
122`SHOW COLUMNS FROM [table]`
123"###,
124 schema_name,
125 table_name
126 );
127 }
128 } else if let Some(source_catalog) =
129 self.temporary_source_manager.get_source(table_name)
130 {
132 self.resolve_source_relation(&source_catalog.clone(), as_of, true)?
133 } else if let Ok((table_catalog, schema_name)) = self
134 .catalog
135 .get_created_table_by_name(db_name, schema_path, table_name)
136 {
137 self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
138 } else if let Ok((source_catalog, _)) =
139 self.catalog
140 .get_source_by_name(db_name, schema_path, table_name)
141 {
142 self.resolve_source_relation(&source_catalog.clone(), as_of, false)?
143 } else if let Ok((view_catalog, _)) =
144 self.catalog
145 .get_view_by_name(db_name, schema_path, table_name)
146 {
147 self.resolve_view_relation(&view_catalog.clone())?
148 } else {
149 return Err(CatalogError::NotFound(
150 "table or source",
151 table_name.to_owned(),
152 )
153 .into());
154 }
155 }
156 None => (|| {
157 let user_name = &self.auth_context.user_name;
158
159 for path in self.search_path.path() {
160 if is_system_schema(path)
161 && let Ok(sys_table_catalog) =
162 self.catalog
163 .get_sys_table_by_name(&self.db_name, path, table_name)
164 {
165 return Ok(resolve_sys_table_relation(sys_table_catalog));
166 } else {
167 let schema_name = if path == USER_NAME_WILD_CARD {
168 user_name
169 } else {
170 path
171 };
172
173 if let Ok(schema) =
174 self.catalog.get_schema_by_name(&self.db_name, schema_name)
175 {
176 if let Some(source_catalog) =
177 self.temporary_source_manager.get_source(table_name)
178 {
180 return self.resolve_source_relation(
181 &source_catalog.clone(),
182 as_of,
183 true,
184 );
185 } else if let Some(table_catalog) = schema
186 .get_created_table_or_any_internal_table_by_name(table_name)
187 {
188 return self.resolve_table_relation(
189 table_catalog.clone(),
190 &schema_name.clone(),
191 as_of,
192 );
193 } else if let Some(source_catalog) =
194 schema.get_source_by_name(table_name)
195 {
196 return self.resolve_source_relation(
197 &source_catalog.clone(),
198 as_of,
199 false,
200 );
201 } else if let Some(view_catalog) =
202 schema.get_view_by_name(table_name)
203 {
204 return self.resolve_view_relation(&view_catalog.clone());
205 }
206 }
207 }
208 }
209
210 Err(CatalogError::NotFound("table or source", table_name.to_owned()).into())
211 })()?,
212 }
213 };
214
215 self.bind_table_to_context(columns, table_name.to_owned(), alias)?;
216 Ok(ret)
217 }
218
219 pub(crate) fn check_privilege(
220 &self,
221 object: PbObject,
222 database_id: DatabaseId,
223 mode: AclMode,
224 owner: UserId,
225 ) -> Result<()> {
226 if self.context.disable_security_invoker {
228 return Ok(());
229 }
230
231 match self.bind_for {
232 BindFor::Stream | BindFor::Batch => {
233 if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) {
234 if user.is_super || user.id == owner {
235 return Ok(());
236 }
237 if !user.has_privilege(&object, mode) {
238 return Err(PermissionDenied("Do not have the privilege".to_owned()).into());
239 }
240
241 if self.database_id != database_id
243 && !user.has_privilege(&PbObject::DatabaseId(database_id), AclMode::Connect)
244 {
245 return Err(
246 PermissionDenied("Do not have CONNECT privilege".to_owned()).into()
247 );
248 }
249 } else {
250 return Err(PermissionDenied("Session user is invalid".to_owned()).into());
251 }
252 }
253 BindFor::Ddl | BindFor::System => {
254 }
256 }
257 Ok(())
258 }
259
260 fn resolve_table_relation(
261 &mut self,
262 table_catalog: Arc<TableCatalog>,
263 schema_name: &str,
264 as_of: Option<AsOf>,
265 ) -> Result<(Relation, Vec<(bool, Field)>)> {
266 let table_id = table_catalog.id();
267 let columns = table_catalog
268 .columns
269 .iter()
270 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
271 .collect_vec();
272 self.check_privilege(
273 PbObject::TableId(table_id.table_id),
274 table_catalog.database_id,
275 AclMode::Select,
276 table_catalog.owner,
277 )?;
278 self.included_relations.insert(table_id);
279
280 let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
281
282 let table = BoundBaseTable {
283 table_id,
284 table_catalog,
285 table_indexes,
286 as_of,
287 };
288
289 Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
290 }
291
292 fn resolve_source_relation(
293 &mut self,
294 source_catalog: &SourceCatalog,
295 as_of: Option<AsOf>,
296 is_temporary: bool,
297 ) -> Result<(Relation, Vec<(bool, Field)>)> {
298 debug_assert_column_ids_distinct(&source_catalog.columns);
299 if !is_temporary {
300 self.check_privilege(
301 PbObject::SourceId(source_catalog.id),
302 source_catalog.database_id,
303 AclMode::Select,
304 source_catalog.owner,
305 )?;
306 }
307 self.included_relations.insert(source_catalog.id.into());
308 Ok((
309 Relation::Source(Box::new(BoundSource {
310 catalog: source_catalog.clone(),
311 as_of,
312 })),
313 source_catalog
314 .columns
315 .iter()
316 .map(|c| (c.is_hidden, Field::from(&c.column_desc)))
317 .collect_vec(),
318 ))
319 }
320
321 fn resolve_view_relation(
322 &mut self,
323 view_catalog: &ViewCatalog,
324 ) -> Result<(Relation, Vec<(bool, Field)>)> {
325 if !view_catalog.is_system_view() {
326 self.check_privilege(
327 PbObject::ViewId(view_catalog.id),
328 view_catalog.database_id,
329 AclMode::Select,
330 view_catalog.owner,
331 )?;
332 }
333
334 let ast = Parser::parse_sql(&view_catalog.sql)
335 .expect("a view's sql should be parsed successfully");
336 let Statement::Query(query) = ast
337 .into_iter()
338 .exactly_one()
339 .expect("a view should contain only one statement")
340 else {
341 unreachable!("a view should contain a query statement");
342 };
343 let query = self.bind_query_for_view(*query).map_err(|e| {
344 ErrorCode::BindError(format!(
345 "failed to bind view {}, sql: {}\nerror: {}",
346 view_catalog.name,
347 view_catalog.sql,
348 e.as_report()
349 ))
350 })?;
351
352 let columns = view_catalog.columns.clone();
353
354 if !itertools::equal(
355 query.schema().fields().iter().map(|f| &f.data_type),
356 view_catalog.columns.iter().map(|f| &f.data_type),
357 ) {
358 return Err(ErrorCode::BindError(format!(
359 "failed to bind view {}. The SQL's schema is different from catalog's schema sql: {}, bound schema: {:?}, catalog schema: {:?}",
360 view_catalog.name, view_catalog.sql, query.schema(), columns
361 )).into());
362 }
363
364 let share_id = match self.shared_views.get(&view_catalog.id) {
365 Some(share_id) => *share_id,
366 None => {
367 let share_id = self.next_share_id();
368 self.shared_views.insert(view_catalog.id, share_id);
369 self.included_relations.insert(view_catalog.id.into());
370 share_id
371 }
372 };
373 let input = Either::Left(query);
374 Ok((
375 Relation::Share(Box::new(BoundShare {
376 share_id,
377 input: BoundShareInput::Query(input),
378 })),
379 columns.iter().map(|c| (false, c.clone())).collect_vec(),
380 ))
381 }
382
383 fn resolve_table_indexes(
384 &self,
385 schema_name: &str,
386 table_id: TableId,
387 ) -> Result<Vec<Arc<IndexCatalog>>> {
388 Ok(self
389 .catalog
390 .get_schema_by_name(&self.db_name, schema_name)?
391 .get_indexes_by_table_id(&table_id))
392 }
393
394 pub(crate) fn bind_table(
395 &mut self,
396 schema_name: Option<&str>,
397 table_name: &str,
398 ) -> Result<BoundBaseTable> {
399 let db_name = &self.db_name;
400 let schema_path = self.bind_schema_path(schema_name);
401 let (table_catalog, schema_name) =
402 self.catalog
403 .get_created_table_by_name(db_name, schema_path, table_name)?;
404 let table_catalog = table_catalog.clone();
405
406 let table_id = table_catalog.id();
407 let table_indexes = self.resolve_table_indexes(schema_name, table_id)?;
408
409 let columns = table_catalog.columns.clone();
410
411 self.bind_table_to_context(
412 columns
413 .iter()
414 .map(|c| (c.is_hidden, (&c.column_desc).into())),
415 table_name.to_owned(),
416 None,
417 )?;
418
419 Ok(BoundBaseTable {
420 table_id,
421 table_catalog,
422 table_indexes,
423 as_of: None,
424 })
425 }
426
427 pub(crate) fn check_for_dml(table: &TableCatalog, is_insert: bool) -> Result<()> {
428 let table_name = &table.name;
429 match table.table_type() {
430 TableType::Table => {}
431 TableType::Index => {
432 return Err(ErrorCode::InvalidInputSyntax(format!(
433 "cannot change index \"{table_name}\""
434 ))
435 .into());
436 }
437 TableType::MaterializedView => {
438 return Err(ErrorCode::InvalidInputSyntax(format!(
439 "cannot change materialized view \"{table_name}\""
440 ))
441 .into());
442 }
443 TableType::Internal => {
444 return Err(ErrorCode::InvalidInputSyntax(format!(
445 "cannot change internal table \"{table_name}\""
446 ))
447 .into());
448 }
449 }
450
451 if table.append_only && !is_insert {
452 return Err(ErrorCode::BindError(
453 "append-only table does not support update or delete".to_owned(),
454 )
455 .into());
456 }
457
458 Ok(())
459 }
460}