risingwave_frontend/handler/
cancel_job.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::types::Fields;
17use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
18use risingwave_sqlparser::ast::JobIdents;
19
20use super::RwPgResponseBuilderExt;
21use crate::error::Result;
22use crate::handler::{HandlerArgs, RwPgResponse};
23
24pub(super) async fn handle_cancel(
25 handler_args: HandlerArgs,
26 jobs: JobIdents,
27) -> Result<RwPgResponse> {
28 let session = handler_args.session;
29 let job_ids = jobs.0;
30 let mut filtered_job_ids = vec![];
31 let mut notices = vec![];
32 {
33 let catalog_reader = session.env().catalog_reader().read_guard();
34 for job_id in job_ids {
35 let database_catalog = catalog_reader.get_database_by_name(&session.database())?;
36 let sink_catalog = database_catalog
37 .iter_schemas()
38 .find_map(|schema| schema.get_sink_by_id(&job_id));
39 if let Some(sink_catalog) = sink_catalog {
40 if sink_catalog.is_created() {
41 continue; } else if sink_catalog.target_table.is_some() {
43 notices.push(format!(
44 "Please use `DROP SINK {}` to cancel sink into table job.",
45 sink_catalog.name
46 ));
47 continue;
48 }
49 }
50 filtered_job_ids.push(job_id);
51 }
52 }
53
54 let mut response_builder = RwPgResponse::builder(StatementType::CANCEL_COMMAND);
55 for notice in notices {
56 response_builder = response_builder.notice(notice);
57 }
58
59 let canceled_jobs = if !filtered_job_ids.is_empty() {
60 session
61 .env()
62 .meta_client()
63 .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
64 job_ids: filtered_job_ids,
65 }))
66 .await?
67 } else {
68 vec![]
69 };
70 let rows = canceled_jobs
71 .into_iter()
72 .map(|id| CancelRow { id: id.to_string() });
73 Ok(response_builder.rows(rows).into())
74}
75
76#[derive(Fields)]
77#[fields(style = "Title Case")]
78struct CancelRow {
79 id: String,
80}