risingwave_meta_service/
stream_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::model::ActorId;
28use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
29use risingwave_meta::{MetaError, model};
30use risingwave_meta_model::{FragmentId, ObjectId, SinkId, SourceId, StreamingParallelism};
31use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
32use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
33use risingwave_pb::meta::list_actor_splits_response::FragmentType;
34use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
35use risingwave_pb::meta::list_table_fragments_response::{
36    ActorInfo, FragmentInfo, TableFragmentInfo,
37};
38use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
39use risingwave_pb::meta::table_fragments::PbState;
40use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
41use risingwave_pb::meta::*;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use tonic::{Request, Response, Status};
44
45use crate::barrier::{BarrierScheduler, Command};
46use crate::manager::MetaSrvEnv;
47use crate::stream::GlobalStreamManagerRef;
48
49pub type TonicResponse<T> = Result<Response<T>, Status>;
50
51#[derive(Clone)]
52pub struct StreamServiceImpl {
53    env: MetaSrvEnv,
54    barrier_scheduler: BarrierScheduler,
55    barrier_manager: BarrierManagerRef,
56    stream_manager: GlobalStreamManagerRef,
57    metadata_manager: MetadataManager,
58}
59
60impl StreamServiceImpl {
61    pub fn new(
62        env: MetaSrvEnv,
63        barrier_scheduler: BarrierScheduler,
64        barrier_manager: BarrierManagerRef,
65        stream_manager: GlobalStreamManagerRef,
66        metadata_manager: MetadataManager,
67    ) -> Self {
68        StreamServiceImpl {
69            env,
70            barrier_scheduler,
71            barrier_manager,
72            stream_manager,
73            metadata_manager,
74        }
75    }
76}
77
78#[async_trait::async_trait]
79impl StreamManagerService for StreamServiceImpl {
80    async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
81        self.env.idle_manager().record_activity();
82        let req = request.into_inner();
83
84        let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?;
85        Ok(Response::new(FlushResponse {
86            status: None,
87            hummock_version_id: version_id.to_u64(),
88        }))
89    }
90
91    async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
92        for database_id in self.metadata_manager.list_active_database_ids().await? {
93            self.barrier_scheduler
94                .run_command(database_id, Command::pause())
95                .await?;
96        }
97        Ok(Response::new(PauseResponse {}))
98    }
99
100    async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
101        for database_id in self.metadata_manager.list_active_database_ids().await? {
102            self.barrier_scheduler
103                .run_command(database_id, Command::resume())
104                .await?;
105        }
106        Ok(Response::new(ResumeResponse {}))
107    }
108
109    async fn apply_throttle(
110        &self,
111        request: Request<ApplyThrottleRequest>,
112    ) -> Result<Response<ApplyThrottleResponse>, Status> {
113        let request = request.into_inner();
114
115        let actor_to_apply = match request.kind() {
116            ThrottleTarget::Source | ThrottleTarget::TableWithSource => {
117                self.metadata_manager
118                    .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
119                    .await?
120            }
121            ThrottleTarget::Mv => {
122                self.metadata_manager
123                    .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
124                    .await?
125            }
126            ThrottleTarget::CdcTable => {
127                self.metadata_manager
128                    .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
129                    .await?
130            }
131            ThrottleTarget::TableDml => {
132                self.metadata_manager
133                    .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
134                    .await?
135            }
136            ThrottleTarget::Sink => {
137                self.metadata_manager
138                    .update_sink_rate_limit_by_sink_id(request.id as SinkId, request.rate)
139                    .await?
140            }
141            ThrottleTarget::Fragment => {
142                self.metadata_manager
143                    .update_fragment_rate_limit_by_fragment_id(request.id as _, request.rate)
144                    .await?
145            }
146            ThrottleTarget::Unspecified => {
147                return Err(Status::invalid_argument("unspecified throttle target"));
148            }
149        };
150
151        let request_id = if request.kind() == ThrottleTarget::Fragment {
152            self.metadata_manager
153                .catalog_controller
154                .get_fragment_streaming_job_id(request.id as _)
155                .await?
156        } else {
157            request.id as _
158        };
159
160        let database_id = self
161            .metadata_manager
162            .catalog_controller
163            .get_object_database_id(request_id as ObjectId)
164            .await?;
165        let database_id = DatabaseId::new(database_id as _);
166        // TODO: check whether shared source is correct
167        let mutation: ThrottleConfig = actor_to_apply
168            .iter()
169            .map(|(fragment_id, actors)| {
170                (
171                    *fragment_id,
172                    actors
173                        .iter()
174                        .map(|actor_id| (*actor_id, request.rate))
175                        .collect::<HashMap<ActorId, Option<u32>>>(),
176                )
177            })
178            .collect();
179        let _i = self
180            .barrier_scheduler
181            .run_command(database_id, Command::Throttle(mutation))
182            .await?;
183
184        Ok(Response::new(ApplyThrottleResponse { status: None }))
185    }
186
187    async fn cancel_creating_jobs(
188        &self,
189        request: Request<CancelCreatingJobsRequest>,
190    ) -> TonicResponse<CancelCreatingJobsResponse> {
191        let req = request.into_inner();
192        let table_ids = match req.jobs.unwrap() {
193            Jobs::Infos(infos) => self
194                .metadata_manager
195                .catalog_controller
196                .find_creating_streaming_job_ids(infos.infos)
197                .await?
198                .into_iter()
199                .map(|id| id as _)
200                .collect(),
201            Jobs::Ids(jobs) => jobs.job_ids,
202        };
203
204        let canceled_jobs = self
205            .stream_manager
206            .cancel_streaming_jobs(table_ids.into_iter().map(JobId::from).collect_vec())
207            .await
208            .into_iter()
209            .map(|id| id.as_raw_id())
210            .collect_vec();
211        Ok(Response::new(CancelCreatingJobsResponse {
212            status: None,
213            canceled_jobs,
214        }))
215    }
216
217    async fn list_table_fragments(
218        &self,
219        request: Request<ListTableFragmentsRequest>,
220    ) -> Result<Response<ListTableFragmentsResponse>, Status> {
221        let req = request.into_inner();
222        let table_ids = HashSet::<u32>::from_iter(req.table_ids);
223
224        let mut info = HashMap::new();
225        for job_id in table_ids {
226            let table_fragments = self
227                .metadata_manager
228                .catalog_controller
229                .get_job_fragments_by_id(job_id.into())
230                .await?;
231            let mut dispatchers = self
232                .metadata_manager
233                .catalog_controller
234                .get_fragment_actor_dispatchers(
235                    table_fragments.fragment_ids().map(|id| id as _).collect(),
236                )
237                .await?;
238            let ctx = table_fragments.ctx.to_protobuf();
239            info.insert(
240                table_fragments.stream_job_id().as_raw_id(),
241                TableFragmentInfo {
242                    fragments: table_fragments
243                        .fragments
244                        .into_iter()
245                        .map(|(id, fragment)| FragmentInfo {
246                            id,
247                            actors: fragment
248                                .actors
249                                .into_iter()
250                                .map(|actor| ActorInfo {
251                                    id: actor.actor_id,
252                                    node: Some(fragment.nodes.clone()),
253                                    dispatcher: dispatchers
254                                        .get_mut(&(fragment.fragment_id as _))
255                                        .and_then(|dispatchers| {
256                                            dispatchers.remove(&(actor.actor_id as _))
257                                        })
258                                        .unwrap_or_default(),
259                                })
260                                .collect_vec(),
261                        })
262                        .collect_vec(),
263                    ctx: Some(ctx),
264                },
265            );
266        }
267
268        Ok(Response::new(ListTableFragmentsResponse {
269            table_fragments: info,
270        }))
271    }
272
273    async fn list_streaming_job_states(
274        &self,
275        _request: Request<ListStreamingJobStatesRequest>,
276    ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
277        let job_infos = self
278            .metadata_manager
279            .catalog_controller
280            .list_streaming_job_infos()
281            .await?;
282        let states = job_infos
283            .into_iter()
284            .map(
285                |StreamingJobInfo {
286                     job_id,
287                     job_status,
288                     name,
289                     parallelism,
290                     max_parallelism,
291                     resource_group,
292                     database_id,
293                     schema_id,
294                     ..
295                 }| {
296                    let parallelism = match parallelism {
297                        StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
298                        StreamingParallelism::Custom => model::TableParallelism::Custom,
299                        StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
300                    };
301
302                    list_streaming_job_states_response::StreamingJobState {
303                        table_id: job_id as _,
304                        name,
305                        state: PbState::from(job_status) as _,
306                        parallelism: Some(parallelism.into()),
307                        max_parallelism: max_parallelism as _,
308                        resource_group,
309                        database_id: database_id as _,
310                        schema_id: schema_id as _,
311                    }
312                },
313            )
314            .collect_vec();
315
316        Ok(Response::new(ListStreamingJobStatesResponse { states }))
317    }
318
319    async fn list_fragment_distribution(
320        &self,
321        _request: Request<ListFragmentDistributionRequest>,
322    ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
323        let distributions = self
324            .metadata_manager
325            .catalog_controller
326            .list_fragment_descs(false)
327            .await?
328            .into_iter()
329            .map(|(dist, _)| dist)
330            .collect();
331
332        Ok(Response::new(ListFragmentDistributionResponse {
333            distributions,
334        }))
335    }
336
337    async fn list_creating_fragment_distribution(
338        &self,
339        _request: Request<ListCreatingFragmentDistributionRequest>,
340    ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
341        let distributions = self
342            .metadata_manager
343            .catalog_controller
344            .list_fragment_descs(true)
345            .await?
346            .into_iter()
347            .map(|(dist, _)| dist)
348            .collect();
349
350        Ok(Response::new(ListCreatingFragmentDistributionResponse {
351            distributions,
352        }))
353    }
354
355    async fn get_fragment_by_id(
356        &self,
357        request: Request<GetFragmentByIdRequest>,
358    ) -> Result<Response<GetFragmentByIdResponse>, Status> {
359        let req = request.into_inner();
360        let fragment_desc = self
361            .metadata_manager
362            .catalog_controller
363            .get_fragment_desc_by_id(req.fragment_id as i32)
364            .await?;
365        let distribution =
366            fragment_desc.map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams));
367        Ok(Response::new(GetFragmentByIdResponse { distribution }))
368    }
369
370    async fn list_actor_states(
371        &self,
372        _request: Request<ListActorStatesRequest>,
373    ) -> Result<Response<ListActorStatesResponse>, Status> {
374        let actor_locations = self
375            .metadata_manager
376            .catalog_controller
377            .list_actor_locations()?;
378        let states = actor_locations
379            .into_iter()
380            .map(|actor_location| list_actor_states_response::ActorState {
381                actor_id: actor_location.actor_id as _,
382                fragment_id: actor_location.fragment_id as _,
383                worker_id: actor_location.worker_id as _,
384            })
385            .collect_vec();
386
387        Ok(Response::new(ListActorStatesResponse { states }))
388    }
389
390    async fn list_object_dependencies(
391        &self,
392        _request: Request<ListObjectDependenciesRequest>,
393    ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
394        let dependencies = self
395            .metadata_manager
396            .catalog_controller
397            .list_created_object_dependencies()
398            .await?;
399
400        Ok(Response::new(ListObjectDependenciesResponse {
401            dependencies,
402        }))
403    }
404
405    async fn recover(
406        &self,
407        _request: Request<RecoverRequest>,
408    ) -> Result<Response<RecoverResponse>, Status> {
409        self.barrier_manager.adhoc_recovery().await?;
410        Ok(Response::new(RecoverResponse {}))
411    }
412
413    async fn list_actor_splits(
414        &self,
415        _request: Request<ListActorSplitsRequest>,
416    ) -> Result<Response<ListActorSplitsResponse>, Status> {
417        let SourceManagerRunningInfo {
418            source_fragments,
419            backfill_fragments,
420        } = self.stream_manager.source_manager.get_running_info().await;
421
422        let mut actor_splits = self.env.shared_actor_infos().list_assignments();
423
424        let source_actors: HashMap<_, _> = {
425            let all_fragment_ids: HashSet<_> = backfill_fragments
426                .values()
427                .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
428                .chain(source_fragments.values().flatten().copied())
429                .collect();
430
431            let guard = self.env.shared_actor_info.read_guard();
432            guard
433                .iter_over_fragments()
434                .filter(|(frag_id, _)| all_fragment_ids.contains(frag_id))
435                .flat_map(|(fragment_id, fragment_info)| {
436                    fragment_info
437                        .actors
438                        .keys()
439                        .copied()
440                        .map(|actor_id| (actor_id, *fragment_id))
441                })
442                .collect()
443        };
444
445        let is_shared_source = self
446            .metadata_manager
447            .catalog_controller
448            .list_source_id_with_shared_types()
449            .await?;
450
451        let fragment_to_source: HashMap<_, _> = source_fragments
452            .into_iter()
453            .flat_map(|(source_id, fragment_ids)| {
454                let source_type = if is_shared_source
455                    .get(&(source_id as _))
456                    .copied()
457                    .unwrap_or(false)
458                {
459                    FragmentType::SharedSource
460                } else {
461                    FragmentType::NonSharedSource
462                };
463
464                fragment_ids
465                    .into_iter()
466                    .map(move |fragment_id| (fragment_id, (source_id, source_type)))
467            })
468            .chain(
469                backfill_fragments
470                    .into_iter()
471                    .flat_map(|(source_id, fragment_ids)| {
472                        fragment_ids.into_iter().flat_map(
473                            move |(fragment_id, upstream_fragment_id)| {
474                                [
475                                    (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
476                                    (
477                                        upstream_fragment_id,
478                                        (source_id, FragmentType::SharedSource),
479                                    ),
480                                ]
481                            },
482                        )
483                    }),
484            )
485            .collect();
486
487        let actor_splits = source_actors
488            .into_iter()
489            .flat_map(|(actor_id, fragment_id)| {
490                let (source_id, fragment_type) = fragment_to_source
491                    .get(&(fragment_id as _))
492                    .copied()
493                    .unwrap_or_default();
494
495                actor_splits
496                    .remove(&(actor_id as _))
497                    .unwrap_or_default()
498                    .into_iter()
499                    .map(move |split| list_actor_splits_response::ActorSplit {
500                        actor_id: actor_id as _,
501                        source_id: source_id as _,
502                        fragment_id: fragment_id as _,
503                        split_id: split.id().to_string(),
504                        fragment_type: fragment_type.into(),
505                    })
506            })
507            .collect_vec();
508
509        Ok(Response::new(ListActorSplitsResponse { actor_splits }))
510    }
511
512    async fn list_rate_limits(
513        &self,
514        _request: Request<ListRateLimitsRequest>,
515    ) -> Result<Response<ListRateLimitsResponse>, Status> {
516        let rate_limits = self
517            .metadata_manager
518            .catalog_controller
519            .list_rate_limits()
520            .await?;
521        Ok(Response::new(ListRateLimitsResponse { rate_limits }))
522    }
523
524    #[cfg_attr(coverage, coverage(off))]
525    async fn refresh(
526        &self,
527        request: Request<RefreshRequest>,
528    ) -> Result<Response<RefreshResponse>, Status> {
529        let req = request.into_inner();
530
531        tracing::info!("Refreshing table with id: {}", req.table_id);
532
533        // Create refresh manager and execute refresh
534        let refresh_manager = risingwave_meta::stream::RefreshManager::new(
535            self.metadata_manager.clone(),
536            self.barrier_scheduler.clone(),
537        );
538
539        let response = refresh_manager.refresh_table(req).await?;
540
541        Ok(Response::new(response))
542    }
543
544    async fn alter_connector_props(
545        &self,
546        request: Request<AlterConnectorPropsRequest>,
547    ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
548        let request = request.into_inner();
549        let secret_manager = LocalSecretManager::global();
550        let (new_props_plaintext, object_id) =
551            match AlterConnectorPropsObject::try_from(request.object_type) {
552                Ok(AlterConnectorPropsObject::Sink) => (
553                    self.metadata_manager
554                        .update_sink_props_by_sink_id(
555                            request.object_id as i32,
556                            request.changed_props.clone().into_iter().collect(),
557                        )
558                        .await?,
559                    request.object_id,
560                ),
561                Ok(AlterConnectorPropsObject::IcebergTable) => {
562                    self.metadata_manager
563                        .update_iceberg_table_props_by_table_id(
564                            TableId::from(request.object_id),
565                            request.changed_props.clone().into_iter().collect(),
566                            request.extra_options,
567                        )
568                        .await?
569                }
570
571                Ok(AlterConnectorPropsObject::Source) => {
572                    // alter source and table's associated source
573                    if request.connector_conn_ref.is_some() {
574                        return Err(Status::invalid_argument(
575                            "alter connector_conn_ref is not supported",
576                        ));
577                    }
578                    let options_with_secret = self
579                        .metadata_manager
580                        .catalog_controller
581                        .update_source_props_by_source_id(
582                            request.object_id as SourceId,
583                            request.changed_props.clone().into_iter().collect(),
584                            request.changed_secret_refs.clone().into_iter().collect(),
585                        )
586                        .await?;
587
588                    self.stream_manager
589                        .source_manager
590                        .validate_source_once(request.object_id, options_with_secret.clone())
591                        .await?;
592
593                    let (options, secret_refs) = options_with_secret.into_parts();
594                    (
595                        secret_manager
596                            .fill_secrets(options, secret_refs)
597                            .map_err(MetaError::from)?
598                            .into_iter()
599                            .collect(),
600                        request.object_id,
601                    )
602                }
603
604                _ => {
605                    unimplemented!(
606                        "Unsupported object type for AlterConnectorProps: {:?}",
607                        request.object_type
608                    );
609                }
610            };
611
612        let database_id = self
613            .metadata_manager
614            .catalog_controller
615            .get_object_database_id(object_id as ObjectId)
616            .await?;
617        let database_id = DatabaseId::new(database_id as _);
618
619        let mut mutation = HashMap::default();
620        mutation.insert(object_id, new_props_plaintext);
621
622        let _i = self
623            .barrier_scheduler
624            .run_command(database_id, Command::ConnectorPropsChange(mutation))
625            .await?;
626
627        Ok(Response::new(AlterConnectorPropsResponse {}))
628    }
629
630    async fn set_sync_log_store_aligned(
631        &self,
632        request: Request<SetSyncLogStoreAlignedRequest>,
633    ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
634        let req = request.into_inner();
635        let job_id = req.job_id;
636        let aligned = req.aligned;
637
638        self.metadata_manager
639            .catalog_controller
640            .mutate_fragments_by_job_id(
641                job_id.into(),
642                |_mask, stream_node| {
643                    let mut visited = false;
644                    visit_stream_node_mut(stream_node, |body| {
645                        if let NodeBody::SyncLogStore(sync_log_store) = body {
646                            sync_log_store.aligned = aligned;
647                            visited = true
648                        }
649                    });
650                    Ok(visited)
651                },
652                "no fragments found with synced log store",
653            )
654            .await?;
655
656        Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
657    }
658
659    async fn list_cdc_progress(
660        &self,
661        _request: Request<ListCdcProgressRequest>,
662    ) -> Result<Response<ListCdcProgressResponse>, Status> {
663        let cdc_progress = self
664            .env
665            .cdc_table_backfill_tracker()
666            .list_cdc_progress()
667            .into_iter()
668            .map(|(job_id, p)| {
669                (
670                    job_id.as_raw_id(),
671                    PbCdcProgress {
672                        split_total_count: p.split_total_count,
673                        split_backfilled_count: p.split_backfilled_count,
674                        split_completed_count: p.split_completed_count,
675                    },
676                )
677            })
678            .collect();
679        Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
680    }
681}
682
683fn fragment_desc_to_distribution(
684    fragment_desc: FragmentDesc,
685    upstreams: Vec<FragmentId>,
686) -> FragmentDistribution {
687    FragmentDistribution {
688        fragment_id: fragment_desc.fragment_id as _,
689        table_id: fragment_desc.job_id.as_raw_id(),
690        distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
691        state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
692        upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
693        fragment_type_mask: fragment_desc.fragment_type_mask as _,
694        parallelism: fragment_desc.parallelism as _,
695        vnode_count: fragment_desc.vnode_count as _,
696        node: Some(fragment_desc.stream_node.to_protobuf()),
697    }
698}