risingwave_meta_service/
stream_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use chrono::DateTime;
18use itertools::Itertools;
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
28use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo};
29use risingwave_meta::{MetaError, model};
30use risingwave_meta_model::{ConnectionId, FragmentId, SourceId, StreamingParallelism};
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
33use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
34use risingwave_pb::meta::list_actor_splits_response::FragmentType;
35use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
36use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
37use risingwave_pb::meta::list_table_fragments_response::{
38    ActorInfo, FragmentInfo, TableFragmentInfo,
39};
40use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
41use risingwave_pb::meta::table_fragments::PbState;
42use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
43use risingwave_pb::meta::*;
44use risingwave_pb::stream_plan::stream_node::NodeBody;
45use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
46use tonic::{Request, Response, Status};
47
48use crate::barrier::{BarrierScheduler, Command};
49use crate::manager::MetaSrvEnv;
50use crate::stream::GlobalStreamManagerRef;
51
52pub type TonicResponse<T> = Result<Response<T>, Status>;
53
54#[derive(Clone)]
55pub struct StreamServiceImpl {
56    env: MetaSrvEnv,
57    barrier_scheduler: BarrierScheduler,
58    barrier_manager: BarrierManagerRef,
59    stream_manager: GlobalStreamManagerRef,
60    metadata_manager: MetadataManager,
61    refresh_manager: GlobalRefreshManagerRef,
62    iceberg_compaction_manager: IcebergCompactionManagerRef,
63}
64
65impl StreamServiceImpl {
66    pub fn new(
67        env: MetaSrvEnv,
68        barrier_scheduler: BarrierScheduler,
69        barrier_manager: BarrierManagerRef,
70        stream_manager: GlobalStreamManagerRef,
71        metadata_manager: MetadataManager,
72        refresh_manager: GlobalRefreshManagerRef,
73        iceberg_compaction_manager: IcebergCompactionManagerRef,
74    ) -> Self {
75        StreamServiceImpl {
76            env,
77            barrier_scheduler,
78            barrier_manager,
79            stream_manager,
80            metadata_manager,
81            refresh_manager,
82            iceberg_compaction_manager,
83        }
84    }
85}
86
87#[async_trait::async_trait]
88impl StreamManagerService for StreamServiceImpl {
89    async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
90        self.env.idle_manager().record_activity();
91        let req = request.into_inner();
92
93        let version_id = self.barrier_scheduler.flush(req.database_id).await?;
94        Ok(Response::new(FlushResponse {
95            status: None,
96            hummock_version_id: version_id,
97        }))
98    }
99
100    async fn list_refresh_table_states(
101        &self,
102        _request: Request<ListRefreshTableStatesRequest>,
103    ) -> TonicResponse<ListRefreshTableStatesResponse> {
104        let refresh_jobs = self.metadata_manager.list_refresh_jobs().await?;
105        let refresh_table_states = refresh_jobs
106            .into_iter()
107            .map(|job| RefreshTableState {
108                table_id: job.table_id,
109                current_status: job.current_status.to_string(),
110                last_trigger_time: job
111                    .last_trigger_time
112                    .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
113                trigger_interval_secs: job.trigger_interval_secs,
114                last_success_time: job
115                    .last_success_time
116                    .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
117            })
118            .collect();
119        Ok(Response::new(ListRefreshTableStatesResponse {
120            states: refresh_table_states,
121        }))
122    }
123
124    async fn list_iceberg_compaction_status(
125        &self,
126        _request: Request<ListIcebergCompactionStatusRequest>,
127    ) -> TonicResponse<ListIcebergCompactionStatusResponse> {
128        let statuses = self
129            .iceberg_compaction_manager
130            .list_compaction_statuses()
131            .await
132            .into_iter()
133            .map(
134                |status| list_iceberg_compaction_status_response::IcebergCompactionStatus {
135                    sink_id: status.sink_id.as_raw_id(),
136                    task_type: status.task_type,
137                    trigger_interval_sec: status.trigger_interval_sec,
138                    trigger_snapshot_count: status.trigger_snapshot_count as u64,
139                    schedule_state: status.schedule_state,
140                    next_compaction_after_sec: status.next_compaction_after_sec,
141                    pending_snapshot_count: status.pending_snapshot_count.map(|count| count as u64),
142                    is_triggerable: status.is_triggerable,
143                },
144            )
145            .collect();
146
147        Ok(Response::new(ListIcebergCompactionStatusResponse {
148            statuses,
149        }))
150    }
151
152    async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
153        for database_id in self.metadata_manager.list_active_database_ids().await? {
154            self.barrier_scheduler
155                .run_command(database_id, Command::pause())
156                .await?;
157        }
158        Ok(Response::new(PauseResponse {}))
159    }
160
161    async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
162        for database_id in self.metadata_manager.list_active_database_ids().await? {
163            self.barrier_scheduler
164                .run_command(database_id, Command::resume())
165                .await?;
166        }
167        Ok(Response::new(ResumeResponse {}))
168    }
169
170    async fn apply_throttle(
171        &self,
172        request: Request<ApplyThrottleRequest>,
173    ) -> Result<Response<ApplyThrottleResponse>, Status> {
174        let request = request.into_inner();
175
176        // Decode enums from raw i32 fields to handle decoupled target/type.
177        let throttle_target = request.throttle_target();
178        let throttle_type = request.throttle_type();
179
180        let raw_object_id: u32;
181        let jobs: HashSet<JobId>;
182        let fragments: HashSet<FragmentId>;
183
184        match (throttle_type, throttle_target) {
185            (ThrottleType::Source, ThrottleTarget::Source | ThrottleTarget::Table) => {
186                (jobs, fragments) = self
187                    .metadata_manager
188                    .update_source_rate_limit_by_source_id(request.id.into(), request.rate)
189                    .await?;
190                raw_object_id = request.id;
191            }
192            (ThrottleType::Backfill, ThrottleTarget::Mv)
193            | (ThrottleType::Backfill, ThrottleTarget::Sink)
194            | (ThrottleType::Backfill, ThrottleTarget::Table) => {
195                fragments = self
196                    .metadata_manager
197                    .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
198                    .await?;
199                jobs = [request.id.into()].into_iter().collect();
200                raw_object_id = request.id;
201            }
202            (ThrottleType::Dml, ThrottleTarget::Table) => {
203                fragments = self
204                    .metadata_manager
205                    .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
206                    .await?;
207                jobs = [request.id.into()].into_iter().collect();
208                raw_object_id = request.id;
209            }
210            (ThrottleType::Sink, ThrottleTarget::Sink) => {
211                fragments = self
212                    .metadata_manager
213                    .update_sink_rate_limit_by_sink_id(request.id.into(), request.rate)
214                    .await?;
215                jobs = [request.id.into()].into_iter().collect();
216                raw_object_id = request.id;
217            }
218            // FIXME(kwannoel): specialize for throttle type x target
219            (_, ThrottleTarget::Fragment) => {
220                self.metadata_manager
221                    .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate)
222                    .await?;
223                let fragment_id = request.id.into();
224                fragments = [fragment_id].into_iter().collect();
225                let job_id = self
226                    .metadata_manager
227                    .catalog_controller
228                    .get_fragment_streaming_job_id(fragment_id)
229                    .await?;
230                jobs = [job_id].into_iter().collect();
231                raw_object_id = job_id.as_raw_id();
232            }
233            _ => {
234                return Err(Status::invalid_argument(format!(
235                    "unsupported throttle target/type: {:?}/{:?}",
236                    throttle_target, throttle_type
237                )));
238            }
239        };
240
241        let database_id = self
242            .metadata_manager
243            .catalog_controller
244            .get_object_database_id(raw_object_id)
245            .await?;
246
247        let throttle_config = ThrottleConfig {
248            rate_limit: request.rate,
249            throttle_type: throttle_type.into(),
250        };
251        let _i = self
252            .barrier_scheduler
253            .run_command(
254                database_id,
255                Command::Throttle {
256                    jobs,
257                    config: fragments
258                        .into_iter()
259                        .map(|fragment_id| (fragment_id, throttle_config))
260                        .collect(),
261                },
262            )
263            .await?;
264
265        Ok(Response::new(ApplyThrottleResponse { status: None }))
266    }
267
268    async fn cancel_creating_jobs(
269        &self,
270        request: Request<CancelCreatingJobsRequest>,
271    ) -> TonicResponse<CancelCreatingJobsResponse> {
272        let req = request.into_inner();
273        let job_ids = match req.jobs.unwrap() {
274            Jobs::Infos(infos) => self
275                .metadata_manager
276                .catalog_controller
277                .find_creating_streaming_job_ids(infos.infos)
278                .await?
279                .into_iter()
280                .map(|id| id.as_job_id())
281                .collect(),
282            Jobs::Ids(jobs) => jobs.job_ids,
283        };
284
285        let canceled_jobs = self
286            .stream_manager
287            .cancel_streaming_jobs(job_ids)
288            .await?
289            .into_iter()
290            .map(|id| id.as_raw_id())
291            .collect_vec();
292        Ok(Response::new(CancelCreatingJobsResponse {
293            status: None,
294            canceled_jobs,
295        }))
296    }
297
298    async fn list_table_fragments(
299        &self,
300        request: Request<ListTableFragmentsRequest>,
301    ) -> Result<Response<ListTableFragmentsResponse>, Status> {
302        let req = request.into_inner();
303        let table_ids = HashSet::<JobId>::from_iter(req.table_ids);
304
305        let mut info = HashMap::new();
306        for job_id in table_ids {
307            let table_fragments = self
308                .metadata_manager
309                .catalog_controller
310                .get_job_fragments_by_id(job_id)
311                .await?;
312            let mut dispatchers = self
313                .metadata_manager
314                .catalog_controller
315                .get_fragment_actor_dispatchers(
316                    table_fragments.fragment_ids().map(|id| id as _).collect(),
317                )
318                .await?;
319            let ctx = table_fragments.ctx.to_protobuf();
320            info.insert(
321                table_fragments.stream_job_id(),
322                TableFragmentInfo {
323                    fragments: table_fragments
324                        .fragments
325                        .into_iter()
326                        .map(|(id, fragment)| FragmentInfo {
327                            id,
328                            actors: fragment
329                                .actors
330                                .into_iter()
331                                .map(|actor| ActorInfo {
332                                    id: actor.actor_id,
333                                    node: Some(fragment.nodes.clone()),
334                                    dispatcher: dispatchers
335                                        .get_mut(&fragment.fragment_id)
336                                        .and_then(|dispatchers| dispatchers.remove(&actor.actor_id))
337                                        .unwrap_or_default(),
338                                })
339                                .collect_vec(),
340                        })
341                        .collect_vec(),
342                    ctx: Some(ctx),
343                },
344            );
345        }
346
347        Ok(Response::new(ListTableFragmentsResponse {
348            table_fragments: info,
349        }))
350    }
351
352    async fn list_streaming_job_states(
353        &self,
354        _request: Request<ListStreamingJobStatesRequest>,
355    ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
356        let job_infos = self
357            .metadata_manager
358            .catalog_controller
359            .list_streaming_job_infos()
360            .await?;
361        let states = job_infos
362            .into_iter()
363            .map(
364                |StreamingJobInfo {
365                     job_id,
366                     job_status,
367                     name,
368                     parallelism,
369                     max_parallelism,
370                     resource_group,
371                     database_id,
372                     schema_id,
373                     config_override,
374                     ..
375                 }| {
376                    let parallelism = match parallelism {
377                        StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
378                        StreamingParallelism::Custom => model::TableParallelism::Custom,
379                        StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
380                    };
381
382                    list_streaming_job_states_response::StreamingJobState {
383                        table_id: job_id,
384                        name,
385                        state: PbState::from(job_status) as _,
386                        parallelism: Some(parallelism.into()),
387                        max_parallelism: max_parallelism as _,
388                        resource_group,
389                        database_id,
390                        schema_id,
391                        config_override,
392                    }
393                },
394            )
395            .collect_vec();
396
397        Ok(Response::new(ListStreamingJobStatesResponse { states }))
398    }
399
400    async fn list_fragment_distribution(
401        &self,
402        _request: Request<ListFragmentDistributionRequest>,
403    ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
404        let include_node = _request.into_inner().include_node.unwrap_or(true);
405        let distributions = if include_node {
406            self.metadata_manager
407                .catalog_controller
408                .list_fragment_descs_with_node(false)
409                .await?
410        } else {
411            self.metadata_manager
412                .catalog_controller
413                .list_fragment_descs_without_node(false)
414                .await?
415        }
416        .into_iter()
417        .map(|(dist, _)| dist)
418        .collect();
419
420        Ok(Response::new(ListFragmentDistributionResponse {
421            distributions,
422        }))
423    }
424
425    async fn list_creating_fragment_distribution(
426        &self,
427        _request: Request<ListCreatingFragmentDistributionRequest>,
428    ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
429        let include_node = _request.into_inner().include_node.unwrap_or(true);
430        let distributions = if include_node {
431            self.metadata_manager
432                .catalog_controller
433                .list_fragment_descs_with_node(true)
434                .await?
435        } else {
436            self.metadata_manager
437                .catalog_controller
438                .list_fragment_descs_without_node(true)
439                .await?
440        }
441        .into_iter()
442        .map(|(dist, _)| dist)
443        .collect();
444
445        Ok(Response::new(ListCreatingFragmentDistributionResponse {
446            distributions,
447        }))
448    }
449
450    async fn list_sink_log_store_tables(
451        &self,
452        _request: Request<ListSinkLogStoreTablesRequest>,
453    ) -> Result<Response<ListSinkLogStoreTablesResponse>, Status> {
454        let tables = self
455            .metadata_manager
456            .catalog_controller
457            .list_sink_log_store_tables()
458            .await?
459            .into_iter()
460            .map(|(sink_id, internal_table_id)| {
461                list_sink_log_store_tables_response::SinkLogStoreTable {
462                    sink_id: sink_id.as_raw_id(),
463                    internal_table_id: internal_table_id.as_raw_id(),
464                }
465            })
466            .collect();
467
468        Ok(Response::new(ListSinkLogStoreTablesResponse { tables }))
469    }
470
471    async fn get_fragment_by_id(
472        &self,
473        request: Request<GetFragmentByIdRequest>,
474    ) -> Result<Response<GetFragmentByIdResponse>, Status> {
475        let req = request.into_inner();
476        let fragment_desc = self
477            .metadata_manager
478            .catalog_controller
479            .get_fragment_desc_by_id(req.fragment_id)
480            .await?;
481        let distribution = fragment_desc
482            .map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams, true));
483        Ok(Response::new(GetFragmentByIdResponse { distribution }))
484    }
485
486    async fn get_fragment_vnodes(
487        &self,
488        request: Request<GetFragmentVnodesRequest>,
489    ) -> Result<Response<GetFragmentVnodesResponse>, Status> {
490        let req = request.into_inner();
491        let fragment_id = req.fragment_id;
492
493        let shared_actor_infos = self.env.shared_actor_infos();
494        let guard = shared_actor_infos.read_guard();
495
496        let fragment_info = guard
497            .get_fragment(fragment_id)
498            .ok_or_else(|| Status::not_found(format!("Fragment {} not found", fragment_id)))?;
499
500        let actor_vnodes = fragment_info
501            .actors
502            .iter()
503            .map(|(actor_id, actor_info)| {
504                let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
505                    vnode_bitmap.iter_ones().map(|v| v as u32).collect()
506                } else {
507                    vec![]
508                };
509
510                get_fragment_vnodes_response::ActorVnodes {
511                    actor_id: *actor_id,
512                    vnode_indices,
513                }
514            })
515            .collect();
516
517        Ok(Response::new(GetFragmentVnodesResponse { actor_vnodes }))
518    }
519
520    async fn get_actor_vnodes(
521        &self,
522        request: Request<GetActorVnodesRequest>,
523    ) -> Result<Response<GetActorVnodesResponse>, Status> {
524        let req = request.into_inner();
525        let actor_id = req.actor_id;
526
527        let shared_actor_infos = self.env.shared_actor_infos();
528        let guard = shared_actor_infos.read_guard();
529
530        // Find the actor across all fragments
531        let actor_info = guard
532            .iter_over_fragments()
533            .find_map(|(_, fragment_info)| fragment_info.actors.get(&actor_id))
534            .ok_or_else(|| Status::not_found(format!("Actor {} not found", actor_id)))?;
535
536        let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
537            vnode_bitmap.iter_ones().map(|v| v as u32).collect()
538        } else {
539            vec![]
540        };
541
542        Ok(Response::new(GetActorVnodesResponse { vnode_indices }))
543    }
544
545    async fn list_actor_states(
546        &self,
547        _request: Request<ListActorStatesRequest>,
548    ) -> Result<Response<ListActorStatesResponse>, Status> {
549        let actor_locations = self
550            .metadata_manager
551            .catalog_controller
552            .list_actor_locations()?;
553        let states = actor_locations
554            .into_iter()
555            .map(|actor_location| list_actor_states_response::ActorState {
556                actor_id: actor_location.actor_id,
557                fragment_id: actor_location.fragment_id,
558                worker_id: actor_location.worker_id,
559            })
560            .collect_vec();
561
562        Ok(Response::new(ListActorStatesResponse { states }))
563    }
564
565    async fn recover(
566        &self,
567        _request: Request<RecoverRequest>,
568    ) -> Result<Response<RecoverResponse>, Status> {
569        self.barrier_manager.adhoc_recovery().await?;
570        Ok(Response::new(RecoverResponse {}))
571    }
572
573    async fn list_actor_splits(
574        &self,
575        _request: Request<ListActorSplitsRequest>,
576    ) -> Result<Response<ListActorSplitsResponse>, Status> {
577        let SourceManagerRunningInfo {
578            source_fragments,
579            backfill_fragments,
580        } = self.stream_manager.source_manager.get_running_info().await;
581
582        let mut actor_splits = self.env.shared_actor_infos().list_assignments();
583
584        let source_actors: HashMap<_, _> = {
585            let all_fragment_ids: HashSet<_> = backfill_fragments
586                .values()
587                .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
588                .chain(source_fragments.values().flatten().copied())
589                .collect();
590
591            let guard = self.env.shared_actor_infos().read_guard();
592            guard
593                .iter_over_fragments()
594                .filter(|(frag_id, _)| all_fragment_ids.contains(*frag_id))
595                .flat_map(|(fragment_id, fragment_info)| {
596                    fragment_info
597                        .actors
598                        .keys()
599                        .copied()
600                        .map(|actor_id| (actor_id, *fragment_id))
601                })
602                .collect()
603        };
604
605        let is_shared_source = self
606            .metadata_manager
607            .catalog_controller
608            .list_source_id_with_shared_types()
609            .await?;
610
611        let fragment_to_source: HashMap<_, _> = source_fragments
612            .into_iter()
613            .flat_map(|(source_id, fragment_ids)| {
614                let source_type = if is_shared_source.get(&source_id).copied().unwrap_or(false) {
615                    FragmentType::SharedSource
616                } else {
617                    FragmentType::NonSharedSource
618                };
619
620                fragment_ids
621                    .into_iter()
622                    .map(move |fragment_id| (fragment_id, (source_id, source_type)))
623            })
624            .chain(
625                backfill_fragments
626                    .into_iter()
627                    .flat_map(|(source_id, fragment_ids)| {
628                        fragment_ids.into_iter().flat_map(
629                            move |(fragment_id, upstream_fragment_id)| {
630                                [
631                                    (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
632                                    (
633                                        upstream_fragment_id,
634                                        (source_id, FragmentType::SharedSource),
635                                    ),
636                                ]
637                            },
638                        )
639                    }),
640            )
641            .collect();
642
643        let actor_splits = source_actors
644            .into_iter()
645            .flat_map(|(actor_id, fragment_id)| {
646                let (source_id, fragment_type) = fragment_to_source
647                    .get(&fragment_id)
648                    .copied()
649                    .unwrap_or_default();
650
651                actor_splits
652                    .remove(&actor_id)
653                    .unwrap_or_default()
654                    .into_iter()
655                    .map(move |split| list_actor_splits_response::ActorSplit {
656                        actor_id,
657                        source_id,
658                        fragment_id,
659                        split_id: split.id().to_string(),
660                        fragment_type: fragment_type.into(),
661                    })
662            })
663            .collect_vec();
664
665        Ok(Response::new(ListActorSplitsResponse { actor_splits }))
666    }
667
668    async fn list_rate_limits(
669        &self,
670        _request: Request<ListRateLimitsRequest>,
671    ) -> Result<Response<ListRateLimitsResponse>, Status> {
672        let rate_limits = self
673            .metadata_manager
674            .catalog_controller
675            .list_rate_limits()
676            .await?;
677        Ok(Response::new(ListRateLimitsResponse { rate_limits }))
678    }
679
680    #[cfg_attr(coverage, coverage(off))]
681    async fn refresh(
682        &self,
683        request: Request<RefreshRequest>,
684    ) -> Result<Response<RefreshResponse>, Status> {
685        let req = request.into_inner();
686
687        tracing::info!("Refreshing table with id: {}", req.table_id);
688
689        let response = self
690            .refresh_manager
691            .trigger_manual_refresh(req, self.env.shared_actor_infos())
692            .await?;
693
694        Ok(Response::new(response))
695    }
696
697    async fn alter_connector_props(
698        &self,
699        request: Request<AlterConnectorPropsRequest>,
700    ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
701        let request = request.into_inner();
702        let secret_manager = LocalSecretManager::global();
703        let (new_props_plaintext, object_id) = match AlterConnectorPropsObject::try_from(
704            request.object_type,
705        ) {
706            Ok(AlterConnectorPropsObject::Sink) => (
707                self.metadata_manager
708                    .update_sink_props_by_sink_id(
709                        request.object_id.into(),
710                        request.changed_props.clone().into_iter().collect(),
711                    )
712                    .await?,
713                request.object_id.into(),
714            ),
715            Ok(AlterConnectorPropsObject::IcebergTable) => {
716                let (prop, sink_id) = self
717                    .metadata_manager
718                    .update_iceberg_table_props_by_table_id(
719                        request.object_id.into(),
720                        request.changed_props.clone().into_iter().collect(),
721                        request.extra_options,
722                    )
723                    .await?;
724                (prop, sink_id.as_object_id())
725            }
726
727            Ok(AlterConnectorPropsObject::Source) => {
728                // alter source and table's associated source
729                if request.connector_conn_ref.is_some() {
730                    return Err(Status::invalid_argument(
731                        "alter connector_conn_ref is not supported",
732                    ));
733                }
734                let options_with_secret = self
735                    .metadata_manager
736                    .catalog_controller
737                    .update_source_props_by_source_id(
738                        request.object_id.into(),
739                        request.changed_props.clone().into_iter().collect(),
740                        request.changed_secret_refs.clone().into_iter().collect(),
741                        false, // SQL ALTER SOURCE enforces alter-on-fly check
742                    )
743                    .await?;
744
745                self.stream_manager
746                    .source_manager
747                    .validate_source_once(request.object_id.into(), options_with_secret.clone())
748                    .await?;
749
750                let (options, secret_refs) = options_with_secret.into_parts();
751                (
752                    secret_manager
753                        .fill_secrets(options, secret_refs)
754                        .map_err(MetaError::from)?
755                        .into_iter()
756                        .collect(),
757                    request.object_id.into(),
758                )
759            }
760            Ok(AlterConnectorPropsObject::Connection) => {
761                // Update the connection and all dependent sources/sinks atomically, and later broadcast
762                // the complete plaintext properties to dependents via barrier.
763                let (
764                    connection_options_with_secret,
765                    updated_sources_with_props,
766                    updated_sinks_with_props,
767                ) = self
768                    .metadata_manager
769                    .catalog_controller
770                    .update_connection_and_dependent_objects_props(
771                        ConnectionId::from(request.object_id),
772                        request.changed_props.clone().into_iter().collect(),
773                        request.changed_secret_refs.clone().into_iter().collect(),
774                    )
775                    .await?;
776
777                // Materialize connection plaintext for observability/debugging (not broadcast directly).
778                let (options, secret_refs) = connection_options_with_secret.into_parts();
779                let new_props_plaintext = secret_manager
780                    .fill_secrets(options, secret_refs)
781                    .map_err(MetaError::from)?
782                    .into_iter()
783                    .collect::<HashMap<String, String>>();
784
785                // Broadcast changes to dependent sources and sinks if any exist.
786                let mut dependent_mutation = HashMap::default();
787                for (source_id, complete_source_props) in updated_sources_with_props {
788                    dependent_mutation.insert(source_id.as_object_id(), complete_source_props);
789                }
790                for (sink_id, complete_sink_props) in updated_sinks_with_props {
791                    dependent_mutation.insert(sink_id.as_object_id(), complete_sink_props);
792                }
793
794                if !dependent_mutation.is_empty() {
795                    let database_id = self
796                        .metadata_manager
797                        .catalog_controller
798                        .get_object_database_id(ConnectionId::from(request.object_id))
799                        .await?;
800                    tracing::info!(
801                        "broadcasting connection {} property changes to dependent object ids: {:?}",
802                        request.object_id,
803                        dependent_mutation.keys().collect_vec()
804                    );
805                    let _version = self
806                        .barrier_scheduler
807                        .run_command(
808                            database_id,
809                            Command::ConnectorPropsChange(dependent_mutation),
810                        )
811                        .await?;
812                }
813
814                (
815                    new_props_plaintext,
816                    ConnectionId::from(request.object_id).as_object_id(),
817                )
818            }
819
820            _ => {
821                unimplemented!(
822                    "Unsupported object type for AlterConnectorProps: {:?}",
823                    request.object_type
824                );
825            }
826        };
827
828        let database_id = self
829            .metadata_manager
830            .catalog_controller
831            .get_object_database_id(object_id)
832            .await?;
833        // Connection updates are broadcast to dependent sources/sinks inside the `Connection` branch above.
834        // For sources/sinks/iceberg-table updates, broadcast the change to the object itself.
835        if AlterConnectorPropsObject::try_from(request.object_type)
836            .is_ok_and(|t| t != AlterConnectorPropsObject::Connection)
837        {
838            let mut mutation = HashMap::default();
839            mutation.insert(object_id, new_props_plaintext);
840            let _version = self
841                .barrier_scheduler
842                .run_command(database_id, Command::ConnectorPropsChange(mutation))
843                .await?;
844        }
845
846        Ok(Response::new(AlterConnectorPropsResponse {}))
847    }
848
849    async fn set_sync_log_store_aligned(
850        &self,
851        request: Request<SetSyncLogStoreAlignedRequest>,
852    ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
853        let req = request.into_inner();
854        let job_id = req.job_id;
855        let aligned = req.aligned;
856
857        self.metadata_manager
858            .catalog_controller
859            .mutate_fragments_by_job_id(
860                job_id,
861                |_mask, stream_node| {
862                    let mut visited = false;
863                    visit_stream_node_mut(stream_node, |body| {
864                        if let NodeBody::SyncLogStore(sync_log_store) = body {
865                            sync_log_store.aligned = aligned;
866                            visited = true
867                        }
868                    });
869                    Ok(visited)
870                },
871                "no fragments found with synced log store",
872            )
873            .await?;
874
875        Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
876    }
877
878    async fn list_cdc_progress(
879        &self,
880        _request: Request<ListCdcProgressRequest>,
881    ) -> Result<Response<ListCdcProgressResponse>, Status> {
882        let cdc_progress = self
883            .barrier_manager
884            .get_cdc_progress()
885            .await?
886            .into_iter()
887            .map(|(job_id, p)| {
888                (
889                    job_id,
890                    PbCdcProgress {
891                        split_total_count: p.split_total_count,
892                        split_backfilled_count: p.split_backfilled_count,
893                        split_completed_count: p.split_completed_count,
894                    },
895                )
896            })
897            .collect();
898        Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
899    }
900
901    async fn list_unmigrated_tables(
902        &self,
903        _request: Request<ListUnmigratedTablesRequest>,
904    ) -> Result<Response<ListUnmigratedTablesResponse>, Status> {
905        let unmigrated_tables = self
906            .metadata_manager
907            .catalog_controller
908            .list_unmigrated_tables()
909            .await?
910            .into_iter()
911            .map(|table| list_unmigrated_tables_response::UnmigratedTable {
912                table_id: table.id,
913                table_name: table.name,
914            })
915            .collect();
916
917        Ok(Response::new(ListUnmigratedTablesResponse {
918            tables: unmigrated_tables,
919        }))
920    }
921
922    /// Orchestrated source property update with pause/update/resume workflow.
923    /// This is the "safe" version that pauses sources before updating and resumes after.
924    async fn alter_source_properties_safe(
925        &self,
926        request: Request<AlterSourcePropertiesSafeRequest>,
927    ) -> Result<Response<AlterSourcePropertiesSafeResponse>, Status> {
928        let request = request.into_inner();
929        let source_id = request.source_id;
930        let options = request.options.unwrap_or_default();
931
932        tracing::info!(
933            source_id = source_id,
934            reset_splits = options.reset_splits,
935            "Starting orchestrated source property update"
936        );
937
938        // Get the database ID for the source
939        let database_id = self
940            .metadata_manager
941            .catalog_controller
942            .get_object_database_id(SourceId::from(source_id))
943            .await?;
944
945        // Step 1: Pause the stream (already commits state)
946        tracing::info!(source_id = source_id, "Pausing stream");
947        self.barrier_scheduler
948            .run_command(database_id, Command::Pause)
949            .await?;
950
951        // Step 2: Update catalog and get the new properties
952        let result = async {
953            let secret_manager = LocalSecretManager::global();
954
955            let options_with_secret = self
956                .metadata_manager
957                .catalog_controller
958                .update_source_props_by_source_id(
959                    source_id.into(),
960                    request.changed_props.clone().into_iter().collect(),
961                    request.changed_secret_refs.clone().into_iter().collect(),
962                    true, // risectl admin operation skips alter-on-fly check
963                )
964                .await?;
965
966            // Validate the source
967            self.stream_manager
968                .source_manager
969                .validate_source_once(source_id.into(), options_with_secret.clone())
970                .await?;
971
972            let (props, secret_refs) = options_with_secret.into_parts();
973            let new_props_plaintext: HashMap<String, String> = secret_manager
974                .fill_secrets(props, secret_refs)
975                .map_err(MetaError::from)?
976                .into_iter()
977                .collect();
978
979            // Step 3: Issue ConnectorPropsChange barrier
980            tracing::info!(
981                source_id = source_id,
982                "Issuing ConnectorPropsChange barrier"
983            );
984            let mut mutation = HashMap::default();
985            mutation.insert(source_id.into(), new_props_plaintext);
986            self.barrier_scheduler
987                .run_command(database_id, Command::ConnectorPropsChange(mutation))
988                .await?;
989
990            // Step 4: Optional split reset
991            if options.reset_splits {
992                tracing::info!(source_id = source_id, "Resetting source splits");
993                self.stream_manager
994                    .source_manager
995                    .reset_source_splits(source_id.into())
996                    .await?;
997            }
998
999            Ok::<_, MetaError>(())
1000        }
1001        .await;
1002
1003        // Step 5: Resume the stream (even if previous steps failed)
1004        tracing::info!(source_id = source_id, "Resuming stream");
1005        let resume_result = self
1006            .barrier_scheduler
1007            .run_command(database_id, Command::Resume)
1008            .await;
1009
1010        // Return the first error if any
1011        result?;
1012        resume_result?;
1013
1014        tracing::info!(
1015            source_id = source_id,
1016            "Orchestrated source property update completed successfully"
1017        );
1018
1019        Ok(Response::new(AlterSourcePropertiesSafeResponse {}))
1020    }
1021
1022    /// Reset source split assignments (UNSAFE - admin only).
1023    /// This clears persisted split metadata and triggers re-discovery.
1024    async fn reset_source_splits(
1025        &self,
1026        request: Request<ResetSourceSplitsRequest>,
1027    ) -> Result<Response<ResetSourceSplitsResponse>, Status> {
1028        let request = request.into_inner();
1029        let source_id = request.source_id;
1030
1031        tracing::warn!(
1032            source_id = source_id,
1033            "UNSAFE: Resetting source splits - this may cause data duplication or loss"
1034        );
1035
1036        self.stream_manager
1037            .source_manager
1038            .reset_source_splits(source_id.into())
1039            .await?;
1040
1041        Ok(Response::new(ResetSourceSplitsResponse {}))
1042    }
1043
1044    /// Inject specific offsets into source splits (UNSAFE - admin only).
1045    /// This can cause data duplication or loss depending on the correctness of the provided offsets.
1046    async fn inject_source_offsets(
1047        &self,
1048        request: Request<InjectSourceOffsetsRequest>,
1049    ) -> Result<Response<InjectSourceOffsetsResponse>, Status> {
1050        let request = request.into_inner();
1051        let source_id = request.source_id;
1052        let split_offsets = request.split_offsets;
1053
1054        // Validate split IDs exist before proceeding
1055        let applied_split_ids = self
1056            .stream_manager
1057            .source_manager
1058            .validate_inject_source_offsets(source_id.into(), &split_offsets)
1059            .await?;
1060
1061        tracing::warn!(
1062            source_id = source_id,
1063            num_offsets = split_offsets.len(),
1064            "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
1065        );
1066
1067        let database_id = self
1068            .metadata_manager
1069            .catalog_controller
1070            .get_object_database_id(SourceId::from(source_id))
1071            .await?;
1072
1073        self.barrier_scheduler
1074            .run_command(
1075                database_id,
1076                Command::InjectSourceOffsets {
1077                    source_id: SourceId::from(source_id),
1078                    split_offsets,
1079                },
1080            )
1081            .await?;
1082
1083        Ok(Response::new(InjectSourceOffsetsResponse {
1084            applied_split_ids,
1085        }))
1086    }
1087}
1088
1089fn fragment_desc_to_distribution(
1090    fragment_desc: FragmentDesc,
1091    upstreams: Vec<FragmentId>,
1092    include_node: bool,
1093) -> FragmentDistribution {
1094    let node = include_node.then(|| fragment_desc.stream_node.to_protobuf());
1095    FragmentDistribution {
1096        fragment_id: fragment_desc.fragment_id,
1097        table_id: fragment_desc.job_id,
1098        distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
1099        state_table_ids: fragment_desc.state_table_ids.0,
1100        upstream_fragment_ids: upstreams,
1101        fragment_type_mask: fragment_desc.fragment_type_mask as _,
1102        parallelism: fragment_desc.parallelism as _,
1103        vnode_count: fragment_desc.vnode_count as _,
1104        node,
1105        parallelism_policy: fragment_desc.parallelism_policy,
1106    }
1107}