risingwave_frontend/handler/
alter_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::bail;
17use risingwave_pb::id::JobId;
18use risingwave_sqlparser::ast::ObjectName;
19
20use crate::Binder;
21use crate::catalog::CatalogError;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::catalog::table_catalog::TableType;
24use crate::error::{Result, bail_invalid_input_syntax};
25use crate::session::SessionImpl;
26
27/// Resolve the **streaming** job id for alter operations.
28///
29/// This function will decide which catalog to lookup based on the given statement type, which should
30/// be one of `ALTER TABLE`, `ALTER MATERIALIZED VIEW`, `ALTER SOURCE`, `ALTER SINK`, `ALTER INDEX`.
31pub(super) fn resolve_streaming_job_id_for_alter(
32    session: &SessionImpl,
33    obj_name: ObjectName,
34    alter_stmt_type: StatementType,
35    alter_target: &str,
36) -> Result<JobId> {
37    resolve_streaming_job_id_for_alter_impl(session, obj_name, alter_stmt_type, alter_target, false)
38}
39
40pub(super) fn resolve_streaming_job_id_for_alter_parallelism(
41    session: &SessionImpl,
42    obj_name: ObjectName,
43    alter_stmt_type: StatementType,
44    alter_target: &str,
45) -> Result<JobId> {
46    resolve_streaming_job_id_for_alter_impl(session, obj_name, alter_stmt_type, alter_target, true)
47}
48
49fn resolve_streaming_job_id_for_alter_impl(
50    session: &SessionImpl,
51    obj_name: ObjectName,
52    alter_stmt_type: StatementType,
53    alter_target: &str,
54    include_creating_table: bool,
55) -> Result<JobId> {
56    let db_name = &session.database();
57    let (schema_name, real_table_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
58    let search_path = session.config().search_path();
59    let user_name = &session.user_name();
60    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
61    let reader = session.env().catalog_reader().read_guard();
62
63    let job_id = match alter_stmt_type {
64        StatementType::ALTER_TABLE
65        | StatementType::ALTER_MATERIALIZED_VIEW
66        | StatementType::ALTER_INDEX => {
67            let (table, schema_name) = if include_creating_table {
68                reader.get_table_by_name(db_name, schema_path, &real_table_name, true)?
69            } else {
70                reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?
71            };
72
73            match (table.table_type(), alter_stmt_type) {
74                (TableType::Internal, _) => {
75                    // we treat internal table as NOT FOUND
76                    return Err(CatalogError::not_found("table", table.name()).into());
77                }
78                (TableType::Table, StatementType::ALTER_TABLE)
79                | (TableType::MaterializedView, StatementType::ALTER_MATERIALIZED_VIEW)
80                | (TableType::Index, StatementType::ALTER_INDEX) => {}
81                _ => {
82                    bail_invalid_input_syntax!(
83                        "cannot alter {alter_target} of {} {} by {}",
84                        table.table_type().to_prost().as_str_name(),
85                        table.name(),
86                        alter_stmt_type,
87                    );
88                }
89            }
90
91            session.check_privilege_for_drop_alter(schema_name, &**table)?;
92            table.id.as_job_id()
93        }
94        StatementType::ALTER_SOURCE => {
95            let (source, schema_name) =
96                reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
97
98            if !source.info.is_shared() {
99                bail_invalid_input_syntax!(
100                    "cannot alter {alter_target} of non-shared source.\n\
101                     Use `ALTER MATERIALIZED VIEW` to alter the materialized view using the source instead."
102                );
103            }
104
105            session.check_privilege_for_drop_alter(schema_name, &**source)?;
106            source.id.as_share_source_job_id()
107        }
108        StatementType::ALTER_SINK => {
109            let (sink, schema_name) =
110                reader.get_created_sink_by_name(db_name, schema_path, &real_table_name)?;
111
112            session.check_privilege_for_drop_alter(schema_name, &**sink)?;
113            sink.id.as_job_id()
114        }
115        _ => bail!(
116            "invalid statement type for alter {alter_target}: {:?}",
117            alter_stmt_type
118        ),
119    };
120
121    Ok(job_id)
122}