1use std::collections::{HashMap, HashSet};
16
17use chrono::DateTime;
18use itertools::Itertools;
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
28use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo};
29use risingwave_meta::{MetaError, model};
30use risingwave_meta_model::{ConnectionId, FragmentId, SourceId, StreamingParallelism};
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
33use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
34use risingwave_pb::meta::list_actor_splits_response::FragmentType;
35use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
36use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
37use risingwave_pb::meta::list_table_fragments_response::{
38 ActorInfo, FragmentInfo, TableFragmentInfo,
39};
40use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
41use risingwave_pb::meta::table_fragments::PbState;
42use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
43use risingwave_pb::meta::*;
44use risingwave_pb::stream_plan::stream_node::NodeBody;
45use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
46use tonic::{Request, Response, Status};
47
48use crate::barrier::{BarrierScheduler, Command};
49use crate::manager::MetaSrvEnv;
50use crate::stream::GlobalStreamManagerRef;
51
52pub type TonicResponse<T> = Result<Response<T>, Status>;
53
54#[derive(Clone)]
55pub struct StreamServiceImpl {
56 env: MetaSrvEnv,
57 barrier_scheduler: BarrierScheduler,
58 barrier_manager: BarrierManagerRef,
59 stream_manager: GlobalStreamManagerRef,
60 metadata_manager: MetadataManager,
61 refresh_manager: GlobalRefreshManagerRef,
62 iceberg_compaction_manager: IcebergCompactionManagerRef,
63}
64
65impl StreamServiceImpl {
66 pub fn new(
67 env: MetaSrvEnv,
68 barrier_scheduler: BarrierScheduler,
69 barrier_manager: BarrierManagerRef,
70 stream_manager: GlobalStreamManagerRef,
71 metadata_manager: MetadataManager,
72 refresh_manager: GlobalRefreshManagerRef,
73 iceberg_compaction_manager: IcebergCompactionManagerRef,
74 ) -> Self {
75 StreamServiceImpl {
76 env,
77 barrier_scheduler,
78 barrier_manager,
79 stream_manager,
80 metadata_manager,
81 refresh_manager,
82 iceberg_compaction_manager,
83 }
84 }
85}
86
87#[async_trait::async_trait]
88impl StreamManagerService for StreamServiceImpl {
89 async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
90 self.env.idle_manager().record_activity();
91 let req = request.into_inner();
92
93 let version_id = self.barrier_scheduler.flush(req.database_id).await?;
94 Ok(Response::new(FlushResponse {
95 status: None,
96 hummock_version_id: version_id,
97 }))
98 }
99
100 async fn list_refresh_table_states(
101 &self,
102 _request: Request<ListRefreshTableStatesRequest>,
103 ) -> TonicResponse<ListRefreshTableStatesResponse> {
104 let refresh_jobs = self.metadata_manager.list_refresh_jobs().await?;
105 let refresh_table_states = refresh_jobs
106 .into_iter()
107 .map(|job| RefreshTableState {
108 table_id: job.table_id,
109 current_status: job.current_status.to_string(),
110 last_trigger_time: job
111 .last_trigger_time
112 .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
113 trigger_interval_secs: job.trigger_interval_secs,
114 last_success_time: job
115 .last_success_time
116 .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
117 })
118 .collect();
119 Ok(Response::new(ListRefreshTableStatesResponse {
120 states: refresh_table_states,
121 }))
122 }
123
124 async fn list_iceberg_compaction_status(
125 &self,
126 _request: Request<ListIcebergCompactionStatusRequest>,
127 ) -> TonicResponse<ListIcebergCompactionStatusResponse> {
128 let statuses = self
129 .iceberg_compaction_manager
130 .list_compaction_statuses()
131 .await
132 .into_iter()
133 .map(
134 |status| list_iceberg_compaction_status_response::IcebergCompactionStatus {
135 sink_id: status.sink_id.as_raw_id(),
136 task_type: status.task_type,
137 trigger_interval_sec: status.trigger_interval_sec,
138 trigger_snapshot_count: status.trigger_snapshot_count as u64,
139 schedule_state: status.schedule_state,
140 next_compaction_after_sec: status.next_compaction_after_sec,
141 pending_snapshot_count: status.pending_snapshot_count.map(|count| count as u64),
142 is_triggerable: status.is_triggerable,
143 },
144 )
145 .collect();
146
147 Ok(Response::new(ListIcebergCompactionStatusResponse {
148 statuses,
149 }))
150 }
151
152 async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
153 for database_id in self.metadata_manager.list_active_database_ids().await? {
154 self.barrier_scheduler
155 .run_command(database_id, Command::pause())
156 .await?;
157 }
158 Ok(Response::new(PauseResponse {}))
159 }
160
161 async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
162 for database_id in self.metadata_manager.list_active_database_ids().await? {
163 self.barrier_scheduler
164 .run_command(database_id, Command::resume())
165 .await?;
166 }
167 Ok(Response::new(ResumeResponse {}))
168 }
169
170 async fn apply_throttle(
171 &self,
172 request: Request<ApplyThrottleRequest>,
173 ) -> Result<Response<ApplyThrottleResponse>, Status> {
174 let request = request.into_inner();
175
176 let throttle_target = request.throttle_target();
178 let throttle_type = request.throttle_type();
179
180 let raw_object_id: u32;
181 let jobs: HashSet<JobId>;
182 let fragments: HashSet<FragmentId>;
183
184 match (throttle_type, throttle_target) {
185 (ThrottleType::Source, ThrottleTarget::Source | ThrottleTarget::Table) => {
186 (jobs, fragments) = self
187 .metadata_manager
188 .update_source_rate_limit_by_source_id(request.id.into(), request.rate)
189 .await?;
190 raw_object_id = request.id;
191 }
192 (ThrottleType::Backfill, ThrottleTarget::Mv)
193 | (ThrottleType::Backfill, ThrottleTarget::Sink)
194 | (ThrottleType::Backfill, ThrottleTarget::Table) => {
195 fragments = self
196 .metadata_manager
197 .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
198 .await?;
199 jobs = [request.id.into()].into_iter().collect();
200 raw_object_id = request.id;
201 }
202 (ThrottleType::Dml, ThrottleTarget::Table) => {
203 fragments = self
204 .metadata_manager
205 .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
206 .await?;
207 jobs = [request.id.into()].into_iter().collect();
208 raw_object_id = request.id;
209 }
210 (ThrottleType::Sink, ThrottleTarget::Sink) => {
211 fragments = self
212 .metadata_manager
213 .update_sink_rate_limit_by_sink_id(request.id.into(), request.rate)
214 .await?;
215 jobs = [request.id.into()].into_iter().collect();
216 raw_object_id = request.id;
217 }
218 (_, ThrottleTarget::Fragment) => {
220 self.metadata_manager
221 .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate)
222 .await?;
223 let fragment_id = request.id.into();
224 fragments = [fragment_id].into_iter().collect();
225 let job_id = self
226 .metadata_manager
227 .catalog_controller
228 .get_fragment_streaming_job_id(fragment_id)
229 .await?;
230 jobs = [job_id].into_iter().collect();
231 raw_object_id = job_id.as_raw_id();
232 }
233 _ => {
234 return Err(Status::invalid_argument(format!(
235 "unsupported throttle target/type: {:?}/{:?}",
236 throttle_target, throttle_type
237 )));
238 }
239 };
240
241 let database_id = self
242 .metadata_manager
243 .catalog_controller
244 .get_object_database_id(raw_object_id)
245 .await?;
246
247 let throttle_config = ThrottleConfig {
248 rate_limit: request.rate,
249 throttle_type: throttle_type.into(),
250 };
251 let _i = self
252 .barrier_scheduler
253 .run_command(
254 database_id,
255 Command::Throttle {
256 jobs,
257 config: fragments
258 .into_iter()
259 .map(|fragment_id| (fragment_id, throttle_config))
260 .collect(),
261 },
262 )
263 .await?;
264
265 Ok(Response::new(ApplyThrottleResponse { status: None }))
266 }
267
268 async fn cancel_creating_jobs(
269 &self,
270 request: Request<CancelCreatingJobsRequest>,
271 ) -> TonicResponse<CancelCreatingJobsResponse> {
272 let req = request.into_inner();
273 let job_ids = match req.jobs.unwrap() {
274 Jobs::Infos(infos) => self
275 .metadata_manager
276 .catalog_controller
277 .find_creating_streaming_job_ids(infos.infos)
278 .await?
279 .into_iter()
280 .map(|id| id.as_job_id())
281 .collect(),
282 Jobs::Ids(jobs) => jobs.job_ids,
283 };
284
285 let canceled_jobs = self
286 .stream_manager
287 .cancel_streaming_jobs(job_ids)
288 .await?
289 .into_iter()
290 .map(|id| id.as_raw_id())
291 .collect_vec();
292 Ok(Response::new(CancelCreatingJobsResponse {
293 status: None,
294 canceled_jobs,
295 }))
296 }
297
298 async fn list_table_fragments(
299 &self,
300 request: Request<ListTableFragmentsRequest>,
301 ) -> Result<Response<ListTableFragmentsResponse>, Status> {
302 let req = request.into_inner();
303 let table_ids = HashSet::<JobId>::from_iter(req.table_ids);
304
305 let mut info = HashMap::new();
306 for job_id in table_ids {
307 let (table_fragments, fragment_actors, _actor_status) = self
308 .metadata_manager
309 .catalog_controller
310 .get_job_fragments_by_id(job_id)
311 .await?;
312 let mut dispatchers = self
313 .metadata_manager
314 .catalog_controller
315 .get_fragment_actor_dispatchers(
316 table_fragments.fragment_ids().map(|id| id as _).collect(),
317 )
318 .await?;
319 let ctx = table_fragments.ctx.to_protobuf();
320 info.insert(
321 table_fragments.stream_job_id(),
322 TableFragmentInfo {
323 fragments: table_fragments
324 .fragments
325 .into_iter()
326 .map(|(id, fragment)| FragmentInfo {
327 id,
328 actors: fragment_actors
329 .get(&id)
330 .into_iter()
331 .flat_map(|actors| actors.iter().map(|actor| actor.actor_id))
332 .map(|actor_id| ActorInfo {
333 id: actor_id,
334 node: Some(fragment.nodes.clone()),
335 dispatcher: dispatchers
336 .get_mut(&fragment.fragment_id)
337 .and_then(|dispatchers| dispatchers.remove(&actor_id))
338 .unwrap_or_default(),
339 })
340 .collect_vec(),
341 })
342 .collect_vec(),
343 ctx: Some(ctx),
344 },
345 );
346 }
347
348 Ok(Response::new(ListTableFragmentsResponse {
349 table_fragments: info,
350 }))
351 }
352
353 async fn list_streaming_job_states(
354 &self,
355 _request: Request<ListStreamingJobStatesRequest>,
356 ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
357 let job_infos = self
358 .metadata_manager
359 .catalog_controller
360 .list_streaming_job_infos()
361 .await?;
362 let states = job_infos
363 .into_iter()
364 .map(
365 |StreamingJobInfo {
366 job_id,
367 job_status,
368 name,
369 parallelism,
370 max_parallelism,
371 resource_group,
372 database_id,
373 schema_id,
374 config_override,
375 ..
376 }| {
377 let parallelism = match parallelism {
378 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
379 StreamingParallelism::Custom => model::TableParallelism::Custom,
380 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
381 };
382
383 list_streaming_job_states_response::StreamingJobState {
384 table_id: job_id,
385 name,
386 state: PbState::from(job_status) as _,
387 parallelism: Some(parallelism.into()),
388 max_parallelism: max_parallelism as _,
389 resource_group,
390 database_id,
391 schema_id,
392 config_override,
393 adaptive_parallelism_strategy: None,
394 }
395 },
396 )
397 .collect_vec();
398
399 Ok(Response::new(ListStreamingJobStatesResponse { states }))
400 }
401
402 async fn list_fragment_distribution(
403 &self,
404 _request: Request<ListFragmentDistributionRequest>,
405 ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
406 let include_node = _request.into_inner().include_node.unwrap_or(true);
407 let distributions = if include_node {
408 self.metadata_manager
409 .catalog_controller
410 .list_fragment_descs_with_node(false)
411 .await?
412 } else {
413 self.metadata_manager
414 .catalog_controller
415 .list_fragment_descs_without_node(false)
416 .await?
417 }
418 .into_iter()
419 .map(|(dist, _)| dist)
420 .collect();
421
422 Ok(Response::new(ListFragmentDistributionResponse {
423 distributions,
424 }))
425 }
426
427 async fn list_creating_fragment_distribution(
428 &self,
429 _request: Request<ListCreatingFragmentDistributionRequest>,
430 ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
431 let include_node = _request.into_inner().include_node.unwrap_or(true);
432 let distributions = if include_node {
433 self.metadata_manager
434 .catalog_controller
435 .list_fragment_descs_with_node(true)
436 .await?
437 } else {
438 self.metadata_manager
439 .catalog_controller
440 .list_fragment_descs_without_node(true)
441 .await?
442 }
443 .into_iter()
444 .map(|(dist, _)| dist)
445 .collect();
446
447 Ok(Response::new(ListCreatingFragmentDistributionResponse {
448 distributions,
449 }))
450 }
451
452 async fn list_sink_log_store_tables(
453 &self,
454 _request: Request<ListSinkLogStoreTablesRequest>,
455 ) -> Result<Response<ListSinkLogStoreTablesResponse>, Status> {
456 let tables = self
457 .metadata_manager
458 .catalog_controller
459 .list_sink_log_store_tables()
460 .await?
461 .into_iter()
462 .map(|(sink_id, internal_table_id)| {
463 list_sink_log_store_tables_response::SinkLogStoreTable {
464 sink_id: sink_id.as_raw_id(),
465 internal_table_id: internal_table_id.as_raw_id(),
466 }
467 })
468 .collect();
469
470 Ok(Response::new(ListSinkLogStoreTablesResponse { tables }))
471 }
472
473 async fn get_fragment_by_id(
474 &self,
475 request: Request<GetFragmentByIdRequest>,
476 ) -> Result<Response<GetFragmentByIdResponse>, Status> {
477 let req = request.into_inner();
478 let fragment_desc = self
479 .metadata_manager
480 .catalog_controller
481 .get_fragment_desc_by_id(req.fragment_id)
482 .await?;
483 let distribution = fragment_desc
484 .map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams, true));
485 Ok(Response::new(GetFragmentByIdResponse { distribution }))
486 }
487
488 async fn get_fragment_vnodes(
489 &self,
490 request: Request<GetFragmentVnodesRequest>,
491 ) -> Result<Response<GetFragmentVnodesResponse>, Status> {
492 let req = request.into_inner();
493 let fragment_id = req.fragment_id;
494
495 let shared_actor_infos = self.env.shared_actor_infos();
496 let guard = shared_actor_infos.read_guard();
497
498 let fragment_info = guard
499 .get_fragment(fragment_id)
500 .ok_or_else(|| Status::not_found(format!("Fragment {} not found", fragment_id)))?;
501
502 let actor_vnodes = fragment_info
503 .actors
504 .iter()
505 .map(|(actor_id, actor_info)| {
506 let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
507 vnode_bitmap.iter_ones().map(|v| v as u32).collect()
508 } else {
509 vec![]
510 };
511
512 get_fragment_vnodes_response::ActorVnodes {
513 actor_id: *actor_id,
514 vnode_indices,
515 }
516 })
517 .collect();
518
519 Ok(Response::new(GetFragmentVnodesResponse { actor_vnodes }))
520 }
521
522 async fn get_actor_vnodes(
523 &self,
524 request: Request<GetActorVnodesRequest>,
525 ) -> Result<Response<GetActorVnodesResponse>, Status> {
526 let req = request.into_inner();
527 let actor_id = req.actor_id;
528
529 let shared_actor_infos = self.env.shared_actor_infos();
530 let guard = shared_actor_infos.read_guard();
531
532 let actor_info = guard
534 .iter_over_fragments()
535 .find_map(|(_, fragment_info)| fragment_info.actors.get(&actor_id))
536 .ok_or_else(|| Status::not_found(format!("Actor {} not found", actor_id)))?;
537
538 let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
539 vnode_bitmap.iter_ones().map(|v| v as u32).collect()
540 } else {
541 vec![]
542 };
543
544 Ok(Response::new(GetActorVnodesResponse { vnode_indices }))
545 }
546
547 async fn list_actor_states(
548 &self,
549 _request: Request<ListActorStatesRequest>,
550 ) -> Result<Response<ListActorStatesResponse>, Status> {
551 let actor_locations = self
552 .metadata_manager
553 .catalog_controller
554 .list_actor_locations()?;
555 let states = actor_locations
556 .into_iter()
557 .map(|actor_location| list_actor_states_response::ActorState {
558 actor_id: actor_location.actor_id,
559 fragment_id: actor_location.fragment_id,
560 worker_id: actor_location.worker_id,
561 })
562 .collect_vec();
563
564 Ok(Response::new(ListActorStatesResponse { states }))
565 }
566
567 async fn recover(
568 &self,
569 _request: Request<RecoverRequest>,
570 ) -> Result<Response<RecoverResponse>, Status> {
571 self.barrier_manager.adhoc_recovery().await?;
572 Ok(Response::new(RecoverResponse {}))
573 }
574
575 async fn list_actor_splits(
576 &self,
577 _request: Request<ListActorSplitsRequest>,
578 ) -> Result<Response<ListActorSplitsResponse>, Status> {
579 let SourceManagerRunningInfo {
580 source_fragments,
581 backfill_fragments,
582 } = self.stream_manager.source_manager.get_running_info().await;
583
584 let mut actor_splits = self.env.shared_actor_infos().list_assignments();
585
586 let source_actors: HashMap<_, _> = {
587 let all_fragment_ids: HashSet<_> = backfill_fragments
588 .values()
589 .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
590 .chain(source_fragments.values().flatten().copied())
591 .collect();
592
593 let guard = self.env.shared_actor_infos().read_guard();
594 guard
595 .iter_over_fragments()
596 .filter(|(frag_id, _)| all_fragment_ids.contains(*frag_id))
597 .flat_map(|(fragment_id, fragment_info)| {
598 fragment_info
599 .actors
600 .keys()
601 .copied()
602 .map(|actor_id| (actor_id, *fragment_id))
603 })
604 .collect()
605 };
606
607 let is_shared_source = self
608 .metadata_manager
609 .catalog_controller
610 .list_source_id_with_shared_types()
611 .await?;
612
613 let fragment_to_source: HashMap<_, _> = source_fragments
614 .into_iter()
615 .flat_map(|(source_id, fragment_ids)| {
616 let source_type = if is_shared_source.get(&source_id).copied().unwrap_or(false) {
617 FragmentType::SharedSource
618 } else {
619 FragmentType::NonSharedSource
620 };
621
622 fragment_ids
623 .into_iter()
624 .map(move |fragment_id| (fragment_id, (source_id, source_type)))
625 })
626 .chain(
627 backfill_fragments
628 .into_iter()
629 .flat_map(|(source_id, fragment_ids)| {
630 fragment_ids.into_iter().flat_map(
631 move |(fragment_id, upstream_fragment_id)| {
632 [
633 (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
634 (
635 upstream_fragment_id,
636 (source_id, FragmentType::SharedSource),
637 ),
638 ]
639 },
640 )
641 }),
642 )
643 .collect();
644
645 let actor_splits = source_actors
646 .into_iter()
647 .flat_map(|(actor_id, fragment_id)| {
648 let (source_id, fragment_type) = fragment_to_source
649 .get(&fragment_id)
650 .copied()
651 .unwrap_or_default();
652
653 actor_splits
654 .remove(&actor_id)
655 .unwrap_or_default()
656 .into_iter()
657 .map(move |split| list_actor_splits_response::ActorSplit {
658 actor_id,
659 source_id,
660 fragment_id,
661 split_id: split.id().to_string(),
662 fragment_type: fragment_type.into(),
663 })
664 })
665 .collect_vec();
666
667 Ok(Response::new(ListActorSplitsResponse { actor_splits }))
668 }
669
670 async fn list_rate_limits(
671 &self,
672 _request: Request<ListRateLimitsRequest>,
673 ) -> Result<Response<ListRateLimitsResponse>, Status> {
674 let rate_limits = self
675 .metadata_manager
676 .catalog_controller
677 .list_rate_limits()
678 .await?;
679 Ok(Response::new(ListRateLimitsResponse { rate_limits }))
680 }
681
682 #[cfg_attr(coverage, coverage(off))]
683 async fn refresh(
684 &self,
685 request: Request<RefreshRequest>,
686 ) -> Result<Response<RefreshResponse>, Status> {
687 let req = request.into_inner();
688
689 tracing::info!("Refreshing table with id: {}", req.table_id);
690
691 let response = self
692 .refresh_manager
693 .trigger_manual_refresh(req, self.env.shared_actor_infos())
694 .await?;
695
696 Ok(Response::new(response))
697 }
698
699 async fn alter_connector_props(
700 &self,
701 request: Request<AlterConnectorPropsRequest>,
702 ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
703 let request = request.into_inner();
704 let secret_manager = LocalSecretManager::global();
705 let (new_props_plaintext, object_id) = match AlterConnectorPropsObject::try_from(
706 request.object_type,
707 ) {
708 Ok(AlterConnectorPropsObject::Sink) => (
709 self.metadata_manager
710 .update_sink_props_by_sink_id(
711 request.object_id.into(),
712 request.changed_props.clone().into_iter().collect(),
713 )
714 .await?,
715 request.object_id.into(),
716 ),
717 Ok(AlterConnectorPropsObject::IcebergTable) => {
718 let (prop, sink_id) = self
719 .metadata_manager
720 .update_iceberg_table_props_by_table_id(
721 request.object_id.into(),
722 request.changed_props.clone().into_iter().collect(),
723 request.extra_options,
724 )
725 .await?;
726 (prop, sink_id.as_object_id())
727 }
728
729 Ok(AlterConnectorPropsObject::Source) => {
730 if request.connector_conn_ref.is_some() {
732 return Err(Status::invalid_argument(
733 "alter connector_conn_ref is not supported",
734 ));
735 }
736 let options_with_secret = self
737 .metadata_manager
738 .catalog_controller
739 .update_source_props_by_source_id(
740 request.object_id.into(),
741 request.changed_props.clone().into_iter().collect(),
742 request.changed_secret_refs.clone().into_iter().collect(),
743 false, )
745 .await?;
746
747 self.stream_manager
748 .source_manager
749 .validate_source_once(request.object_id.into(), options_with_secret.clone())
750 .await?;
751
752 let (options, secret_refs) = options_with_secret.into_parts();
753 (
754 secret_manager
755 .fill_secrets(options, secret_refs)
756 .map_err(MetaError::from)?
757 .into_iter()
758 .collect(),
759 request.object_id.into(),
760 )
761 }
762 Ok(AlterConnectorPropsObject::Connection) => {
763 let (
766 connection_options_with_secret,
767 updated_sources_with_props,
768 updated_sinks_with_props,
769 ) = self
770 .metadata_manager
771 .catalog_controller
772 .update_connection_and_dependent_objects_props(
773 ConnectionId::from(request.object_id),
774 request.changed_props.clone().into_iter().collect(),
775 request.changed_secret_refs.clone().into_iter().collect(),
776 )
777 .await?;
778
779 let (options, secret_refs) = connection_options_with_secret.into_parts();
781 let new_props_plaintext = secret_manager
782 .fill_secrets(options, secret_refs)
783 .map_err(MetaError::from)?
784 .into_iter()
785 .collect::<HashMap<String, String>>();
786
787 let mut dependent_mutation = HashMap::default();
789 for (source_id, complete_source_props) in updated_sources_with_props {
790 dependent_mutation.insert(source_id.as_object_id(), complete_source_props);
791 }
792 for (sink_id, complete_sink_props) in updated_sinks_with_props {
793 dependent_mutation.insert(sink_id.as_object_id(), complete_sink_props);
794 }
795
796 if !dependent_mutation.is_empty() {
797 let database_id = self
798 .metadata_manager
799 .catalog_controller
800 .get_object_database_id(ConnectionId::from(request.object_id))
801 .await?;
802 tracing::info!(
803 "broadcasting connection {} property changes to dependent object ids: {:?}",
804 request.object_id,
805 dependent_mutation.keys().collect_vec()
806 );
807 let _version = self
808 .barrier_scheduler
809 .run_command(
810 database_id,
811 Command::ConnectorPropsChange(dependent_mutation),
812 )
813 .await?;
814 }
815
816 (
817 new_props_plaintext,
818 ConnectionId::from(request.object_id).as_object_id(),
819 )
820 }
821
822 _ => {
823 unimplemented!(
824 "Unsupported object type for AlterConnectorProps: {:?}",
825 request.object_type
826 );
827 }
828 };
829
830 let database_id = self
831 .metadata_manager
832 .catalog_controller
833 .get_object_database_id(object_id)
834 .await?;
835 if AlterConnectorPropsObject::try_from(request.object_type)
838 .is_ok_and(|t| t != AlterConnectorPropsObject::Connection)
839 {
840 let mut mutation = HashMap::default();
841 mutation.insert(object_id, new_props_plaintext);
842 let _version = self
843 .barrier_scheduler
844 .run_command(database_id, Command::ConnectorPropsChange(mutation))
845 .await?;
846 }
847
848 Ok(Response::new(AlterConnectorPropsResponse {}))
849 }
850
851 async fn set_sync_log_store_aligned(
852 &self,
853 request: Request<SetSyncLogStoreAlignedRequest>,
854 ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
855 let req = request.into_inner();
856 let job_id = req.job_id;
857 let aligned = req.aligned;
858
859 self.metadata_manager
860 .catalog_controller
861 .mutate_fragments_by_job_id(
862 job_id,
863 |_mask, stream_node| {
864 let mut visited = false;
865 visit_stream_node_mut(stream_node, |body| {
866 if let NodeBody::SyncLogStore(sync_log_store) = body {
867 sync_log_store.aligned = aligned;
868 visited = true
869 }
870 });
871 Ok(visited)
872 },
873 "no fragments found with synced log store",
874 )
875 .await?;
876
877 Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
878 }
879
880 async fn list_cdc_progress(
881 &self,
882 _request: Request<ListCdcProgressRequest>,
883 ) -> Result<Response<ListCdcProgressResponse>, Status> {
884 let cdc_progress = self
885 .barrier_manager
886 .get_cdc_progress()
887 .await?
888 .into_iter()
889 .map(|(job_id, p)| {
890 (
891 job_id,
892 PbCdcProgress {
893 split_total_count: p.split_total_count,
894 split_backfilled_count: p.split_backfilled_count,
895 split_completed_count: p.split_completed_count,
896 },
897 )
898 })
899 .collect();
900 Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
901 }
902
903 async fn list_unmigrated_tables(
904 &self,
905 _request: Request<ListUnmigratedTablesRequest>,
906 ) -> Result<Response<ListUnmigratedTablesResponse>, Status> {
907 let unmigrated_tables = self
908 .metadata_manager
909 .catalog_controller
910 .list_unmigrated_tables()
911 .await?
912 .into_iter()
913 .map(|table| list_unmigrated_tables_response::UnmigratedTable {
914 table_id: table.id,
915 table_name: table.name,
916 })
917 .collect();
918
919 Ok(Response::new(ListUnmigratedTablesResponse {
920 tables: unmigrated_tables,
921 }))
922 }
923
924 async fn alter_source_properties_safe(
927 &self,
928 request: Request<AlterSourcePropertiesSafeRequest>,
929 ) -> Result<Response<AlterSourcePropertiesSafeResponse>, Status> {
930 let request = request.into_inner();
931 let source_id = request.source_id;
932 let options = request.options.unwrap_or_default();
933
934 tracing::info!(
935 source_id = source_id,
936 reset_splits = options.reset_splits,
937 "Starting orchestrated source property update"
938 );
939
940 let database_id = self
942 .metadata_manager
943 .catalog_controller
944 .get_object_database_id(SourceId::from(source_id))
945 .await?;
946
947 tracing::info!(source_id = source_id, "Pausing stream");
949 self.barrier_scheduler
950 .run_command(database_id, Command::Pause)
951 .await?;
952
953 let result = async {
955 let secret_manager = LocalSecretManager::global();
956
957 let options_with_secret = self
958 .metadata_manager
959 .catalog_controller
960 .update_source_props_by_source_id(
961 source_id.into(),
962 request.changed_props.clone().into_iter().collect(),
963 request.changed_secret_refs.clone().into_iter().collect(),
964 true, )
966 .await?;
967
968 self.stream_manager
970 .source_manager
971 .validate_source_once(source_id.into(), options_with_secret.clone())
972 .await?;
973
974 let (props, secret_refs) = options_with_secret.into_parts();
975 let new_props_plaintext: HashMap<String, String> = secret_manager
976 .fill_secrets(props, secret_refs)
977 .map_err(MetaError::from)?
978 .into_iter()
979 .collect();
980
981 tracing::info!(
983 source_id = source_id,
984 "Issuing ConnectorPropsChange barrier"
985 );
986 let mut mutation = HashMap::default();
987 mutation.insert(source_id.into(), new_props_plaintext);
988 self.barrier_scheduler
989 .run_command(database_id, Command::ConnectorPropsChange(mutation))
990 .await?;
991
992 if options.reset_splits {
994 tracing::info!(source_id = source_id, "Resetting source splits");
995 self.stream_manager
996 .source_manager
997 .reset_source_splits(source_id.into())
998 .await?;
999 }
1000
1001 Ok::<_, MetaError>(())
1002 }
1003 .await;
1004
1005 tracing::info!(source_id = source_id, "Resuming stream");
1007 let resume_result = self
1008 .barrier_scheduler
1009 .run_command(database_id, Command::Resume)
1010 .await;
1011
1012 result?;
1014 resume_result?;
1015
1016 tracing::info!(
1017 source_id = source_id,
1018 "Orchestrated source property update completed successfully"
1019 );
1020
1021 Ok(Response::new(AlterSourcePropertiesSafeResponse {}))
1022 }
1023
1024 async fn reset_source_splits(
1027 &self,
1028 request: Request<ResetSourceSplitsRequest>,
1029 ) -> Result<Response<ResetSourceSplitsResponse>, Status> {
1030 let request = request.into_inner();
1031 let source_id = request.source_id;
1032
1033 tracing::warn!(
1034 source_id = source_id,
1035 "UNSAFE: Resetting source splits - this may cause data duplication or loss"
1036 );
1037
1038 self.stream_manager
1039 .source_manager
1040 .reset_source_splits(source_id.into())
1041 .await?;
1042
1043 Ok(Response::new(ResetSourceSplitsResponse {}))
1044 }
1045
1046 async fn inject_source_offsets(
1049 &self,
1050 request: Request<InjectSourceOffsetsRequest>,
1051 ) -> Result<Response<InjectSourceOffsetsResponse>, Status> {
1052 let request = request.into_inner();
1053 let source_id = request.source_id;
1054 let split_offsets = request.split_offsets;
1055
1056 let applied_split_ids = self
1058 .stream_manager
1059 .source_manager
1060 .validate_inject_source_offsets(source_id.into(), &split_offsets)
1061 .await?;
1062
1063 tracing::warn!(
1064 source_id = source_id,
1065 num_offsets = split_offsets.len(),
1066 "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
1067 );
1068
1069 let database_id = self
1070 .metadata_manager
1071 .catalog_controller
1072 .get_object_database_id(SourceId::from(source_id))
1073 .await?;
1074
1075 self.barrier_scheduler
1076 .run_command(
1077 database_id,
1078 Command::InjectSourceOffsets {
1079 source_id: SourceId::from(source_id),
1080 split_offsets,
1081 },
1082 )
1083 .await?;
1084
1085 Ok(Response::new(InjectSourceOffsetsResponse {
1086 applied_split_ids,
1087 }))
1088 }
1089}
1090
1091fn fragment_desc_to_distribution(
1092 fragment_desc: FragmentDesc,
1093 upstreams: Vec<FragmentId>,
1094 include_node: bool,
1095) -> FragmentDistribution {
1096 let node = include_node.then(|| fragment_desc.stream_node.to_protobuf());
1097 FragmentDistribution {
1098 fragment_id: fragment_desc.fragment_id,
1099 table_id: fragment_desc.job_id,
1100 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
1101 state_table_ids: fragment_desc.state_table_ids.0,
1102 upstream_fragment_ids: upstreams,
1103 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1104 parallelism: fragment_desc.parallelism as _,
1105 vnode_count: fragment_desc.vnode_count as _,
1106 node,
1107 parallelism_policy: fragment_desc.parallelism_policy,
1108 }
1109}