1use std::collections::{HashMap, HashSet};
16
17use chrono::DateTime;
18use itertools::Itertools;
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo};
28use risingwave_meta::{MetaError, model};
29use risingwave_meta_model::{ConnectionId, FragmentId, StreamingParallelism};
30use risingwave_pb::common::ThrottleType;
31use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
32use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
33use risingwave_pb::meta::list_actor_splits_response::FragmentType;
34use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
35use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
36use risingwave_pb::meta::list_table_fragments_response::{
37 ActorInfo, FragmentInfo, TableFragmentInfo,
38};
39use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
40use risingwave_pb::meta::table_fragments::PbState;
41use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
42use risingwave_pb::meta::*;
43use risingwave_pb::stream_plan::stream_node::NodeBody;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use tonic::{Request, Response, Status};
46
47use crate::barrier::{BarrierScheduler, Command};
48use crate::manager::MetaSrvEnv;
49use crate::stream::GlobalStreamManagerRef;
50
51pub type TonicResponse<T> = Result<Response<T>, Status>;
52
53#[derive(Clone)]
54pub struct StreamServiceImpl {
55 env: MetaSrvEnv,
56 barrier_scheduler: BarrierScheduler,
57 barrier_manager: BarrierManagerRef,
58 stream_manager: GlobalStreamManagerRef,
59 metadata_manager: MetadataManager,
60 refresh_manager: GlobalRefreshManagerRef,
61}
62
63impl StreamServiceImpl {
64 pub fn new(
65 env: MetaSrvEnv,
66 barrier_scheduler: BarrierScheduler,
67 barrier_manager: BarrierManagerRef,
68 stream_manager: GlobalStreamManagerRef,
69 metadata_manager: MetadataManager,
70 refresh_manager: GlobalRefreshManagerRef,
71 ) -> Self {
72 StreamServiceImpl {
73 env,
74 barrier_scheduler,
75 barrier_manager,
76 stream_manager,
77 metadata_manager,
78 refresh_manager,
79 }
80 }
81}
82
83#[async_trait::async_trait]
84impl StreamManagerService for StreamServiceImpl {
85 async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
86 self.env.idle_manager().record_activity();
87 let req = request.into_inner();
88
89 let version_id = self.barrier_scheduler.flush(req.database_id).await?;
90 Ok(Response::new(FlushResponse {
91 status: None,
92 hummock_version_id: version_id.to_u64(),
93 }))
94 }
95
96 async fn list_refresh_table_states(
97 &self,
98 _request: Request<ListRefreshTableStatesRequest>,
99 ) -> TonicResponse<ListRefreshTableStatesResponse> {
100 let refresh_jobs = self.metadata_manager.list_refresh_jobs().await?;
101 let refresh_table_states = refresh_jobs
102 .into_iter()
103 .map(|job| RefreshTableState {
104 table_id: job.table_id,
105 current_status: job.current_status.to_string(),
106 last_trigger_time: job
107 .last_trigger_time
108 .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
109 trigger_interval_secs: job.trigger_interval_secs,
110 last_success_time: job
111 .last_success_time
112 .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
113 })
114 .collect();
115 Ok(Response::new(ListRefreshTableStatesResponse {
116 states: refresh_table_states,
117 }))
118 }
119
120 async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
121 for database_id in self.metadata_manager.list_active_database_ids().await? {
122 self.barrier_scheduler
123 .run_command(database_id, Command::pause())
124 .await?;
125 }
126 Ok(Response::new(PauseResponse {}))
127 }
128
129 async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
130 for database_id in self.metadata_manager.list_active_database_ids().await? {
131 self.barrier_scheduler
132 .run_command(database_id, Command::resume())
133 .await?;
134 }
135 Ok(Response::new(ResumeResponse {}))
136 }
137
138 async fn apply_throttle(
139 &self,
140 request: Request<ApplyThrottleRequest>,
141 ) -> Result<Response<ApplyThrottleResponse>, Status> {
142 let request = request.into_inner();
143
144 let throttle_target = request.throttle_target();
146 let throttle_type = request.throttle_type();
147
148 let raw_object_id: u32;
149 let jobs: HashSet<JobId>;
150 let fragments: HashSet<FragmentId>;
151
152 match (throttle_type, throttle_target) {
153 (ThrottleType::Source, ThrottleTarget::Source | ThrottleTarget::Table) => {
154 (jobs, fragments) = self
155 .metadata_manager
156 .update_source_rate_limit_by_source_id(request.id.into(), request.rate)
157 .await?;
158 raw_object_id = request.id;
159 }
160 (ThrottleType::Backfill, ThrottleTarget::Mv)
161 | (ThrottleType::Backfill, ThrottleTarget::Sink)
162 | (ThrottleType::Backfill, ThrottleTarget::Table) => {
163 fragments = self
164 .metadata_manager
165 .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
166 .await?;
167 jobs = [request.id.into()].into_iter().collect();
168 raw_object_id = request.id;
169 }
170 (ThrottleType::Dml, ThrottleTarget::Table) => {
171 fragments = self
172 .metadata_manager
173 .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
174 .await?;
175 jobs = [request.id.into()].into_iter().collect();
176 raw_object_id = request.id;
177 }
178 (ThrottleType::Sink, ThrottleTarget::Sink) => {
179 fragments = self
180 .metadata_manager
181 .update_sink_rate_limit_by_sink_id(request.id.into(), request.rate)
182 .await?;
183 jobs = [request.id.into()].into_iter().collect();
184 raw_object_id = request.id;
185 }
186 (_, ThrottleTarget::Fragment) => {
188 self.metadata_manager
189 .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate)
190 .await?;
191 let fragment_id = request.id.into();
192 fragments = [fragment_id].into_iter().collect();
193 let job_id = self
194 .metadata_manager
195 .catalog_controller
196 .get_fragment_streaming_job_id(fragment_id)
197 .await?;
198 jobs = [job_id].into_iter().collect();
199 raw_object_id = job_id.as_raw_id();
200 }
201 _ => {
202 return Err(Status::invalid_argument(format!(
203 "unsupported throttle target/type: {:?}/{:?}",
204 throttle_target, throttle_type
205 )));
206 }
207 };
208
209 let database_id = self
210 .metadata_manager
211 .catalog_controller
212 .get_object_database_id(raw_object_id)
213 .await?;
214
215 let throttle_config = ThrottleConfig {
216 rate_limit: request.rate,
217 throttle_type: throttle_type.into(),
218 };
219 let _i = self
220 .barrier_scheduler
221 .run_command(
222 database_id,
223 Command::Throttle {
224 jobs,
225 config: fragments
226 .into_iter()
227 .map(|fragment_id| (fragment_id, throttle_config))
228 .collect(),
229 },
230 )
231 .await?;
232
233 Ok(Response::new(ApplyThrottleResponse { status: None }))
234 }
235
236 async fn cancel_creating_jobs(
237 &self,
238 request: Request<CancelCreatingJobsRequest>,
239 ) -> TonicResponse<CancelCreatingJobsResponse> {
240 let req = request.into_inner();
241 let job_ids = match req.jobs.unwrap() {
242 Jobs::Infos(infos) => self
243 .metadata_manager
244 .catalog_controller
245 .find_creating_streaming_job_ids(infos.infos)
246 .await?
247 .into_iter()
248 .map(|id| id.as_job_id())
249 .collect(),
250 Jobs::Ids(jobs) => jobs.job_ids,
251 };
252
253 let canceled_jobs = self
254 .stream_manager
255 .cancel_streaming_jobs(job_ids)
256 .await?
257 .into_iter()
258 .map(|id| id.as_raw_id())
259 .collect_vec();
260 Ok(Response::new(CancelCreatingJobsResponse {
261 status: None,
262 canceled_jobs,
263 }))
264 }
265
266 async fn list_table_fragments(
267 &self,
268 request: Request<ListTableFragmentsRequest>,
269 ) -> Result<Response<ListTableFragmentsResponse>, Status> {
270 let req = request.into_inner();
271 let table_ids = HashSet::<JobId>::from_iter(req.table_ids);
272
273 let mut info = HashMap::new();
274 for job_id in table_ids {
275 let table_fragments = self
276 .metadata_manager
277 .catalog_controller
278 .get_job_fragments_by_id(job_id)
279 .await?;
280 let mut dispatchers = self
281 .metadata_manager
282 .catalog_controller
283 .get_fragment_actor_dispatchers(
284 table_fragments.fragment_ids().map(|id| id as _).collect(),
285 )
286 .await?;
287 let ctx = table_fragments.ctx.to_protobuf();
288 info.insert(
289 table_fragments.stream_job_id(),
290 TableFragmentInfo {
291 fragments: table_fragments
292 .fragments
293 .into_iter()
294 .map(|(id, fragment)| FragmentInfo {
295 id,
296 actors: fragment
297 .actors
298 .into_iter()
299 .map(|actor| ActorInfo {
300 id: actor.actor_id,
301 node: Some(fragment.nodes.clone()),
302 dispatcher: dispatchers
303 .get_mut(&(fragment.fragment_id as _))
304 .and_then(|dispatchers| {
305 dispatchers.remove(&(actor.actor_id as _))
306 })
307 .unwrap_or_default(),
308 })
309 .collect_vec(),
310 })
311 .collect_vec(),
312 ctx: Some(ctx),
313 },
314 );
315 }
316
317 Ok(Response::new(ListTableFragmentsResponse {
318 table_fragments: info,
319 }))
320 }
321
322 async fn list_streaming_job_states(
323 &self,
324 _request: Request<ListStreamingJobStatesRequest>,
325 ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
326 let job_infos = self
327 .metadata_manager
328 .catalog_controller
329 .list_streaming_job_infos()
330 .await?;
331 let states = job_infos
332 .into_iter()
333 .map(
334 |StreamingJobInfo {
335 job_id,
336 job_status,
337 name,
338 parallelism,
339 max_parallelism,
340 resource_group,
341 database_id,
342 schema_id,
343 config_override,
344 ..
345 }| {
346 let parallelism = match parallelism {
347 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
348 StreamingParallelism::Custom => model::TableParallelism::Custom,
349 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
350 };
351
352 list_streaming_job_states_response::StreamingJobState {
353 table_id: job_id,
354 name,
355 state: PbState::from(job_status) as _,
356 parallelism: Some(parallelism.into()),
357 max_parallelism: max_parallelism as _,
358 resource_group,
359 database_id,
360 schema_id,
361 config_override,
362 }
363 },
364 )
365 .collect_vec();
366
367 Ok(Response::new(ListStreamingJobStatesResponse { states }))
368 }
369
370 async fn list_fragment_distribution(
371 &self,
372 _request: Request<ListFragmentDistributionRequest>,
373 ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
374 let distributions = self
375 .metadata_manager
376 .catalog_controller
377 .list_fragment_descs(false)
378 .await?
379 .into_iter()
380 .map(|(dist, _)| dist)
381 .collect();
382
383 Ok(Response::new(ListFragmentDistributionResponse {
384 distributions,
385 }))
386 }
387
388 async fn list_creating_fragment_distribution(
389 &self,
390 _request: Request<ListCreatingFragmentDistributionRequest>,
391 ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
392 let distributions = self
393 .metadata_manager
394 .catalog_controller
395 .list_fragment_descs(true)
396 .await?
397 .into_iter()
398 .map(|(dist, _)| dist)
399 .collect();
400
401 Ok(Response::new(ListCreatingFragmentDistributionResponse {
402 distributions,
403 }))
404 }
405
406 async fn get_fragment_by_id(
407 &self,
408 request: Request<GetFragmentByIdRequest>,
409 ) -> Result<Response<GetFragmentByIdResponse>, Status> {
410 let req = request.into_inner();
411 let fragment_desc = self
412 .metadata_manager
413 .catalog_controller
414 .get_fragment_desc_by_id(req.fragment_id)
415 .await?;
416 let distribution =
417 fragment_desc.map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams));
418 Ok(Response::new(GetFragmentByIdResponse { distribution }))
419 }
420
421 async fn get_fragment_vnodes(
422 &self,
423 request: Request<GetFragmentVnodesRequest>,
424 ) -> Result<Response<GetFragmentVnodesResponse>, Status> {
425 let req = request.into_inner();
426 let fragment_id = req.fragment_id;
427
428 let shared_actor_infos = self.env.shared_actor_infos();
429 let guard = shared_actor_infos.read_guard();
430
431 let fragment_info = guard
432 .get_fragment(fragment_id)
433 .ok_or_else(|| Status::not_found(format!("Fragment {} not found", fragment_id)))?;
434
435 let actor_vnodes = fragment_info
436 .actors
437 .iter()
438 .map(|(actor_id, actor_info)| {
439 let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
440 vnode_bitmap.iter_ones().map(|v| v as u32).collect()
441 } else {
442 vec![]
443 };
444
445 get_fragment_vnodes_response::ActorVnodes {
446 actor_id: *actor_id,
447 vnode_indices,
448 }
449 })
450 .collect();
451
452 Ok(Response::new(GetFragmentVnodesResponse { actor_vnodes }))
453 }
454
455 async fn get_actor_vnodes(
456 &self,
457 request: Request<GetActorVnodesRequest>,
458 ) -> Result<Response<GetActorVnodesResponse>, Status> {
459 let req = request.into_inner();
460 let actor_id = req.actor_id;
461
462 let shared_actor_infos = self.env.shared_actor_infos();
463 let guard = shared_actor_infos.read_guard();
464
465 let actor_info = guard
467 .iter_over_fragments()
468 .find_map(|(_, fragment_info)| fragment_info.actors.get(&actor_id))
469 .ok_or_else(|| Status::not_found(format!("Actor {} not found", actor_id)))?;
470
471 let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
472 vnode_bitmap.iter_ones().map(|v| v as u32).collect()
473 } else {
474 vec![]
475 };
476
477 Ok(Response::new(GetActorVnodesResponse { vnode_indices }))
478 }
479
480 async fn list_actor_states(
481 &self,
482 _request: Request<ListActorStatesRequest>,
483 ) -> Result<Response<ListActorStatesResponse>, Status> {
484 let actor_locations = self
485 .metadata_manager
486 .catalog_controller
487 .list_actor_locations()?;
488 let states = actor_locations
489 .into_iter()
490 .map(|actor_location| list_actor_states_response::ActorState {
491 actor_id: actor_location.actor_id,
492 fragment_id: actor_location.fragment_id,
493 worker_id: actor_location.worker_id,
494 })
495 .collect_vec();
496
497 Ok(Response::new(ListActorStatesResponse { states }))
498 }
499
500 async fn list_object_dependencies(
501 &self,
502 _request: Request<ListObjectDependenciesRequest>,
503 ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
504 let dependencies = self
505 .metadata_manager
506 .catalog_controller
507 .list_created_object_dependencies()
508 .await?;
509
510 Ok(Response::new(ListObjectDependenciesResponse {
511 dependencies,
512 }))
513 }
514
515 async fn recover(
516 &self,
517 _request: Request<RecoverRequest>,
518 ) -> Result<Response<RecoverResponse>, Status> {
519 self.barrier_manager.adhoc_recovery().await?;
520 Ok(Response::new(RecoverResponse {}))
521 }
522
523 async fn list_actor_splits(
524 &self,
525 _request: Request<ListActorSplitsRequest>,
526 ) -> Result<Response<ListActorSplitsResponse>, Status> {
527 let SourceManagerRunningInfo {
528 source_fragments,
529 backfill_fragments,
530 } = self.stream_manager.source_manager.get_running_info().await;
531
532 let mut actor_splits = self.env.shared_actor_infos().list_assignments();
533
534 let source_actors: HashMap<_, _> = {
535 let all_fragment_ids: HashSet<_> = backfill_fragments
536 .values()
537 .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
538 .chain(source_fragments.values().flatten().copied())
539 .collect();
540
541 let guard = self.env.shared_actor_infos().read_guard();
542 guard
543 .iter_over_fragments()
544 .filter(|(frag_id, _)| all_fragment_ids.contains(frag_id))
545 .flat_map(|(fragment_id, fragment_info)| {
546 fragment_info
547 .actors
548 .keys()
549 .copied()
550 .map(|actor_id| (actor_id, *fragment_id))
551 })
552 .collect()
553 };
554
555 let is_shared_source = self
556 .metadata_manager
557 .catalog_controller
558 .list_source_id_with_shared_types()
559 .await?;
560
561 let fragment_to_source: HashMap<_, _> = source_fragments
562 .into_iter()
563 .flat_map(|(source_id, fragment_ids)| {
564 let source_type = if is_shared_source
565 .get(&(source_id as _))
566 .copied()
567 .unwrap_or(false)
568 {
569 FragmentType::SharedSource
570 } else {
571 FragmentType::NonSharedSource
572 };
573
574 fragment_ids
575 .into_iter()
576 .map(move |fragment_id| (fragment_id, (source_id, source_type)))
577 })
578 .chain(
579 backfill_fragments
580 .into_iter()
581 .flat_map(|(source_id, fragment_ids)| {
582 fragment_ids.into_iter().flat_map(
583 move |(fragment_id, upstream_fragment_id)| {
584 [
585 (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
586 (
587 upstream_fragment_id,
588 (source_id, FragmentType::SharedSource),
589 ),
590 ]
591 },
592 )
593 }),
594 )
595 .collect();
596
597 let actor_splits = source_actors
598 .into_iter()
599 .flat_map(|(actor_id, fragment_id)| {
600 let (source_id, fragment_type) = fragment_to_source
601 .get(&(fragment_id as _))
602 .copied()
603 .unwrap_or_default();
604
605 actor_splits
606 .remove(&(actor_id as _))
607 .unwrap_or_default()
608 .into_iter()
609 .map(move |split| list_actor_splits_response::ActorSplit {
610 actor_id,
611 source_id,
612 fragment_id,
613 split_id: split.id().to_string(),
614 fragment_type: fragment_type.into(),
615 })
616 })
617 .collect_vec();
618
619 Ok(Response::new(ListActorSplitsResponse { actor_splits }))
620 }
621
622 async fn list_rate_limits(
623 &self,
624 _request: Request<ListRateLimitsRequest>,
625 ) -> Result<Response<ListRateLimitsResponse>, Status> {
626 let rate_limits = self
627 .metadata_manager
628 .catalog_controller
629 .list_rate_limits()
630 .await?;
631 Ok(Response::new(ListRateLimitsResponse { rate_limits }))
632 }
633
634 #[cfg_attr(coverage, coverage(off))]
635 async fn refresh(
636 &self,
637 request: Request<RefreshRequest>,
638 ) -> Result<Response<RefreshResponse>, Status> {
639 let req = request.into_inner();
640
641 tracing::info!("Refreshing table with id: {}", req.table_id);
642
643 let response = self
644 .refresh_manager
645 .trigger_manual_refresh(req, self.env.shared_actor_infos())
646 .await?;
647
648 Ok(Response::new(response))
649 }
650
651 async fn alter_connector_props(
652 &self,
653 request: Request<AlterConnectorPropsRequest>,
654 ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
655 let request = request.into_inner();
656 let secret_manager = LocalSecretManager::global();
657 let (new_props_plaintext, object_id) = match AlterConnectorPropsObject::try_from(
658 request.object_type,
659 ) {
660 Ok(AlterConnectorPropsObject::Sink) => (
661 self.metadata_manager
662 .update_sink_props_by_sink_id(
663 request.object_id.into(),
664 request.changed_props.clone().into_iter().collect(),
665 )
666 .await?,
667 request.object_id.into(),
668 ),
669 Ok(AlterConnectorPropsObject::IcebergTable) => {
670 let (prop, sink_id) = self
671 .metadata_manager
672 .update_iceberg_table_props_by_table_id(
673 request.object_id.into(),
674 request.changed_props.clone().into_iter().collect(),
675 request.extra_options,
676 )
677 .await?;
678 (prop, sink_id.as_object_id())
679 }
680
681 Ok(AlterConnectorPropsObject::Source) => {
682 if request.connector_conn_ref.is_some() {
684 return Err(Status::invalid_argument(
685 "alter connector_conn_ref is not supported",
686 ));
687 }
688 let options_with_secret = self
689 .metadata_manager
690 .catalog_controller
691 .update_source_props_by_source_id(
692 request.object_id.into(),
693 request.changed_props.clone().into_iter().collect(),
694 request.changed_secret_refs.clone().into_iter().collect(),
695 )
696 .await?;
697
698 self.stream_manager
699 .source_manager
700 .validate_source_once(request.object_id.into(), options_with_secret.clone())
701 .await?;
702
703 let (options, secret_refs) = options_with_secret.into_parts();
704 (
705 secret_manager
706 .fill_secrets(options, secret_refs)
707 .map_err(MetaError::from)?
708 .into_iter()
709 .collect(),
710 request.object_id.into(),
711 )
712 }
713 Ok(AlterConnectorPropsObject::Connection) => {
714 let (
717 connection_options_with_secret,
718 updated_sources_with_props,
719 updated_sinks_with_props,
720 ) = self
721 .metadata_manager
722 .catalog_controller
723 .update_connection_and_dependent_objects_props(
724 ConnectionId::from(request.object_id),
725 request.changed_props.clone().into_iter().collect(),
726 request.changed_secret_refs.clone().into_iter().collect(),
727 )
728 .await?;
729
730 let (options, secret_refs) = connection_options_with_secret.into_parts();
732 let new_props_plaintext = secret_manager
733 .fill_secrets(options, secret_refs)
734 .map_err(MetaError::from)?
735 .into_iter()
736 .collect::<HashMap<String, String>>();
737
738 let mut dependent_mutation = HashMap::default();
740 for (source_id, complete_source_props) in updated_sources_with_props {
741 dependent_mutation.insert(source_id.as_object_id(), complete_source_props);
742 }
743 for (sink_id, complete_sink_props) in updated_sinks_with_props {
744 dependent_mutation.insert(sink_id.as_object_id(), complete_sink_props);
745 }
746
747 if !dependent_mutation.is_empty() {
748 let database_id = self
749 .metadata_manager
750 .catalog_controller
751 .get_object_database_id(ConnectionId::from(request.object_id))
752 .await?;
753 tracing::info!(
754 "broadcasting connection {} property changes to dependent object ids: {:?}",
755 request.object_id,
756 dependent_mutation.keys().collect_vec()
757 );
758 let _version = self
759 .barrier_scheduler
760 .run_command(
761 database_id,
762 Command::ConnectorPropsChange(dependent_mutation),
763 )
764 .await?;
765 }
766
767 (
768 new_props_plaintext,
769 ConnectionId::from(request.object_id).as_object_id(),
770 )
771 }
772
773 _ => {
774 unimplemented!(
775 "Unsupported object type for AlterConnectorProps: {:?}",
776 request.object_type
777 );
778 }
779 };
780
781 let database_id = self
782 .metadata_manager
783 .catalog_controller
784 .get_object_database_id(object_id)
785 .await?;
786 if AlterConnectorPropsObject::try_from(request.object_type)
789 .is_ok_and(|t| t != AlterConnectorPropsObject::Connection)
790 {
791 let mut mutation = HashMap::default();
792 mutation.insert(object_id, new_props_plaintext);
793 let _version = self
794 .barrier_scheduler
795 .run_command(database_id, Command::ConnectorPropsChange(mutation))
796 .await?;
797 }
798
799 Ok(Response::new(AlterConnectorPropsResponse {}))
800 }
801
802 async fn set_sync_log_store_aligned(
803 &self,
804 request: Request<SetSyncLogStoreAlignedRequest>,
805 ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
806 let req = request.into_inner();
807 let job_id = req.job_id;
808 let aligned = req.aligned;
809
810 self.metadata_manager
811 .catalog_controller
812 .mutate_fragments_by_job_id(
813 job_id,
814 |_mask, stream_node| {
815 let mut visited = false;
816 visit_stream_node_mut(stream_node, |body| {
817 if let NodeBody::SyncLogStore(sync_log_store) = body {
818 sync_log_store.aligned = aligned;
819 visited = true
820 }
821 });
822 Ok(visited)
823 },
824 "no fragments found with synced log store",
825 )
826 .await?;
827
828 Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
829 }
830
831 async fn list_cdc_progress(
832 &self,
833 _request: Request<ListCdcProgressRequest>,
834 ) -> Result<Response<ListCdcProgressResponse>, Status> {
835 let cdc_progress = self
836 .barrier_manager
837 .get_cdc_progress()
838 .await?
839 .into_iter()
840 .map(|(job_id, p)| {
841 (
842 job_id,
843 PbCdcProgress {
844 split_total_count: p.split_total_count,
845 split_backfilled_count: p.split_backfilled_count,
846 split_completed_count: p.split_completed_count,
847 },
848 )
849 })
850 .collect();
851 Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
852 }
853
854 async fn list_unmigrated_tables(
855 &self,
856 _request: Request<ListUnmigratedTablesRequest>,
857 ) -> Result<Response<ListUnmigratedTablesResponse>, Status> {
858 let unmigrated_tables = self
859 .metadata_manager
860 .catalog_controller
861 .list_unmigrated_tables()
862 .await?
863 .into_iter()
864 .map(|table| list_unmigrated_tables_response::UnmigratedTable {
865 table_id: table.id,
866 table_name: table.name,
867 })
868 .collect();
869
870 Ok(Response::new(ListUnmigratedTablesResponse {
871 tables: unmigrated_tables,
872 }))
873 }
874}
875
876fn fragment_desc_to_distribution(
877 fragment_desc: FragmentDesc,
878 upstreams: Vec<FragmentId>,
879) -> FragmentDistribution {
880 FragmentDistribution {
881 fragment_id: fragment_desc.fragment_id,
882 table_id: fragment_desc.job_id,
883 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
884 state_table_ids: fragment_desc.state_table_ids.0,
885 upstream_fragment_ids: upstreams,
886 fragment_type_mask: fragment_desc.fragment_type_mask as _,
887 parallelism: fragment_desc.parallelism as _,
888 vnode_count: fragment_desc.vnode_count as _,
889 node: Some(fragment_desc.stream_node.to_protobuf()),
890 parallelism_policy: fragment_desc.parallelism_policy,
891 }
892}