risingwave_frontend/handler/
cancel_job.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::types::Fields;
17use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
18use risingwave_sqlparser::ast::JobIdents;
19
20use super::RwPgResponseBuilderExt;
21use super::util::execute_with_long_running_notification;
22use crate::error::Result;
23use crate::handler::{HandlerArgs, RwPgResponse};
24
25pub(super) async fn handle_cancel(
26 handler_args: HandlerArgs,
27 jobs: JobIdents,
28) -> Result<RwPgResponse> {
29 let session = handler_args.session;
30 let job_ids = jobs.0;
31 let mut filtered_job_ids = vec![];
32 let mut notices = vec![];
33 {
34 let catalog_reader = session.env().catalog_reader().read_guard();
35 for job_id in job_ids {
36 let database_catalog = catalog_reader.get_database_by_name(&session.database())?;
37 let sink_catalog = database_catalog
38 .iter_schemas()
39 .find_map(|schema| schema.get_sink_by_id(&job_id));
40 if let Some(sink_catalog) = sink_catalog {
41 if sink_catalog.is_created() {
42 continue; } else if sink_catalog.target_table.is_some() {
44 notices.push(format!(
45 "Please use `DROP SINK {}` to cancel sink into table job.",
46 sink_catalog.name
47 ));
48 continue;
49 }
50 }
51 filtered_job_ids.push(job_id.into());
52 }
53 }
54
55 let mut response_builder = RwPgResponse::builder(StatementType::CANCEL_COMMAND);
56 for notice in notices {
57 response_builder = response_builder.notice(notice);
58 }
59
60 let canceled_jobs = if !filtered_job_ids.is_empty() {
61 execute_with_long_running_notification(
63 async {
64 session
65 .env()
66 .meta_client()
67 .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
68 job_ids: filtered_job_ids,
69 }))
70 .await
71 .map_err(Into::into)
72 },
73 &session,
74 "CANCEL JOBS",
75 )
76 .await?
77 } else {
78 vec![]
79 };
80 let rows = canceled_jobs
81 .into_iter()
82 .map(|id| CancelRow { id: id.to_string() });
83 Ok(response_builder.rows(rows).into())
84}
85
86#[derive(Fields)]
87#[fields(style = "Title Case")]
88struct CancelRow {
89 id: String,
90}