1use std::collections::{HashMap, HashSet};
16
17use chrono::DateTime;
18use itertools::Itertools;
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
28use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo};
29use risingwave_meta::{MetaError, model};
30use risingwave_meta_model::{ConnectionId, FragmentId, SourceId, StreamingParallelism};
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
33use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
34use risingwave_pb::meta::list_actor_splits_response::FragmentType;
35use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
36use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
37use risingwave_pb::meta::list_table_fragments_response::{
38 ActorInfo, FragmentInfo, TableFragmentInfo,
39};
40use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
41use risingwave_pb::meta::table_fragments::PbState;
42use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
43use risingwave_pb::meta::*;
44use risingwave_pb::stream_plan::stream_node::NodeBody;
45use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
46use tonic::{Request, Response, Status};
47
48use crate::barrier::{BarrierScheduler, Command};
49use crate::manager::MetaSrvEnv;
50use crate::stream::GlobalStreamManagerRef;
51
52pub type TonicResponse<T> = Result<Response<T>, Status>;
53
54#[derive(Clone)]
55pub struct StreamServiceImpl {
56 env: MetaSrvEnv,
57 barrier_scheduler: BarrierScheduler,
58 barrier_manager: BarrierManagerRef,
59 stream_manager: GlobalStreamManagerRef,
60 metadata_manager: MetadataManager,
61 refresh_manager: GlobalRefreshManagerRef,
62 iceberg_compaction_manager: IcebergCompactionManagerRef,
63}
64
65impl StreamServiceImpl {
66 pub fn new(
67 env: MetaSrvEnv,
68 barrier_scheduler: BarrierScheduler,
69 barrier_manager: BarrierManagerRef,
70 stream_manager: GlobalStreamManagerRef,
71 metadata_manager: MetadataManager,
72 refresh_manager: GlobalRefreshManagerRef,
73 iceberg_compaction_manager: IcebergCompactionManagerRef,
74 ) -> Self {
75 StreamServiceImpl {
76 env,
77 barrier_scheduler,
78 barrier_manager,
79 stream_manager,
80 metadata_manager,
81 refresh_manager,
82 iceberg_compaction_manager,
83 }
84 }
85}
86
87#[async_trait::async_trait]
88impl StreamManagerService for StreamServiceImpl {
89 async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
90 self.env.idle_manager().record_activity();
91 let req = request.into_inner();
92
93 let version_id = self.barrier_scheduler.flush(req.database_id).await?;
94 Ok(Response::new(FlushResponse {
95 status: None,
96 hummock_version_id: version_id,
97 }))
98 }
99
100 async fn list_refresh_table_states(
101 &self,
102 _request: Request<ListRefreshTableStatesRequest>,
103 ) -> TonicResponse<ListRefreshTableStatesResponse> {
104 let refresh_jobs = self.metadata_manager.list_refresh_jobs().await?;
105 let refresh_table_states = refresh_jobs
106 .into_iter()
107 .map(|job| RefreshTableState {
108 table_id: job.table_id,
109 current_status: job.current_status.to_string(),
110 last_trigger_time: job
111 .last_trigger_time
112 .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
113 trigger_interval_secs: job.trigger_interval_secs,
114 last_success_time: job
115 .last_success_time
116 .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
117 })
118 .collect();
119 Ok(Response::new(ListRefreshTableStatesResponse {
120 states: refresh_table_states,
121 }))
122 }
123
124 async fn list_iceberg_compaction_status(
125 &self,
126 _request: Request<ListIcebergCompactionStatusRequest>,
127 ) -> TonicResponse<ListIcebergCompactionStatusResponse> {
128 let statuses = self
129 .iceberg_compaction_manager
130 .list_compaction_statuses()
131 .await
132 .into_iter()
133 .map(
134 |status| list_iceberg_compaction_status_response::IcebergCompactionStatus {
135 sink_id: status.sink_id.as_raw_id(),
136 task_type: status.task_type,
137 trigger_interval_sec: status.trigger_interval_sec,
138 trigger_snapshot_count: status.trigger_snapshot_count as u64,
139 schedule_state: status.schedule_state,
140 next_compaction_after_sec: status.next_compaction_after_sec,
141 pending_snapshot_count: status.pending_snapshot_count.map(|count| count as u64),
142 is_triggerable: status.is_triggerable,
143 },
144 )
145 .collect();
146
147 Ok(Response::new(ListIcebergCompactionStatusResponse {
148 statuses,
149 }))
150 }
151
152 async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
153 for database_id in self.metadata_manager.list_active_database_ids().await? {
154 self.barrier_scheduler
155 .run_command(database_id, Command::pause())
156 .await?;
157 }
158 Ok(Response::new(PauseResponse {}))
159 }
160
161 async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
162 for database_id in self.metadata_manager.list_active_database_ids().await? {
163 self.barrier_scheduler
164 .run_command(database_id, Command::resume())
165 .await?;
166 }
167 Ok(Response::new(ResumeResponse {}))
168 }
169
170 async fn apply_throttle(
171 &self,
172 request: Request<ApplyThrottleRequest>,
173 ) -> Result<Response<ApplyThrottleResponse>, Status> {
174 let request = request.into_inner();
175
176 let throttle_target = request.throttle_target();
178 let throttle_type = request.throttle_type();
179
180 let raw_object_id: u32;
181 let jobs: HashSet<JobId>;
182 let fragments: HashSet<FragmentId>;
183
184 match (throttle_type, throttle_target) {
185 (ThrottleType::Source, ThrottleTarget::Source | ThrottleTarget::Table) => {
186 (jobs, fragments) = self
187 .metadata_manager
188 .update_source_rate_limit_by_source_id(request.id.into(), request.rate)
189 .await?;
190 raw_object_id = request.id;
191 }
192 (ThrottleType::Backfill, ThrottleTarget::Mv)
193 | (ThrottleType::Backfill, ThrottleTarget::Sink)
194 | (ThrottleType::Backfill, ThrottleTarget::Table) => {
195 fragments = self
196 .metadata_manager
197 .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
198 .await?;
199 jobs = [request.id.into()].into_iter().collect();
200 raw_object_id = request.id;
201 }
202 (ThrottleType::Dml, ThrottleTarget::Table) => {
203 fragments = self
204 .metadata_manager
205 .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
206 .await?;
207 jobs = [request.id.into()].into_iter().collect();
208 raw_object_id = request.id;
209 }
210 (ThrottleType::Sink, ThrottleTarget::Sink) => {
211 fragments = self
212 .metadata_manager
213 .update_sink_rate_limit_by_sink_id(request.id.into(), request.rate)
214 .await?;
215 jobs = [request.id.into()].into_iter().collect();
216 raw_object_id = request.id;
217 }
218 (_, ThrottleTarget::Fragment) => {
220 self.metadata_manager
221 .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate)
222 .await?;
223 let fragment_id = request.id.into();
224 fragments = [fragment_id].into_iter().collect();
225 let job_id = self
226 .metadata_manager
227 .catalog_controller
228 .get_fragment_streaming_job_id(fragment_id)
229 .await?;
230 jobs = [job_id].into_iter().collect();
231 raw_object_id = job_id.as_raw_id();
232 }
233 _ => {
234 return Err(Status::invalid_argument(format!(
235 "unsupported throttle target/type: {:?}/{:?}",
236 throttle_target, throttle_type
237 )));
238 }
239 };
240
241 let database_id = self
242 .metadata_manager
243 .catalog_controller
244 .get_object_database_id(raw_object_id)
245 .await?;
246
247 let throttle_config = ThrottleConfig {
248 rate_limit: request.rate,
249 throttle_type: throttle_type.into(),
250 };
251 let _i = self
252 .barrier_scheduler
253 .run_command(
254 database_id,
255 Command::Throttle {
256 jobs,
257 config: fragments
258 .into_iter()
259 .map(|fragment_id| (fragment_id, throttle_config))
260 .collect(),
261 },
262 )
263 .await?;
264
265 Ok(Response::new(ApplyThrottleResponse { status: None }))
266 }
267
268 async fn cancel_creating_jobs(
269 &self,
270 request: Request<CancelCreatingJobsRequest>,
271 ) -> TonicResponse<CancelCreatingJobsResponse> {
272 let req = request.into_inner();
273 let job_ids = match req.jobs.unwrap() {
274 Jobs::Infos(infos) => self
275 .metadata_manager
276 .catalog_controller
277 .find_creating_streaming_job_ids(infos.infos)
278 .await?
279 .into_iter()
280 .map(|id| id.as_job_id())
281 .collect(),
282 Jobs::Ids(jobs) => jobs.job_ids,
283 };
284
285 let canceled_jobs = self
286 .stream_manager
287 .cancel_streaming_jobs(job_ids)
288 .await?
289 .into_iter()
290 .map(|id| id.as_raw_id())
291 .collect_vec();
292 Ok(Response::new(CancelCreatingJobsResponse {
293 status: None,
294 canceled_jobs,
295 }))
296 }
297
298 async fn list_table_fragments(
299 &self,
300 request: Request<ListTableFragmentsRequest>,
301 ) -> Result<Response<ListTableFragmentsResponse>, Status> {
302 let req = request.into_inner();
303 let table_ids = HashSet::<JobId>::from_iter(req.table_ids);
304
305 let mut info = HashMap::new();
306 for job_id in table_ids {
307 let table_fragments = self
308 .metadata_manager
309 .catalog_controller
310 .get_job_fragments_by_id(job_id)
311 .await?;
312 let mut dispatchers = self
313 .metadata_manager
314 .catalog_controller
315 .get_fragment_actor_dispatchers(
316 table_fragments.fragment_ids().map(|id| id as _).collect(),
317 )
318 .await?;
319 let ctx = table_fragments.ctx.to_protobuf();
320 info.insert(
321 table_fragments.stream_job_id(),
322 TableFragmentInfo {
323 fragments: table_fragments
324 .fragments
325 .into_iter()
326 .map(|(id, fragment)| FragmentInfo {
327 id,
328 actors: fragment
329 .actors
330 .into_iter()
331 .map(|actor| ActorInfo {
332 id: actor.actor_id,
333 node: Some(fragment.nodes.clone()),
334 dispatcher: dispatchers
335 .get_mut(&fragment.fragment_id)
336 .and_then(|dispatchers| dispatchers.remove(&actor.actor_id))
337 .unwrap_or_default(),
338 })
339 .collect_vec(),
340 })
341 .collect_vec(),
342 ctx: Some(ctx),
343 },
344 );
345 }
346
347 Ok(Response::new(ListTableFragmentsResponse {
348 table_fragments: info,
349 }))
350 }
351
352 async fn list_streaming_job_states(
353 &self,
354 _request: Request<ListStreamingJobStatesRequest>,
355 ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
356 let job_infos = self
357 .metadata_manager
358 .catalog_controller
359 .list_streaming_job_infos()
360 .await?;
361 let states = job_infos
362 .into_iter()
363 .map(
364 |StreamingJobInfo {
365 job_id,
366 job_status,
367 name,
368 parallelism,
369 max_parallelism,
370 resource_group,
371 database_id,
372 schema_id,
373 config_override,
374 ..
375 }| {
376 let parallelism = match parallelism {
377 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
378 StreamingParallelism::Custom => model::TableParallelism::Custom,
379 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
380 };
381
382 list_streaming_job_states_response::StreamingJobState {
383 table_id: job_id,
384 name,
385 state: PbState::from(job_status) as _,
386 parallelism: Some(parallelism.into()),
387 max_parallelism: max_parallelism as _,
388 resource_group,
389 database_id,
390 schema_id,
391 config_override,
392 }
393 },
394 )
395 .collect_vec();
396
397 Ok(Response::new(ListStreamingJobStatesResponse { states }))
398 }
399
400 async fn list_fragment_distribution(
401 &self,
402 _request: Request<ListFragmentDistributionRequest>,
403 ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
404 let include_node = _request.into_inner().include_node.unwrap_or(true);
405 let distributions = if include_node {
406 self.metadata_manager
407 .catalog_controller
408 .list_fragment_descs_with_node(false)
409 .await?
410 } else {
411 self.metadata_manager
412 .catalog_controller
413 .list_fragment_descs_without_node(false)
414 .await?
415 }
416 .into_iter()
417 .map(|(dist, _)| dist)
418 .collect();
419
420 Ok(Response::new(ListFragmentDistributionResponse {
421 distributions,
422 }))
423 }
424
425 async fn list_creating_fragment_distribution(
426 &self,
427 _request: Request<ListCreatingFragmentDistributionRequest>,
428 ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
429 let include_node = _request.into_inner().include_node.unwrap_or(true);
430 let distributions = if include_node {
431 self.metadata_manager
432 .catalog_controller
433 .list_fragment_descs_with_node(true)
434 .await?
435 } else {
436 self.metadata_manager
437 .catalog_controller
438 .list_fragment_descs_without_node(true)
439 .await?
440 }
441 .into_iter()
442 .map(|(dist, _)| dist)
443 .collect();
444
445 Ok(Response::new(ListCreatingFragmentDistributionResponse {
446 distributions,
447 }))
448 }
449
450 async fn list_sink_log_store_tables(
451 &self,
452 _request: Request<ListSinkLogStoreTablesRequest>,
453 ) -> Result<Response<ListSinkLogStoreTablesResponse>, Status> {
454 let tables = self
455 .metadata_manager
456 .catalog_controller
457 .list_sink_log_store_tables()
458 .await?
459 .into_iter()
460 .map(|(sink_id, internal_table_id)| {
461 list_sink_log_store_tables_response::SinkLogStoreTable {
462 sink_id: sink_id.as_raw_id(),
463 internal_table_id: internal_table_id.as_raw_id(),
464 }
465 })
466 .collect();
467
468 Ok(Response::new(ListSinkLogStoreTablesResponse { tables }))
469 }
470
471 async fn get_fragment_by_id(
472 &self,
473 request: Request<GetFragmentByIdRequest>,
474 ) -> Result<Response<GetFragmentByIdResponse>, Status> {
475 let req = request.into_inner();
476 let fragment_desc = self
477 .metadata_manager
478 .catalog_controller
479 .get_fragment_desc_by_id(req.fragment_id)
480 .await?;
481 let distribution = fragment_desc
482 .map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams, true));
483 Ok(Response::new(GetFragmentByIdResponse { distribution }))
484 }
485
486 async fn get_fragment_vnodes(
487 &self,
488 request: Request<GetFragmentVnodesRequest>,
489 ) -> Result<Response<GetFragmentVnodesResponse>, Status> {
490 let req = request.into_inner();
491 let fragment_id = req.fragment_id;
492
493 let shared_actor_infos = self.env.shared_actor_infos();
494 let guard = shared_actor_infos.read_guard();
495
496 let fragment_info = guard
497 .get_fragment(fragment_id)
498 .ok_or_else(|| Status::not_found(format!("Fragment {} not found", fragment_id)))?;
499
500 let actor_vnodes = fragment_info
501 .actors
502 .iter()
503 .map(|(actor_id, actor_info)| {
504 let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
505 vnode_bitmap.iter_ones().map(|v| v as u32).collect()
506 } else {
507 vec![]
508 };
509
510 get_fragment_vnodes_response::ActorVnodes {
511 actor_id: *actor_id,
512 vnode_indices,
513 }
514 })
515 .collect();
516
517 Ok(Response::new(GetFragmentVnodesResponse { actor_vnodes }))
518 }
519
520 async fn get_actor_vnodes(
521 &self,
522 request: Request<GetActorVnodesRequest>,
523 ) -> Result<Response<GetActorVnodesResponse>, Status> {
524 let req = request.into_inner();
525 let actor_id = req.actor_id;
526
527 let shared_actor_infos = self.env.shared_actor_infos();
528 let guard = shared_actor_infos.read_guard();
529
530 let actor_info = guard
532 .iter_over_fragments()
533 .find_map(|(_, fragment_info)| fragment_info.actors.get(&actor_id))
534 .ok_or_else(|| Status::not_found(format!("Actor {} not found", actor_id)))?;
535
536 let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
537 vnode_bitmap.iter_ones().map(|v| v as u32).collect()
538 } else {
539 vec![]
540 };
541
542 Ok(Response::new(GetActorVnodesResponse { vnode_indices }))
543 }
544
545 async fn list_actor_states(
546 &self,
547 _request: Request<ListActorStatesRequest>,
548 ) -> Result<Response<ListActorStatesResponse>, Status> {
549 let actor_locations = self
550 .metadata_manager
551 .catalog_controller
552 .list_actor_locations()?;
553 let states = actor_locations
554 .into_iter()
555 .map(|actor_location| list_actor_states_response::ActorState {
556 actor_id: actor_location.actor_id,
557 fragment_id: actor_location.fragment_id,
558 worker_id: actor_location.worker_id,
559 })
560 .collect_vec();
561
562 Ok(Response::new(ListActorStatesResponse { states }))
563 }
564
565 async fn recover(
566 &self,
567 _request: Request<RecoverRequest>,
568 ) -> Result<Response<RecoverResponse>, Status> {
569 self.barrier_manager.adhoc_recovery().await?;
570 Ok(Response::new(RecoverResponse {}))
571 }
572
573 async fn list_actor_splits(
574 &self,
575 _request: Request<ListActorSplitsRequest>,
576 ) -> Result<Response<ListActorSplitsResponse>, Status> {
577 let SourceManagerRunningInfo {
578 source_fragments,
579 backfill_fragments,
580 } = self.stream_manager.source_manager.get_running_info().await;
581
582 let mut actor_splits = self.env.shared_actor_infos().list_assignments();
583
584 let source_actors: HashMap<_, _> = {
585 let all_fragment_ids: HashSet<_> = backfill_fragments
586 .values()
587 .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
588 .chain(source_fragments.values().flatten().copied())
589 .collect();
590
591 let guard = self.env.shared_actor_infos().read_guard();
592 guard
593 .iter_over_fragments()
594 .filter(|(frag_id, _)| all_fragment_ids.contains(*frag_id))
595 .flat_map(|(fragment_id, fragment_info)| {
596 fragment_info
597 .actors
598 .keys()
599 .copied()
600 .map(|actor_id| (actor_id, *fragment_id))
601 })
602 .collect()
603 };
604
605 let is_shared_source = self
606 .metadata_manager
607 .catalog_controller
608 .list_source_id_with_shared_types()
609 .await?;
610
611 let fragment_to_source: HashMap<_, _> = source_fragments
612 .into_iter()
613 .flat_map(|(source_id, fragment_ids)| {
614 let source_type = if is_shared_source.get(&source_id).copied().unwrap_or(false) {
615 FragmentType::SharedSource
616 } else {
617 FragmentType::NonSharedSource
618 };
619
620 fragment_ids
621 .into_iter()
622 .map(move |fragment_id| (fragment_id, (source_id, source_type)))
623 })
624 .chain(
625 backfill_fragments
626 .into_iter()
627 .flat_map(|(source_id, fragment_ids)| {
628 fragment_ids.into_iter().flat_map(
629 move |(fragment_id, upstream_fragment_id)| {
630 [
631 (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
632 (
633 upstream_fragment_id,
634 (source_id, FragmentType::SharedSource),
635 ),
636 ]
637 },
638 )
639 }),
640 )
641 .collect();
642
643 let actor_splits = source_actors
644 .into_iter()
645 .flat_map(|(actor_id, fragment_id)| {
646 let (source_id, fragment_type) = fragment_to_source
647 .get(&fragment_id)
648 .copied()
649 .unwrap_or_default();
650
651 actor_splits
652 .remove(&actor_id)
653 .unwrap_or_default()
654 .into_iter()
655 .map(move |split| list_actor_splits_response::ActorSplit {
656 actor_id,
657 source_id,
658 fragment_id,
659 split_id: split.id().to_string(),
660 fragment_type: fragment_type.into(),
661 })
662 })
663 .collect_vec();
664
665 Ok(Response::new(ListActorSplitsResponse { actor_splits }))
666 }
667
668 async fn list_rate_limits(
669 &self,
670 _request: Request<ListRateLimitsRequest>,
671 ) -> Result<Response<ListRateLimitsResponse>, Status> {
672 let rate_limits = self
673 .metadata_manager
674 .catalog_controller
675 .list_rate_limits()
676 .await?;
677 Ok(Response::new(ListRateLimitsResponse { rate_limits }))
678 }
679
680 #[cfg_attr(coverage, coverage(off))]
681 async fn refresh(
682 &self,
683 request: Request<RefreshRequest>,
684 ) -> Result<Response<RefreshResponse>, Status> {
685 let req = request.into_inner();
686
687 tracing::info!("Refreshing table with id: {}", req.table_id);
688
689 let response = self
690 .refresh_manager
691 .trigger_manual_refresh(req, self.env.shared_actor_infos())
692 .await?;
693
694 Ok(Response::new(response))
695 }
696
697 async fn alter_connector_props(
698 &self,
699 request: Request<AlterConnectorPropsRequest>,
700 ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
701 let request = request.into_inner();
702 let secret_manager = LocalSecretManager::global();
703 let (new_props_plaintext, object_id) = match AlterConnectorPropsObject::try_from(
704 request.object_type,
705 ) {
706 Ok(AlterConnectorPropsObject::Sink) => (
707 self.metadata_manager
708 .update_sink_props_by_sink_id(
709 request.object_id.into(),
710 request.changed_props.clone().into_iter().collect(),
711 )
712 .await?,
713 request.object_id.into(),
714 ),
715 Ok(AlterConnectorPropsObject::IcebergTable) => {
716 let (prop, sink_id) = self
717 .metadata_manager
718 .update_iceberg_table_props_by_table_id(
719 request.object_id.into(),
720 request.changed_props.clone().into_iter().collect(),
721 request.extra_options,
722 )
723 .await?;
724 (prop, sink_id.as_object_id())
725 }
726
727 Ok(AlterConnectorPropsObject::Source) => {
728 if request.connector_conn_ref.is_some() {
730 return Err(Status::invalid_argument(
731 "alter connector_conn_ref is not supported",
732 ));
733 }
734 let options_with_secret = self
735 .metadata_manager
736 .catalog_controller
737 .update_source_props_by_source_id(
738 request.object_id.into(),
739 request.changed_props.clone().into_iter().collect(),
740 request.changed_secret_refs.clone().into_iter().collect(),
741 false, )
743 .await?;
744
745 self.stream_manager
746 .source_manager
747 .validate_source_once(request.object_id.into(), options_with_secret.clone())
748 .await?;
749
750 let (options, secret_refs) = options_with_secret.into_parts();
751 (
752 secret_manager
753 .fill_secrets(options, secret_refs)
754 .map_err(MetaError::from)?
755 .into_iter()
756 .collect(),
757 request.object_id.into(),
758 )
759 }
760 Ok(AlterConnectorPropsObject::Connection) => {
761 let (
764 connection_options_with_secret,
765 updated_sources_with_props,
766 updated_sinks_with_props,
767 ) = self
768 .metadata_manager
769 .catalog_controller
770 .update_connection_and_dependent_objects_props(
771 ConnectionId::from(request.object_id),
772 request.changed_props.clone().into_iter().collect(),
773 request.changed_secret_refs.clone().into_iter().collect(),
774 )
775 .await?;
776
777 let (options, secret_refs) = connection_options_with_secret.into_parts();
779 let new_props_plaintext = secret_manager
780 .fill_secrets(options, secret_refs)
781 .map_err(MetaError::from)?
782 .into_iter()
783 .collect::<HashMap<String, String>>();
784
785 let mut dependent_mutation = HashMap::default();
787 for (source_id, complete_source_props) in updated_sources_with_props {
788 dependent_mutation.insert(source_id.as_object_id(), complete_source_props);
789 }
790 for (sink_id, complete_sink_props) in updated_sinks_with_props {
791 dependent_mutation.insert(sink_id.as_object_id(), complete_sink_props);
792 }
793
794 if !dependent_mutation.is_empty() {
795 let database_id = self
796 .metadata_manager
797 .catalog_controller
798 .get_object_database_id(ConnectionId::from(request.object_id))
799 .await?;
800 tracing::info!(
801 "broadcasting connection {} property changes to dependent object ids: {:?}",
802 request.object_id,
803 dependent_mutation.keys().collect_vec()
804 );
805 let _version = self
806 .barrier_scheduler
807 .run_command(
808 database_id,
809 Command::ConnectorPropsChange(dependent_mutation),
810 )
811 .await?;
812 }
813
814 (
815 new_props_plaintext,
816 ConnectionId::from(request.object_id).as_object_id(),
817 )
818 }
819
820 _ => {
821 unimplemented!(
822 "Unsupported object type for AlterConnectorProps: {:?}",
823 request.object_type
824 );
825 }
826 };
827
828 let database_id = self
829 .metadata_manager
830 .catalog_controller
831 .get_object_database_id(object_id)
832 .await?;
833 if AlterConnectorPropsObject::try_from(request.object_type)
836 .is_ok_and(|t| t != AlterConnectorPropsObject::Connection)
837 {
838 let mut mutation = HashMap::default();
839 mutation.insert(object_id, new_props_plaintext);
840 let _version = self
841 .barrier_scheduler
842 .run_command(database_id, Command::ConnectorPropsChange(mutation))
843 .await?;
844 }
845
846 Ok(Response::new(AlterConnectorPropsResponse {}))
847 }
848
849 async fn set_sync_log_store_aligned(
850 &self,
851 request: Request<SetSyncLogStoreAlignedRequest>,
852 ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
853 let req = request.into_inner();
854 let job_id = req.job_id;
855 let aligned = req.aligned;
856
857 self.metadata_manager
858 .catalog_controller
859 .mutate_fragments_by_job_id(
860 job_id,
861 |_mask, stream_node| {
862 let mut visited = false;
863 visit_stream_node_mut(stream_node, |body| {
864 if let NodeBody::SyncLogStore(sync_log_store) = body {
865 sync_log_store.aligned = aligned;
866 visited = true
867 }
868 });
869 Ok(visited)
870 },
871 "no fragments found with synced log store",
872 )
873 .await?;
874
875 Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
876 }
877
878 async fn list_cdc_progress(
879 &self,
880 _request: Request<ListCdcProgressRequest>,
881 ) -> Result<Response<ListCdcProgressResponse>, Status> {
882 let cdc_progress = self
883 .barrier_manager
884 .get_cdc_progress()
885 .await?
886 .into_iter()
887 .map(|(job_id, p)| {
888 (
889 job_id,
890 PbCdcProgress {
891 split_total_count: p.split_total_count,
892 split_backfilled_count: p.split_backfilled_count,
893 split_completed_count: p.split_completed_count,
894 },
895 )
896 })
897 .collect();
898 Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
899 }
900
901 async fn list_unmigrated_tables(
902 &self,
903 _request: Request<ListUnmigratedTablesRequest>,
904 ) -> Result<Response<ListUnmigratedTablesResponse>, Status> {
905 let unmigrated_tables = self
906 .metadata_manager
907 .catalog_controller
908 .list_unmigrated_tables()
909 .await?
910 .into_iter()
911 .map(|table| list_unmigrated_tables_response::UnmigratedTable {
912 table_id: table.id,
913 table_name: table.name,
914 })
915 .collect();
916
917 Ok(Response::new(ListUnmigratedTablesResponse {
918 tables: unmigrated_tables,
919 }))
920 }
921
922 async fn alter_source_properties_safe(
925 &self,
926 request: Request<AlterSourcePropertiesSafeRequest>,
927 ) -> Result<Response<AlterSourcePropertiesSafeResponse>, Status> {
928 let request = request.into_inner();
929 let source_id = request.source_id;
930 let options = request.options.unwrap_or_default();
931
932 tracing::info!(
933 source_id = source_id,
934 reset_splits = options.reset_splits,
935 "Starting orchestrated source property update"
936 );
937
938 let database_id = self
940 .metadata_manager
941 .catalog_controller
942 .get_object_database_id(SourceId::from(source_id))
943 .await?;
944
945 tracing::info!(source_id = source_id, "Pausing stream");
947 self.barrier_scheduler
948 .run_command(database_id, Command::Pause)
949 .await?;
950
951 let result = async {
953 let secret_manager = LocalSecretManager::global();
954
955 let options_with_secret = self
956 .metadata_manager
957 .catalog_controller
958 .update_source_props_by_source_id(
959 source_id.into(),
960 request.changed_props.clone().into_iter().collect(),
961 request.changed_secret_refs.clone().into_iter().collect(),
962 true, )
964 .await?;
965
966 self.stream_manager
968 .source_manager
969 .validate_source_once(source_id.into(), options_with_secret.clone())
970 .await?;
971
972 let (props, secret_refs) = options_with_secret.into_parts();
973 let new_props_plaintext: HashMap<String, String> = secret_manager
974 .fill_secrets(props, secret_refs)
975 .map_err(MetaError::from)?
976 .into_iter()
977 .collect();
978
979 tracing::info!(
981 source_id = source_id,
982 "Issuing ConnectorPropsChange barrier"
983 );
984 let mut mutation = HashMap::default();
985 mutation.insert(source_id.into(), new_props_plaintext);
986 self.barrier_scheduler
987 .run_command(database_id, Command::ConnectorPropsChange(mutation))
988 .await?;
989
990 if options.reset_splits {
992 tracing::info!(source_id = source_id, "Resetting source splits");
993 self.stream_manager
994 .source_manager
995 .reset_source_splits(source_id.into())
996 .await?;
997 }
998
999 Ok::<_, MetaError>(())
1000 }
1001 .await;
1002
1003 tracing::info!(source_id = source_id, "Resuming stream");
1005 let resume_result = self
1006 .barrier_scheduler
1007 .run_command(database_id, Command::Resume)
1008 .await;
1009
1010 result?;
1012 resume_result?;
1013
1014 tracing::info!(
1015 source_id = source_id,
1016 "Orchestrated source property update completed successfully"
1017 );
1018
1019 Ok(Response::new(AlterSourcePropertiesSafeResponse {}))
1020 }
1021
1022 async fn reset_source_splits(
1025 &self,
1026 request: Request<ResetSourceSplitsRequest>,
1027 ) -> Result<Response<ResetSourceSplitsResponse>, Status> {
1028 let request = request.into_inner();
1029 let source_id = request.source_id;
1030
1031 tracing::warn!(
1032 source_id = source_id,
1033 "UNSAFE: Resetting source splits - this may cause data duplication or loss"
1034 );
1035
1036 self.stream_manager
1037 .source_manager
1038 .reset_source_splits(source_id.into())
1039 .await?;
1040
1041 Ok(Response::new(ResetSourceSplitsResponse {}))
1042 }
1043
1044 async fn inject_source_offsets(
1047 &self,
1048 request: Request<InjectSourceOffsetsRequest>,
1049 ) -> Result<Response<InjectSourceOffsetsResponse>, Status> {
1050 let request = request.into_inner();
1051 let source_id = request.source_id;
1052 let split_offsets = request.split_offsets;
1053
1054 let applied_split_ids = self
1056 .stream_manager
1057 .source_manager
1058 .validate_inject_source_offsets(source_id.into(), &split_offsets)
1059 .await?;
1060
1061 tracing::warn!(
1062 source_id = source_id,
1063 num_offsets = split_offsets.len(),
1064 "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
1065 );
1066
1067 let database_id = self
1068 .metadata_manager
1069 .catalog_controller
1070 .get_object_database_id(SourceId::from(source_id))
1071 .await?;
1072
1073 self.barrier_scheduler
1074 .run_command(
1075 database_id,
1076 Command::InjectSourceOffsets {
1077 source_id: SourceId::from(source_id),
1078 split_offsets,
1079 },
1080 )
1081 .await?;
1082
1083 Ok(Response::new(InjectSourceOffsetsResponse {
1084 applied_split_ids,
1085 }))
1086 }
1087}
1088
1089fn fragment_desc_to_distribution(
1090 fragment_desc: FragmentDesc,
1091 upstreams: Vec<FragmentId>,
1092 include_node: bool,
1093) -> FragmentDistribution {
1094 let node = include_node.then(|| fragment_desc.stream_node.to_protobuf());
1095 FragmentDistribution {
1096 fragment_id: fragment_desc.fragment_id,
1097 table_id: fragment_desc.job_id,
1098 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
1099 state_table_ids: fragment_desc.state_table_ids.0,
1100 upstream_fragment_ids: upstreams,
1101 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1102 parallelism: fragment_desc.parallelism as _,
1103 vnode_count: fragment_desc.vnode_count as _,
1104 node,
1105 parallelism_policy: fragment_desc.parallelism_policy,
1106 }
1107}