risingwave_frontend/scheduler/
streaming_manager.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Debug, Formatter};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use pgwire::pg_server::SessionId;
22use risingwave_pb::meta::cancel_creating_jobs_request::{
23    CreatingJobInfo, CreatingJobInfos, PbJobs,
24};
25use risingwave_rpc_client::error::Result;
26use uuid::Uuid;
27
28use crate::catalog::{DatabaseId, SchemaId};
29use crate::meta_client::FrontendMetaClient;
30
31#[derive(Clone, Debug, Hash, Eq, PartialEq)]
32pub struct TaskId {
33    pub id: String,
34}
35
36impl std::fmt::Display for TaskId {
37    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38        write!(f, "TaskId:{}", self.id)
39    }
40}
41
42impl Default for TaskId {
43    fn default() -> Self {
44        Self {
45            id: Uuid::new_v4().to_string(),
46        }
47    }
48}
49
50pub type StreamingJobTrackerRef = Arc<StreamingJobTracker>;
51
52pub struct StreamingJobTracker {
53    creating_streaming_job: RwLock<HashMap<TaskId, CreatingStreamingJobInfo>>,
54    meta_client: Arc<dyn FrontendMetaClient>,
55}
56
57impl StreamingJobTracker {
58    pub fn new(meta_client: Arc<dyn FrontendMetaClient>) -> Self {
59        Self {
60            creating_streaming_job: RwLock::new(HashMap::default()),
61            meta_client,
62        }
63    }
64}
65
66#[derive(Clone, Default)]
67pub struct CreatingStreamingJobInfo {
68    /// Identified by `process_id`, `secret_key`.
69    session_id: SessionId,
70    info: CreatingJobInfo,
71}
72
73impl CreatingStreamingJobInfo {
74    pub fn new(
75        session_id: SessionId,
76        database_id: DatabaseId,
77        schema_id: SchemaId,
78        name: String,
79    ) -> Self {
80        Self {
81            session_id,
82            info: CreatingJobInfo {
83                database_id,
84                schema_id,
85                name,
86            },
87        }
88    }
89}
90
91pub struct StreamingJobGuard<'a> {
92    task_id: TaskId,
93    tracker: &'a StreamingJobTracker,
94}
95
96impl Drop for StreamingJobGuard<'_> {
97    fn drop(&mut self) {
98        self.tracker.delete_job(&self.task_id);
99    }
100}
101
102impl StreamingJobTracker {
103    pub fn guard(&self, task_info: CreatingStreamingJobInfo) -> StreamingJobGuard<'_> {
104        let task_id = TaskId::default();
105        self.add_job(task_id.clone(), task_info);
106        StreamingJobGuard {
107            task_id,
108            tracker: self,
109        }
110    }
111
112    fn add_job(&self, task_id: TaskId, info: CreatingStreamingJobInfo) {
113        self.creating_streaming_job.write().insert(task_id, info);
114    }
115
116    fn delete_job(&self, task_id: &TaskId) {
117        self.creating_streaming_job.write().remove(task_id);
118    }
119
120    fn jobs_for_session(&self, session_id: SessionId) -> Vec<CreatingJobInfo> {
121        self.creating_streaming_job
122            .read()
123            .values()
124            .filter(|job| job.session_id == session_id)
125            .map(|job| job.info.clone())
126            .collect_vec()
127    }
128
129    pub async fn cancel_jobs(&self, session_id: SessionId) -> Result<bool> {
130        let jobs = self.jobs_for_session(session_id);
131        if jobs.is_empty() {
132            return Ok(false);
133        }
134        self.meta_client
135            .cancel_creating_jobs(PbJobs::Infos(CreatingJobInfos { infos: jobs }))
136            .await?;
137        Ok(true)
138    }
139
140    pub fn abort_jobs(&self, session_id: SessionId) {
141        let jobs = self.jobs_for_session(session_id);
142        if jobs.is_empty() {
143            return;
144        }
145
146        let client = self.meta_client.clone();
147        tokio::spawn(async move {
148            client
149                .cancel_creating_jobs(PbJobs::Infos(CreatingJobInfos { infos: jobs }))
150                .await
151        });
152    }
153}