Skip to main content

risingwave_meta_service/
stream_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use chrono::DateTime;
18use itertools::Itertools;
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
28use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo};
29use risingwave_meta::{MetaError, model};
30use risingwave_meta_model::{ConnectionId, FragmentId, JobStatus, SourceId, StreamingParallelism};
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
33use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
34use risingwave_pb::meta::list_actor_splits_response::FragmentType;
35use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
36use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
37use risingwave_pb::meta::list_table_fragments_response::{
38    ActorInfo, FragmentInfo, TableFragmentInfo,
39};
40use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
41use risingwave_pb::meta::table_fragments::PbState;
42use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
43use risingwave_pb::meta::*;
44use risingwave_pb::stream_plan::stream_node::NodeBody;
45use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
46use tonic::{Request, Response, Status};
47
48use crate::barrier::{BarrierScheduler, Command};
49use crate::manager::MetaSrvEnv;
50use crate::stream::GlobalStreamManagerRef;
51
52pub type TonicResponse<T> = Result<Response<T>, Status>;
53
54#[derive(Clone)]
55pub struct StreamServiceImpl {
56    env: MetaSrvEnv,
57    barrier_scheduler: BarrierScheduler,
58    barrier_manager: BarrierManagerRef,
59    stream_manager: GlobalStreamManagerRef,
60    metadata_manager: MetadataManager,
61    refresh_manager: GlobalRefreshManagerRef,
62    iceberg_compaction_manager: IcebergCompactionManagerRef,
63}
64
65impl StreamServiceImpl {
66    pub fn new(
67        env: MetaSrvEnv,
68        barrier_scheduler: BarrierScheduler,
69        barrier_manager: BarrierManagerRef,
70        stream_manager: GlobalStreamManagerRef,
71        metadata_manager: MetadataManager,
72        refresh_manager: GlobalRefreshManagerRef,
73        iceberg_compaction_manager: IcebergCompactionManagerRef,
74    ) -> Self {
75        StreamServiceImpl {
76            env,
77            barrier_scheduler,
78            barrier_manager,
79            stream_manager,
80            metadata_manager,
81            refresh_manager,
82            iceberg_compaction_manager,
83        }
84    }
85}
86
87fn effective_streaming_job_parallelism(
88    job_status: JobStatus,
89    parallelism: StreamingParallelism,
90    adaptive_parallelism_strategy: Option<String>,
91    backfill_parallelism: Option<StreamingParallelism>,
92    backfill_adaptive_parallelism_strategy: Option<String>,
93) -> (StreamingParallelism, Option<String>) {
94    if job_status != JobStatus::Created {
95        (
96            backfill_parallelism.unwrap_or(parallelism),
97            backfill_adaptive_parallelism_strategy.or(adaptive_parallelism_strategy),
98        )
99    } else {
100        (parallelism, adaptive_parallelism_strategy)
101    }
102}
103
104#[async_trait::async_trait]
105impl StreamManagerService for StreamServiceImpl {
106    async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
107        self.env.idle_manager().record_activity();
108        let req = request.into_inner();
109
110        let version_id = self.barrier_scheduler.flush(req.database_id).await?;
111        Ok(Response::new(FlushResponse {
112            status: None,
113            hummock_version_id: version_id,
114        }))
115    }
116
117    async fn list_refresh_table_states(
118        &self,
119        _request: Request<ListRefreshTableStatesRequest>,
120    ) -> TonicResponse<ListRefreshTableStatesResponse> {
121        let refresh_jobs = self.metadata_manager.list_refresh_jobs().await?;
122        let refresh_table_states = refresh_jobs
123            .into_iter()
124            .map(|job| RefreshTableState {
125                table_id: job.table_id,
126                current_status: job.current_status.to_string(),
127                last_trigger_time: job
128                    .last_trigger_time
129                    .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
130                trigger_interval_secs: job.trigger_interval_secs,
131                last_success_time: job
132                    .last_success_time
133                    .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
134            })
135            .collect();
136        Ok(Response::new(ListRefreshTableStatesResponse {
137            states: refresh_table_states,
138        }))
139    }
140
141    async fn list_iceberg_compaction_status(
142        &self,
143        _request: Request<ListIcebergCompactionStatusRequest>,
144    ) -> TonicResponse<ListIcebergCompactionStatusResponse> {
145        let statuses = self
146            .iceberg_compaction_manager
147            .list_compaction_statuses()
148            .into_iter()
149            .map(
150                |status| list_iceberg_compaction_status_response::IcebergCompactionStatus {
151                    sink_id: status.sink_id.as_raw_id(),
152                    task_type: status.task_type,
153                    trigger_interval_sec: status.trigger_interval_sec,
154                    trigger_snapshot_count: status.trigger_snapshot_count as u64,
155                    schedule_state: status.schedule_state,
156                    next_compaction_after_sec: status.next_compaction_after_sec,
157                    pending_snapshot_count: status.pending_snapshot_count.map(|count| count as u64),
158                    is_triggerable: status.is_triggerable,
159                },
160            )
161            .collect();
162
163        Ok(Response::new(ListIcebergCompactionStatusResponse {
164            statuses,
165        }))
166    }
167
168    async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
169        for database_id in self.metadata_manager.list_active_database_ids().await? {
170            self.barrier_scheduler
171                .run_command(database_id, Command::pause())
172                .await?;
173        }
174        Ok(Response::new(PauseResponse {}))
175    }
176
177    async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
178        for database_id in self.metadata_manager.list_active_database_ids().await? {
179            self.barrier_scheduler
180                .run_command(database_id, Command::resume())
181                .await?;
182        }
183        Ok(Response::new(ResumeResponse {}))
184    }
185
186    async fn apply_throttle(
187        &self,
188        request: Request<ApplyThrottleRequest>,
189    ) -> Result<Response<ApplyThrottleResponse>, Status> {
190        let request = request.into_inner();
191
192        // Decode enums from raw i32 fields to handle decoupled target/type.
193        let throttle_target = request.throttle_target();
194        let throttle_type = request.throttle_type();
195
196        let raw_object_id: u32;
197        let jobs: HashSet<JobId>;
198        let fragments: HashSet<FragmentId>;
199
200        match (throttle_type, throttle_target) {
201            (ThrottleType::Source, ThrottleTarget::Source | ThrottleTarget::Table) => {
202                (jobs, fragments) = self
203                    .metadata_manager
204                    .update_source_rate_limit_by_source_id(request.id.into(), request.rate)
205                    .await?;
206                raw_object_id = request.id;
207            }
208            (ThrottleType::Backfill, ThrottleTarget::Mv)
209            | (ThrottleType::Backfill, ThrottleTarget::Sink)
210            | (ThrottleType::Backfill, ThrottleTarget::Table) => {
211                fragments = self
212                    .metadata_manager
213                    .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
214                    .await?;
215                jobs = [request.id.into()].into_iter().collect();
216                raw_object_id = request.id;
217            }
218            (ThrottleType::Dml, ThrottleTarget::Table) => {
219                fragments = self
220                    .metadata_manager
221                    .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
222                    .await?;
223                jobs = [request.id.into()].into_iter().collect();
224                raw_object_id = request.id;
225            }
226            (ThrottleType::Sink, ThrottleTarget::Sink) => {
227                fragments = self
228                    .metadata_manager
229                    .update_sink_rate_limit_by_sink_id(request.id.into(), request.rate)
230                    .await?;
231                jobs = [request.id.into()].into_iter().collect();
232                raw_object_id = request.id;
233            }
234            // FIXME(kwannoel): specialize for throttle type x target
235            (_, ThrottleTarget::Fragment) => {
236                self.metadata_manager
237                    .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate)
238                    .await?;
239                let fragment_id = request.id.into();
240                fragments = [fragment_id].into_iter().collect();
241                let job_id = self
242                    .metadata_manager
243                    .catalog_controller
244                    .get_fragment_streaming_job_id(fragment_id)
245                    .await?;
246                jobs = [job_id].into_iter().collect();
247                raw_object_id = job_id.as_raw_id();
248            }
249            _ => {
250                return Err(Status::invalid_argument(format!(
251                    "unsupported throttle target/type: {:?}/{:?}",
252                    throttle_target, throttle_type
253                )));
254            }
255        };
256
257        let database_id = self
258            .metadata_manager
259            .catalog_controller
260            .get_object_database_id(raw_object_id)
261            .await?;
262
263        let throttle_config = ThrottleConfig {
264            rate_limit: request.rate,
265            throttle_type: throttle_type.into(),
266        };
267        let _i = self
268            .barrier_scheduler
269            .run_command(
270                database_id,
271                Command::Throttle {
272                    jobs,
273                    config: fragments
274                        .into_iter()
275                        .map(|fragment_id| (fragment_id, throttle_config))
276                        .collect(),
277                },
278            )
279            .await?;
280
281        Ok(Response::new(ApplyThrottleResponse { status: None }))
282    }
283
284    async fn cancel_creating_jobs(
285        &self,
286        request: Request<CancelCreatingJobsRequest>,
287    ) -> TonicResponse<CancelCreatingJobsResponse> {
288        let req = request.into_inner();
289        let job_ids = match req.jobs.unwrap() {
290            Jobs::Infos(infos) => self
291                .metadata_manager
292                .catalog_controller
293                .find_creating_streaming_job_ids(infos.infos)
294                .await?
295                .into_iter()
296                .map(|id| id.as_job_id())
297                .collect(),
298            Jobs::Ids(jobs) => jobs.job_ids,
299        };
300
301        let canceled_jobs = self
302            .stream_manager
303            .cancel_streaming_jobs(job_ids)
304            .await?
305            .into_iter()
306            .map(|id| id.as_raw_id())
307            .collect_vec();
308        Ok(Response::new(CancelCreatingJobsResponse {
309            status: None,
310            canceled_jobs,
311        }))
312    }
313
314    async fn list_table_fragments(
315        &self,
316        request: Request<ListTableFragmentsRequest>,
317    ) -> Result<Response<ListTableFragmentsResponse>, Status> {
318        let req = request.into_inner();
319        let table_ids = HashSet::<JobId>::from_iter(req.table_ids);
320
321        let mut info = HashMap::new();
322        for job_id in table_ids {
323            let (table_fragments, fragment_actors, _actor_status) = self
324                .metadata_manager
325                .catalog_controller
326                .get_job_fragments_by_id(job_id)
327                .await?;
328            let mut dispatchers = self
329                .metadata_manager
330                .catalog_controller
331                .get_fragment_actor_dispatchers(
332                    table_fragments.fragment_ids().map(|id| id as _).collect(),
333                )
334                .await?;
335            let ctx = table_fragments.ctx.to_protobuf();
336            info.insert(
337                table_fragments.stream_job_id(),
338                TableFragmentInfo {
339                    fragments: table_fragments
340                        .fragments
341                        .into_iter()
342                        .map(|(id, fragment)| FragmentInfo {
343                            id,
344                            actors: fragment_actors
345                                .get(&id)
346                                .into_iter()
347                                .flat_map(|actors| actors.iter().map(|actor| actor.actor_id))
348                                .map(|actor_id| ActorInfo {
349                                    id: actor_id,
350                                    node: Some(fragment.nodes.clone()),
351                                    dispatcher: dispatchers
352                                        .get_mut(&fragment.fragment_id)
353                                        .and_then(|dispatchers| dispatchers.remove(&actor_id))
354                                        .unwrap_or_default(),
355                                })
356                                .collect_vec(),
357                        })
358                        .collect_vec(),
359                    ctx: Some(ctx),
360                },
361            );
362        }
363
364        Ok(Response::new(ListTableFragmentsResponse {
365            table_fragments: info,
366        }))
367    }
368
369    async fn list_streaming_job_states(
370        &self,
371        _request: Request<ListStreamingJobStatesRequest>,
372    ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
373        let job_infos = self
374            .metadata_manager
375            .catalog_controller
376            .list_streaming_job_infos()
377            .await?;
378        let states = job_infos
379            .into_iter()
380            .map(
381                |StreamingJobInfo {
382                     job_id,
383                     job_status,
384                     name,
385                     parallelism,
386                     adaptive_parallelism_strategy,
387                     backfill_parallelism,
388                     backfill_adaptive_parallelism_strategy,
389                     max_parallelism,
390                     resource_group,
391                     database_id,
392                     schema_id,
393                     config_override,
394                     ..
395                 }| {
396                    // While a job is still being created, system tables should surface the
397                    // temporary backfill override instead of the post-backfill target value.
398                    let (parallelism, adaptive_parallelism_strategy) =
399                        effective_streaming_job_parallelism(
400                            job_status,
401                            parallelism,
402                            adaptive_parallelism_strategy,
403                            backfill_parallelism,
404                            backfill_adaptive_parallelism_strategy,
405                        );
406                    let parallelism = match parallelism {
407                        StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
408                        StreamingParallelism::Custom => model::TableParallelism::Custom,
409                        StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
410                    };
411
412                    list_streaming_job_states_response::StreamingJobState {
413                        table_id: job_id,
414                        name,
415                        state: PbState::from(job_status) as _,
416                        parallelism: Some(parallelism.into()),
417                        max_parallelism: max_parallelism as _,
418                        resource_group,
419                        database_id,
420                        schema_id,
421                        config_override,
422                        adaptive_parallelism_strategy,
423                    }
424                },
425            )
426            .collect_vec();
427
428        Ok(Response::new(ListStreamingJobStatesResponse { states }))
429    }
430
431    async fn list_fragment_distribution(
432        &self,
433        _request: Request<ListFragmentDistributionRequest>,
434    ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
435        let include_node = _request.into_inner().include_node.unwrap_or(true);
436        let distributions = if include_node {
437            self.metadata_manager
438                .catalog_controller
439                .list_fragment_descs_with_node(false)
440                .await?
441        } else {
442            self.metadata_manager
443                .catalog_controller
444                .list_fragment_descs_without_node(false)
445                .await?
446        }
447        .into_iter()
448        .map(|(dist, _)| dist)
449        .collect();
450
451        Ok(Response::new(ListFragmentDistributionResponse {
452            distributions,
453        }))
454    }
455
456    async fn list_creating_fragment_distribution(
457        &self,
458        _request: Request<ListCreatingFragmentDistributionRequest>,
459    ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
460        let include_node = _request.into_inner().include_node.unwrap_or(true);
461        let distributions = if include_node {
462            self.metadata_manager
463                .catalog_controller
464                .list_fragment_descs_with_node(true)
465                .await?
466        } else {
467            self.metadata_manager
468                .catalog_controller
469                .list_fragment_descs_without_node(true)
470                .await?
471        }
472        .into_iter()
473        .map(|(dist, _)| dist)
474        .collect();
475
476        Ok(Response::new(ListCreatingFragmentDistributionResponse {
477            distributions,
478        }))
479    }
480
481    async fn list_sink_log_store_tables(
482        &self,
483        _request: Request<ListSinkLogStoreTablesRequest>,
484    ) -> Result<Response<ListSinkLogStoreTablesResponse>, Status> {
485        let tables = self
486            .metadata_manager
487            .catalog_controller
488            .list_sink_log_store_tables()
489            .await?
490            .into_iter()
491            .map(|(sink_id, internal_table_id)| {
492                list_sink_log_store_tables_response::SinkLogStoreTable {
493                    sink_id: sink_id.as_raw_id(),
494                    internal_table_id: internal_table_id.as_raw_id(),
495                }
496            })
497            .collect();
498
499        Ok(Response::new(ListSinkLogStoreTablesResponse { tables }))
500    }
501
502    async fn get_fragment_by_id(
503        &self,
504        request: Request<GetFragmentByIdRequest>,
505    ) -> Result<Response<GetFragmentByIdResponse>, Status> {
506        let req = request.into_inner();
507        let fragment_desc = self
508            .metadata_manager
509            .catalog_controller
510            .get_fragment_desc_by_id(req.fragment_id)
511            .await?;
512        let distribution = fragment_desc
513            .map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams, true));
514        Ok(Response::new(GetFragmentByIdResponse { distribution }))
515    }
516
517    async fn get_fragment_vnodes(
518        &self,
519        request: Request<GetFragmentVnodesRequest>,
520    ) -> Result<Response<GetFragmentVnodesResponse>, Status> {
521        let req = request.into_inner();
522        let fragment_id = req.fragment_id;
523
524        let shared_actor_infos = self.env.shared_actor_infos();
525        let guard = shared_actor_infos.read_guard();
526
527        let fragment_info = guard
528            .get_fragment(fragment_id)
529            .ok_or_else(|| Status::not_found(format!("Fragment {} not found", fragment_id)))?;
530
531        let actor_vnodes = fragment_info
532            .actors
533            .iter()
534            .map(|(actor_id, actor_info)| {
535                let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
536                    vnode_bitmap.iter_ones().map(|v| v as u32).collect()
537                } else {
538                    vec![]
539                };
540
541                get_fragment_vnodes_response::ActorVnodes {
542                    actor_id: *actor_id,
543                    vnode_indices,
544                }
545            })
546            .collect();
547
548        Ok(Response::new(GetFragmentVnodesResponse { actor_vnodes }))
549    }
550
551    async fn get_actor_vnodes(
552        &self,
553        request: Request<GetActorVnodesRequest>,
554    ) -> Result<Response<GetActorVnodesResponse>, Status> {
555        let req = request.into_inner();
556        let actor_id = req.actor_id;
557
558        let shared_actor_infos = self.env.shared_actor_infos();
559        let guard = shared_actor_infos.read_guard();
560
561        // Find the actor across all fragments
562        let actor_info = guard
563            .iter_over_fragments()
564            .find_map(|(_, fragment_info)| fragment_info.actors.get(&actor_id))
565            .ok_or_else(|| Status::not_found(format!("Actor {} not found", actor_id)))?;
566
567        let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
568            vnode_bitmap.iter_ones().map(|v| v as u32).collect()
569        } else {
570            vec![]
571        };
572
573        Ok(Response::new(GetActorVnodesResponse { vnode_indices }))
574    }
575
576    async fn list_actor_states(
577        &self,
578        _request: Request<ListActorStatesRequest>,
579    ) -> Result<Response<ListActorStatesResponse>, Status> {
580        let actor_locations = self
581            .metadata_manager
582            .catalog_controller
583            .list_actor_locations()?;
584        let states = actor_locations
585            .into_iter()
586            .map(|actor_location| list_actor_states_response::ActorState {
587                actor_id: actor_location.actor_id,
588                fragment_id: actor_location.fragment_id,
589                worker_id: actor_location.worker_id,
590            })
591            .collect_vec();
592
593        Ok(Response::new(ListActorStatesResponse { states }))
594    }
595
596    async fn recover(
597        &self,
598        _request: Request<RecoverRequest>,
599    ) -> Result<Response<RecoverResponse>, Status> {
600        self.barrier_manager.adhoc_recovery().await?;
601        Ok(Response::new(RecoverResponse {}))
602    }
603
604    async fn list_actor_splits(
605        &self,
606        _request: Request<ListActorSplitsRequest>,
607    ) -> Result<Response<ListActorSplitsResponse>, Status> {
608        let SourceManagerRunningInfo {
609            source_fragments,
610            backfill_fragments,
611        } = self.stream_manager.source_manager.get_running_info().await;
612
613        let mut actor_splits = self.env.shared_actor_infos().list_assignments();
614
615        let source_actors: HashMap<_, _> = {
616            let all_fragment_ids: HashSet<_> = backfill_fragments
617                .values()
618                .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
619                .chain(source_fragments.values().flatten().copied())
620                .collect();
621
622            let guard = self.env.shared_actor_infos().read_guard();
623            guard
624                .iter_over_fragments()
625                .filter(|(frag_id, _)| all_fragment_ids.contains(*frag_id))
626                .flat_map(|(fragment_id, fragment_info)| {
627                    fragment_info
628                        .actors
629                        .keys()
630                        .copied()
631                        .map(|actor_id| (actor_id, *fragment_id))
632                })
633                .collect()
634        };
635
636        let is_shared_source = self
637            .metadata_manager
638            .catalog_controller
639            .list_source_id_with_shared_types()
640            .await?;
641
642        let fragment_to_source: HashMap<_, _> = source_fragments
643            .into_iter()
644            .flat_map(|(source_id, fragment_ids)| {
645                let source_type = if is_shared_source.get(&source_id).copied().unwrap_or(false) {
646                    FragmentType::SharedSource
647                } else {
648                    FragmentType::NonSharedSource
649                };
650
651                fragment_ids
652                    .into_iter()
653                    .map(move |fragment_id| (fragment_id, (source_id, source_type)))
654            })
655            .chain(
656                backfill_fragments
657                    .into_iter()
658                    .flat_map(|(source_id, fragment_ids)| {
659                        fragment_ids.into_iter().flat_map(
660                            move |(fragment_id, upstream_fragment_id)| {
661                                [
662                                    (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
663                                    (
664                                        upstream_fragment_id,
665                                        (source_id, FragmentType::SharedSource),
666                                    ),
667                                ]
668                            },
669                        )
670                    }),
671            )
672            .collect();
673
674        let actor_splits = source_actors
675            .into_iter()
676            .flat_map(|(actor_id, fragment_id)| {
677                let (source_id, fragment_type) = fragment_to_source
678                    .get(&fragment_id)
679                    .copied()
680                    .unwrap_or_default();
681
682                actor_splits
683                    .remove(&actor_id)
684                    .unwrap_or_default()
685                    .into_iter()
686                    .map(move |split| list_actor_splits_response::ActorSplit {
687                        actor_id,
688                        source_id,
689                        fragment_id,
690                        split_id: split.id().to_string(),
691                        fragment_type: fragment_type.into(),
692                    })
693            })
694            .collect_vec();
695
696        Ok(Response::new(ListActorSplitsResponse { actor_splits }))
697    }
698
699    async fn list_rate_limits(
700        &self,
701        _request: Request<ListRateLimitsRequest>,
702    ) -> Result<Response<ListRateLimitsResponse>, Status> {
703        let rate_limits = self
704            .metadata_manager
705            .catalog_controller
706            .list_rate_limits()
707            .await?;
708        Ok(Response::new(ListRateLimitsResponse { rate_limits }))
709    }
710
711    #[cfg_attr(coverage, coverage(off))]
712    async fn refresh(
713        &self,
714        request: Request<RefreshRequest>,
715    ) -> Result<Response<RefreshResponse>, Status> {
716        let req = request.into_inner();
717
718        tracing::info!("Refreshing table with id: {}", req.table_id);
719
720        let response = self
721            .refresh_manager
722            .trigger_manual_refresh(req, self.env.shared_actor_infos())
723            .await?;
724
725        Ok(Response::new(response))
726    }
727
728    async fn alter_connector_props(
729        &self,
730        request: Request<AlterConnectorPropsRequest>,
731    ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
732        let request = request.into_inner();
733        let secret_manager = LocalSecretManager::global();
734        let (new_props_plaintext, object_id) = match AlterConnectorPropsObject::try_from(
735            request.object_type,
736        ) {
737            Ok(AlterConnectorPropsObject::Sink) => (
738                self.metadata_manager
739                    .update_sink_props_by_sink_id(
740                        request.object_id.into(),
741                        request.changed_props.clone().into_iter().collect(),
742                    )
743                    .await?,
744                request.object_id.into(),
745            ),
746            Ok(AlterConnectorPropsObject::IcebergTable) => {
747                let (prop, sink_id) = self
748                    .metadata_manager
749                    .update_iceberg_table_props_by_table_id(
750                        request.object_id.into(),
751                        request.changed_props.clone().into_iter().collect(),
752                        request.extra_options,
753                    )
754                    .await?;
755                (prop, sink_id.as_object_id())
756            }
757
758            Ok(AlterConnectorPropsObject::Source) => {
759                // alter source and table's associated source
760                if request.connector_conn_ref.is_some() {
761                    return Err(Status::invalid_argument(
762                        "alter connector_conn_ref is not supported",
763                    ));
764                }
765                let options_with_secret = self
766                    .metadata_manager
767                    .catalog_controller
768                    .update_source_props_by_source_id(
769                        request.object_id.into(),
770                        request.changed_props.clone().into_iter().collect(),
771                        request.changed_secret_refs.clone().into_iter().collect(),
772                        false, // SQL ALTER SOURCE enforces alter-on-fly check
773                    )
774                    .await?;
775
776                self.stream_manager
777                    .source_manager
778                    .validate_source_once(request.object_id.into(), options_with_secret.clone())
779                    .await?;
780
781                let (options, secret_refs) = options_with_secret.into_parts();
782                (
783                    secret_manager
784                        .fill_secrets(options, secret_refs)
785                        .map_err(MetaError::from)?
786                        .into_iter()
787                        .collect(),
788                    request.object_id.into(),
789                )
790            }
791            Ok(AlterConnectorPropsObject::Connection) => {
792                // Update the connection and all dependent sources/sinks atomically, and later broadcast
793                // the complete plaintext properties to dependents via barrier.
794                let (
795                    connection_options_with_secret,
796                    updated_sources_with_props,
797                    updated_sinks_with_props,
798                ) = self
799                    .metadata_manager
800                    .catalog_controller
801                    .update_connection_and_dependent_objects_props(
802                        ConnectionId::from(request.object_id),
803                        request.changed_props.clone().into_iter().collect(),
804                        request.changed_secret_refs.clone().into_iter().collect(),
805                    )
806                    .await?;
807
808                // Materialize connection plaintext for observability/debugging (not broadcast directly).
809                let (options, secret_refs) = connection_options_with_secret.into_parts();
810                let new_props_plaintext = secret_manager
811                    .fill_secrets(options, secret_refs)
812                    .map_err(MetaError::from)?
813                    .into_iter()
814                    .collect::<HashMap<String, String>>();
815
816                // Broadcast changes to dependent sources and sinks if any exist.
817                let mut dependent_mutation = HashMap::default();
818                for (source_id, complete_source_props) in updated_sources_with_props {
819                    dependent_mutation.insert(source_id.as_object_id(), complete_source_props);
820                }
821                for (sink_id, complete_sink_props) in updated_sinks_with_props {
822                    dependent_mutation.insert(sink_id.as_object_id(), complete_sink_props);
823                }
824
825                if !dependent_mutation.is_empty() {
826                    let database_id = self
827                        .metadata_manager
828                        .catalog_controller
829                        .get_object_database_id(ConnectionId::from(request.object_id))
830                        .await?;
831                    tracing::info!(
832                        "broadcasting connection {} property changes to dependent object ids: {:?}",
833                        request.object_id,
834                        dependent_mutation.keys().collect_vec()
835                    );
836                    let _version = self
837                        .barrier_scheduler
838                        .run_command(
839                            database_id,
840                            Command::ConnectorPropsChange(dependent_mutation),
841                        )
842                        .await?;
843                }
844
845                (
846                    new_props_plaintext,
847                    ConnectionId::from(request.object_id).as_object_id(),
848                )
849            }
850
851            _ => {
852                unimplemented!(
853                    "Unsupported object type for AlterConnectorProps: {:?}",
854                    request.object_type
855                );
856            }
857        };
858
859        let database_id = self
860            .metadata_manager
861            .catalog_controller
862            .get_object_database_id(object_id)
863            .await?;
864        // Connection updates are broadcast to dependent sources/sinks inside the `Connection` branch above.
865        // For sources/sinks/iceberg-table updates, broadcast the change to the object itself.
866        if AlterConnectorPropsObject::try_from(request.object_type)
867            .is_ok_and(|t| t != AlterConnectorPropsObject::Connection)
868        {
869            let mut mutation = HashMap::default();
870            mutation.insert(object_id, new_props_plaintext);
871            let _version = self
872                .barrier_scheduler
873                .run_command(database_id, Command::ConnectorPropsChange(mutation))
874                .await?;
875        }
876
877        Ok(Response::new(AlterConnectorPropsResponse {}))
878    }
879
880    async fn set_sync_log_store_aligned(
881        &self,
882        request: Request<SetSyncLogStoreAlignedRequest>,
883    ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
884        let req = request.into_inner();
885        let job_id = req.job_id;
886        let aligned = req.aligned;
887
888        self.metadata_manager
889            .catalog_controller
890            .mutate_fragments_by_job_id(
891                job_id,
892                |_mask, stream_node| {
893                    let mut visited = false;
894                    visit_stream_node_mut(stream_node, |body| {
895                        if let NodeBody::SyncLogStore(sync_log_store) = body {
896                            sync_log_store.aligned = aligned;
897                            visited = true
898                        }
899                    });
900                    Ok(visited)
901                },
902                "no fragments found with synced log store",
903            )
904            .await?;
905
906        Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
907    }
908
909    async fn list_cdc_progress(
910        &self,
911        _request: Request<ListCdcProgressRequest>,
912    ) -> Result<Response<ListCdcProgressResponse>, Status> {
913        let cdc_progress = self
914            .barrier_manager
915            .get_cdc_progress()
916            .await?
917            .into_iter()
918            .map(|(job_id, p)| {
919                (
920                    job_id,
921                    PbCdcProgress {
922                        split_total_count: p.split_total_count,
923                        split_backfilled_count: p.split_backfilled_count,
924                        split_completed_count: p.split_completed_count,
925                    },
926                )
927            })
928            .collect();
929        Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
930    }
931
932    async fn list_unmigrated_tables(
933        &self,
934        _request: Request<ListUnmigratedTablesRequest>,
935    ) -> Result<Response<ListUnmigratedTablesResponse>, Status> {
936        let unmigrated_tables = self
937            .metadata_manager
938            .catalog_controller
939            .list_unmigrated_tables()
940            .await?
941            .into_iter()
942            .map(|table| list_unmigrated_tables_response::UnmigratedTable {
943                table_id: table.id,
944                table_name: table.name,
945            })
946            .collect();
947
948        Ok(Response::new(ListUnmigratedTablesResponse {
949            tables: unmigrated_tables,
950        }))
951    }
952
953    /// Orchestrated source property update with pause/update/resume workflow.
954    /// This is the "safe" version that pauses sources before updating and resumes after.
955    async fn alter_source_properties_safe(
956        &self,
957        request: Request<AlterSourcePropertiesSafeRequest>,
958    ) -> Result<Response<AlterSourcePropertiesSafeResponse>, Status> {
959        let request = request.into_inner();
960        let source_id = request.source_id;
961        let options = request.options.unwrap_or_default();
962
963        tracing::info!(
964            source_id = source_id,
965            reset_splits = options.reset_splits,
966            "Starting orchestrated source property update"
967        );
968
969        // Get the database ID for the source
970        let database_id = self
971            .metadata_manager
972            .catalog_controller
973            .get_object_database_id(SourceId::from(source_id))
974            .await?;
975
976        // Step 1: Pause the stream (already commits state)
977        tracing::info!(source_id = source_id, "Pausing stream");
978        self.barrier_scheduler
979            .run_command(database_id, Command::Pause)
980            .await?;
981
982        // Step 2: Update catalog and get the new properties
983        let result = async {
984            let secret_manager = LocalSecretManager::global();
985
986            let options_with_secret = self
987                .metadata_manager
988                .catalog_controller
989                .update_source_props_by_source_id(
990                    source_id.into(),
991                    request.changed_props.clone().into_iter().collect(),
992                    request.changed_secret_refs.clone().into_iter().collect(),
993                    true, // risectl admin operation skips alter-on-fly check
994                )
995                .await?;
996
997            // Validate the source
998            self.stream_manager
999                .source_manager
1000                .validate_source_once(source_id.into(), options_with_secret.clone())
1001                .await?;
1002
1003            let (props, secret_refs) = options_with_secret.into_parts();
1004            let new_props_plaintext: HashMap<String, String> = secret_manager
1005                .fill_secrets(props, secret_refs)
1006                .map_err(MetaError::from)?
1007                .into_iter()
1008                .collect();
1009
1010            // Step 3: Issue ConnectorPropsChange barrier
1011            tracing::info!(
1012                source_id = source_id,
1013                "Issuing ConnectorPropsChange barrier"
1014            );
1015            let mut mutation = HashMap::default();
1016            mutation.insert(source_id.into(), new_props_plaintext);
1017            self.barrier_scheduler
1018                .run_command(database_id, Command::ConnectorPropsChange(mutation))
1019                .await?;
1020
1021            // Step 4: Optional split reset
1022            if options.reset_splits {
1023                tracing::info!(source_id = source_id, "Resetting source splits");
1024                self.stream_manager
1025                    .source_manager
1026                    .reset_source_splits(source_id.into())
1027                    .await?;
1028            }
1029
1030            Ok::<_, MetaError>(())
1031        }
1032        .await;
1033
1034        // Step 5: Resume the stream (even if previous steps failed)
1035        tracing::info!(source_id = source_id, "Resuming stream");
1036        let resume_result = self
1037            .barrier_scheduler
1038            .run_command(database_id, Command::Resume)
1039            .await;
1040
1041        // Return the first error if any
1042        result?;
1043        resume_result?;
1044
1045        tracing::info!(
1046            source_id = source_id,
1047            "Orchestrated source property update completed successfully"
1048        );
1049
1050        Ok(Response::new(AlterSourcePropertiesSafeResponse {}))
1051    }
1052
1053    /// Reset source split assignments (UNSAFE - admin only).
1054    /// This clears persisted split metadata and triggers re-discovery.
1055    async fn reset_source_splits(
1056        &self,
1057        request: Request<ResetSourceSplitsRequest>,
1058    ) -> Result<Response<ResetSourceSplitsResponse>, Status> {
1059        let request = request.into_inner();
1060        let source_id = request.source_id;
1061
1062        tracing::warn!(
1063            source_id = source_id,
1064            "UNSAFE: Resetting source splits - this may cause data duplication or loss"
1065        );
1066
1067        self.stream_manager
1068            .source_manager
1069            .reset_source_splits(source_id.into())
1070            .await?;
1071
1072        Ok(Response::new(ResetSourceSplitsResponse {}))
1073    }
1074
1075    /// Inject specific offsets into source splits (UNSAFE - admin only).
1076    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
1077    async fn inject_source_offsets(
1078        &self,
1079        request: Request<InjectSourceOffsetsRequest>,
1080    ) -> Result<Response<InjectSourceOffsetsResponse>, Status> {
1081        let request = request.into_inner();
1082        let source_id = request.source_id;
1083        let split_offsets = request.split_offsets;
1084
1085        // Validate split IDs exist before proceeding
1086        let applied_split_ids = self
1087            .stream_manager
1088            .source_manager
1089            .validate_inject_source_offsets(source_id.into(), &split_offsets)
1090            .await?;
1091
1092        tracing::warn!(
1093            source_id = source_id,
1094            num_offsets = split_offsets.len(),
1095            "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
1096        );
1097
1098        let database_id = self
1099            .metadata_manager
1100            .catalog_controller
1101            .get_object_database_id(SourceId::from(source_id))
1102            .await?;
1103
1104        self.barrier_scheduler
1105            .run_command(
1106                database_id,
1107                Command::InjectSourceOffsets {
1108                    source_id: SourceId::from(source_id),
1109                    split_offsets,
1110                },
1111            )
1112            .await?;
1113
1114        Ok(Response::new(InjectSourceOffsetsResponse {
1115            applied_split_ids,
1116        }))
1117    }
1118}
1119
1120fn fragment_desc_to_distribution(
1121    fragment_desc: FragmentDesc,
1122    upstreams: Vec<FragmentId>,
1123    include_node: bool,
1124) -> FragmentDistribution {
1125    let node = include_node.then(|| fragment_desc.stream_node.to_protobuf());
1126    FragmentDistribution {
1127        fragment_id: fragment_desc.fragment_id,
1128        table_id: fragment_desc.job_id,
1129        distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
1130        state_table_ids: fragment_desc.state_table_ids.0,
1131        upstream_fragment_ids: upstreams,
1132        fragment_type_mask: fragment_desc.fragment_type_mask as _,
1133        parallelism: fragment_desc.parallelism as _,
1134        vnode_count: fragment_desc.vnode_count as _,
1135        node,
1136        parallelism_policy: fragment_desc.parallelism_policy,
1137    }
1138}
1139
1140#[cfg(test)]
1141mod tests {
1142    use risingwave_meta_model::{JobStatus, StreamingParallelism};
1143
1144    use super::effective_streaming_job_parallelism;
1145
1146    #[test]
1147    fn test_effective_streaming_job_parallelism_prefers_backfill_override() {
1148        let (parallelism, strategy) = effective_streaming_job_parallelism(
1149            JobStatus::Creating,
1150            StreamingParallelism::Adaptive,
1151            Some("BOUNDED(4)".to_owned()),
1152            Some(StreamingParallelism::Adaptive),
1153            Some("BOUNDED(2)".to_owned()),
1154        );
1155
1156        assert_eq!(parallelism, StreamingParallelism::Adaptive);
1157        assert_eq!(strategy.as_deref(), Some("BOUNDED(2)"));
1158    }
1159
1160    #[test]
1161    fn test_effective_streaming_job_parallelism_falls_back_to_job_strategy() {
1162        let (parallelism, strategy) = effective_streaming_job_parallelism(
1163            JobStatus::Initial,
1164            StreamingParallelism::Adaptive,
1165            Some("RATIO(0.5)".to_owned()),
1166            Some(StreamingParallelism::Adaptive),
1167            None,
1168        );
1169
1170        assert_eq!(parallelism, StreamingParallelism::Adaptive);
1171        assert_eq!(strategy.as_deref(), Some("RATIO(0.5)"));
1172    }
1173
1174    #[test]
1175    fn test_effective_streaming_job_parallelism_ignores_backfill_after_creation() {
1176        let (parallelism, strategy) = effective_streaming_job_parallelism(
1177            JobStatus::Created,
1178            StreamingParallelism::Fixed(4),
1179            None,
1180            Some(StreamingParallelism::Fixed(2)),
1181            Some("BOUNDED(2)".to_owned()),
1182        );
1183
1184        assert_eq!(parallelism, StreamingParallelism::Fixed(4));
1185        assert_eq!(strategy, None);
1186    }
1187}