1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitMetaData;
20use risingwave_meta::barrier::BarrierManagerRef;
21use risingwave_meta::controller::fragment::StreamingJobInfo;
22use risingwave_meta::manager::MetadataManager;
23use risingwave_meta::model;
24use risingwave_meta::model::ActorId;
25use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
26use risingwave_meta_model::{ObjectId, SinkId, SourceId, StreamingParallelism};
27use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
28use risingwave_pb::meta::list_actor_splits_response::FragmentType;
29use risingwave_pb::meta::list_table_fragments_response::{
30 ActorInfo, FragmentInfo, TableFragmentInfo,
31};
32use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
33use risingwave_pb::meta::table_fragments::PbState;
34use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
35use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
36use risingwave_pb::meta::*;
37use tonic::{Request, Response, Status};
38
39use crate::barrier::{BarrierScheduler, Command};
40use crate::manager::MetaSrvEnv;
41use crate::stream::GlobalStreamManagerRef;
42
43pub type TonicResponse<T> = Result<Response<T>, Status>;
44
45#[derive(Clone)]
46pub struct StreamServiceImpl {
47 env: MetaSrvEnv,
48 barrier_scheduler: BarrierScheduler,
49 barrier_manager: BarrierManagerRef,
50 stream_manager: GlobalStreamManagerRef,
51 metadata_manager: MetadataManager,
52}
53
54impl StreamServiceImpl {
55 pub fn new(
56 env: MetaSrvEnv,
57 barrier_scheduler: BarrierScheduler,
58 barrier_manager: BarrierManagerRef,
59 stream_manager: GlobalStreamManagerRef,
60 metadata_manager: MetadataManager,
61 ) -> Self {
62 StreamServiceImpl {
63 env,
64 barrier_scheduler,
65 barrier_manager,
66 stream_manager,
67 metadata_manager,
68 }
69 }
70}
71
72#[async_trait::async_trait]
73impl StreamManagerService for StreamServiceImpl {
74 #[cfg_attr(coverage, coverage(off))]
75 async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
76 self.env.idle_manager().record_activity();
77 let req = request.into_inner();
78
79 let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?;
80 Ok(Response::new(FlushResponse {
81 status: None,
82 hummock_version_id: version_id.to_u64(),
83 }))
84 }
85
86 #[cfg_attr(coverage, coverage(off))]
87 async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
88 for database_id in self.metadata_manager.list_active_database_ids().await? {
89 self.barrier_scheduler
90 .run_command(database_id, Command::pause())
91 .await?;
92 }
93 Ok(Response::new(PauseResponse {}))
94 }
95
96 #[cfg_attr(coverage, coverage(off))]
97 async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
98 for database_id in self.metadata_manager.list_active_database_ids().await? {
99 self.barrier_scheduler
100 .run_command(database_id, Command::resume())
101 .await?;
102 }
103 Ok(Response::new(ResumeResponse {}))
104 }
105
106 #[cfg_attr(coverage, coverage(off))]
107 async fn apply_throttle(
108 &self,
109 request: Request<ApplyThrottleRequest>,
110 ) -> Result<Response<ApplyThrottleResponse>, Status> {
111 let request = request.into_inner();
112
113 let actor_to_apply = match request.kind() {
114 ThrottleTarget::Source | ThrottleTarget::TableWithSource => {
115 self.metadata_manager
116 .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
117 .await?
118 }
119 ThrottleTarget::Mv => {
120 self.metadata_manager
121 .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
122 .await?
123 }
124 ThrottleTarget::CdcTable => {
125 self.metadata_manager
126 .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
127 .await?
128 }
129 ThrottleTarget::TableDml => {
130 self.metadata_manager
131 .update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate)
132 .await?
133 }
134 ThrottleTarget::Sink => {
135 self.metadata_manager
136 .update_sink_rate_limit_by_sink_id(request.id as SinkId, request.rate)
137 .await?
138 }
139 ThrottleTarget::Fragment => {
140 self.metadata_manager
141 .update_fragment_rate_limit_by_fragment_id(request.id as _, request.rate)
142 .await?
143 }
144 ThrottleTarget::Unspecified => {
145 return Err(Status::invalid_argument("unspecified throttle target"));
146 }
147 };
148
149 let request_id = if request.kind() == ThrottleTarget::Fragment {
150 self.metadata_manager
151 .catalog_controller
152 .get_fragment_streaming_job_id(request.id as _)
153 .await?
154 } else {
155 request.id as _
156 };
157
158 let database_id = self
159 .metadata_manager
160 .catalog_controller
161 .get_object_database_id(request_id as ObjectId)
162 .await?;
163 let database_id = DatabaseId::new(database_id as _);
164 let mutation: ThrottleConfig = actor_to_apply
166 .iter()
167 .map(|(fragment_id, actors)| {
168 (
169 *fragment_id,
170 actors
171 .iter()
172 .map(|actor_id| (*actor_id, request.rate))
173 .collect::<HashMap<ActorId, Option<u32>>>(),
174 )
175 })
176 .collect();
177 let _i = self
178 .barrier_scheduler
179 .run_command(database_id, Command::Throttle(mutation))
180 .await?;
181
182 Ok(Response::new(ApplyThrottleResponse { status: None }))
183 }
184
185 async fn cancel_creating_jobs(
186 &self,
187 request: Request<CancelCreatingJobsRequest>,
188 ) -> TonicResponse<CancelCreatingJobsResponse> {
189 let req = request.into_inner();
190 let table_ids = match req.jobs.unwrap() {
191 Jobs::Infos(infos) => self
192 .metadata_manager
193 .catalog_controller
194 .find_creating_streaming_job_ids(infos.infos)
195 .await?
196 .into_iter()
197 .map(|id| id as _)
198 .collect(),
199 Jobs::Ids(jobs) => jobs.job_ids,
200 };
201
202 let canceled_jobs = self
203 .stream_manager
204 .cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
205 .await
206 .into_iter()
207 .map(|id| id.table_id)
208 .collect_vec();
209 Ok(Response::new(CancelCreatingJobsResponse {
210 status: None,
211 canceled_jobs,
212 }))
213 }
214
215 #[cfg_attr(coverage, coverage(off))]
216 async fn list_table_fragments(
217 &self,
218 request: Request<ListTableFragmentsRequest>,
219 ) -> Result<Response<ListTableFragmentsResponse>, Status> {
220 let req = request.into_inner();
221 let table_ids = HashSet::<u32>::from_iter(req.table_ids);
222
223 let mut info = HashMap::new();
224 for job_id in table_ids {
225 let table_fragments = self
226 .metadata_manager
227 .catalog_controller
228 .get_job_fragments_by_id(job_id as _)
229 .await?;
230 let mut dispatchers = self
231 .metadata_manager
232 .catalog_controller
233 .get_fragment_actor_dispatchers(
234 table_fragments.fragment_ids().map(|id| id as _).collect(),
235 )
236 .await?;
237 let ctx = table_fragments.ctx.to_protobuf();
238 info.insert(
239 table_fragments.stream_job_id().table_id,
240 TableFragmentInfo {
241 fragments: table_fragments
242 .fragments
243 .into_iter()
244 .map(|(id, fragment)| FragmentInfo {
245 id,
246 actors: fragment
247 .actors
248 .into_iter()
249 .map(|actor| ActorInfo {
250 id: actor.actor_id,
251 node: Some(fragment.nodes.clone()),
252 dispatcher: dispatchers
253 .get_mut(&(fragment.fragment_id as _))
254 .and_then(|dispatchers| {
255 dispatchers.remove(&(actor.actor_id as _))
256 })
257 .unwrap_or_default(),
258 })
259 .collect_vec(),
260 })
261 .collect_vec(),
262 ctx: Some(ctx),
263 },
264 );
265 }
266
267 Ok(Response::new(ListTableFragmentsResponse {
268 table_fragments: info,
269 }))
270 }
271
272 #[cfg_attr(coverage, coverage(off))]
273 async fn list_streaming_job_states(
274 &self,
275 _request: Request<ListStreamingJobStatesRequest>,
276 ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
277 let job_infos = self
278 .metadata_manager
279 .catalog_controller
280 .list_streaming_job_infos()
281 .await?;
282 let states = job_infos
283 .into_iter()
284 .map(
285 |StreamingJobInfo {
286 job_id,
287 job_status,
288 name,
289 parallelism,
290 max_parallelism,
291 resource_group,
292 ..
293 }| {
294 let parallelism = match parallelism {
295 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
296 StreamingParallelism::Custom => model::TableParallelism::Custom,
297 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
298 };
299
300 list_streaming_job_states_response::StreamingJobState {
301 table_id: job_id as _,
302 name,
303 state: PbState::from(job_status) as _,
304 parallelism: Some(parallelism.into()),
305 max_parallelism: max_parallelism as _,
306 resource_group,
307 }
308 },
309 )
310 .collect_vec();
311
312 Ok(Response::new(ListStreamingJobStatesResponse { states }))
313 }
314
315 #[cfg_attr(coverage, coverage(off))]
316 async fn list_fragment_distribution(
317 &self,
318 _request: Request<ListFragmentDistributionRequest>,
319 ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
320 let fragment_descs = self
321 .metadata_manager
322 .catalog_controller
323 .list_fragment_descs()
324 .await?;
325 let distributions = fragment_descs
326 .into_iter()
327 .map(|(fragment_desc, upstreams)| {
328 list_fragment_distribution_response::FragmentDistribution {
329 fragment_id: fragment_desc.fragment_id as _,
330 table_id: fragment_desc.job_id as _,
331 distribution_type: PbFragmentDistributionType::from(
332 fragment_desc.distribution_type,
333 ) as _,
334 state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
335 upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
336 fragment_type_mask: fragment_desc.fragment_type_mask as _,
337 parallelism: fragment_desc.parallelism as _,
338 vnode_count: fragment_desc.vnode_count as _,
339 node: Some(fragment_desc.stream_node.to_protobuf()),
340 }
341 })
342 .collect_vec();
343
344 Ok(Response::new(ListFragmentDistributionResponse {
345 distributions,
346 }))
347 }
348
349 #[cfg_attr(coverage, coverage(off))]
350 async fn list_actor_states(
351 &self,
352 _request: Request<ListActorStatesRequest>,
353 ) -> Result<Response<ListActorStatesResponse>, Status> {
354 let actor_locations = self
355 .metadata_manager
356 .catalog_controller
357 .list_actor_locations()
358 .await?;
359 let states = actor_locations
360 .into_iter()
361 .map(|actor_location| list_actor_states_response::ActorState {
362 actor_id: actor_location.actor_id as _,
363 fragment_id: actor_location.fragment_id as _,
364 state: PbActorState::from(actor_location.status) as _,
365 worker_id: actor_location.worker_id as _,
366 })
367 .collect_vec();
368
369 Ok(Response::new(ListActorStatesResponse { states }))
370 }
371
372 #[cfg_attr(coverage, coverage(off))]
373 async fn list_object_dependencies(
374 &self,
375 _request: Request<ListObjectDependenciesRequest>,
376 ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
377 let dependencies = self
378 .metadata_manager
379 .catalog_controller
380 .list_created_object_dependencies()
381 .await?;
382
383 Ok(Response::new(ListObjectDependenciesResponse {
384 dependencies,
385 }))
386 }
387
388 #[cfg_attr(coverage, coverage(off))]
389 async fn recover(
390 &self,
391 _request: Request<RecoverRequest>,
392 ) -> Result<Response<RecoverResponse>, Status> {
393 self.barrier_manager.adhoc_recovery().await?;
394 Ok(Response::new(RecoverResponse {}))
395 }
396
397 async fn list_actor_splits(
398 &self,
399 _request: Request<ListActorSplitsRequest>,
400 ) -> Result<Response<ListActorSplitsResponse>, Status> {
401 let SourceManagerRunningInfo {
402 source_fragments,
403 backfill_fragments,
404 mut actor_splits,
405 } = self.stream_manager.source_manager.get_running_info().await;
406
407 let source_actors = self
408 .metadata_manager
409 .catalog_controller
410 .list_source_actors()
411 .await?;
412
413 let is_shared_source = self
414 .metadata_manager
415 .catalog_controller
416 .list_source_id_with_shared_types()
417 .await?;
418
419 let fragment_to_source: HashMap<_, _> = source_fragments
420 .into_iter()
421 .flat_map(|(source_id, fragment_ids)| {
422 let source_type = if is_shared_source
423 .get(&(source_id as _))
424 .copied()
425 .unwrap_or(false)
426 {
427 FragmentType::SharedSource
428 } else {
429 FragmentType::NonSharedSource
430 };
431
432 fragment_ids
433 .into_iter()
434 .map(move |fragment_id| (fragment_id, (source_id, source_type)))
435 })
436 .chain(
437 backfill_fragments
438 .into_iter()
439 .flat_map(|(source_id, fragment_ids)| {
440 fragment_ids.into_iter().flat_map(
441 move |(fragment_id, upstream_fragment_id)| {
442 [
443 (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
444 (
445 upstream_fragment_id,
446 (source_id, FragmentType::SharedSource),
447 ),
448 ]
449 },
450 )
451 }),
452 )
453 .collect();
454
455 let actor_splits = source_actors
456 .into_iter()
457 .flat_map(|(actor_id, fragment_id)| {
458 let (source_id, fragment_type) = fragment_to_source
459 .get(&(fragment_id as _))
460 .copied()
461 .unwrap_or_default();
462
463 actor_splits
464 .remove(&(actor_id as _))
465 .unwrap_or_default()
466 .into_iter()
467 .map(move |split| list_actor_splits_response::ActorSplit {
468 actor_id: actor_id as _,
469 source_id: source_id as _,
470 fragment_id: fragment_id as _,
471 split_id: split.id().to_string(),
472 fragment_type: fragment_type.into(),
473 })
474 })
475 .collect_vec();
476
477 Ok(Response::new(ListActorSplitsResponse { actor_splits }))
478 }
479
480 async fn list_rate_limits(
481 &self,
482 _request: Request<ListRateLimitsRequest>,
483 ) -> Result<Response<ListRateLimitsResponse>, Status> {
484 let rate_limits = self
485 .metadata_manager
486 .catalog_controller
487 .list_rate_limits()
488 .await?;
489 Ok(Response::new(ListRateLimitsResponse { rate_limits }))
490 }
491}