risingwave_frontend/handler/
alter_utils.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::bail;
17use risingwave_pb::id::JobId;
18use risingwave_sqlparser::ast::ObjectName;
19
20use crate::Binder;
21use crate::catalog::CatalogError;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::catalog::table_catalog::TableType;
24use crate::error::{Result, bail_invalid_input_syntax};
25use crate::session::SessionImpl;
26
27pub(super) fn resolve_streaming_job_id_for_alter(
32 session: &SessionImpl,
33 obj_name: ObjectName,
34 alter_stmt_type: StatementType,
35 alter_target: &str,
36) -> Result<JobId> {
37 resolve_streaming_job_id_for_alter_impl(session, obj_name, alter_stmt_type, alter_target, false)
38}
39
40pub(super) fn resolve_streaming_job_id_for_alter_parallelism(
41 session: &SessionImpl,
42 obj_name: ObjectName,
43 alter_stmt_type: StatementType,
44 alter_target: &str,
45) -> Result<JobId> {
46 resolve_streaming_job_id_for_alter_impl(session, obj_name, alter_stmt_type, alter_target, true)
47}
48
49fn resolve_streaming_job_id_for_alter_impl(
50 session: &SessionImpl,
51 obj_name: ObjectName,
52 alter_stmt_type: StatementType,
53 alter_target: &str,
54 include_creating_table: bool,
55) -> Result<JobId> {
56 let db_name = &session.database();
57 let (schema_name, real_table_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
58 let search_path = session.config().search_path();
59 let user_name = &session.user_name();
60 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
61 let reader = session.env().catalog_reader().read_guard();
62
63 let job_id = match alter_stmt_type {
64 StatementType::ALTER_TABLE
65 | StatementType::ALTER_MATERIALIZED_VIEW
66 | StatementType::ALTER_INDEX => {
67 let (table, schema_name) = if include_creating_table {
68 reader.get_table_by_name(db_name, schema_path, &real_table_name, true)?
69 } else {
70 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?
71 };
72
73 match (table.table_type(), alter_stmt_type) {
74 (TableType::Internal, _) => {
75 return Err(CatalogError::not_found("table", table.name()).into());
77 }
78 (TableType::Table, StatementType::ALTER_TABLE)
79 | (TableType::MaterializedView, StatementType::ALTER_MATERIALIZED_VIEW)
80 | (TableType::Index, StatementType::ALTER_INDEX) => {}
81 _ => {
82 bail_invalid_input_syntax!(
83 "cannot alter {alter_target} of {} {} by {}",
84 table.table_type().to_prost().as_str_name(),
85 table.name(),
86 alter_stmt_type,
87 );
88 }
89 }
90
91 session.check_privilege_for_drop_alter(schema_name, &**table)?;
92 table.id.as_job_id()
93 }
94 StatementType::ALTER_SOURCE => {
95 let (source, schema_name) =
96 reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
97
98 if !source.info.is_shared() {
99 bail_invalid_input_syntax!(
100 "cannot alter {alter_target} of non-shared source.\n\
101 Use `ALTER MATERIALIZED VIEW` to alter the materialized view using the source instead."
102 );
103 }
104
105 session.check_privilege_for_drop_alter(schema_name, &**source)?;
106 source.id.as_share_source_job_id()
107 }
108 StatementType::ALTER_SINK => {
109 let (sink, schema_name) =
110 reader.get_created_sink_by_name(db_name, schema_path, &real_table_name)?;
111
112 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
113 sink.id.as_job_id()
114 }
115 _ => bail!(
116 "invalid statement type for alter {alter_target}: {:?}",
117 alter_stmt_type
118 ),
119 };
120
121 Ok(job_id)
122}