risingwave_frontend/handler/
wait.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::id::JobId;
17use risingwave_sqlparser::ast::WaitTarget;
18
19use super::RwPgResponse;
20use crate::Binder;
21use crate::catalog::root_catalog::SchemaPath;
22use crate::error::Result;
23use crate::handler::HandlerArgs;
24use crate::session::SessionImpl;
25
26pub(super) async fn handle_wait(
27    handler_args: HandlerArgs,
28    target: WaitTarget,
29) -> Result<RwPgResponse> {
30    do_wait(&handler_args.session, target).await?;
31    Ok(PgResponse::empty_result(StatementType::WAIT))
32}
33
34pub(crate) async fn do_wait(session: &SessionImpl, target: WaitTarget) -> Result<()> {
35    let catalog_writer = session.catalog_writer()?;
36    let job_id = resolve_wait_job_id(session, target)?;
37    catalog_writer.wait(job_id).await?;
38    Ok(())
39}
40
41fn resolve_wait_job_id(session: &SessionImpl, target: WaitTarget) -> Result<Option<JobId>> {
42    let db_name = &session.database();
43    let search_path = session.config().search_path();
44    let user_name = &session.user_name();
45    let reader = session.env().catalog_reader().read_guard();
46
47    match target {
48        WaitTarget::All => Ok(None),
49        WaitTarget::Table(name) => {
50            let (schema_name, real_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
51            let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
52            let (table, schema_name) =
53                reader.get_any_table_by_name(db_name, schema_path, &real_name)?;
54            if !table.is_user_table() {
55                return Err(table.bad_drop_error());
56            }
57            session.check_privilege_for_drop_alter(schema_name, &**table)?;
58            Ok(Some(table.id.as_job_id()))
59        }
60        WaitTarget::MaterializedView(name) => {
61            let (schema_name, real_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
62            let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
63            let (table, schema_name) =
64                reader.get_any_table_by_name(db_name, schema_path, &real_name)?;
65            if !table.is_mview() {
66                return Err(table.bad_drop_error());
67            }
68            session.check_privilege_for_drop_alter(schema_name, &**table)?;
69            Ok(Some(table.id.as_job_id()))
70        }
71        WaitTarget::Sink(name) => {
72            let (schema_name, real_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
73            let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
74            let (sink, schema_name) =
75                reader.get_any_sink_by_name(db_name, schema_path, &real_name)?;
76            session.check_privilege_for_drop_alter(schema_name, &**sink)?;
77            Ok(Some(sink.id.as_job_id()))
78        }
79        WaitTarget::Index(name) => {
80            let (schema_name, real_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
81            let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
82            let (index, schema_name) =
83                reader.get_any_index_by_name(db_name, schema_path, &real_name)?;
84            session.check_privilege_for_drop_alter(schema_name, &**index)?;
85            Ok(Some(index.id.as_job_id()))
86        }
87    }
88}