risingwave_meta_service/
stream_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use chrono::DateTime;
18use itertools::Itertools;
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
28use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo};
29use risingwave_meta::{MetaError, model};
30use risingwave_meta_model::{ConnectionId, FragmentId, SourceId, StreamingParallelism};
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
33use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
34use risingwave_pb::meta::list_actor_splits_response::FragmentType;
35use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
36use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
37use risingwave_pb::meta::list_table_fragments_response::{
38    ActorInfo, FragmentInfo, TableFragmentInfo,
39};
40use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
41use risingwave_pb::meta::table_fragments::PbState;
42use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
43use risingwave_pb::meta::*;
44use risingwave_pb::stream_plan::stream_node::NodeBody;
45use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
46use tonic::{Request, Response, Status};
47
48use crate::barrier::{BarrierScheduler, Command};
49use crate::manager::MetaSrvEnv;
50use crate::stream::GlobalStreamManagerRef;
51
52pub type TonicResponse<T> = Result<Response<T>, Status>;
53
54#[derive(Clone)]
55pub struct StreamServiceImpl {
56    env: MetaSrvEnv,
57    barrier_scheduler: BarrierScheduler,
58    barrier_manager: BarrierManagerRef,
59    stream_manager: GlobalStreamManagerRef,
60    metadata_manager: MetadataManager,
61    refresh_manager: GlobalRefreshManagerRef,
62    iceberg_compaction_manager: IcebergCompactionManagerRef,
63}
64
65impl StreamServiceImpl {
66    pub fn new(
67        env: MetaSrvEnv,
68        barrier_scheduler: BarrierScheduler,
69        barrier_manager: BarrierManagerRef,
70        stream_manager: GlobalStreamManagerRef,
71        metadata_manager: MetadataManager,
72        refresh_manager: GlobalRefreshManagerRef,
73        iceberg_compaction_manager: IcebergCompactionManagerRef,
74    ) -> Self {
75        StreamServiceImpl {
76            env,
77            barrier_scheduler,
78            barrier_manager,
79            stream_manager,
80            metadata_manager,
81            refresh_manager,
82            iceberg_compaction_manager,
83        }
84    }
85}
86
87#[async_trait::async_trait]
88impl StreamManagerService for StreamServiceImpl {
89    async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
90        self.env.idle_manager().record_activity();
91        let req = request.into_inner();
92
93        let version_id = self.barrier_scheduler.flush(req.database_id).await?;
94        Ok(Response::new(FlushResponse {
95            status: None,
96            hummock_version_id: version_id,
97        }))
98    }
99
100    async fn list_refresh_table_states(
101        &self,
102        _request: Request<ListRefreshTableStatesRequest>,
103    ) -> TonicResponse<ListRefreshTableStatesResponse> {
104        let refresh_jobs = self.metadata_manager.list_refresh_jobs().await?;
105        let refresh_table_states = refresh_jobs
106            .into_iter()
107            .map(|job| RefreshTableState {
108                table_id: job.table_id,
109                current_status: job.current_status.to_string(),
110                last_trigger_time: job
111                    .last_trigger_time
112                    .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
113                trigger_interval_secs: job.trigger_interval_secs,
114                last_success_time: job
115                    .last_success_time
116                    .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
117            })
118            .collect();
119        Ok(Response::new(ListRefreshTableStatesResponse {
120            states: refresh_table_states,
121        }))
122    }
123
124    async fn list_iceberg_compaction_status(
125        &self,
126        _request: Request<ListIcebergCompactionStatusRequest>,
127    ) -> TonicResponse<ListIcebergCompactionStatusResponse> {
128        let statuses = self
129            .iceberg_compaction_manager
130            .list_compaction_statuses()
131            .await
132            .into_iter()
133            .map(
134                |status| list_iceberg_compaction_status_response::IcebergCompactionStatus {
135                    sink_id: status.sink_id.as_raw_id(),
136                    task_type: status.task_type,
137                    trigger_interval_sec: status.trigger_interval_sec,
138                    trigger_snapshot_count: status.trigger_snapshot_count as u64,
139                    schedule_state: status.schedule_state,
140                    next_compaction_after_sec: status.next_compaction_after_sec,
141                    pending_snapshot_count: status.pending_snapshot_count.map(|count| count as u64),
142                    is_triggerable: status.is_triggerable,
143                },
144            )
145            .collect();
146
147        Ok(Response::new(ListIcebergCompactionStatusResponse {
148            statuses,
149        }))
150    }
151
152    async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
153        for database_id in self.metadata_manager.list_active_database_ids().await? {
154            self.barrier_scheduler
155                .run_command(database_id, Command::pause())
156                .await?;
157        }
158        Ok(Response::new(PauseResponse {}))
159    }
160
161    async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
162        for database_id in self.metadata_manager.list_active_database_ids().await? {
163            self.barrier_scheduler
164                .run_command(database_id, Command::resume())
165                .await?;
166        }
167        Ok(Response::new(ResumeResponse {}))
168    }
169
170    async fn apply_throttle(
171        &self,
172        request: Request<ApplyThrottleRequest>,
173    ) -> Result<Response<ApplyThrottleResponse>, Status> {
174        let request = request.into_inner();
175
176        // Decode enums from raw i32 fields to handle decoupled target/type.
177        let throttle_target = request.throttle_target();
178        let throttle_type = request.throttle_type();
179
180        let raw_object_id: u32;
181        let jobs: HashSet<JobId>;
182        let fragments: HashSet<FragmentId>;
183
184        match (throttle_type, throttle_target) {
185            (ThrottleType::Source, ThrottleTarget::Source | ThrottleTarget::Table) => {
186                (jobs, fragments) = self
187                    .metadata_manager
188                    .update_source_rate_limit_by_source_id(request.id.into(), request.rate)
189                    .await?;
190                raw_object_id = request.id;
191            }
192            (ThrottleType::Backfill, ThrottleTarget::Mv)
193            | (ThrottleType::Backfill, ThrottleTarget::Sink)
194            | (ThrottleType::Backfill, ThrottleTarget::Table) => {
195                fragments = self
196                    .metadata_manager
197                    .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
198                    .await?;
199                jobs = [request.id.into()].into_iter().collect();
200                raw_object_id = request.id;
201            }
202            (ThrottleType::Dml, ThrottleTarget::Table) => {
203                fragments = self
204                    .metadata_manager
205                    .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
206                    .await?;
207                jobs = [request.id.into()].into_iter().collect();
208                raw_object_id = request.id;
209            }
210            (ThrottleType::Sink, ThrottleTarget::Sink) => {
211                fragments = self
212                    .metadata_manager
213                    .update_sink_rate_limit_by_sink_id(request.id.into(), request.rate)
214                    .await?;
215                jobs = [request.id.into()].into_iter().collect();
216                raw_object_id = request.id;
217            }
218            // FIXME(kwannoel): specialize for throttle type x target
219            (_, ThrottleTarget::Fragment) => {
220                self.metadata_manager
221                    .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate)
222                    .await?;
223                let fragment_id = request.id.into();
224                fragments = [fragment_id].into_iter().collect();
225                let job_id = self
226                    .metadata_manager
227                    .catalog_controller
228                    .get_fragment_streaming_job_id(fragment_id)
229                    .await?;
230                jobs = [job_id].into_iter().collect();
231                raw_object_id = job_id.as_raw_id();
232            }
233            _ => {
234                return Err(Status::invalid_argument(format!(
235                    "unsupported throttle target/type: {:?}/{:?}",
236                    throttle_target, throttle_type
237                )));
238            }
239        };
240
241        let database_id = self
242            .metadata_manager
243            .catalog_controller
244            .get_object_database_id(raw_object_id)
245            .await?;
246
247        let throttle_config = ThrottleConfig {
248            rate_limit: request.rate,
249            throttle_type: throttle_type.into(),
250        };
251        let _i = self
252            .barrier_scheduler
253            .run_command(
254                database_id,
255                Command::Throttle {
256                    jobs,
257                    config: fragments
258                        .into_iter()
259                        .map(|fragment_id| (fragment_id, throttle_config))
260                        .collect(),
261                },
262            )
263            .await?;
264
265        Ok(Response::new(ApplyThrottleResponse { status: None }))
266    }
267
268    async fn cancel_creating_jobs(
269        &self,
270        request: Request<CancelCreatingJobsRequest>,
271    ) -> TonicResponse<CancelCreatingJobsResponse> {
272        let req = request.into_inner();
273        let job_ids = match req.jobs.unwrap() {
274            Jobs::Infos(infos) => self
275                .metadata_manager
276                .catalog_controller
277                .find_creating_streaming_job_ids(infos.infos)
278                .await?
279                .into_iter()
280                .map(|id| id.as_job_id())
281                .collect(),
282            Jobs::Ids(jobs) => jobs.job_ids,
283        };
284
285        let canceled_jobs = self
286            .stream_manager
287            .cancel_streaming_jobs(job_ids)
288            .await?
289            .into_iter()
290            .map(|id| id.as_raw_id())
291            .collect_vec();
292        Ok(Response::new(CancelCreatingJobsResponse {
293            status: None,
294            canceled_jobs,
295        }))
296    }
297
298    async fn list_table_fragments(
299        &self,
300        request: Request<ListTableFragmentsRequest>,
301    ) -> Result<Response<ListTableFragmentsResponse>, Status> {
302        let req = request.into_inner();
303        let table_ids = HashSet::<JobId>::from_iter(req.table_ids);
304
305        let mut info = HashMap::new();
306        for job_id in table_ids {
307            let (table_fragments, fragment_actors, _actor_status) = self
308                .metadata_manager
309                .catalog_controller
310                .get_job_fragments_by_id(job_id)
311                .await?;
312            let mut dispatchers = self
313                .metadata_manager
314                .catalog_controller
315                .get_fragment_actor_dispatchers(
316                    table_fragments.fragment_ids().map(|id| id as _).collect(),
317                )
318                .await?;
319            let ctx = table_fragments.ctx.to_protobuf();
320            info.insert(
321                table_fragments.stream_job_id(),
322                TableFragmentInfo {
323                    fragments: table_fragments
324                        .fragments
325                        .into_iter()
326                        .map(|(id, fragment)| FragmentInfo {
327                            id,
328                            actors: fragment_actors
329                                .get(&id)
330                                .into_iter()
331                                .flat_map(|actors| actors.iter().map(|actor| actor.actor_id))
332                                .map(|actor_id| ActorInfo {
333                                    id: actor_id,
334                                    node: Some(fragment.nodes.clone()),
335                                    dispatcher: dispatchers
336                                        .get_mut(&fragment.fragment_id)
337                                        .and_then(|dispatchers| dispatchers.remove(&actor_id))
338                                        .unwrap_or_default(),
339                                })
340                                .collect_vec(),
341                        })
342                        .collect_vec(),
343                    ctx: Some(ctx),
344                },
345            );
346        }
347
348        Ok(Response::new(ListTableFragmentsResponse {
349            table_fragments: info,
350        }))
351    }
352
353    async fn list_streaming_job_states(
354        &self,
355        _request: Request<ListStreamingJobStatesRequest>,
356    ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
357        let job_infos = self
358            .metadata_manager
359            .catalog_controller
360            .list_streaming_job_infos()
361            .await?;
362        let states = job_infos
363            .into_iter()
364            .map(
365                |StreamingJobInfo {
366                     job_id,
367                     job_status,
368                     name,
369                     parallelism,
370                     max_parallelism,
371                     resource_group,
372                     database_id,
373                     schema_id,
374                     config_override,
375                     ..
376                 }| {
377                    let parallelism = match parallelism {
378                        StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
379                        StreamingParallelism::Custom => model::TableParallelism::Custom,
380                        StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
381                    };
382
383                    list_streaming_job_states_response::StreamingJobState {
384                        table_id: job_id,
385                        name,
386                        state: PbState::from(job_status) as _,
387                        parallelism: Some(parallelism.into()),
388                        max_parallelism: max_parallelism as _,
389                        resource_group,
390                        database_id,
391                        schema_id,
392                        config_override,
393                        adaptive_parallelism_strategy: None,
394                    }
395                },
396            )
397            .collect_vec();
398
399        Ok(Response::new(ListStreamingJobStatesResponse { states }))
400    }
401
402    async fn list_fragment_distribution(
403        &self,
404        _request: Request<ListFragmentDistributionRequest>,
405    ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
406        let include_node = _request.into_inner().include_node.unwrap_or(true);
407        let distributions = if include_node {
408            self.metadata_manager
409                .catalog_controller
410                .list_fragment_descs_with_node(false)
411                .await?
412        } else {
413            self.metadata_manager
414                .catalog_controller
415                .list_fragment_descs_without_node(false)
416                .await?
417        }
418        .into_iter()
419        .map(|(dist, _)| dist)
420        .collect();
421
422        Ok(Response::new(ListFragmentDistributionResponse {
423            distributions,
424        }))
425    }
426
427    async fn list_creating_fragment_distribution(
428        &self,
429        _request: Request<ListCreatingFragmentDistributionRequest>,
430    ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
431        let include_node = _request.into_inner().include_node.unwrap_or(true);
432        let distributions = if include_node {
433            self.metadata_manager
434                .catalog_controller
435                .list_fragment_descs_with_node(true)
436                .await?
437        } else {
438            self.metadata_manager
439                .catalog_controller
440                .list_fragment_descs_without_node(true)
441                .await?
442        }
443        .into_iter()
444        .map(|(dist, _)| dist)
445        .collect();
446
447        Ok(Response::new(ListCreatingFragmentDistributionResponse {
448            distributions,
449        }))
450    }
451
452    async fn list_sink_log_store_tables(
453        &self,
454        _request: Request<ListSinkLogStoreTablesRequest>,
455    ) -> Result<Response<ListSinkLogStoreTablesResponse>, Status> {
456        let tables = self
457            .metadata_manager
458            .catalog_controller
459            .list_sink_log_store_tables()
460            .await?
461            .into_iter()
462            .map(|(sink_id, internal_table_id)| {
463                list_sink_log_store_tables_response::SinkLogStoreTable {
464                    sink_id: sink_id.as_raw_id(),
465                    internal_table_id: internal_table_id.as_raw_id(),
466                }
467            })
468            .collect();
469
470        Ok(Response::new(ListSinkLogStoreTablesResponse { tables }))
471    }
472
473    async fn get_fragment_by_id(
474        &self,
475        request: Request<GetFragmentByIdRequest>,
476    ) -> Result<Response<GetFragmentByIdResponse>, Status> {
477        let req = request.into_inner();
478        let fragment_desc = self
479            .metadata_manager
480            .catalog_controller
481            .get_fragment_desc_by_id(req.fragment_id)
482            .await?;
483        let distribution = fragment_desc
484            .map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams, true));
485        Ok(Response::new(GetFragmentByIdResponse { distribution }))
486    }
487
488    async fn get_fragment_vnodes(
489        &self,
490        request: Request<GetFragmentVnodesRequest>,
491    ) -> Result<Response<GetFragmentVnodesResponse>, Status> {
492        let req = request.into_inner();
493        let fragment_id = req.fragment_id;
494
495        let shared_actor_infos = self.env.shared_actor_infos();
496        let guard = shared_actor_infos.read_guard();
497
498        let fragment_info = guard
499            .get_fragment(fragment_id)
500            .ok_or_else(|| Status::not_found(format!("Fragment {} not found", fragment_id)))?;
501
502        let actor_vnodes = fragment_info
503            .actors
504            .iter()
505            .map(|(actor_id, actor_info)| {
506                let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
507                    vnode_bitmap.iter_ones().map(|v| v as u32).collect()
508                } else {
509                    vec![]
510                };
511
512                get_fragment_vnodes_response::ActorVnodes {
513                    actor_id: *actor_id,
514                    vnode_indices,
515                }
516            })
517            .collect();
518
519        Ok(Response::new(GetFragmentVnodesResponse { actor_vnodes }))
520    }
521
522    async fn get_actor_vnodes(
523        &self,
524        request: Request<GetActorVnodesRequest>,
525    ) -> Result<Response<GetActorVnodesResponse>, Status> {
526        let req = request.into_inner();
527        let actor_id = req.actor_id;
528
529        let shared_actor_infos = self.env.shared_actor_infos();
530        let guard = shared_actor_infos.read_guard();
531
532        // Find the actor across all fragments
533        let actor_info = guard
534            .iter_over_fragments()
535            .find_map(|(_, fragment_info)| fragment_info.actors.get(&actor_id))
536            .ok_or_else(|| Status::not_found(format!("Actor {} not found", actor_id)))?;
537
538        let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
539            vnode_bitmap.iter_ones().map(|v| v as u32).collect()
540        } else {
541            vec![]
542        };
543
544        Ok(Response::new(GetActorVnodesResponse { vnode_indices }))
545    }
546
547    async fn list_actor_states(
548        &self,
549        _request: Request<ListActorStatesRequest>,
550    ) -> Result<Response<ListActorStatesResponse>, Status> {
551        let actor_locations = self
552            .metadata_manager
553            .catalog_controller
554            .list_actor_locations()?;
555        let states = actor_locations
556            .into_iter()
557            .map(|actor_location| list_actor_states_response::ActorState {
558                actor_id: actor_location.actor_id,
559                fragment_id: actor_location.fragment_id,
560                worker_id: actor_location.worker_id,
561            })
562            .collect_vec();
563
564        Ok(Response::new(ListActorStatesResponse { states }))
565    }
566
567    async fn recover(
568        &self,
569        _request: Request<RecoverRequest>,
570    ) -> Result<Response<RecoverResponse>, Status> {
571        self.barrier_manager.adhoc_recovery().await?;
572        Ok(Response::new(RecoverResponse {}))
573    }
574
575    async fn list_actor_splits(
576        &self,
577        _request: Request<ListActorSplitsRequest>,
578    ) -> Result<Response<ListActorSplitsResponse>, Status> {
579        let SourceManagerRunningInfo {
580            source_fragments,
581            backfill_fragments,
582        } = self.stream_manager.source_manager.get_running_info().await;
583
584        let mut actor_splits = self.env.shared_actor_infos().list_assignments();
585
586        let source_actors: HashMap<_, _> = {
587            let all_fragment_ids: HashSet<_> = backfill_fragments
588                .values()
589                .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
590                .chain(source_fragments.values().flatten().copied())
591                .collect();
592
593            let guard = self.env.shared_actor_infos().read_guard();
594            guard
595                .iter_over_fragments()
596                .filter(|(frag_id, _)| all_fragment_ids.contains(*frag_id))
597                .flat_map(|(fragment_id, fragment_info)| {
598                    fragment_info
599                        .actors
600                        .keys()
601                        .copied()
602                        .map(|actor_id| (actor_id, *fragment_id))
603                })
604                .collect()
605        };
606
607        let is_shared_source = self
608            .metadata_manager
609            .catalog_controller
610            .list_source_id_with_shared_types()
611            .await?;
612
613        let fragment_to_source: HashMap<_, _> = source_fragments
614            .into_iter()
615            .flat_map(|(source_id, fragment_ids)| {
616                let source_type = if is_shared_source.get(&source_id).copied().unwrap_or(false) {
617                    FragmentType::SharedSource
618                } else {
619                    FragmentType::NonSharedSource
620                };
621
622                fragment_ids
623                    .into_iter()
624                    .map(move |fragment_id| (fragment_id, (source_id, source_type)))
625            })
626            .chain(
627                backfill_fragments
628                    .into_iter()
629                    .flat_map(|(source_id, fragment_ids)| {
630                        fragment_ids.into_iter().flat_map(
631                            move |(fragment_id, upstream_fragment_id)| {
632                                [
633                                    (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
634                                    (
635                                        upstream_fragment_id,
636                                        (source_id, FragmentType::SharedSource),
637                                    ),
638                                ]
639                            },
640                        )
641                    }),
642            )
643            .collect();
644
645        let actor_splits = source_actors
646            .into_iter()
647            .flat_map(|(actor_id, fragment_id)| {
648                let (source_id, fragment_type) = fragment_to_source
649                    .get(&fragment_id)
650                    .copied()
651                    .unwrap_or_default();
652
653                actor_splits
654                    .remove(&actor_id)
655                    .unwrap_or_default()
656                    .into_iter()
657                    .map(move |split| list_actor_splits_response::ActorSplit {
658                        actor_id,
659                        source_id,
660                        fragment_id,
661                        split_id: split.id().to_string(),
662                        fragment_type: fragment_type.into(),
663                    })
664            })
665            .collect_vec();
666
667        Ok(Response::new(ListActorSplitsResponse { actor_splits }))
668    }
669
670    async fn list_rate_limits(
671        &self,
672        _request: Request<ListRateLimitsRequest>,
673    ) -> Result<Response<ListRateLimitsResponse>, Status> {
674        let rate_limits = self
675            .metadata_manager
676            .catalog_controller
677            .list_rate_limits()
678            .await?;
679        Ok(Response::new(ListRateLimitsResponse { rate_limits }))
680    }
681
682    #[cfg_attr(coverage, coverage(off))]
683    async fn refresh(
684        &self,
685        request: Request<RefreshRequest>,
686    ) -> Result<Response<RefreshResponse>, Status> {
687        let req = request.into_inner();
688
689        tracing::info!("Refreshing table with id: {}", req.table_id);
690
691        let response = self
692            .refresh_manager
693            .trigger_manual_refresh(req, self.env.shared_actor_infos())
694            .await?;
695
696        Ok(Response::new(response))
697    }
698
699    async fn alter_connector_props(
700        &self,
701        request: Request<AlterConnectorPropsRequest>,
702    ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
703        let request = request.into_inner();
704        let secret_manager = LocalSecretManager::global();
705        let (new_props_plaintext, object_id) = match AlterConnectorPropsObject::try_from(
706            request.object_type,
707        ) {
708            Ok(AlterConnectorPropsObject::Sink) => (
709                self.metadata_manager
710                    .update_sink_props_by_sink_id(
711                        request.object_id.into(),
712                        request.changed_props.clone().into_iter().collect(),
713                    )
714                    .await?,
715                request.object_id.into(),
716            ),
717            Ok(AlterConnectorPropsObject::IcebergTable) => {
718                let (prop, sink_id) = self
719                    .metadata_manager
720                    .update_iceberg_table_props_by_table_id(
721                        request.object_id.into(),
722                        request.changed_props.clone().into_iter().collect(),
723                        request.extra_options,
724                    )
725                    .await?;
726                (prop, sink_id.as_object_id())
727            }
728
729            Ok(AlterConnectorPropsObject::Source) => {
730                // alter source and table's associated source
731                if request.connector_conn_ref.is_some() {
732                    return Err(Status::invalid_argument(
733                        "alter connector_conn_ref is not supported",
734                    ));
735                }
736                let options_with_secret = self
737                    .metadata_manager
738                    .catalog_controller
739                    .update_source_props_by_source_id(
740                        request.object_id.into(),
741                        request.changed_props.clone().into_iter().collect(),
742                        request.changed_secret_refs.clone().into_iter().collect(),
743                        false, // SQL ALTER SOURCE enforces alter-on-fly check
744                    )
745                    .await?;
746
747                self.stream_manager
748                    .source_manager
749                    .validate_source_once(request.object_id.into(), options_with_secret.clone())
750                    .await?;
751
752                let (options, secret_refs) = options_with_secret.into_parts();
753                (
754                    secret_manager
755                        .fill_secrets(options, secret_refs)
756                        .map_err(MetaError::from)?
757                        .into_iter()
758                        .collect(),
759                    request.object_id.into(),
760                )
761            }
762            Ok(AlterConnectorPropsObject::Connection) => {
763                // Update the connection and all dependent sources/sinks atomically, and later broadcast
764                // the complete plaintext properties to dependents via barrier.
765                let (
766                    connection_options_with_secret,
767                    updated_sources_with_props,
768                    updated_sinks_with_props,
769                ) = self
770                    .metadata_manager
771                    .catalog_controller
772                    .update_connection_and_dependent_objects_props(
773                        ConnectionId::from(request.object_id),
774                        request.changed_props.clone().into_iter().collect(),
775                        request.changed_secret_refs.clone().into_iter().collect(),
776                    )
777                    .await?;
778
779                // Materialize connection plaintext for observability/debugging (not broadcast directly).
780                let (options, secret_refs) = connection_options_with_secret.into_parts();
781                let new_props_plaintext = secret_manager
782                    .fill_secrets(options, secret_refs)
783                    .map_err(MetaError::from)?
784                    .into_iter()
785                    .collect::<HashMap<String, String>>();
786
787                // Broadcast changes to dependent sources and sinks if any exist.
788                let mut dependent_mutation = HashMap::default();
789                for (source_id, complete_source_props) in updated_sources_with_props {
790                    dependent_mutation.insert(source_id.as_object_id(), complete_source_props);
791                }
792                for (sink_id, complete_sink_props) in updated_sinks_with_props {
793                    dependent_mutation.insert(sink_id.as_object_id(), complete_sink_props);
794                }
795
796                if !dependent_mutation.is_empty() {
797                    let database_id = self
798                        .metadata_manager
799                        .catalog_controller
800                        .get_object_database_id(ConnectionId::from(request.object_id))
801                        .await?;
802                    tracing::info!(
803                        "broadcasting connection {} property changes to dependent object ids: {:?}",
804                        request.object_id,
805                        dependent_mutation.keys().collect_vec()
806                    );
807                    let _version = self
808                        .barrier_scheduler
809                        .run_command(
810                            database_id,
811                            Command::ConnectorPropsChange(dependent_mutation),
812                        )
813                        .await?;
814                }
815
816                (
817                    new_props_plaintext,
818                    ConnectionId::from(request.object_id).as_object_id(),
819                )
820            }
821
822            _ => {
823                unimplemented!(
824                    "Unsupported object type for AlterConnectorProps: {:?}",
825                    request.object_type
826                );
827            }
828        };
829
830        let database_id = self
831            .metadata_manager
832            .catalog_controller
833            .get_object_database_id(object_id)
834            .await?;
835        // Connection updates are broadcast to dependent sources/sinks inside the `Connection` branch above.
836        // For sources/sinks/iceberg-table updates, broadcast the change to the object itself.
837        if AlterConnectorPropsObject::try_from(request.object_type)
838            .is_ok_and(|t| t != AlterConnectorPropsObject::Connection)
839        {
840            let mut mutation = HashMap::default();
841            mutation.insert(object_id, new_props_plaintext);
842            let _version = self
843                .barrier_scheduler
844                .run_command(database_id, Command::ConnectorPropsChange(mutation))
845                .await?;
846        }
847
848        Ok(Response::new(AlterConnectorPropsResponse {}))
849    }
850
851    async fn set_sync_log_store_aligned(
852        &self,
853        request: Request<SetSyncLogStoreAlignedRequest>,
854    ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
855        let req = request.into_inner();
856        let job_id = req.job_id;
857        let aligned = req.aligned;
858
859        self.metadata_manager
860            .catalog_controller
861            .mutate_fragments_by_job_id(
862                job_id,
863                |_mask, stream_node| {
864                    let mut visited = false;
865                    visit_stream_node_mut(stream_node, |body| {
866                        if let NodeBody::SyncLogStore(sync_log_store) = body {
867                            sync_log_store.aligned = aligned;
868                            visited = true
869                        }
870                    });
871                    Ok(visited)
872                },
873                "no fragments found with synced log store",
874            )
875            .await?;
876
877        Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
878    }
879
880    async fn list_cdc_progress(
881        &self,
882        _request: Request<ListCdcProgressRequest>,
883    ) -> Result<Response<ListCdcProgressResponse>, Status> {
884        let cdc_progress = self
885            .barrier_manager
886            .get_cdc_progress()
887            .await?
888            .into_iter()
889            .map(|(job_id, p)| {
890                (
891                    job_id,
892                    PbCdcProgress {
893                        split_total_count: p.split_total_count,
894                        split_backfilled_count: p.split_backfilled_count,
895                        split_completed_count: p.split_completed_count,
896                    },
897                )
898            })
899            .collect();
900        Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
901    }
902
903    async fn list_unmigrated_tables(
904        &self,
905        _request: Request<ListUnmigratedTablesRequest>,
906    ) -> Result<Response<ListUnmigratedTablesResponse>, Status> {
907        let unmigrated_tables = self
908            .metadata_manager
909            .catalog_controller
910            .list_unmigrated_tables()
911            .await?
912            .into_iter()
913            .map(|table| list_unmigrated_tables_response::UnmigratedTable {
914                table_id: table.id,
915                table_name: table.name,
916            })
917            .collect();
918
919        Ok(Response::new(ListUnmigratedTablesResponse {
920            tables: unmigrated_tables,
921        }))
922    }
923
924    /// Orchestrated source property update with pause/update/resume workflow.
925    /// This is the "safe" version that pauses sources before updating and resumes after.
926    async fn alter_source_properties_safe(
927        &self,
928        request: Request<AlterSourcePropertiesSafeRequest>,
929    ) -> Result<Response<AlterSourcePropertiesSafeResponse>, Status> {
930        let request = request.into_inner();
931        let source_id = request.source_id;
932        let options = request.options.unwrap_or_default();
933
934        tracing::info!(
935            source_id = source_id,
936            reset_splits = options.reset_splits,
937            "Starting orchestrated source property update"
938        );
939
940        // Get the database ID for the source
941        let database_id = self
942            .metadata_manager
943            .catalog_controller
944            .get_object_database_id(SourceId::from(source_id))
945            .await?;
946
947        // Step 1: Pause the stream (already commits state)
948        tracing::info!(source_id = source_id, "Pausing stream");
949        self.barrier_scheduler
950            .run_command(database_id, Command::Pause)
951            .await?;
952
953        // Step 2: Update catalog and get the new properties
954        let result = async {
955            let secret_manager = LocalSecretManager::global();
956
957            let options_with_secret = self
958                .metadata_manager
959                .catalog_controller
960                .update_source_props_by_source_id(
961                    source_id.into(),
962                    request.changed_props.clone().into_iter().collect(),
963                    request.changed_secret_refs.clone().into_iter().collect(),
964                    true, // risectl admin operation skips alter-on-fly check
965                )
966                .await?;
967
968            // Validate the source
969            self.stream_manager
970                .source_manager
971                .validate_source_once(source_id.into(), options_with_secret.clone())
972                .await?;
973
974            let (props, secret_refs) = options_with_secret.into_parts();
975            let new_props_plaintext: HashMap<String, String> = secret_manager
976                .fill_secrets(props, secret_refs)
977                .map_err(MetaError::from)?
978                .into_iter()
979                .collect();
980
981            // Step 3: Issue ConnectorPropsChange barrier
982            tracing::info!(
983                source_id = source_id,
984                "Issuing ConnectorPropsChange barrier"
985            );
986            let mut mutation = HashMap::default();
987            mutation.insert(source_id.into(), new_props_plaintext);
988            self.barrier_scheduler
989                .run_command(database_id, Command::ConnectorPropsChange(mutation))
990                .await?;
991
992            // Step 4: Optional split reset
993            if options.reset_splits {
994                tracing::info!(source_id = source_id, "Resetting source splits");
995                self.stream_manager
996                    .source_manager
997                    .reset_source_splits(source_id.into())
998                    .await?;
999            }
1000
1001            Ok::<_, MetaError>(())
1002        }
1003        .await;
1004
1005        // Step 5: Resume the stream (even if previous steps failed)
1006        tracing::info!(source_id = source_id, "Resuming stream");
1007        let resume_result = self
1008            .barrier_scheduler
1009            .run_command(database_id, Command::Resume)
1010            .await;
1011
1012        // Return the first error if any
1013        result?;
1014        resume_result?;
1015
1016        tracing::info!(
1017            source_id = source_id,
1018            "Orchestrated source property update completed successfully"
1019        );
1020
1021        Ok(Response::new(AlterSourcePropertiesSafeResponse {}))
1022    }
1023
1024    /// Reset source split assignments (UNSAFE - admin only).
1025    /// This clears persisted split metadata and triggers re-discovery.
1026    async fn reset_source_splits(
1027        &self,
1028        request: Request<ResetSourceSplitsRequest>,
1029    ) -> Result<Response<ResetSourceSplitsResponse>, Status> {
1030        let request = request.into_inner();
1031        let source_id = request.source_id;
1032
1033        tracing::warn!(
1034            source_id = source_id,
1035            "UNSAFE: Resetting source splits - this may cause data duplication or loss"
1036        );
1037
1038        self.stream_manager
1039            .source_manager
1040            .reset_source_splits(source_id.into())
1041            .await?;
1042
1043        Ok(Response::new(ResetSourceSplitsResponse {}))
1044    }
1045
1046    /// Inject specific offsets into source splits (UNSAFE - admin only).
1047    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
1048    async fn inject_source_offsets(
1049        &self,
1050        request: Request<InjectSourceOffsetsRequest>,
1051    ) -> Result<Response<InjectSourceOffsetsResponse>, Status> {
1052        let request = request.into_inner();
1053        let source_id = request.source_id;
1054        let split_offsets = request.split_offsets;
1055
1056        // Validate split IDs exist before proceeding
1057        let applied_split_ids = self
1058            .stream_manager
1059            .source_manager
1060            .validate_inject_source_offsets(source_id.into(), &split_offsets)
1061            .await?;
1062
1063        tracing::warn!(
1064            source_id = source_id,
1065            num_offsets = split_offsets.len(),
1066            "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
1067        );
1068
1069        let database_id = self
1070            .metadata_manager
1071            .catalog_controller
1072            .get_object_database_id(SourceId::from(source_id))
1073            .await?;
1074
1075        self.barrier_scheduler
1076            .run_command(
1077                database_id,
1078                Command::InjectSourceOffsets {
1079                    source_id: SourceId::from(source_id),
1080                    split_offsets,
1081                },
1082            )
1083            .await?;
1084
1085        Ok(Response::new(InjectSourceOffsetsResponse {
1086            applied_split_ids,
1087        }))
1088    }
1089}
1090
1091fn fragment_desc_to_distribution(
1092    fragment_desc: FragmentDesc,
1093    upstreams: Vec<FragmentId>,
1094    include_node: bool,
1095) -> FragmentDistribution {
1096    let node = include_node.then(|| fragment_desc.stream_node.to_protobuf());
1097    FragmentDistribution {
1098        fragment_id: fragment_desc.fragment_id,
1099        table_id: fragment_desc.job_id,
1100        distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
1101        state_table_ids: fragment_desc.state_table_ids.0,
1102        upstream_fragment_ids: upstreams,
1103        fragment_type_mask: fragment_desc.fragment_type_mask as _,
1104        parallelism: fragment_desc.parallelism as _,
1105        vnode_count: fragment_desc.vnode_count as _,
1106        node,
1107        parallelism_policy: fragment_desc.parallelism_policy,
1108    }
1109}