risingwave_meta_service/
stream_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_common::secret::LocalSecretManager;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_connector::source::SplitMetaData;
22use risingwave_meta::barrier::BarrierManagerRef;
23use risingwave_meta::controller::fragment::StreamingJobInfo;
24use risingwave_meta::controller::utils::FragmentDesc;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::model::ActorId;
27use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
28use risingwave_meta::{MetaError, model};
29use risingwave_meta_model::{FragmentId, ObjectId, SinkId, SourceId, StreamingParallelism};
30use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
31use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
32use risingwave_pb::meta::list_actor_splits_response::FragmentType;
33use risingwave_pb::meta::list_table_fragments_response::{
34    ActorInfo, FragmentInfo, TableFragmentInfo,
35};
36use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
37use risingwave_pb::meta::table_fragments::PbState;
38use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
39use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
40use risingwave_pb::meta::*;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use tonic::{Request, Response, Status};
43
44use crate::barrier::{BarrierScheduler, Command};
45use crate::manager::MetaSrvEnv;
46use crate::stream::GlobalStreamManagerRef;
47
48pub type TonicResponse<T> = Result<Response<T>, Status>;
49
50#[derive(Clone)]
51pub struct StreamServiceImpl {
52    env: MetaSrvEnv,
53    barrier_scheduler: BarrierScheduler,
54    barrier_manager: BarrierManagerRef,
55    stream_manager: GlobalStreamManagerRef,
56    metadata_manager: MetadataManager,
57}
58
59impl StreamServiceImpl {
60    pub fn new(
61        env: MetaSrvEnv,
62        barrier_scheduler: BarrierScheduler,
63        barrier_manager: BarrierManagerRef,
64        stream_manager: GlobalStreamManagerRef,
65        metadata_manager: MetadataManager,
66    ) -> Self {
67        StreamServiceImpl {
68            env,
69            barrier_scheduler,
70            barrier_manager,
71            stream_manager,
72            metadata_manager,
73        }
74    }
75}
76
77#[async_trait::async_trait]
78impl StreamManagerService for StreamServiceImpl {
79    async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
80        self.env.idle_manager().record_activity();
81        let req = request.into_inner();
82
83        let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?;
84        Ok(Response::new(FlushResponse {
85            status: None,
86            hummock_version_id: version_id.to_u64(),
87        }))
88    }
89
90    async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
91        for database_id in self.metadata_manager.list_active_database_ids().await? {
92            self.barrier_scheduler
93                .run_command(database_id, Command::pause())
94                .await?;
95        }
96        Ok(Response::new(PauseResponse {}))
97    }
98
99    async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
100        for database_id in self.metadata_manager.list_active_database_ids().await? {
101            self.barrier_scheduler
102                .run_command(database_id, Command::resume())
103                .await?;
104        }
105        Ok(Response::new(ResumeResponse {}))
106    }
107
108    async fn apply_throttle(
109        &self,
110        request: Request<ApplyThrottleRequest>,
111    ) -> Result<Response<ApplyThrottleResponse>, Status> {
112        let request = request.into_inner();
113
114        let actor_to_apply = match request.kind() {
115            ThrottleTarget::Source | ThrottleTarget::TableWithSource => {
116                self.metadata_manager
117                    .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
118                    .await?
119            }
120            ThrottleTarget::Mv => {
121                self.metadata_manager
122                    .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
123                    .await?
124            }
125            ThrottleTarget::CdcTable => {
126                self.metadata_manager
127                    .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
128                    .await?
129            }
130            ThrottleTarget::TableDml => {
131                self.metadata_manager
132                    .update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate)
133                    .await?
134            }
135            ThrottleTarget::Sink => {
136                self.metadata_manager
137                    .update_sink_rate_limit_by_sink_id(request.id as SinkId, request.rate)
138                    .await?
139            }
140            ThrottleTarget::Fragment => {
141                self.metadata_manager
142                    .update_fragment_rate_limit_by_fragment_id(request.id as _, request.rate)
143                    .await?
144            }
145            ThrottleTarget::Unspecified => {
146                return Err(Status::invalid_argument("unspecified throttle target"));
147            }
148        };
149
150        let request_id = if request.kind() == ThrottleTarget::Fragment {
151            self.metadata_manager
152                .catalog_controller
153                .get_fragment_streaming_job_id(request.id as _)
154                .await?
155        } else {
156            request.id as _
157        };
158
159        let database_id = self
160            .metadata_manager
161            .catalog_controller
162            .get_object_database_id(request_id as ObjectId)
163            .await?;
164        let database_id = DatabaseId::new(database_id as _);
165        // TODO: check whether shared source is correct
166        let mutation: ThrottleConfig = actor_to_apply
167            .iter()
168            .map(|(fragment_id, actors)| {
169                (
170                    *fragment_id,
171                    actors
172                        .iter()
173                        .map(|actor_id| (*actor_id, request.rate))
174                        .collect::<HashMap<ActorId, Option<u32>>>(),
175                )
176            })
177            .collect();
178        let _i = self
179            .barrier_scheduler
180            .run_command(database_id, Command::Throttle(mutation))
181            .await?;
182
183        Ok(Response::new(ApplyThrottleResponse { status: None }))
184    }
185
186    async fn cancel_creating_jobs(
187        &self,
188        request: Request<CancelCreatingJobsRequest>,
189    ) -> TonicResponse<CancelCreatingJobsResponse> {
190        let req = request.into_inner();
191        let table_ids = match req.jobs.unwrap() {
192            Jobs::Infos(infos) => self
193                .metadata_manager
194                .catalog_controller
195                .find_creating_streaming_job_ids(infos.infos)
196                .await?
197                .into_iter()
198                .map(|id| id as _)
199                .collect(),
200            Jobs::Ids(jobs) => jobs.job_ids,
201        };
202
203        let canceled_jobs = self
204            .stream_manager
205            .cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
206            .await
207            .into_iter()
208            .map(|id| id.table_id)
209            .collect_vec();
210        Ok(Response::new(CancelCreatingJobsResponse {
211            status: None,
212            canceled_jobs,
213        }))
214    }
215
216    async fn list_table_fragments(
217        &self,
218        request: Request<ListTableFragmentsRequest>,
219    ) -> Result<Response<ListTableFragmentsResponse>, Status> {
220        let req = request.into_inner();
221        let table_ids = HashSet::<u32>::from_iter(req.table_ids);
222
223        let mut info = HashMap::new();
224        for job_id in table_ids {
225            let table_fragments = self
226                .metadata_manager
227                .catalog_controller
228                .get_job_fragments_by_id(job_id as _)
229                .await?;
230            let mut dispatchers = self
231                .metadata_manager
232                .catalog_controller
233                .get_fragment_actor_dispatchers(
234                    table_fragments.fragment_ids().map(|id| id as _).collect(),
235                )
236                .await?;
237            let ctx = table_fragments.ctx.to_protobuf();
238            info.insert(
239                table_fragments.stream_job_id().table_id,
240                TableFragmentInfo {
241                    fragments: table_fragments
242                        .fragments
243                        .into_iter()
244                        .map(|(id, fragment)| FragmentInfo {
245                            id,
246                            actors: fragment
247                                .actors
248                                .into_iter()
249                                .map(|actor| ActorInfo {
250                                    id: actor.actor_id,
251                                    node: Some(fragment.nodes.clone()),
252                                    dispatcher: dispatchers
253                                        .get_mut(&(fragment.fragment_id as _))
254                                        .and_then(|dispatchers| {
255                                            dispatchers.remove(&(actor.actor_id as _))
256                                        })
257                                        .unwrap_or_default(),
258                                })
259                                .collect_vec(),
260                        })
261                        .collect_vec(),
262                    ctx: Some(ctx),
263                },
264            );
265        }
266
267        Ok(Response::new(ListTableFragmentsResponse {
268            table_fragments: info,
269        }))
270    }
271
272    async fn list_streaming_job_states(
273        &self,
274        _request: Request<ListStreamingJobStatesRequest>,
275    ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
276        let job_infos = self
277            .metadata_manager
278            .catalog_controller
279            .list_streaming_job_infos()
280            .await?;
281        let states = job_infos
282            .into_iter()
283            .map(
284                |StreamingJobInfo {
285                     job_id,
286                     job_status,
287                     name,
288                     parallelism,
289                     max_parallelism,
290                     resource_group,
291                     database_id,
292                     schema_id,
293                     ..
294                 }| {
295                    let parallelism = match parallelism {
296                        StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
297                        StreamingParallelism::Custom => model::TableParallelism::Custom,
298                        StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
299                    };
300
301                    list_streaming_job_states_response::StreamingJobState {
302                        table_id: job_id as _,
303                        name,
304                        state: PbState::from(job_status) as _,
305                        parallelism: Some(parallelism.into()),
306                        max_parallelism: max_parallelism as _,
307                        resource_group,
308                        database_id: database_id as _,
309                        schema_id: schema_id as _,
310                    }
311                },
312            )
313            .collect_vec();
314
315        Ok(Response::new(ListStreamingJobStatesResponse { states }))
316    }
317
318    async fn list_fragment_distribution(
319        &self,
320        _request: Request<ListFragmentDistributionRequest>,
321    ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
322        let fragment_descs = self
323            .metadata_manager
324            .catalog_controller
325            .list_fragment_descs()
326            .await?;
327        let distributions = fragment_descs
328            .into_iter()
329            .map(|(fragment_desc, upstreams)| {
330                fragment_desc_to_distribution(fragment_desc, upstreams)
331            })
332            .collect_vec();
333
334        Ok(Response::new(ListFragmentDistributionResponse {
335            distributions,
336        }))
337    }
338
339    async fn list_creating_fragment_distribution(
340        &self,
341        _request: Request<ListCreatingFragmentDistributionRequest>,
342    ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
343        let fragment_descs = self
344            .metadata_manager
345            .catalog_controller
346            .list_creating_fragment_descs()
347            .await?;
348        let distributions = fragment_descs
349            .into_iter()
350            .map(|(fragment_desc, upstreams)| {
351                fragment_desc_to_distribution(fragment_desc, upstreams)
352            })
353            .collect_vec();
354
355        Ok(Response::new(ListCreatingFragmentDistributionResponse {
356            distributions,
357        }))
358    }
359
360    async fn get_fragment_by_id(
361        &self,
362        request: Request<GetFragmentByIdRequest>,
363    ) -> Result<Response<GetFragmentByIdResponse>, Status> {
364        let req = request.into_inner();
365        let fragment_desc = self
366            .metadata_manager
367            .catalog_controller
368            .get_fragment_desc_by_id(req.fragment_id as i32)
369            .await?;
370        let distribution =
371            fragment_desc.map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams));
372        Ok(Response::new(GetFragmentByIdResponse { distribution }))
373    }
374
375    async fn list_actor_states(
376        &self,
377        _request: Request<ListActorStatesRequest>,
378    ) -> Result<Response<ListActorStatesResponse>, Status> {
379        let actor_locations = self
380            .metadata_manager
381            .catalog_controller
382            .list_actor_locations()
383            .await?;
384        let states = actor_locations
385            .into_iter()
386            .map(|actor_location| list_actor_states_response::ActorState {
387                actor_id: actor_location.actor_id as _,
388                fragment_id: actor_location.fragment_id as _,
389                state: PbActorState::from(actor_location.status) as _,
390                worker_id: actor_location.worker_id as _,
391            })
392            .collect_vec();
393
394        Ok(Response::new(ListActorStatesResponse { states }))
395    }
396
397    async fn list_object_dependencies(
398        &self,
399        _request: Request<ListObjectDependenciesRequest>,
400    ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
401        let dependencies = self
402            .metadata_manager
403            .catalog_controller
404            .list_created_object_dependencies()
405            .await?;
406
407        Ok(Response::new(ListObjectDependenciesResponse {
408            dependencies,
409        }))
410    }
411
412    async fn recover(
413        &self,
414        _request: Request<RecoverRequest>,
415    ) -> Result<Response<RecoverResponse>, Status> {
416        self.barrier_manager.adhoc_recovery().await?;
417        Ok(Response::new(RecoverResponse {}))
418    }
419
420    async fn list_actor_splits(
421        &self,
422        _request: Request<ListActorSplitsRequest>,
423    ) -> Result<Response<ListActorSplitsResponse>, Status> {
424        let SourceManagerRunningInfo {
425            source_fragments,
426            backfill_fragments,
427            mut actor_splits,
428        } = self.stream_manager.source_manager.get_running_info().await;
429
430        let source_actors = self
431            .metadata_manager
432            .catalog_controller
433            .list_source_actors()
434            .await?;
435
436        let is_shared_source = self
437            .metadata_manager
438            .catalog_controller
439            .list_source_id_with_shared_types()
440            .await?;
441
442        let fragment_to_source: HashMap<_, _> = source_fragments
443            .into_iter()
444            .flat_map(|(source_id, fragment_ids)| {
445                let source_type = if is_shared_source
446                    .get(&(source_id as _))
447                    .copied()
448                    .unwrap_or(false)
449                {
450                    FragmentType::SharedSource
451                } else {
452                    FragmentType::NonSharedSource
453                };
454
455                fragment_ids
456                    .into_iter()
457                    .map(move |fragment_id| (fragment_id, (source_id, source_type)))
458            })
459            .chain(
460                backfill_fragments
461                    .into_iter()
462                    .flat_map(|(source_id, fragment_ids)| {
463                        fragment_ids.into_iter().flat_map(
464                            move |(fragment_id, upstream_fragment_id)| {
465                                [
466                                    (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
467                                    (
468                                        upstream_fragment_id,
469                                        (source_id, FragmentType::SharedSource),
470                                    ),
471                                ]
472                            },
473                        )
474                    }),
475            )
476            .collect();
477
478        let actor_splits = source_actors
479            .into_iter()
480            .flat_map(|(actor_id, fragment_id)| {
481                let (source_id, fragment_type) = fragment_to_source
482                    .get(&(fragment_id as _))
483                    .copied()
484                    .unwrap_or_default();
485
486                actor_splits
487                    .remove(&(actor_id as _))
488                    .unwrap_or_default()
489                    .into_iter()
490                    .map(move |split| list_actor_splits_response::ActorSplit {
491                        actor_id: actor_id as _,
492                        source_id: source_id as _,
493                        fragment_id: fragment_id as _,
494                        split_id: split.id().to_string(),
495                        fragment_type: fragment_type.into(),
496                    })
497            })
498            .collect_vec();
499
500        Ok(Response::new(ListActorSplitsResponse { actor_splits }))
501    }
502
503    async fn list_rate_limits(
504        &self,
505        _request: Request<ListRateLimitsRequest>,
506    ) -> Result<Response<ListRateLimitsResponse>, Status> {
507        let rate_limits = self
508            .metadata_manager
509            .catalog_controller
510            .list_rate_limits()
511            .await?;
512        Ok(Response::new(ListRateLimitsResponse { rate_limits }))
513    }
514
515    async fn alter_connector_props(
516        &self,
517        request: Request<AlterConnectorPropsRequest>,
518    ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
519        let request = request.into_inner();
520
521        let database_id = self
522            .metadata_manager
523            .catalog_controller
524            .get_object_database_id(request.object_id as ObjectId)
525            .await?;
526        let database_id = DatabaseId::new(database_id as _);
527
528        let secret_manager = LocalSecretManager::global();
529        let new_props_plaintext = match request.object_type() {
530            AlterConnectorPropsObject::Sink => {
531                self.metadata_manager
532                    .update_sink_props_by_sink_id(
533                        request.object_id as i32,
534                        request.changed_props.clone().into_iter().collect(),
535                    )
536                    .await?
537            }
538            AlterConnectorPropsObject::Source => {
539                // alter source and table's associated source
540                if request.connector_conn_ref.is_some() {
541                    return Err(Status::invalid_argument(
542                        "alter connector_conn_ref is not supported",
543                    ));
544                }
545                let options_with_secret = self
546                    .metadata_manager
547                    .catalog_controller
548                    .update_source_props_by_source_id(
549                        request.object_id as SourceId,
550                        request.changed_props.clone().into_iter().collect(),
551                        request.changed_secret_refs.clone().into_iter().collect(),
552                    )
553                    .await?;
554
555                self.stream_manager
556                    .source_manager
557                    .validate_source_once(request.object_id, options_with_secret.clone())
558                    .await?;
559
560                let (options, secret_refs) = options_with_secret.into_parts();
561                secret_manager
562                    .fill_secrets(options, secret_refs)
563                    .map_err(MetaError::from)?
564                    .into_iter()
565                    .collect()
566            }
567            AlterConnectorPropsObject::Connection => {
568                todo!()
569            }
570            AlterConnectorPropsObject::Unspecified => unreachable!(),
571        };
572
573        let mut mutation = HashMap::default();
574        mutation.insert(request.object_id, new_props_plaintext);
575
576        let _i = self
577            .barrier_scheduler
578            .run_command(database_id, Command::ConnectorPropsChange(mutation))
579            .await?;
580
581        Ok(Response::new(AlterConnectorPropsResponse {}))
582    }
583
584    async fn set_sync_log_store_aligned(
585        &self,
586        request: Request<SetSyncLogStoreAlignedRequest>,
587    ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
588        let req = request.into_inner();
589        let job_id = req.job_id;
590        let aligned = req.aligned;
591
592        self.metadata_manager
593            .catalog_controller
594            .mutate_fragments_by_job_id(
595                job_id as _,
596                |_mask, stream_node| {
597                    let mut visited = false;
598                    visit_stream_node_mut(stream_node, |body| {
599                        if let NodeBody::SyncLogStore(sync_log_store) = body {
600                            sync_log_store.aligned = aligned;
601                            visited = true
602                        }
603                    });
604                    visited
605                },
606                "no fragments found with synced log store",
607            )
608            .await?;
609
610        Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
611    }
612}
613
614fn fragment_desc_to_distribution(
615    fragment_desc: FragmentDesc,
616    upstreams: Vec<FragmentId>,
617) -> FragmentDistribution {
618    FragmentDistribution {
619        fragment_id: fragment_desc.fragment_id as _,
620        table_id: fragment_desc.job_id as _,
621        distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
622        state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
623        upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
624        fragment_type_mask: fragment_desc.fragment_type_mask as _,
625        parallelism: fragment_desc.parallelism as _,
626        vnode_count: fragment_desc.vnode_count as _,
627        node: Some(fragment_desc.stream_node.to_protobuf()),
628    }
629}