risingwave_meta_service/
stream_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use chrono::DateTime;
18use itertools::Itertools;
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo};
28use risingwave_meta::{MetaError, model};
29use risingwave_meta_model::{ConnectionId, FragmentId, StreamingParallelism};
30use risingwave_pb::common::ThrottleType;
31use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
32use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
33use risingwave_pb::meta::list_actor_splits_response::FragmentType;
34use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
35use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
36use risingwave_pb::meta::list_table_fragments_response::{
37    ActorInfo, FragmentInfo, TableFragmentInfo,
38};
39use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
40use risingwave_pb::meta::table_fragments::PbState;
41use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
42use risingwave_pb::meta::*;
43use risingwave_pb::stream_plan::stream_node::NodeBody;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use tonic::{Request, Response, Status};
46
47use crate::barrier::{BarrierScheduler, Command};
48use crate::manager::MetaSrvEnv;
49use crate::stream::GlobalStreamManagerRef;
50
51pub type TonicResponse<T> = Result<Response<T>, Status>;
52
53#[derive(Clone)]
54pub struct StreamServiceImpl {
55    env: MetaSrvEnv,
56    barrier_scheduler: BarrierScheduler,
57    barrier_manager: BarrierManagerRef,
58    stream_manager: GlobalStreamManagerRef,
59    metadata_manager: MetadataManager,
60    refresh_manager: GlobalRefreshManagerRef,
61}
62
63impl StreamServiceImpl {
64    pub fn new(
65        env: MetaSrvEnv,
66        barrier_scheduler: BarrierScheduler,
67        barrier_manager: BarrierManagerRef,
68        stream_manager: GlobalStreamManagerRef,
69        metadata_manager: MetadataManager,
70        refresh_manager: GlobalRefreshManagerRef,
71    ) -> Self {
72        StreamServiceImpl {
73            env,
74            barrier_scheduler,
75            barrier_manager,
76            stream_manager,
77            metadata_manager,
78            refresh_manager,
79        }
80    }
81}
82
83#[async_trait::async_trait]
84impl StreamManagerService for StreamServiceImpl {
85    async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
86        self.env.idle_manager().record_activity();
87        let req = request.into_inner();
88
89        let version_id = self.barrier_scheduler.flush(req.database_id).await?;
90        Ok(Response::new(FlushResponse {
91            status: None,
92            hummock_version_id: version_id.to_u64(),
93        }))
94    }
95
96    async fn list_refresh_table_states(
97        &self,
98        _request: Request<ListRefreshTableStatesRequest>,
99    ) -> TonicResponse<ListRefreshTableStatesResponse> {
100        let refresh_jobs = self.metadata_manager.list_refresh_jobs().await?;
101        let refresh_table_states = refresh_jobs
102            .into_iter()
103            .map(|job| RefreshTableState {
104                table_id: job.table_id,
105                current_status: job.current_status.to_string(),
106                last_trigger_time: job
107                    .last_trigger_time
108                    .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
109                trigger_interval_secs: job.trigger_interval_secs,
110                last_success_time: job
111                    .last_success_time
112                    .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
113            })
114            .collect();
115        Ok(Response::new(ListRefreshTableStatesResponse {
116            states: refresh_table_states,
117        }))
118    }
119
120    async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
121        for database_id in self.metadata_manager.list_active_database_ids().await? {
122            self.barrier_scheduler
123                .run_command(database_id, Command::pause())
124                .await?;
125        }
126        Ok(Response::new(PauseResponse {}))
127    }
128
129    async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
130        for database_id in self.metadata_manager.list_active_database_ids().await? {
131            self.barrier_scheduler
132                .run_command(database_id, Command::resume())
133                .await?;
134        }
135        Ok(Response::new(ResumeResponse {}))
136    }
137
138    async fn apply_throttle(
139        &self,
140        request: Request<ApplyThrottleRequest>,
141    ) -> Result<Response<ApplyThrottleResponse>, Status> {
142        let request = request.into_inner();
143
144        // Decode enums from raw i32 fields to handle decoupled target/type.
145        let throttle_target = request.throttle_target();
146        let throttle_type = request.throttle_type();
147
148        let raw_object_id: u32;
149        let jobs: HashSet<JobId>;
150        let fragments: HashSet<FragmentId>;
151
152        match (throttle_type, throttle_target) {
153            (ThrottleType::Source, ThrottleTarget::Source | ThrottleTarget::Table) => {
154                (jobs, fragments) = self
155                    .metadata_manager
156                    .update_source_rate_limit_by_source_id(request.id.into(), request.rate)
157                    .await?;
158                raw_object_id = request.id;
159            }
160            (ThrottleType::Backfill, ThrottleTarget::Mv)
161            | (ThrottleType::Backfill, ThrottleTarget::Sink)
162            | (ThrottleType::Backfill, ThrottleTarget::Table) => {
163                fragments = self
164                    .metadata_manager
165                    .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
166                    .await?;
167                jobs = [request.id.into()].into_iter().collect();
168                raw_object_id = request.id;
169            }
170            (ThrottleType::Dml, ThrottleTarget::Table) => {
171                fragments = self
172                    .metadata_manager
173                    .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
174                    .await?;
175                jobs = [request.id.into()].into_iter().collect();
176                raw_object_id = request.id;
177            }
178            (ThrottleType::Sink, ThrottleTarget::Sink) => {
179                fragments = self
180                    .metadata_manager
181                    .update_sink_rate_limit_by_sink_id(request.id.into(), request.rate)
182                    .await?;
183                jobs = [request.id.into()].into_iter().collect();
184                raw_object_id = request.id;
185            }
186            // FIXME(kwannoel): specialize for throttle type x target
187            (_, ThrottleTarget::Fragment) => {
188                self.metadata_manager
189                    .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate)
190                    .await?;
191                let fragment_id = request.id.into();
192                fragments = [fragment_id].into_iter().collect();
193                let job_id = self
194                    .metadata_manager
195                    .catalog_controller
196                    .get_fragment_streaming_job_id(fragment_id)
197                    .await?;
198                jobs = [job_id].into_iter().collect();
199                raw_object_id = job_id.as_raw_id();
200            }
201            _ => {
202                return Err(Status::invalid_argument(format!(
203                    "unsupported throttle target/type: {:?}/{:?}",
204                    throttle_target, throttle_type
205                )));
206            }
207        };
208
209        let database_id = self
210            .metadata_manager
211            .catalog_controller
212            .get_object_database_id(raw_object_id)
213            .await?;
214
215        let throttle_config = ThrottleConfig {
216            rate_limit: request.rate,
217            throttle_type: throttle_type.into(),
218        };
219        let _i = self
220            .barrier_scheduler
221            .run_command(
222                database_id,
223                Command::Throttle {
224                    jobs,
225                    config: fragments
226                        .into_iter()
227                        .map(|fragment_id| (fragment_id, throttle_config))
228                        .collect(),
229                },
230            )
231            .await?;
232
233        Ok(Response::new(ApplyThrottleResponse { status: None }))
234    }
235
236    async fn cancel_creating_jobs(
237        &self,
238        request: Request<CancelCreatingJobsRequest>,
239    ) -> TonicResponse<CancelCreatingJobsResponse> {
240        let req = request.into_inner();
241        let job_ids = match req.jobs.unwrap() {
242            Jobs::Infos(infos) => self
243                .metadata_manager
244                .catalog_controller
245                .find_creating_streaming_job_ids(infos.infos)
246                .await?
247                .into_iter()
248                .map(|id| id.as_job_id())
249                .collect(),
250            Jobs::Ids(jobs) => jobs.job_ids,
251        };
252
253        let canceled_jobs = self
254            .stream_manager
255            .cancel_streaming_jobs(job_ids)
256            .await?
257            .into_iter()
258            .map(|id| id.as_raw_id())
259            .collect_vec();
260        Ok(Response::new(CancelCreatingJobsResponse {
261            status: None,
262            canceled_jobs,
263        }))
264    }
265
266    async fn list_table_fragments(
267        &self,
268        request: Request<ListTableFragmentsRequest>,
269    ) -> Result<Response<ListTableFragmentsResponse>, Status> {
270        let req = request.into_inner();
271        let table_ids = HashSet::<JobId>::from_iter(req.table_ids);
272
273        let mut info = HashMap::new();
274        for job_id in table_ids {
275            let table_fragments = self
276                .metadata_manager
277                .catalog_controller
278                .get_job_fragments_by_id(job_id)
279                .await?;
280            let mut dispatchers = self
281                .metadata_manager
282                .catalog_controller
283                .get_fragment_actor_dispatchers(
284                    table_fragments.fragment_ids().map(|id| id as _).collect(),
285                )
286                .await?;
287            let ctx = table_fragments.ctx.to_protobuf();
288            info.insert(
289                table_fragments.stream_job_id(),
290                TableFragmentInfo {
291                    fragments: table_fragments
292                        .fragments
293                        .into_iter()
294                        .map(|(id, fragment)| FragmentInfo {
295                            id,
296                            actors: fragment
297                                .actors
298                                .into_iter()
299                                .map(|actor| ActorInfo {
300                                    id: actor.actor_id,
301                                    node: Some(fragment.nodes.clone()),
302                                    dispatcher: dispatchers
303                                        .get_mut(&(fragment.fragment_id as _))
304                                        .and_then(|dispatchers| {
305                                            dispatchers.remove(&(actor.actor_id as _))
306                                        })
307                                        .unwrap_or_default(),
308                                })
309                                .collect_vec(),
310                        })
311                        .collect_vec(),
312                    ctx: Some(ctx),
313                },
314            );
315        }
316
317        Ok(Response::new(ListTableFragmentsResponse {
318            table_fragments: info,
319        }))
320    }
321
322    async fn list_streaming_job_states(
323        &self,
324        _request: Request<ListStreamingJobStatesRequest>,
325    ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
326        let job_infos = self
327            .metadata_manager
328            .catalog_controller
329            .list_streaming_job_infos()
330            .await?;
331        let states = job_infos
332            .into_iter()
333            .map(
334                |StreamingJobInfo {
335                     job_id,
336                     job_status,
337                     name,
338                     parallelism,
339                     max_parallelism,
340                     resource_group,
341                     database_id,
342                     schema_id,
343                     config_override,
344                     ..
345                 }| {
346                    let parallelism = match parallelism {
347                        StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
348                        StreamingParallelism::Custom => model::TableParallelism::Custom,
349                        StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
350                    };
351
352                    list_streaming_job_states_response::StreamingJobState {
353                        table_id: job_id,
354                        name,
355                        state: PbState::from(job_status) as _,
356                        parallelism: Some(parallelism.into()),
357                        max_parallelism: max_parallelism as _,
358                        resource_group,
359                        database_id,
360                        schema_id,
361                        config_override,
362                    }
363                },
364            )
365            .collect_vec();
366
367        Ok(Response::new(ListStreamingJobStatesResponse { states }))
368    }
369
370    async fn list_fragment_distribution(
371        &self,
372        _request: Request<ListFragmentDistributionRequest>,
373    ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
374        let distributions = self
375            .metadata_manager
376            .catalog_controller
377            .list_fragment_descs(false)
378            .await?
379            .into_iter()
380            .map(|(dist, _)| dist)
381            .collect();
382
383        Ok(Response::new(ListFragmentDistributionResponse {
384            distributions,
385        }))
386    }
387
388    async fn list_creating_fragment_distribution(
389        &self,
390        _request: Request<ListCreatingFragmentDistributionRequest>,
391    ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
392        let distributions = self
393            .metadata_manager
394            .catalog_controller
395            .list_fragment_descs(true)
396            .await?
397            .into_iter()
398            .map(|(dist, _)| dist)
399            .collect();
400
401        Ok(Response::new(ListCreatingFragmentDistributionResponse {
402            distributions,
403        }))
404    }
405
406    async fn get_fragment_by_id(
407        &self,
408        request: Request<GetFragmentByIdRequest>,
409    ) -> Result<Response<GetFragmentByIdResponse>, Status> {
410        let req = request.into_inner();
411        let fragment_desc = self
412            .metadata_manager
413            .catalog_controller
414            .get_fragment_desc_by_id(req.fragment_id)
415            .await?;
416        let distribution =
417            fragment_desc.map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams));
418        Ok(Response::new(GetFragmentByIdResponse { distribution }))
419    }
420
421    async fn get_fragment_vnodes(
422        &self,
423        request: Request<GetFragmentVnodesRequest>,
424    ) -> Result<Response<GetFragmentVnodesResponse>, Status> {
425        let req = request.into_inner();
426        let fragment_id = req.fragment_id;
427
428        let shared_actor_infos = self.env.shared_actor_infos();
429        let guard = shared_actor_infos.read_guard();
430
431        let fragment_info = guard
432            .get_fragment(fragment_id)
433            .ok_or_else(|| Status::not_found(format!("Fragment {} not found", fragment_id)))?;
434
435        let actor_vnodes = fragment_info
436            .actors
437            .iter()
438            .map(|(actor_id, actor_info)| {
439                let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
440                    vnode_bitmap.iter_ones().map(|v| v as u32).collect()
441                } else {
442                    vec![]
443                };
444
445                get_fragment_vnodes_response::ActorVnodes {
446                    actor_id: *actor_id,
447                    vnode_indices,
448                }
449            })
450            .collect();
451
452        Ok(Response::new(GetFragmentVnodesResponse { actor_vnodes }))
453    }
454
455    async fn get_actor_vnodes(
456        &self,
457        request: Request<GetActorVnodesRequest>,
458    ) -> Result<Response<GetActorVnodesResponse>, Status> {
459        let req = request.into_inner();
460        let actor_id = req.actor_id;
461
462        let shared_actor_infos = self.env.shared_actor_infos();
463        let guard = shared_actor_infos.read_guard();
464
465        // Find the actor across all fragments
466        let actor_info = guard
467            .iter_over_fragments()
468            .find_map(|(_, fragment_info)| fragment_info.actors.get(&actor_id))
469            .ok_or_else(|| Status::not_found(format!("Actor {} not found", actor_id)))?;
470
471        let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
472            vnode_bitmap.iter_ones().map(|v| v as u32).collect()
473        } else {
474            vec![]
475        };
476
477        Ok(Response::new(GetActorVnodesResponse { vnode_indices }))
478    }
479
480    async fn list_actor_states(
481        &self,
482        _request: Request<ListActorStatesRequest>,
483    ) -> Result<Response<ListActorStatesResponse>, Status> {
484        let actor_locations = self
485            .metadata_manager
486            .catalog_controller
487            .list_actor_locations()?;
488        let states = actor_locations
489            .into_iter()
490            .map(|actor_location| list_actor_states_response::ActorState {
491                actor_id: actor_location.actor_id,
492                fragment_id: actor_location.fragment_id,
493                worker_id: actor_location.worker_id,
494            })
495            .collect_vec();
496
497        Ok(Response::new(ListActorStatesResponse { states }))
498    }
499
500    async fn list_object_dependencies(
501        &self,
502        _request: Request<ListObjectDependenciesRequest>,
503    ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
504        let dependencies = self
505            .metadata_manager
506            .catalog_controller
507            .list_created_object_dependencies()
508            .await?;
509
510        Ok(Response::new(ListObjectDependenciesResponse {
511            dependencies,
512        }))
513    }
514
515    async fn recover(
516        &self,
517        _request: Request<RecoverRequest>,
518    ) -> Result<Response<RecoverResponse>, Status> {
519        self.barrier_manager.adhoc_recovery().await?;
520        Ok(Response::new(RecoverResponse {}))
521    }
522
523    async fn list_actor_splits(
524        &self,
525        _request: Request<ListActorSplitsRequest>,
526    ) -> Result<Response<ListActorSplitsResponse>, Status> {
527        let SourceManagerRunningInfo {
528            source_fragments,
529            backfill_fragments,
530        } = self.stream_manager.source_manager.get_running_info().await;
531
532        let mut actor_splits = self.env.shared_actor_infos().list_assignments();
533
534        let source_actors: HashMap<_, _> = {
535            let all_fragment_ids: HashSet<_> = backfill_fragments
536                .values()
537                .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
538                .chain(source_fragments.values().flatten().copied())
539                .collect();
540
541            let guard = self.env.shared_actor_infos().read_guard();
542            guard
543                .iter_over_fragments()
544                .filter(|(frag_id, _)| all_fragment_ids.contains(frag_id))
545                .flat_map(|(fragment_id, fragment_info)| {
546                    fragment_info
547                        .actors
548                        .keys()
549                        .copied()
550                        .map(|actor_id| (actor_id, *fragment_id))
551                })
552                .collect()
553        };
554
555        let is_shared_source = self
556            .metadata_manager
557            .catalog_controller
558            .list_source_id_with_shared_types()
559            .await?;
560
561        let fragment_to_source: HashMap<_, _> = source_fragments
562            .into_iter()
563            .flat_map(|(source_id, fragment_ids)| {
564                let source_type = if is_shared_source
565                    .get(&(source_id as _))
566                    .copied()
567                    .unwrap_or(false)
568                {
569                    FragmentType::SharedSource
570                } else {
571                    FragmentType::NonSharedSource
572                };
573
574                fragment_ids
575                    .into_iter()
576                    .map(move |fragment_id| (fragment_id, (source_id, source_type)))
577            })
578            .chain(
579                backfill_fragments
580                    .into_iter()
581                    .flat_map(|(source_id, fragment_ids)| {
582                        fragment_ids.into_iter().flat_map(
583                            move |(fragment_id, upstream_fragment_id)| {
584                                [
585                                    (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
586                                    (
587                                        upstream_fragment_id,
588                                        (source_id, FragmentType::SharedSource),
589                                    ),
590                                ]
591                            },
592                        )
593                    }),
594            )
595            .collect();
596
597        let actor_splits = source_actors
598            .into_iter()
599            .flat_map(|(actor_id, fragment_id)| {
600                let (source_id, fragment_type) = fragment_to_source
601                    .get(&(fragment_id as _))
602                    .copied()
603                    .unwrap_or_default();
604
605                actor_splits
606                    .remove(&(actor_id as _))
607                    .unwrap_or_default()
608                    .into_iter()
609                    .map(move |split| list_actor_splits_response::ActorSplit {
610                        actor_id,
611                        source_id,
612                        fragment_id,
613                        split_id: split.id().to_string(),
614                        fragment_type: fragment_type.into(),
615                    })
616            })
617            .collect_vec();
618
619        Ok(Response::new(ListActorSplitsResponse { actor_splits }))
620    }
621
622    async fn list_rate_limits(
623        &self,
624        _request: Request<ListRateLimitsRequest>,
625    ) -> Result<Response<ListRateLimitsResponse>, Status> {
626        let rate_limits = self
627            .metadata_manager
628            .catalog_controller
629            .list_rate_limits()
630            .await?;
631        Ok(Response::new(ListRateLimitsResponse { rate_limits }))
632    }
633
634    #[cfg_attr(coverage, coverage(off))]
635    async fn refresh(
636        &self,
637        request: Request<RefreshRequest>,
638    ) -> Result<Response<RefreshResponse>, Status> {
639        let req = request.into_inner();
640
641        tracing::info!("Refreshing table with id: {}", req.table_id);
642
643        let response = self
644            .refresh_manager
645            .trigger_manual_refresh(req, self.env.shared_actor_infos())
646            .await?;
647
648        Ok(Response::new(response))
649    }
650
651    async fn alter_connector_props(
652        &self,
653        request: Request<AlterConnectorPropsRequest>,
654    ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
655        let request = request.into_inner();
656        let secret_manager = LocalSecretManager::global();
657        let (new_props_plaintext, object_id) = match AlterConnectorPropsObject::try_from(
658            request.object_type,
659        ) {
660            Ok(AlterConnectorPropsObject::Sink) => (
661                self.metadata_manager
662                    .update_sink_props_by_sink_id(
663                        request.object_id.into(),
664                        request.changed_props.clone().into_iter().collect(),
665                    )
666                    .await?,
667                request.object_id.into(),
668            ),
669            Ok(AlterConnectorPropsObject::IcebergTable) => {
670                let (prop, sink_id) = self
671                    .metadata_manager
672                    .update_iceberg_table_props_by_table_id(
673                        request.object_id.into(),
674                        request.changed_props.clone().into_iter().collect(),
675                        request.extra_options,
676                    )
677                    .await?;
678                (prop, sink_id.as_object_id())
679            }
680
681            Ok(AlterConnectorPropsObject::Source) => {
682                // alter source and table's associated source
683                if request.connector_conn_ref.is_some() {
684                    return Err(Status::invalid_argument(
685                        "alter connector_conn_ref is not supported",
686                    ));
687                }
688                let options_with_secret = self
689                    .metadata_manager
690                    .catalog_controller
691                    .update_source_props_by_source_id(
692                        request.object_id.into(),
693                        request.changed_props.clone().into_iter().collect(),
694                        request.changed_secret_refs.clone().into_iter().collect(),
695                    )
696                    .await?;
697
698                self.stream_manager
699                    .source_manager
700                    .validate_source_once(request.object_id.into(), options_with_secret.clone())
701                    .await?;
702
703                let (options, secret_refs) = options_with_secret.into_parts();
704                (
705                    secret_manager
706                        .fill_secrets(options, secret_refs)
707                        .map_err(MetaError::from)?
708                        .into_iter()
709                        .collect(),
710                    request.object_id.into(),
711                )
712            }
713            Ok(AlterConnectorPropsObject::Connection) => {
714                // Update the connection and all dependent sources/sinks atomically, and later broadcast
715                // the complete plaintext properties to dependents via barrier.
716                let (
717                    connection_options_with_secret,
718                    updated_sources_with_props,
719                    updated_sinks_with_props,
720                ) = self
721                    .metadata_manager
722                    .catalog_controller
723                    .update_connection_and_dependent_objects_props(
724                        ConnectionId::from(request.object_id),
725                        request.changed_props.clone().into_iter().collect(),
726                        request.changed_secret_refs.clone().into_iter().collect(),
727                    )
728                    .await?;
729
730                // Materialize connection plaintext for observability/debugging (not broadcast directly).
731                let (options, secret_refs) = connection_options_with_secret.into_parts();
732                let new_props_plaintext = secret_manager
733                    .fill_secrets(options, secret_refs)
734                    .map_err(MetaError::from)?
735                    .into_iter()
736                    .collect::<HashMap<String, String>>();
737
738                // Broadcast changes to dependent sources and sinks if any exist.
739                let mut dependent_mutation = HashMap::default();
740                for (source_id, complete_source_props) in updated_sources_with_props {
741                    dependent_mutation.insert(source_id.as_object_id(), complete_source_props);
742                }
743                for (sink_id, complete_sink_props) in updated_sinks_with_props {
744                    dependent_mutation.insert(sink_id.as_object_id(), complete_sink_props);
745                }
746
747                if !dependent_mutation.is_empty() {
748                    let database_id = self
749                        .metadata_manager
750                        .catalog_controller
751                        .get_object_database_id(ConnectionId::from(request.object_id))
752                        .await?;
753                    tracing::info!(
754                        "broadcasting connection {} property changes to dependent object ids: {:?}",
755                        request.object_id,
756                        dependent_mutation.keys().collect_vec()
757                    );
758                    let _version = self
759                        .barrier_scheduler
760                        .run_command(
761                            database_id,
762                            Command::ConnectorPropsChange(dependent_mutation),
763                        )
764                        .await?;
765                }
766
767                (
768                    new_props_plaintext,
769                    ConnectionId::from(request.object_id).as_object_id(),
770                )
771            }
772
773            _ => {
774                unimplemented!(
775                    "Unsupported object type for AlterConnectorProps: {:?}",
776                    request.object_type
777                );
778            }
779        };
780
781        let database_id = self
782            .metadata_manager
783            .catalog_controller
784            .get_object_database_id(object_id)
785            .await?;
786        // Connection updates are broadcast to dependent sources/sinks inside the `Connection` branch above.
787        // For sources/sinks/iceberg-table updates, broadcast the change to the object itself.
788        if AlterConnectorPropsObject::try_from(request.object_type)
789            .is_ok_and(|t| t != AlterConnectorPropsObject::Connection)
790        {
791            let mut mutation = HashMap::default();
792            mutation.insert(object_id, new_props_plaintext);
793            let _version = self
794                .barrier_scheduler
795                .run_command(database_id, Command::ConnectorPropsChange(mutation))
796                .await?;
797        }
798
799        Ok(Response::new(AlterConnectorPropsResponse {}))
800    }
801
802    async fn set_sync_log_store_aligned(
803        &self,
804        request: Request<SetSyncLogStoreAlignedRequest>,
805    ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
806        let req = request.into_inner();
807        let job_id = req.job_id;
808        let aligned = req.aligned;
809
810        self.metadata_manager
811            .catalog_controller
812            .mutate_fragments_by_job_id(
813                job_id,
814                |_mask, stream_node| {
815                    let mut visited = false;
816                    visit_stream_node_mut(stream_node, |body| {
817                        if let NodeBody::SyncLogStore(sync_log_store) = body {
818                            sync_log_store.aligned = aligned;
819                            visited = true
820                        }
821                    });
822                    Ok(visited)
823                },
824                "no fragments found with synced log store",
825            )
826            .await?;
827
828        Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
829    }
830
831    async fn list_cdc_progress(
832        &self,
833        _request: Request<ListCdcProgressRequest>,
834    ) -> Result<Response<ListCdcProgressResponse>, Status> {
835        let cdc_progress = self
836            .barrier_manager
837            .get_cdc_progress()
838            .await?
839            .into_iter()
840            .map(|(job_id, p)| {
841                (
842                    job_id,
843                    PbCdcProgress {
844                        split_total_count: p.split_total_count,
845                        split_backfilled_count: p.split_backfilled_count,
846                        split_completed_count: p.split_completed_count,
847                    },
848                )
849            })
850            .collect();
851        Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
852    }
853
854    async fn list_unmigrated_tables(
855        &self,
856        _request: Request<ListUnmigratedTablesRequest>,
857    ) -> Result<Response<ListUnmigratedTablesResponse>, Status> {
858        let unmigrated_tables = self
859            .metadata_manager
860            .catalog_controller
861            .list_unmigrated_tables()
862            .await?
863            .into_iter()
864            .map(|table| list_unmigrated_tables_response::UnmigratedTable {
865                table_id: table.id,
866                table_name: table.name,
867            })
868            .collect();
869
870        Ok(Response::new(ListUnmigratedTablesResponse {
871            tables: unmigrated_tables,
872        }))
873    }
874}
875
876fn fragment_desc_to_distribution(
877    fragment_desc: FragmentDesc,
878    upstreams: Vec<FragmentId>,
879) -> FragmentDistribution {
880    FragmentDistribution {
881        fragment_id: fragment_desc.fragment_id,
882        table_id: fragment_desc.job_id,
883        distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
884        state_table_ids: fragment_desc.state_table_ids.0,
885        upstream_fragment_ids: upstreams,
886        fragment_type_mask: fragment_desc.fragment_type_mask as _,
887        parallelism: fragment_desc.parallelism as _,
888        vnode_count: fragment_desc.vnode_count as _,
889        node: Some(fragment_desc.stream_node.to_protobuf()),
890        parallelism_policy: fragment_desc.parallelism_policy,
891    }
892}