risingwave_frontend/scheduler/
streaming_manager.rs1use std::collections::HashMap;
16use std::fmt::{Debug, Formatter};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use pgwire::pg_server::SessionId;
22use risingwave_pb::meta::cancel_creating_jobs_request::{
23 CreatingJobInfo, CreatingJobInfos, PbJobs,
24};
25use risingwave_rpc_client::error::Result;
26use uuid::Uuid;
27
28use crate::catalog::{DatabaseId, SchemaId};
29use crate::meta_client::FrontendMetaClient;
30
31#[derive(Clone, Debug, Hash, Eq, PartialEq)]
32pub struct TaskId {
33 pub id: String,
34}
35
36impl std::fmt::Display for TaskId {
37 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 write!(f, "TaskId:{}", self.id)
39 }
40}
41
42impl Default for TaskId {
43 fn default() -> Self {
44 Self {
45 id: Uuid::new_v4().to_string(),
46 }
47 }
48}
49
50pub type StreamingJobTrackerRef = Arc<StreamingJobTracker>;
51
52pub struct StreamingJobTracker {
53 creating_streaming_job: RwLock<HashMap<TaskId, CreatingStreamingJobInfo>>,
54 meta_client: Arc<dyn FrontendMetaClient>,
55}
56
57impl StreamingJobTracker {
58 pub fn new(meta_client: Arc<dyn FrontendMetaClient>) -> Self {
59 Self {
60 creating_streaming_job: RwLock::new(HashMap::default()),
61 meta_client,
62 }
63 }
64}
65
66#[derive(Clone, Default)]
67pub struct CreatingStreamingJobInfo {
68 session_id: SessionId,
70 info: CreatingJobInfo,
71}
72
73impl CreatingStreamingJobInfo {
74 pub fn new(
75 session_id: SessionId,
76 database_id: DatabaseId,
77 schema_id: SchemaId,
78 name: String,
79 ) -> Self {
80 Self {
81 session_id,
82 info: CreatingJobInfo {
83 database_id,
84 schema_id,
85 name,
86 },
87 }
88 }
89}
90
91pub struct StreamingJobGuard<'a> {
92 task_id: TaskId,
93 tracker: &'a StreamingJobTracker,
94}
95
96impl Drop for StreamingJobGuard<'_> {
97 fn drop(&mut self) {
98 self.tracker.delete_job(&self.task_id);
99 }
100}
101
102impl StreamingJobTracker {
103 pub fn guard(&self, task_info: CreatingStreamingJobInfo) -> StreamingJobGuard<'_> {
104 let task_id = TaskId::default();
105 self.add_job(task_id.clone(), task_info);
106 StreamingJobGuard {
107 task_id,
108 tracker: self,
109 }
110 }
111
112 fn add_job(&self, task_id: TaskId, info: CreatingStreamingJobInfo) {
113 self.creating_streaming_job.write().insert(task_id, info);
114 }
115
116 fn delete_job(&self, task_id: &TaskId) {
117 self.creating_streaming_job.write().remove(task_id);
118 }
119
120 fn jobs_for_session(&self, session_id: SessionId) -> Vec<CreatingJobInfo> {
121 self.creating_streaming_job
122 .read()
123 .values()
124 .filter(|job| job.session_id == session_id)
125 .map(|job| job.info.clone())
126 .collect_vec()
127 }
128
129 pub async fn cancel_jobs(&self, session_id: SessionId) -> Result<bool> {
130 let jobs = self.jobs_for_session(session_id);
131 if jobs.is_empty() {
132 return Ok(false);
133 }
134 self.meta_client
135 .cancel_creating_jobs(PbJobs::Infos(CreatingJobInfos { infos: jobs }))
136 .await?;
137 Ok(true)
138 }
139
140 pub fn abort_jobs(&self, session_id: SessionId) {
141 let jobs = self.jobs_for_session(session_id);
142 if jobs.is_empty() {
143 return;
144 }
145
146 let client = self.meta_client.clone();
147 tokio::spawn(async move {
148 client
149 .cancel_creating_jobs(PbJobs::Infos(CreatingJobInfos { infos: jobs }))
150 .await
151 });
152 }
153}