risingwave_meta_service/
stream_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_common::secret::LocalSecretManager;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_connector::source::SplitMetaData;
22use risingwave_meta::barrier::BarrierManagerRef;
23use risingwave_meta::controller::fragment::StreamingJobInfo;
24use risingwave_meta::controller::utils::FragmentDesc;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::model::ActorId;
27use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
28use risingwave_meta::{MetaError, model};
29use risingwave_meta_model::{FragmentId, ObjectId, SinkId, SourceId, StreamingParallelism};
30use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
31use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
32use risingwave_pb::meta::list_actor_splits_response::FragmentType;
33use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
34use risingwave_pb::meta::list_table_fragments_response::{
35    ActorInfo, FragmentInfo, TableFragmentInfo,
36};
37use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
38use risingwave_pb::meta::table_fragments::PbState;
39use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
40use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
41use risingwave_pb::meta::*;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use tonic::{Request, Response, Status};
44
45use crate::barrier::{BarrierScheduler, Command};
46use crate::manager::MetaSrvEnv;
47use crate::stream::GlobalStreamManagerRef;
48
49pub type TonicResponse<T> = Result<Response<T>, Status>;
50
51#[derive(Clone)]
52pub struct StreamServiceImpl {
53    env: MetaSrvEnv,
54    barrier_scheduler: BarrierScheduler,
55    barrier_manager: BarrierManagerRef,
56    stream_manager: GlobalStreamManagerRef,
57    metadata_manager: MetadataManager,
58}
59
60impl StreamServiceImpl {
61    pub fn new(
62        env: MetaSrvEnv,
63        barrier_scheduler: BarrierScheduler,
64        barrier_manager: BarrierManagerRef,
65        stream_manager: GlobalStreamManagerRef,
66        metadata_manager: MetadataManager,
67    ) -> Self {
68        StreamServiceImpl {
69            env,
70            barrier_scheduler,
71            barrier_manager,
72            stream_manager,
73            metadata_manager,
74        }
75    }
76}
77
78#[async_trait::async_trait]
79impl StreamManagerService for StreamServiceImpl {
80    async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
81        self.env.idle_manager().record_activity();
82        let req = request.into_inner();
83
84        let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?;
85        Ok(Response::new(FlushResponse {
86            status: None,
87            hummock_version_id: version_id.to_u64(),
88        }))
89    }
90
91    async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
92        for database_id in self.metadata_manager.list_active_database_ids().await? {
93            self.barrier_scheduler
94                .run_command(database_id, Command::pause())
95                .await?;
96        }
97        Ok(Response::new(PauseResponse {}))
98    }
99
100    async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
101        for database_id in self.metadata_manager.list_active_database_ids().await? {
102            self.barrier_scheduler
103                .run_command(database_id, Command::resume())
104                .await?;
105        }
106        Ok(Response::new(ResumeResponse {}))
107    }
108
109    async fn apply_throttle(
110        &self,
111        request: Request<ApplyThrottleRequest>,
112    ) -> Result<Response<ApplyThrottleResponse>, Status> {
113        let request = request.into_inner();
114
115        let actor_to_apply = match request.kind() {
116            ThrottleTarget::Source | ThrottleTarget::TableWithSource => {
117                self.metadata_manager
118                    .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
119                    .await?
120            }
121            ThrottleTarget::Mv => {
122                self.metadata_manager
123                    .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
124                    .await?
125            }
126            ThrottleTarget::CdcTable => {
127                self.metadata_manager
128                    .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
129                    .await?
130            }
131            ThrottleTarget::TableDml => {
132                self.metadata_manager
133                    .update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate)
134                    .await?
135            }
136            ThrottleTarget::Sink => {
137                self.metadata_manager
138                    .update_sink_rate_limit_by_sink_id(request.id as SinkId, request.rate)
139                    .await?
140            }
141            ThrottleTarget::Fragment => {
142                self.metadata_manager
143                    .update_fragment_rate_limit_by_fragment_id(request.id as _, request.rate)
144                    .await?
145            }
146            ThrottleTarget::Unspecified => {
147                return Err(Status::invalid_argument("unspecified throttle target"));
148            }
149        };
150
151        let request_id = if request.kind() == ThrottleTarget::Fragment {
152            self.metadata_manager
153                .catalog_controller
154                .get_fragment_streaming_job_id(request.id as _)
155                .await?
156        } else {
157            request.id as _
158        };
159
160        let database_id = self
161            .metadata_manager
162            .catalog_controller
163            .get_object_database_id(request_id as ObjectId)
164            .await?;
165        let database_id = DatabaseId::new(database_id as _);
166        // TODO: check whether shared source is correct
167        let mutation: ThrottleConfig = actor_to_apply
168            .iter()
169            .map(|(fragment_id, actors)| {
170                (
171                    *fragment_id,
172                    actors
173                        .iter()
174                        .map(|actor_id| (*actor_id, request.rate))
175                        .collect::<HashMap<ActorId, Option<u32>>>(),
176                )
177            })
178            .collect();
179        let _i = self
180            .barrier_scheduler
181            .run_command(database_id, Command::Throttle(mutation))
182            .await?;
183
184        Ok(Response::new(ApplyThrottleResponse { status: None }))
185    }
186
187    async fn cancel_creating_jobs(
188        &self,
189        request: Request<CancelCreatingJobsRequest>,
190    ) -> TonicResponse<CancelCreatingJobsResponse> {
191        let req = request.into_inner();
192        let table_ids = match req.jobs.unwrap() {
193            Jobs::Infos(infos) => self
194                .metadata_manager
195                .catalog_controller
196                .find_creating_streaming_job_ids(infos.infos)
197                .await?
198                .into_iter()
199                .map(|id| id as _)
200                .collect(),
201            Jobs::Ids(jobs) => jobs.job_ids,
202        };
203
204        let canceled_jobs = self
205            .stream_manager
206            .cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
207            .await
208            .into_iter()
209            .map(|id| id.table_id)
210            .collect_vec();
211        Ok(Response::new(CancelCreatingJobsResponse {
212            status: None,
213            canceled_jobs,
214        }))
215    }
216
217    async fn list_table_fragments(
218        &self,
219        request: Request<ListTableFragmentsRequest>,
220    ) -> Result<Response<ListTableFragmentsResponse>, Status> {
221        let req = request.into_inner();
222        let table_ids = HashSet::<u32>::from_iter(req.table_ids);
223
224        let mut info = HashMap::new();
225        for job_id in table_ids {
226            let table_fragments = self
227                .metadata_manager
228                .catalog_controller
229                .get_job_fragments_by_id(job_id as _)
230                .await?;
231            let mut dispatchers = self
232                .metadata_manager
233                .catalog_controller
234                .get_fragment_actor_dispatchers(
235                    table_fragments.fragment_ids().map(|id| id as _).collect(),
236                )
237                .await?;
238            let ctx = table_fragments.ctx.to_protobuf();
239            info.insert(
240                table_fragments.stream_job_id().table_id,
241                TableFragmentInfo {
242                    fragments: table_fragments
243                        .fragments
244                        .into_iter()
245                        .map(|(id, fragment)| FragmentInfo {
246                            id,
247                            actors: fragment
248                                .actors
249                                .into_iter()
250                                .map(|actor| ActorInfo {
251                                    id: actor.actor_id,
252                                    node: Some(fragment.nodes.clone()),
253                                    dispatcher: dispatchers
254                                        .get_mut(&(fragment.fragment_id as _))
255                                        .and_then(|dispatchers| {
256                                            dispatchers.remove(&(actor.actor_id as _))
257                                        })
258                                        .unwrap_or_default(),
259                                })
260                                .collect_vec(),
261                        })
262                        .collect_vec(),
263                    ctx: Some(ctx),
264                },
265            );
266        }
267
268        Ok(Response::new(ListTableFragmentsResponse {
269            table_fragments: info,
270        }))
271    }
272
273    async fn list_streaming_job_states(
274        &self,
275        _request: Request<ListStreamingJobStatesRequest>,
276    ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
277        let job_infos = self
278            .metadata_manager
279            .catalog_controller
280            .list_streaming_job_infos()
281            .await?;
282        let states = job_infos
283            .into_iter()
284            .map(
285                |StreamingJobInfo {
286                     job_id,
287                     job_status,
288                     name,
289                     parallelism,
290                     max_parallelism,
291                     resource_group,
292                     database_id,
293                     schema_id,
294                     ..
295                 }| {
296                    let parallelism = match parallelism {
297                        StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
298                        StreamingParallelism::Custom => model::TableParallelism::Custom,
299                        StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
300                    };
301
302                    list_streaming_job_states_response::StreamingJobState {
303                        table_id: job_id as _,
304                        name,
305                        state: PbState::from(job_status) as _,
306                        parallelism: Some(parallelism.into()),
307                        max_parallelism: max_parallelism as _,
308                        resource_group,
309                        database_id: database_id as _,
310                        schema_id: schema_id as _,
311                    }
312                },
313            )
314            .collect_vec();
315
316        Ok(Response::new(ListStreamingJobStatesResponse { states }))
317    }
318
319    async fn list_fragment_distribution(
320        &self,
321        _request: Request<ListFragmentDistributionRequest>,
322    ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
323        let fragment_descs = self
324            .metadata_manager
325            .catalog_controller
326            .list_fragment_descs()
327            .await?;
328        let distributions = fragment_descs
329            .into_iter()
330            .map(|(fragment_desc, upstreams)| {
331                fragment_desc_to_distribution(fragment_desc, upstreams)
332            })
333            .collect_vec();
334
335        Ok(Response::new(ListFragmentDistributionResponse {
336            distributions,
337        }))
338    }
339
340    async fn list_creating_fragment_distribution(
341        &self,
342        _request: Request<ListCreatingFragmentDistributionRequest>,
343    ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
344        let fragment_descs = self
345            .metadata_manager
346            .catalog_controller
347            .list_creating_fragment_descs()
348            .await?;
349        let distributions = fragment_descs
350            .into_iter()
351            .map(|(fragment_desc, upstreams)| {
352                fragment_desc_to_distribution(fragment_desc, upstreams)
353            })
354            .collect_vec();
355
356        Ok(Response::new(ListCreatingFragmentDistributionResponse {
357            distributions,
358        }))
359    }
360
361    async fn get_fragment_by_id(
362        &self,
363        request: Request<GetFragmentByIdRequest>,
364    ) -> Result<Response<GetFragmentByIdResponse>, Status> {
365        let req = request.into_inner();
366        let fragment_desc = self
367            .metadata_manager
368            .catalog_controller
369            .get_fragment_desc_by_id(req.fragment_id as i32)
370            .await?;
371        let distribution =
372            fragment_desc.map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams));
373        Ok(Response::new(GetFragmentByIdResponse { distribution }))
374    }
375
376    async fn list_actor_states(
377        &self,
378        _request: Request<ListActorStatesRequest>,
379    ) -> Result<Response<ListActorStatesResponse>, Status> {
380        let actor_locations = self
381            .metadata_manager
382            .catalog_controller
383            .list_actor_locations()
384            .await?;
385        let states = actor_locations
386            .into_iter()
387            .map(|actor_location| list_actor_states_response::ActorState {
388                actor_id: actor_location.actor_id as _,
389                fragment_id: actor_location.fragment_id as _,
390                state: PbActorState::from(actor_location.status) as _,
391                worker_id: actor_location.worker_id as _,
392            })
393            .collect_vec();
394
395        Ok(Response::new(ListActorStatesResponse { states }))
396    }
397
398    async fn list_object_dependencies(
399        &self,
400        _request: Request<ListObjectDependenciesRequest>,
401    ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
402        let dependencies = self
403            .metadata_manager
404            .catalog_controller
405            .list_created_object_dependencies()
406            .await?;
407
408        Ok(Response::new(ListObjectDependenciesResponse {
409            dependencies,
410        }))
411    }
412
413    async fn recover(
414        &self,
415        _request: Request<RecoverRequest>,
416    ) -> Result<Response<RecoverResponse>, Status> {
417        self.barrier_manager.adhoc_recovery().await?;
418        Ok(Response::new(RecoverResponse {}))
419    }
420
421    async fn list_actor_splits(
422        &self,
423        _request: Request<ListActorSplitsRequest>,
424    ) -> Result<Response<ListActorSplitsResponse>, Status> {
425        let SourceManagerRunningInfo {
426            source_fragments,
427            backfill_fragments,
428        } = self.stream_manager.source_manager.get_running_info().await;
429
430        let mut actor_splits = self.env.shared_actor_infos().list_assignments();
431
432        let source_actors = self
433            .metadata_manager
434            .catalog_controller
435            .list_source_actors()
436            .await?;
437
438        let is_shared_source = self
439            .metadata_manager
440            .catalog_controller
441            .list_source_id_with_shared_types()
442            .await?;
443
444        let fragment_to_source: HashMap<_, _> = source_fragments
445            .into_iter()
446            .flat_map(|(source_id, fragment_ids)| {
447                let source_type = if is_shared_source
448                    .get(&(source_id as _))
449                    .copied()
450                    .unwrap_or(false)
451                {
452                    FragmentType::SharedSource
453                } else {
454                    FragmentType::NonSharedSource
455                };
456
457                fragment_ids
458                    .into_iter()
459                    .map(move |fragment_id| (fragment_id, (source_id, source_type)))
460            })
461            .chain(
462                backfill_fragments
463                    .into_iter()
464                    .flat_map(|(source_id, fragment_ids)| {
465                        fragment_ids.into_iter().flat_map(
466                            move |(fragment_id, upstream_fragment_id)| {
467                                [
468                                    (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
469                                    (
470                                        upstream_fragment_id,
471                                        (source_id, FragmentType::SharedSource),
472                                    ),
473                                ]
474                            },
475                        )
476                    }),
477            )
478            .collect();
479
480        let actor_splits = source_actors
481            .into_iter()
482            .flat_map(|(actor_id, fragment_id)| {
483                let (source_id, fragment_type) = fragment_to_source
484                    .get(&(fragment_id as _))
485                    .copied()
486                    .unwrap_or_default();
487
488                actor_splits
489                    .remove(&(actor_id as _))
490                    .unwrap_or_default()
491                    .into_iter()
492                    .map(move |split| list_actor_splits_response::ActorSplit {
493                        actor_id: actor_id as _,
494                        source_id: source_id as _,
495                        fragment_id: fragment_id as _,
496                        split_id: split.id().to_string(),
497                        fragment_type: fragment_type.into(),
498                    })
499            })
500            .collect_vec();
501
502        Ok(Response::new(ListActorSplitsResponse { actor_splits }))
503    }
504
505    async fn list_rate_limits(
506        &self,
507        _request: Request<ListRateLimitsRequest>,
508    ) -> Result<Response<ListRateLimitsResponse>, Status> {
509        let rate_limits = self
510            .metadata_manager
511            .catalog_controller
512            .list_rate_limits()
513            .await?;
514        Ok(Response::new(ListRateLimitsResponse { rate_limits }))
515    }
516
517    #[cfg_attr(coverage, coverage(off))]
518    async fn refresh(
519        &self,
520        request: Request<RefreshRequest>,
521    ) -> Result<Response<RefreshResponse>, Status> {
522        let req = request.into_inner();
523
524        tracing::info!("Refreshing table with id: {}", req.table_id);
525
526        // Create refresh manager and execute refresh
527        let refresh_manager = risingwave_meta::stream::RefreshManager::new(
528            self.metadata_manager.clone(),
529            self.barrier_scheduler.clone(),
530        );
531
532        let response = refresh_manager.refresh_table(req).await?;
533
534        Ok(Response::new(response))
535    }
536
537    async fn alter_connector_props(
538        &self,
539        request: Request<AlterConnectorPropsRequest>,
540    ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
541        let request = request.into_inner();
542        let secret_manager = LocalSecretManager::global();
543        let (new_props_plaintext, object_id) =
544            match AlterConnectorPropsObject::try_from(request.object_type) {
545                Ok(AlterConnectorPropsObject::Sink) => (
546                    self.metadata_manager
547                        .update_sink_props_by_sink_id(
548                            request.object_id as i32,
549                            request.changed_props.clone().into_iter().collect(),
550                        )
551                        .await?,
552                    request.object_id,
553                ),
554                Ok(AlterConnectorPropsObject::IcebergTable) => {
555                    self.metadata_manager
556                        .update_iceberg_table_props_by_table_id(
557                            TableId::from(request.object_id),
558                            request.changed_props.clone().into_iter().collect(),
559                            request.extra_options,
560                        )
561                        .await?
562                }
563
564                Ok(AlterConnectorPropsObject::Source) => {
565                    // alter source and table's associated source
566                    if request.connector_conn_ref.is_some() {
567                        return Err(Status::invalid_argument(
568                            "alter connector_conn_ref is not supported",
569                        ));
570                    }
571                    let options_with_secret = self
572                        .metadata_manager
573                        .catalog_controller
574                        .update_source_props_by_source_id(
575                            request.object_id as SourceId,
576                            request.changed_props.clone().into_iter().collect(),
577                            request.changed_secret_refs.clone().into_iter().collect(),
578                        )
579                        .await?;
580
581                    self.stream_manager
582                        .source_manager
583                        .validate_source_once(request.object_id, options_with_secret.clone())
584                        .await?;
585
586                    let (options, secret_refs) = options_with_secret.into_parts();
587                    (
588                        secret_manager
589                            .fill_secrets(options, secret_refs)
590                            .map_err(MetaError::from)?
591                            .into_iter()
592                            .collect(),
593                        request.object_id,
594                    )
595                }
596
597                _ => {
598                    unimplemented!(
599                        "Unsupported object type for AlterConnectorProps: {:?}",
600                        request.object_type
601                    );
602                }
603            };
604
605        let database_id = self
606            .metadata_manager
607            .catalog_controller
608            .get_object_database_id(object_id as ObjectId)
609            .await?;
610        let database_id = DatabaseId::new(database_id as _);
611
612        let mut mutation = HashMap::default();
613        mutation.insert(object_id, new_props_plaintext);
614
615        let _i = self
616            .barrier_scheduler
617            .run_command(database_id, Command::ConnectorPropsChange(mutation))
618            .await?;
619
620        Ok(Response::new(AlterConnectorPropsResponse {}))
621    }
622
623    async fn set_sync_log_store_aligned(
624        &self,
625        request: Request<SetSyncLogStoreAlignedRequest>,
626    ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
627        let req = request.into_inner();
628        let job_id = req.job_id;
629        let aligned = req.aligned;
630
631        self.metadata_manager
632            .catalog_controller
633            .mutate_fragments_by_job_id(
634                job_id as _,
635                |_mask, stream_node| {
636                    let mut visited = false;
637                    visit_stream_node_mut(stream_node, |body| {
638                        if let NodeBody::SyncLogStore(sync_log_store) = body {
639                            sync_log_store.aligned = aligned;
640                            visited = true
641                        }
642                    });
643                    Ok(visited)
644                },
645                "no fragments found with synced log store",
646            )
647            .await?;
648
649        Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
650    }
651
652    async fn list_cdc_progress(
653        &self,
654        _request: Request<ListCdcProgressRequest>,
655    ) -> Result<Response<ListCdcProgressResponse>, Status> {
656        let cdc_progress = self
657            .env
658            .cdc_table_backfill_tracker()
659            .list_cdc_progress()
660            .into_iter()
661            .map(|(job_id, p)| {
662                (
663                    job_id,
664                    PbCdcProgress {
665                        split_total_count: p.split_total_count,
666                        split_backfilled_count: p.split_backfilled_count,
667                        split_completed_count: p.split_completed_count,
668                    },
669                )
670            })
671            .collect();
672        Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
673    }
674}
675
676fn fragment_desc_to_distribution(
677    fragment_desc: FragmentDesc,
678    upstreams: Vec<FragmentId>,
679) -> FragmentDistribution {
680    FragmentDistribution {
681        fragment_id: fragment_desc.fragment_id as _,
682        table_id: fragment_desc.job_id as _,
683        distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
684        state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
685        upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
686        fragment_type_mask: fragment_desc.fragment_type_mask as _,
687        parallelism: fragment_desc.parallelism as _,
688        vnode_count: fragment_desc.vnode_count as _,
689        node: Some(fragment_desc.stream_node.to_protobuf()),
690    }
691}