risingwave_frontend/handler/
wait.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::id::JobId;
17use risingwave_sqlparser::ast::WaitTarget;
18
19use super::RwPgResponse;
20use crate::Binder;
21use crate::catalog::root_catalog::SchemaPath;
22use crate::error::Result;
23use crate::handler::HandlerArgs;
24use crate::session::SessionImpl;
25
26pub(super) async fn handle_wait(
27 handler_args: HandlerArgs,
28 target: WaitTarget,
29) -> Result<RwPgResponse> {
30 do_wait(&handler_args.session, target).await?;
31 Ok(PgResponse::empty_result(StatementType::WAIT))
32}
33
34pub(crate) async fn do_wait(session: &SessionImpl, target: WaitTarget) -> Result<()> {
35 let catalog_writer = session.catalog_writer()?;
36 let job_id = resolve_wait_job_id(session, target)?;
37 catalog_writer.wait(job_id).await?;
38 Ok(())
39}
40
41fn resolve_wait_job_id(session: &SessionImpl, target: WaitTarget) -> Result<Option<JobId>> {
42 let db_name = &session.database();
43 let search_path = session.config().search_path();
44 let user_name = &session.user_name();
45 let reader = session.env().catalog_reader().read_guard();
46
47 match target {
48 WaitTarget::All => Ok(None),
49 WaitTarget::Table(name) => {
50 let (schema_name, real_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
51 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
52 let (table, schema_name) =
53 reader.get_any_table_by_name(db_name, schema_path, &real_name)?;
54 if !table.is_user_table() {
55 return Err(table.bad_drop_error());
56 }
57 session.check_privilege_for_drop_alter(schema_name, &**table)?;
58 Ok(Some(table.id.as_job_id()))
59 }
60 WaitTarget::MaterializedView(name) => {
61 let (schema_name, real_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
62 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
63 let (table, schema_name) =
64 reader.get_any_table_by_name(db_name, schema_path, &real_name)?;
65 if !table.is_mview() {
66 return Err(table.bad_drop_error());
67 }
68 session.check_privilege_for_drop_alter(schema_name, &**table)?;
69 Ok(Some(table.id.as_job_id()))
70 }
71 WaitTarget::Sink(name) => {
72 let (schema_name, real_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
73 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
74 let (sink, schema_name) =
75 reader.get_any_sink_by_name(db_name, schema_path, &real_name)?;
76 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
77 Ok(Some(sink.id.as_job_id()))
78 }
79 WaitTarget::Index(name) => {
80 let (schema_name, real_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
81 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
82 let (index, schema_name) =
83 reader.get_any_index_by_name(db_name, schema_path, &real_name)?;
84 session.check_privilege_for_drop_alter(schema_name, &**index)?;
85 Ok(Some(index.id.as_job_id()))
86 }
87 }
88}