risingwave_meta_service/
stream_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_connector::source::SplitMetaData;
20use risingwave_meta::barrier::BarrierManagerRef;
21use risingwave_meta::controller::fragment::StreamingJobInfo;
22use risingwave_meta::manager::MetadataManager;
23use risingwave_meta::model;
24use risingwave_meta::model::ActorId;
25use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
26use risingwave_meta_model::{ObjectId, SinkId, SourceId, StreamingParallelism};
27use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
28use risingwave_pb::meta::list_actor_splits_response::FragmentType;
29use risingwave_pb::meta::list_table_fragments_response::{
30    ActorInfo, FragmentInfo, TableFragmentInfo,
31};
32use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
33use risingwave_pb::meta::table_fragments::PbState;
34use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
35use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
36use risingwave_pb::meta::*;
37use tonic::{Request, Response, Status};
38
39use crate::barrier::{BarrierScheduler, Command};
40use crate::manager::MetaSrvEnv;
41use crate::stream::GlobalStreamManagerRef;
42
43pub type TonicResponse<T> = Result<Response<T>, Status>;
44
45#[derive(Clone)]
46pub struct StreamServiceImpl {
47    env: MetaSrvEnv,
48    barrier_scheduler: BarrierScheduler,
49    barrier_manager: BarrierManagerRef,
50    stream_manager: GlobalStreamManagerRef,
51    metadata_manager: MetadataManager,
52}
53
54impl StreamServiceImpl {
55    pub fn new(
56        env: MetaSrvEnv,
57        barrier_scheduler: BarrierScheduler,
58        barrier_manager: BarrierManagerRef,
59        stream_manager: GlobalStreamManagerRef,
60        metadata_manager: MetadataManager,
61    ) -> Self {
62        StreamServiceImpl {
63            env,
64            barrier_scheduler,
65            barrier_manager,
66            stream_manager,
67            metadata_manager,
68        }
69    }
70}
71
72#[async_trait::async_trait]
73impl StreamManagerService for StreamServiceImpl {
74    #[cfg_attr(coverage, coverage(off))]
75    async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
76        self.env.idle_manager().record_activity();
77        let req = request.into_inner();
78
79        let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?;
80        Ok(Response::new(FlushResponse {
81            status: None,
82            hummock_version_id: version_id.to_u64(),
83        }))
84    }
85
86    #[cfg_attr(coverage, coverage(off))]
87    async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
88        for database_id in self.metadata_manager.list_active_database_ids().await? {
89            self.barrier_scheduler
90                .run_command(database_id, Command::pause())
91                .await?;
92        }
93        Ok(Response::new(PauseResponse {}))
94    }
95
96    #[cfg_attr(coverage, coverage(off))]
97    async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
98        for database_id in self.metadata_manager.list_active_database_ids().await? {
99            self.barrier_scheduler
100                .run_command(database_id, Command::resume())
101                .await?;
102        }
103        Ok(Response::new(ResumeResponse {}))
104    }
105
106    #[cfg_attr(coverage, coverage(off))]
107    async fn apply_throttle(
108        &self,
109        request: Request<ApplyThrottleRequest>,
110    ) -> Result<Response<ApplyThrottleResponse>, Status> {
111        let request = request.into_inner();
112
113        let actor_to_apply = match request.kind() {
114            ThrottleTarget::Source | ThrottleTarget::TableWithSource => {
115                self.metadata_manager
116                    .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
117                    .await?
118            }
119            ThrottleTarget::Mv => {
120                self.metadata_manager
121                    .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
122                    .await?
123            }
124            ThrottleTarget::CdcTable => {
125                self.metadata_manager
126                    .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
127                    .await?
128            }
129            ThrottleTarget::TableDml => {
130                self.metadata_manager
131                    .update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate)
132                    .await?
133            }
134            ThrottleTarget::Sink => {
135                self.metadata_manager
136                    .update_sink_rate_limit_by_sink_id(request.id as SinkId, request.rate)
137                    .await?
138            }
139            ThrottleTarget::Fragment => {
140                self.metadata_manager
141                    .update_fragment_rate_limit_by_fragment_id(request.id as _, request.rate)
142                    .await?
143            }
144            ThrottleTarget::Unspecified => {
145                return Err(Status::invalid_argument("unspecified throttle target"));
146            }
147        };
148
149        let request_id = if request.kind() == ThrottleTarget::Fragment {
150            self.metadata_manager
151                .catalog_controller
152                .get_fragment_streaming_job_id(request.id as _)
153                .await?
154        } else {
155            request.id as _
156        };
157
158        let database_id = self
159            .metadata_manager
160            .catalog_controller
161            .get_object_database_id(request_id as ObjectId)
162            .await?;
163        let database_id = DatabaseId::new(database_id as _);
164        // TODO: check whether shared source is correct
165        let mutation: ThrottleConfig = actor_to_apply
166            .iter()
167            .map(|(fragment_id, actors)| {
168                (
169                    *fragment_id,
170                    actors
171                        .iter()
172                        .map(|actor_id| (*actor_id, request.rate))
173                        .collect::<HashMap<ActorId, Option<u32>>>(),
174                )
175            })
176            .collect();
177        let _i = self
178            .barrier_scheduler
179            .run_command(database_id, Command::Throttle(mutation))
180            .await?;
181
182        Ok(Response::new(ApplyThrottleResponse { status: None }))
183    }
184
185    async fn cancel_creating_jobs(
186        &self,
187        request: Request<CancelCreatingJobsRequest>,
188    ) -> TonicResponse<CancelCreatingJobsResponse> {
189        let req = request.into_inner();
190        let table_ids = match req.jobs.unwrap() {
191            Jobs::Infos(infos) => self
192                .metadata_manager
193                .catalog_controller
194                .find_creating_streaming_job_ids(infos.infos)
195                .await?
196                .into_iter()
197                .map(|id| id as _)
198                .collect(),
199            Jobs::Ids(jobs) => jobs.job_ids,
200        };
201
202        let canceled_jobs = self
203            .stream_manager
204            .cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
205            .await
206            .into_iter()
207            .map(|id| id.table_id)
208            .collect_vec();
209        Ok(Response::new(CancelCreatingJobsResponse {
210            status: None,
211            canceled_jobs,
212        }))
213    }
214
215    #[cfg_attr(coverage, coverage(off))]
216    async fn list_table_fragments(
217        &self,
218        request: Request<ListTableFragmentsRequest>,
219    ) -> Result<Response<ListTableFragmentsResponse>, Status> {
220        let req = request.into_inner();
221        let table_ids = HashSet::<u32>::from_iter(req.table_ids);
222
223        let mut info = HashMap::new();
224        for job_id in table_ids {
225            let table_fragments = self
226                .metadata_manager
227                .catalog_controller
228                .get_job_fragments_by_id(job_id as _)
229                .await?;
230            let mut dispatchers = self
231                .metadata_manager
232                .catalog_controller
233                .get_fragment_actor_dispatchers(
234                    table_fragments.fragment_ids().map(|id| id as _).collect(),
235                )
236                .await?;
237            let ctx = table_fragments.ctx.to_protobuf();
238            info.insert(
239                table_fragments.stream_job_id().table_id,
240                TableFragmentInfo {
241                    fragments: table_fragments
242                        .fragments
243                        .into_iter()
244                        .map(|(id, fragment)| FragmentInfo {
245                            id,
246                            actors: fragment
247                                .actors
248                                .into_iter()
249                                .map(|actor| ActorInfo {
250                                    id: actor.actor_id,
251                                    node: Some(fragment.nodes.clone()),
252                                    dispatcher: dispatchers
253                                        .get_mut(&(fragment.fragment_id as _))
254                                        .and_then(|dispatchers| {
255                                            dispatchers.remove(&(actor.actor_id as _))
256                                        })
257                                        .unwrap_or_default(),
258                                })
259                                .collect_vec(),
260                        })
261                        .collect_vec(),
262                    ctx: Some(ctx),
263                },
264            );
265        }
266
267        Ok(Response::new(ListTableFragmentsResponse {
268            table_fragments: info,
269        }))
270    }
271
272    #[cfg_attr(coverage, coverage(off))]
273    async fn list_streaming_job_states(
274        &self,
275        _request: Request<ListStreamingJobStatesRequest>,
276    ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
277        let job_infos = self
278            .metadata_manager
279            .catalog_controller
280            .list_streaming_job_infos()
281            .await?;
282        let states = job_infos
283            .into_iter()
284            .map(
285                |StreamingJobInfo {
286                     job_id,
287                     job_status,
288                     name,
289                     parallelism,
290                     max_parallelism,
291                     resource_group,
292                     ..
293                 }| {
294                    let parallelism = match parallelism {
295                        StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
296                        StreamingParallelism::Custom => model::TableParallelism::Custom,
297                        StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
298                    };
299
300                    list_streaming_job_states_response::StreamingJobState {
301                        table_id: job_id as _,
302                        name,
303                        state: PbState::from(job_status) as _,
304                        parallelism: Some(parallelism.into()),
305                        max_parallelism: max_parallelism as _,
306                        resource_group,
307                    }
308                },
309            )
310            .collect_vec();
311
312        Ok(Response::new(ListStreamingJobStatesResponse { states }))
313    }
314
315    #[cfg_attr(coverage, coverage(off))]
316    async fn list_fragment_distribution(
317        &self,
318        _request: Request<ListFragmentDistributionRequest>,
319    ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
320        let fragment_descs = self
321            .metadata_manager
322            .catalog_controller
323            .list_fragment_descs()
324            .await?;
325        let distributions = fragment_descs
326            .into_iter()
327            .map(|(fragment_desc, upstreams)| {
328                list_fragment_distribution_response::FragmentDistribution {
329                    fragment_id: fragment_desc.fragment_id as _,
330                    table_id: fragment_desc.job_id as _,
331                    distribution_type: PbFragmentDistributionType::from(
332                        fragment_desc.distribution_type,
333                    ) as _,
334                    state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
335                    upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
336                    fragment_type_mask: fragment_desc.fragment_type_mask as _,
337                    parallelism: fragment_desc.parallelism as _,
338                    vnode_count: fragment_desc.vnode_count as _,
339                    node: Some(fragment_desc.stream_node.to_protobuf()),
340                }
341            })
342            .collect_vec();
343
344        Ok(Response::new(ListFragmentDistributionResponse {
345            distributions,
346        }))
347    }
348
349    #[cfg_attr(coverage, coverage(off))]
350    async fn list_actor_states(
351        &self,
352        _request: Request<ListActorStatesRequest>,
353    ) -> Result<Response<ListActorStatesResponse>, Status> {
354        let actor_locations = self
355            .metadata_manager
356            .catalog_controller
357            .list_actor_locations()
358            .await?;
359        let states = actor_locations
360            .into_iter()
361            .map(|actor_location| list_actor_states_response::ActorState {
362                actor_id: actor_location.actor_id as _,
363                fragment_id: actor_location.fragment_id as _,
364                state: PbActorState::from(actor_location.status) as _,
365                worker_id: actor_location.worker_id as _,
366            })
367            .collect_vec();
368
369        Ok(Response::new(ListActorStatesResponse { states }))
370    }
371
372    #[cfg_attr(coverage, coverage(off))]
373    async fn list_object_dependencies(
374        &self,
375        _request: Request<ListObjectDependenciesRequest>,
376    ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
377        let dependencies = self
378            .metadata_manager
379            .catalog_controller
380            .list_created_object_dependencies()
381            .await?;
382
383        Ok(Response::new(ListObjectDependenciesResponse {
384            dependencies,
385        }))
386    }
387
388    #[cfg_attr(coverage, coverage(off))]
389    async fn recover(
390        &self,
391        _request: Request<RecoverRequest>,
392    ) -> Result<Response<RecoverResponse>, Status> {
393        self.barrier_manager.adhoc_recovery().await?;
394        Ok(Response::new(RecoverResponse {}))
395    }
396
397    async fn list_actor_splits(
398        &self,
399        _request: Request<ListActorSplitsRequest>,
400    ) -> Result<Response<ListActorSplitsResponse>, Status> {
401        let SourceManagerRunningInfo {
402            source_fragments,
403            backfill_fragments,
404            mut actor_splits,
405        } = self.stream_manager.source_manager.get_running_info().await;
406
407        let source_actors = self
408            .metadata_manager
409            .catalog_controller
410            .list_source_actors()
411            .await?;
412
413        let is_shared_source = self
414            .metadata_manager
415            .catalog_controller
416            .list_source_id_with_shared_types()
417            .await?;
418
419        let fragment_to_source: HashMap<_, _> = source_fragments
420            .into_iter()
421            .flat_map(|(source_id, fragment_ids)| {
422                let source_type = if is_shared_source
423                    .get(&(source_id as _))
424                    .copied()
425                    .unwrap_or(false)
426                {
427                    FragmentType::SharedSource
428                } else {
429                    FragmentType::NonSharedSource
430                };
431
432                fragment_ids
433                    .into_iter()
434                    .map(move |fragment_id| (fragment_id, (source_id, source_type)))
435            })
436            .chain(
437                backfill_fragments
438                    .into_iter()
439                    .flat_map(|(source_id, fragment_ids)| {
440                        fragment_ids.into_iter().flat_map(
441                            move |(fragment_id, upstream_fragment_id)| {
442                                [
443                                    (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
444                                    (
445                                        upstream_fragment_id,
446                                        (source_id, FragmentType::SharedSource),
447                                    ),
448                                ]
449                            },
450                        )
451                    }),
452            )
453            .collect();
454
455        let actor_splits = source_actors
456            .into_iter()
457            .flat_map(|(actor_id, fragment_id)| {
458                let (source_id, fragment_type) = fragment_to_source
459                    .get(&(fragment_id as _))
460                    .copied()
461                    .unwrap_or_default();
462
463                actor_splits
464                    .remove(&(actor_id as _))
465                    .unwrap_or_default()
466                    .into_iter()
467                    .map(move |split| list_actor_splits_response::ActorSplit {
468                        actor_id: actor_id as _,
469                        source_id: source_id as _,
470                        fragment_id: fragment_id as _,
471                        split_id: split.id().to_string(),
472                        fragment_type: fragment_type.into(),
473                    })
474            })
475            .collect_vec();
476
477        Ok(Response::new(ListActorSplitsResponse { actor_splits }))
478    }
479
480    async fn list_rate_limits(
481        &self,
482        _request: Request<ListRateLimitsRequest>,
483    ) -> Result<Response<ListRateLimitsResponse>, Status> {
484        let rate_limits = self
485            .metadata_manager
486            .catalog_controller
487            .list_rate_limits()
488            .await?;
489        Ok(Response::new(ListRateLimitsResponse { rate_limits }))
490    }
491}