1use std::collections::{HashMap, HashSet};
16
17use chrono::DateTime;
18use itertools::Itertools;
19use risingwave_common::id::JobId;
20use risingwave_common::secret::LocalSecretManager;
21use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
22use risingwave_connector::source::SplitMetaData;
23use risingwave_meta::barrier::BarrierManagerRef;
24use risingwave_meta::controller::fragment::StreamingJobInfo;
25use risingwave_meta::controller::utils::FragmentDesc;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
28use risingwave_meta::stream::{GlobalRefreshManagerRef, SourceManagerRunningInfo};
29use risingwave_meta::{MetaError, model};
30use risingwave_meta_model::{ConnectionId, FragmentId, JobStatus, SourceId, StreamingParallelism};
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
33use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
34use risingwave_pb::meta::list_actor_splits_response::FragmentType;
35use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
36use risingwave_pb::meta::list_refresh_table_states_response::RefreshTableState;
37use risingwave_pb::meta::list_table_fragments_response::{
38 ActorInfo, FragmentInfo, TableFragmentInfo,
39};
40use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
41use risingwave_pb::meta::table_fragments::PbState;
42use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
43use risingwave_pb::meta::*;
44use risingwave_pb::stream_plan::stream_node::NodeBody;
45use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
46use tonic::{Request, Response, Status};
47
48use crate::barrier::{BarrierScheduler, Command};
49use crate::manager::MetaSrvEnv;
50use crate::stream::GlobalStreamManagerRef;
51
52pub type TonicResponse<T> = Result<Response<T>, Status>;
53
54#[derive(Clone)]
55pub struct StreamServiceImpl {
56 env: MetaSrvEnv,
57 barrier_scheduler: BarrierScheduler,
58 barrier_manager: BarrierManagerRef,
59 stream_manager: GlobalStreamManagerRef,
60 metadata_manager: MetadataManager,
61 refresh_manager: GlobalRefreshManagerRef,
62 iceberg_compaction_manager: IcebergCompactionManagerRef,
63}
64
65impl StreamServiceImpl {
66 pub fn new(
67 env: MetaSrvEnv,
68 barrier_scheduler: BarrierScheduler,
69 barrier_manager: BarrierManagerRef,
70 stream_manager: GlobalStreamManagerRef,
71 metadata_manager: MetadataManager,
72 refresh_manager: GlobalRefreshManagerRef,
73 iceberg_compaction_manager: IcebergCompactionManagerRef,
74 ) -> Self {
75 StreamServiceImpl {
76 env,
77 barrier_scheduler,
78 barrier_manager,
79 stream_manager,
80 metadata_manager,
81 refresh_manager,
82 iceberg_compaction_manager,
83 }
84 }
85}
86
87fn effective_streaming_job_parallelism(
88 job_status: JobStatus,
89 parallelism: StreamingParallelism,
90 adaptive_parallelism_strategy: Option<String>,
91 backfill_parallelism: Option<StreamingParallelism>,
92 backfill_adaptive_parallelism_strategy: Option<String>,
93) -> (StreamingParallelism, Option<String>) {
94 if job_status != JobStatus::Created {
95 (
96 backfill_parallelism.unwrap_or(parallelism),
97 backfill_adaptive_parallelism_strategy.or(adaptive_parallelism_strategy),
98 )
99 } else {
100 (parallelism, adaptive_parallelism_strategy)
101 }
102}
103
104#[async_trait::async_trait]
105impl StreamManagerService for StreamServiceImpl {
106 async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
107 self.env.idle_manager().record_activity();
108 let req = request.into_inner();
109
110 let version_id = self.barrier_scheduler.flush(req.database_id).await?;
111 Ok(Response::new(FlushResponse {
112 status: None,
113 hummock_version_id: version_id,
114 }))
115 }
116
117 async fn list_refresh_table_states(
118 &self,
119 _request: Request<ListRefreshTableStatesRequest>,
120 ) -> TonicResponse<ListRefreshTableStatesResponse> {
121 let refresh_jobs = self.metadata_manager.list_refresh_jobs().await?;
122 let refresh_table_states = refresh_jobs
123 .into_iter()
124 .map(|job| RefreshTableState {
125 table_id: job.table_id,
126 current_status: job.current_status.to_string(),
127 last_trigger_time: job
128 .last_trigger_time
129 .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
130 trigger_interval_secs: job.trigger_interval_secs,
131 last_success_time: job
132 .last_success_time
133 .map(|time| DateTime::from_timestamp_millis(time).unwrap().to_string()),
134 })
135 .collect();
136 Ok(Response::new(ListRefreshTableStatesResponse {
137 states: refresh_table_states,
138 }))
139 }
140
141 async fn list_iceberg_compaction_status(
142 &self,
143 _request: Request<ListIcebergCompactionStatusRequest>,
144 ) -> TonicResponse<ListIcebergCompactionStatusResponse> {
145 let statuses = self
146 .iceberg_compaction_manager
147 .list_compaction_statuses()
148 .into_iter()
149 .map(
150 |status| list_iceberg_compaction_status_response::IcebergCompactionStatus {
151 sink_id: status.sink_id.as_raw_id(),
152 task_type: status.task_type,
153 trigger_interval_sec: status.trigger_interval_sec,
154 trigger_snapshot_count: status.trigger_snapshot_count as u64,
155 schedule_state: status.schedule_state,
156 next_compaction_after_sec: status.next_compaction_after_sec,
157 pending_snapshot_count: status.pending_snapshot_count.map(|count| count as u64),
158 is_triggerable: status.is_triggerable,
159 },
160 )
161 .collect();
162
163 Ok(Response::new(ListIcebergCompactionStatusResponse {
164 statuses,
165 }))
166 }
167
168 async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
169 for database_id in self.metadata_manager.list_active_database_ids().await? {
170 self.barrier_scheduler
171 .run_command(database_id, Command::pause())
172 .await?;
173 }
174 Ok(Response::new(PauseResponse {}))
175 }
176
177 async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
178 for database_id in self.metadata_manager.list_active_database_ids().await? {
179 self.barrier_scheduler
180 .run_command(database_id, Command::resume())
181 .await?;
182 }
183 Ok(Response::new(ResumeResponse {}))
184 }
185
186 async fn apply_throttle(
187 &self,
188 request: Request<ApplyThrottleRequest>,
189 ) -> Result<Response<ApplyThrottleResponse>, Status> {
190 let request = request.into_inner();
191
192 let throttle_target = request.throttle_target();
194 let throttle_type = request.throttle_type();
195
196 let raw_object_id: u32;
197 let jobs: HashSet<JobId>;
198 let fragments: HashSet<FragmentId>;
199
200 match (throttle_type, throttle_target) {
201 (ThrottleType::Source, ThrottleTarget::Source | ThrottleTarget::Table) => {
202 (jobs, fragments) = self
203 .metadata_manager
204 .update_source_rate_limit_by_source_id(request.id.into(), request.rate)
205 .await?;
206 raw_object_id = request.id;
207 }
208 (ThrottleType::Backfill, ThrottleTarget::Mv)
209 | (ThrottleType::Backfill, ThrottleTarget::Sink)
210 | (ThrottleType::Backfill, ThrottleTarget::Table) => {
211 fragments = self
212 .metadata_manager
213 .update_backfill_rate_limit_by_job_id(JobId::from(request.id), request.rate)
214 .await?;
215 jobs = [request.id.into()].into_iter().collect();
216 raw_object_id = request.id;
217 }
218 (ThrottleType::Dml, ThrottleTarget::Table) => {
219 fragments = self
220 .metadata_manager
221 .update_dml_rate_limit_by_job_id(JobId::from(request.id), request.rate)
222 .await?;
223 jobs = [request.id.into()].into_iter().collect();
224 raw_object_id = request.id;
225 }
226 (ThrottleType::Sink, ThrottleTarget::Sink) => {
227 fragments = self
228 .metadata_manager
229 .update_sink_rate_limit_by_sink_id(request.id.into(), request.rate)
230 .await?;
231 jobs = [request.id.into()].into_iter().collect();
232 raw_object_id = request.id;
233 }
234 (_, ThrottleTarget::Fragment) => {
236 self.metadata_manager
237 .update_fragment_rate_limit_by_fragment_id(request.id.into(), request.rate)
238 .await?;
239 let fragment_id = request.id.into();
240 fragments = [fragment_id].into_iter().collect();
241 let job_id = self
242 .metadata_manager
243 .catalog_controller
244 .get_fragment_streaming_job_id(fragment_id)
245 .await?;
246 jobs = [job_id].into_iter().collect();
247 raw_object_id = job_id.as_raw_id();
248 }
249 _ => {
250 return Err(Status::invalid_argument(format!(
251 "unsupported throttle target/type: {:?}/{:?}",
252 throttle_target, throttle_type
253 )));
254 }
255 };
256
257 let database_id = self
258 .metadata_manager
259 .catalog_controller
260 .get_object_database_id(raw_object_id)
261 .await?;
262
263 let throttle_config = ThrottleConfig {
264 rate_limit: request.rate,
265 throttle_type: throttle_type.into(),
266 };
267 let _i = self
268 .barrier_scheduler
269 .run_command(
270 database_id,
271 Command::Throttle {
272 jobs,
273 config: fragments
274 .into_iter()
275 .map(|fragment_id| (fragment_id, throttle_config))
276 .collect(),
277 },
278 )
279 .await?;
280
281 Ok(Response::new(ApplyThrottleResponse { status: None }))
282 }
283
284 async fn cancel_creating_jobs(
285 &self,
286 request: Request<CancelCreatingJobsRequest>,
287 ) -> TonicResponse<CancelCreatingJobsResponse> {
288 let req = request.into_inner();
289 let job_ids = match req.jobs.unwrap() {
290 Jobs::Infos(infos) => self
291 .metadata_manager
292 .catalog_controller
293 .find_creating_streaming_job_ids(infos.infos)
294 .await?
295 .into_iter()
296 .map(|id| id.as_job_id())
297 .collect(),
298 Jobs::Ids(jobs) => jobs.job_ids,
299 };
300
301 let canceled_jobs = self
302 .stream_manager
303 .cancel_streaming_jobs(job_ids)
304 .await?
305 .into_iter()
306 .map(|id| id.as_raw_id())
307 .collect_vec();
308 Ok(Response::new(CancelCreatingJobsResponse {
309 status: None,
310 canceled_jobs,
311 }))
312 }
313
314 async fn list_table_fragments(
315 &self,
316 request: Request<ListTableFragmentsRequest>,
317 ) -> Result<Response<ListTableFragmentsResponse>, Status> {
318 let req = request.into_inner();
319 let table_ids = HashSet::<JobId>::from_iter(req.table_ids);
320
321 let mut info = HashMap::new();
322 for job_id in table_ids {
323 let (table_fragments, fragment_actors, _actor_status) = self
324 .metadata_manager
325 .catalog_controller
326 .get_job_fragments_by_id(job_id)
327 .await?;
328 let mut dispatchers = self
329 .metadata_manager
330 .catalog_controller
331 .get_fragment_actor_dispatchers(
332 table_fragments.fragment_ids().map(|id| id as _).collect(),
333 )
334 .await?;
335 let ctx = table_fragments.ctx.to_protobuf();
336 info.insert(
337 table_fragments.stream_job_id(),
338 TableFragmentInfo {
339 fragments: table_fragments
340 .fragments
341 .into_iter()
342 .map(|(id, fragment)| FragmentInfo {
343 id,
344 actors: fragment_actors
345 .get(&id)
346 .into_iter()
347 .flat_map(|actors| actors.iter().map(|actor| actor.actor_id))
348 .map(|actor_id| ActorInfo {
349 id: actor_id,
350 node: Some(fragment.nodes.clone()),
351 dispatcher: dispatchers
352 .get_mut(&fragment.fragment_id)
353 .and_then(|dispatchers| dispatchers.remove(&actor_id))
354 .unwrap_or_default(),
355 })
356 .collect_vec(),
357 })
358 .collect_vec(),
359 ctx: Some(ctx),
360 },
361 );
362 }
363
364 Ok(Response::new(ListTableFragmentsResponse {
365 table_fragments: info,
366 }))
367 }
368
369 async fn list_streaming_job_states(
370 &self,
371 _request: Request<ListStreamingJobStatesRequest>,
372 ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
373 let job_infos = self
374 .metadata_manager
375 .catalog_controller
376 .list_streaming_job_infos()
377 .await?;
378 let states = job_infos
379 .into_iter()
380 .map(
381 |StreamingJobInfo {
382 job_id,
383 job_status,
384 name,
385 parallelism,
386 adaptive_parallelism_strategy,
387 backfill_parallelism,
388 backfill_adaptive_parallelism_strategy,
389 max_parallelism,
390 resource_group,
391 database_id,
392 schema_id,
393 config_override,
394 ..
395 }| {
396 let (parallelism, adaptive_parallelism_strategy) =
399 effective_streaming_job_parallelism(
400 job_status,
401 parallelism,
402 adaptive_parallelism_strategy,
403 backfill_parallelism,
404 backfill_adaptive_parallelism_strategy,
405 );
406 let parallelism = match parallelism {
407 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
408 StreamingParallelism::Custom => model::TableParallelism::Custom,
409 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
410 };
411
412 list_streaming_job_states_response::StreamingJobState {
413 table_id: job_id,
414 name,
415 state: PbState::from(job_status) as _,
416 parallelism: Some(parallelism.into()),
417 max_parallelism: max_parallelism as _,
418 resource_group,
419 database_id,
420 schema_id,
421 config_override,
422 adaptive_parallelism_strategy,
423 }
424 },
425 )
426 .collect_vec();
427
428 Ok(Response::new(ListStreamingJobStatesResponse { states }))
429 }
430
431 async fn list_fragment_distribution(
432 &self,
433 _request: Request<ListFragmentDistributionRequest>,
434 ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
435 let include_node = _request.into_inner().include_node.unwrap_or(true);
436 let distributions = if include_node {
437 self.metadata_manager
438 .catalog_controller
439 .list_fragment_descs_with_node(false)
440 .await?
441 } else {
442 self.metadata_manager
443 .catalog_controller
444 .list_fragment_descs_without_node(false)
445 .await?
446 }
447 .into_iter()
448 .map(|(dist, _)| dist)
449 .collect();
450
451 Ok(Response::new(ListFragmentDistributionResponse {
452 distributions,
453 }))
454 }
455
456 async fn list_creating_fragment_distribution(
457 &self,
458 _request: Request<ListCreatingFragmentDistributionRequest>,
459 ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
460 let include_node = _request.into_inner().include_node.unwrap_or(true);
461 let distributions = if include_node {
462 self.metadata_manager
463 .catalog_controller
464 .list_fragment_descs_with_node(true)
465 .await?
466 } else {
467 self.metadata_manager
468 .catalog_controller
469 .list_fragment_descs_without_node(true)
470 .await?
471 }
472 .into_iter()
473 .map(|(dist, _)| dist)
474 .collect();
475
476 Ok(Response::new(ListCreatingFragmentDistributionResponse {
477 distributions,
478 }))
479 }
480
481 async fn list_sink_log_store_tables(
482 &self,
483 _request: Request<ListSinkLogStoreTablesRequest>,
484 ) -> Result<Response<ListSinkLogStoreTablesResponse>, Status> {
485 let tables = self
486 .metadata_manager
487 .catalog_controller
488 .list_sink_log_store_tables()
489 .await?
490 .into_iter()
491 .map(|(sink_id, internal_table_id)| {
492 list_sink_log_store_tables_response::SinkLogStoreTable {
493 sink_id: sink_id.as_raw_id(),
494 internal_table_id: internal_table_id.as_raw_id(),
495 }
496 })
497 .collect();
498
499 Ok(Response::new(ListSinkLogStoreTablesResponse { tables }))
500 }
501
502 async fn get_fragment_by_id(
503 &self,
504 request: Request<GetFragmentByIdRequest>,
505 ) -> Result<Response<GetFragmentByIdResponse>, Status> {
506 let req = request.into_inner();
507 let fragment_desc = self
508 .metadata_manager
509 .catalog_controller
510 .get_fragment_desc_by_id(req.fragment_id)
511 .await?;
512 let distribution = fragment_desc
513 .map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams, true));
514 Ok(Response::new(GetFragmentByIdResponse { distribution }))
515 }
516
517 async fn get_fragment_vnodes(
518 &self,
519 request: Request<GetFragmentVnodesRequest>,
520 ) -> Result<Response<GetFragmentVnodesResponse>, Status> {
521 let req = request.into_inner();
522 let fragment_id = req.fragment_id;
523
524 let shared_actor_infos = self.env.shared_actor_infos();
525 let guard = shared_actor_infos.read_guard();
526
527 let fragment_info = guard
528 .get_fragment(fragment_id)
529 .ok_or_else(|| Status::not_found(format!("Fragment {} not found", fragment_id)))?;
530
531 let actor_vnodes = fragment_info
532 .actors
533 .iter()
534 .map(|(actor_id, actor_info)| {
535 let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
536 vnode_bitmap.iter_ones().map(|v| v as u32).collect()
537 } else {
538 vec![]
539 };
540
541 get_fragment_vnodes_response::ActorVnodes {
542 actor_id: *actor_id,
543 vnode_indices,
544 }
545 })
546 .collect();
547
548 Ok(Response::new(GetFragmentVnodesResponse { actor_vnodes }))
549 }
550
551 async fn get_actor_vnodes(
552 &self,
553 request: Request<GetActorVnodesRequest>,
554 ) -> Result<Response<GetActorVnodesResponse>, Status> {
555 let req = request.into_inner();
556 let actor_id = req.actor_id;
557
558 let shared_actor_infos = self.env.shared_actor_infos();
559 let guard = shared_actor_infos.read_guard();
560
561 let actor_info = guard
563 .iter_over_fragments()
564 .find_map(|(_, fragment_info)| fragment_info.actors.get(&actor_id))
565 .ok_or_else(|| Status::not_found(format!("Actor {} not found", actor_id)))?;
566
567 let vnode_indices = if let Some(ref vnode_bitmap) = actor_info.vnode_bitmap {
568 vnode_bitmap.iter_ones().map(|v| v as u32).collect()
569 } else {
570 vec![]
571 };
572
573 Ok(Response::new(GetActorVnodesResponse { vnode_indices }))
574 }
575
576 async fn list_actor_states(
577 &self,
578 _request: Request<ListActorStatesRequest>,
579 ) -> Result<Response<ListActorStatesResponse>, Status> {
580 let actor_locations = self
581 .metadata_manager
582 .catalog_controller
583 .list_actor_locations()?;
584 let states = actor_locations
585 .into_iter()
586 .map(|actor_location| list_actor_states_response::ActorState {
587 actor_id: actor_location.actor_id,
588 fragment_id: actor_location.fragment_id,
589 worker_id: actor_location.worker_id,
590 })
591 .collect_vec();
592
593 Ok(Response::new(ListActorStatesResponse { states }))
594 }
595
596 async fn recover(
597 &self,
598 _request: Request<RecoverRequest>,
599 ) -> Result<Response<RecoverResponse>, Status> {
600 self.barrier_manager.adhoc_recovery().await?;
601 Ok(Response::new(RecoverResponse {}))
602 }
603
604 async fn list_actor_splits(
605 &self,
606 _request: Request<ListActorSplitsRequest>,
607 ) -> Result<Response<ListActorSplitsResponse>, Status> {
608 let SourceManagerRunningInfo {
609 source_fragments,
610 backfill_fragments,
611 } = self.stream_manager.source_manager.get_running_info().await;
612
613 let mut actor_splits = self.env.shared_actor_infos().list_assignments();
614
615 let source_actors: HashMap<_, _> = {
616 let all_fragment_ids: HashSet<_> = backfill_fragments
617 .values()
618 .flat_map(|set| set.iter().flat_map(|&(id1, id2)| [id1, id2]))
619 .chain(source_fragments.values().flatten().copied())
620 .collect();
621
622 let guard = self.env.shared_actor_infos().read_guard();
623 guard
624 .iter_over_fragments()
625 .filter(|(frag_id, _)| all_fragment_ids.contains(*frag_id))
626 .flat_map(|(fragment_id, fragment_info)| {
627 fragment_info
628 .actors
629 .keys()
630 .copied()
631 .map(|actor_id| (actor_id, *fragment_id))
632 })
633 .collect()
634 };
635
636 let is_shared_source = self
637 .metadata_manager
638 .catalog_controller
639 .list_source_id_with_shared_types()
640 .await?;
641
642 let fragment_to_source: HashMap<_, _> = source_fragments
643 .into_iter()
644 .flat_map(|(source_id, fragment_ids)| {
645 let source_type = if is_shared_source.get(&source_id).copied().unwrap_or(false) {
646 FragmentType::SharedSource
647 } else {
648 FragmentType::NonSharedSource
649 };
650
651 fragment_ids
652 .into_iter()
653 .map(move |fragment_id| (fragment_id, (source_id, source_type)))
654 })
655 .chain(
656 backfill_fragments
657 .into_iter()
658 .flat_map(|(source_id, fragment_ids)| {
659 fragment_ids.into_iter().flat_map(
660 move |(fragment_id, upstream_fragment_id)| {
661 [
662 (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
663 (
664 upstream_fragment_id,
665 (source_id, FragmentType::SharedSource),
666 ),
667 ]
668 },
669 )
670 }),
671 )
672 .collect();
673
674 let actor_splits = source_actors
675 .into_iter()
676 .flat_map(|(actor_id, fragment_id)| {
677 let (source_id, fragment_type) = fragment_to_source
678 .get(&fragment_id)
679 .copied()
680 .unwrap_or_default();
681
682 actor_splits
683 .remove(&actor_id)
684 .unwrap_or_default()
685 .into_iter()
686 .map(move |split| list_actor_splits_response::ActorSplit {
687 actor_id,
688 source_id,
689 fragment_id,
690 split_id: split.id().to_string(),
691 fragment_type: fragment_type.into(),
692 })
693 })
694 .collect_vec();
695
696 Ok(Response::new(ListActorSplitsResponse { actor_splits }))
697 }
698
699 async fn list_rate_limits(
700 &self,
701 _request: Request<ListRateLimitsRequest>,
702 ) -> Result<Response<ListRateLimitsResponse>, Status> {
703 let rate_limits = self
704 .metadata_manager
705 .catalog_controller
706 .list_rate_limits()
707 .await?;
708 Ok(Response::new(ListRateLimitsResponse { rate_limits }))
709 }
710
711 #[cfg_attr(coverage, coverage(off))]
712 async fn refresh(
713 &self,
714 request: Request<RefreshRequest>,
715 ) -> Result<Response<RefreshResponse>, Status> {
716 let req = request.into_inner();
717
718 tracing::info!("Refreshing table with id: {}", req.table_id);
719
720 let response = self
721 .refresh_manager
722 .trigger_manual_refresh(req, self.env.shared_actor_infos())
723 .await?;
724
725 Ok(Response::new(response))
726 }
727
728 async fn alter_connector_props(
729 &self,
730 request: Request<AlterConnectorPropsRequest>,
731 ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
732 let request = request.into_inner();
733 let secret_manager = LocalSecretManager::global();
734 let (new_props_plaintext, object_id) = match AlterConnectorPropsObject::try_from(
735 request.object_type,
736 ) {
737 Ok(AlterConnectorPropsObject::Sink) => (
738 self.metadata_manager
739 .update_sink_props_by_sink_id(
740 request.object_id.into(),
741 request.changed_props.clone().into_iter().collect(),
742 )
743 .await?,
744 request.object_id.into(),
745 ),
746 Ok(AlterConnectorPropsObject::IcebergTable) => {
747 let (prop, sink_id) = self
748 .metadata_manager
749 .update_iceberg_table_props_by_table_id(
750 request.object_id.into(),
751 request.changed_props.clone().into_iter().collect(),
752 request.extra_options,
753 )
754 .await?;
755 (prop, sink_id.as_object_id())
756 }
757
758 Ok(AlterConnectorPropsObject::Source) => {
759 if request.connector_conn_ref.is_some() {
761 return Err(Status::invalid_argument(
762 "alter connector_conn_ref is not supported",
763 ));
764 }
765 let options_with_secret = self
766 .metadata_manager
767 .catalog_controller
768 .update_source_props_by_source_id(
769 request.object_id.into(),
770 request.changed_props.clone().into_iter().collect(),
771 request.changed_secret_refs.clone().into_iter().collect(),
772 false, )
774 .await?;
775
776 self.stream_manager
777 .source_manager
778 .validate_source_once(request.object_id.into(), options_with_secret.clone())
779 .await?;
780
781 let (options, secret_refs) = options_with_secret.into_parts();
782 (
783 secret_manager
784 .fill_secrets(options, secret_refs)
785 .map_err(MetaError::from)?
786 .into_iter()
787 .collect(),
788 request.object_id.into(),
789 )
790 }
791 Ok(AlterConnectorPropsObject::Connection) => {
792 let (
795 connection_options_with_secret,
796 updated_sources_with_props,
797 updated_sinks_with_props,
798 ) = self
799 .metadata_manager
800 .catalog_controller
801 .update_connection_and_dependent_objects_props(
802 ConnectionId::from(request.object_id),
803 request.changed_props.clone().into_iter().collect(),
804 request.changed_secret_refs.clone().into_iter().collect(),
805 )
806 .await?;
807
808 let (options, secret_refs) = connection_options_with_secret.into_parts();
810 let new_props_plaintext = secret_manager
811 .fill_secrets(options, secret_refs)
812 .map_err(MetaError::from)?
813 .into_iter()
814 .collect::<HashMap<String, String>>();
815
816 let mut dependent_mutation = HashMap::default();
818 for (source_id, complete_source_props) in updated_sources_with_props {
819 dependent_mutation.insert(source_id.as_object_id(), complete_source_props);
820 }
821 for (sink_id, complete_sink_props) in updated_sinks_with_props {
822 dependent_mutation.insert(sink_id.as_object_id(), complete_sink_props);
823 }
824
825 if !dependent_mutation.is_empty() {
826 let database_id = self
827 .metadata_manager
828 .catalog_controller
829 .get_object_database_id(ConnectionId::from(request.object_id))
830 .await?;
831 tracing::info!(
832 "broadcasting connection {} property changes to dependent object ids: {:?}",
833 request.object_id,
834 dependent_mutation.keys().collect_vec()
835 );
836 let _version = self
837 .barrier_scheduler
838 .run_command(
839 database_id,
840 Command::ConnectorPropsChange(dependent_mutation),
841 )
842 .await?;
843 }
844
845 (
846 new_props_plaintext,
847 ConnectionId::from(request.object_id).as_object_id(),
848 )
849 }
850
851 _ => {
852 unimplemented!(
853 "Unsupported object type for AlterConnectorProps: {:?}",
854 request.object_type
855 );
856 }
857 };
858
859 let database_id = self
860 .metadata_manager
861 .catalog_controller
862 .get_object_database_id(object_id)
863 .await?;
864 if AlterConnectorPropsObject::try_from(request.object_type)
867 .is_ok_and(|t| t != AlterConnectorPropsObject::Connection)
868 {
869 let mut mutation = HashMap::default();
870 mutation.insert(object_id, new_props_plaintext);
871 let _version = self
872 .barrier_scheduler
873 .run_command(database_id, Command::ConnectorPropsChange(mutation))
874 .await?;
875 }
876
877 Ok(Response::new(AlterConnectorPropsResponse {}))
878 }
879
880 async fn set_sync_log_store_aligned(
881 &self,
882 request: Request<SetSyncLogStoreAlignedRequest>,
883 ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
884 let req = request.into_inner();
885 let job_id = req.job_id;
886 let aligned = req.aligned;
887
888 self.metadata_manager
889 .catalog_controller
890 .mutate_fragments_by_job_id(
891 job_id,
892 |_mask, stream_node| {
893 let mut visited = false;
894 visit_stream_node_mut(stream_node, |body| {
895 if let NodeBody::SyncLogStore(sync_log_store) = body {
896 sync_log_store.aligned = aligned;
897 visited = true
898 }
899 });
900 Ok(visited)
901 },
902 "no fragments found with synced log store",
903 )
904 .await?;
905
906 Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
907 }
908
909 async fn list_cdc_progress(
910 &self,
911 _request: Request<ListCdcProgressRequest>,
912 ) -> Result<Response<ListCdcProgressResponse>, Status> {
913 let cdc_progress = self
914 .barrier_manager
915 .get_cdc_progress()
916 .await?
917 .into_iter()
918 .map(|(job_id, p)| {
919 (
920 job_id,
921 PbCdcProgress {
922 split_total_count: p.split_total_count,
923 split_backfilled_count: p.split_backfilled_count,
924 split_completed_count: p.split_completed_count,
925 },
926 )
927 })
928 .collect();
929 Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
930 }
931
932 async fn list_unmigrated_tables(
933 &self,
934 _request: Request<ListUnmigratedTablesRequest>,
935 ) -> Result<Response<ListUnmigratedTablesResponse>, Status> {
936 let unmigrated_tables = self
937 .metadata_manager
938 .catalog_controller
939 .list_unmigrated_tables()
940 .await?
941 .into_iter()
942 .map(|table| list_unmigrated_tables_response::UnmigratedTable {
943 table_id: table.id,
944 table_name: table.name,
945 })
946 .collect();
947
948 Ok(Response::new(ListUnmigratedTablesResponse {
949 tables: unmigrated_tables,
950 }))
951 }
952
953 async fn alter_source_properties_safe(
956 &self,
957 request: Request<AlterSourcePropertiesSafeRequest>,
958 ) -> Result<Response<AlterSourcePropertiesSafeResponse>, Status> {
959 let request = request.into_inner();
960 let source_id = request.source_id;
961 let options = request.options.unwrap_or_default();
962
963 tracing::info!(
964 source_id = source_id,
965 reset_splits = options.reset_splits,
966 "Starting orchestrated source property update"
967 );
968
969 let database_id = self
971 .metadata_manager
972 .catalog_controller
973 .get_object_database_id(SourceId::from(source_id))
974 .await?;
975
976 tracing::info!(source_id = source_id, "Pausing stream");
978 self.barrier_scheduler
979 .run_command(database_id, Command::Pause)
980 .await?;
981
982 let result = async {
984 let secret_manager = LocalSecretManager::global();
985
986 let options_with_secret = self
987 .metadata_manager
988 .catalog_controller
989 .update_source_props_by_source_id(
990 source_id.into(),
991 request.changed_props.clone().into_iter().collect(),
992 request.changed_secret_refs.clone().into_iter().collect(),
993 true, )
995 .await?;
996
997 self.stream_manager
999 .source_manager
1000 .validate_source_once(source_id.into(), options_with_secret.clone())
1001 .await?;
1002
1003 let (props, secret_refs) = options_with_secret.into_parts();
1004 let new_props_plaintext: HashMap<String, String> = secret_manager
1005 .fill_secrets(props, secret_refs)
1006 .map_err(MetaError::from)?
1007 .into_iter()
1008 .collect();
1009
1010 tracing::info!(
1012 source_id = source_id,
1013 "Issuing ConnectorPropsChange barrier"
1014 );
1015 let mut mutation = HashMap::default();
1016 mutation.insert(source_id.into(), new_props_plaintext);
1017 self.barrier_scheduler
1018 .run_command(database_id, Command::ConnectorPropsChange(mutation))
1019 .await?;
1020
1021 if options.reset_splits {
1023 tracing::info!(source_id = source_id, "Resetting source splits");
1024 self.stream_manager
1025 .source_manager
1026 .reset_source_splits(source_id.into())
1027 .await?;
1028 }
1029
1030 Ok::<_, MetaError>(())
1031 }
1032 .await;
1033
1034 tracing::info!(source_id = source_id, "Resuming stream");
1036 let resume_result = self
1037 .barrier_scheduler
1038 .run_command(database_id, Command::Resume)
1039 .await;
1040
1041 result?;
1043 resume_result?;
1044
1045 tracing::info!(
1046 source_id = source_id,
1047 "Orchestrated source property update completed successfully"
1048 );
1049
1050 Ok(Response::new(AlterSourcePropertiesSafeResponse {}))
1051 }
1052
1053 async fn reset_source_splits(
1056 &self,
1057 request: Request<ResetSourceSplitsRequest>,
1058 ) -> Result<Response<ResetSourceSplitsResponse>, Status> {
1059 let request = request.into_inner();
1060 let source_id = request.source_id;
1061
1062 tracing::warn!(
1063 source_id = source_id,
1064 "UNSAFE: Resetting source splits - this may cause data duplication or loss"
1065 );
1066
1067 self.stream_manager
1068 .source_manager
1069 .reset_source_splits(source_id.into())
1070 .await?;
1071
1072 Ok(Response::new(ResetSourceSplitsResponse {}))
1073 }
1074
1075 async fn inject_source_offsets(
1078 &self,
1079 request: Request<InjectSourceOffsetsRequest>,
1080 ) -> Result<Response<InjectSourceOffsetsResponse>, Status> {
1081 let request = request.into_inner();
1082 let source_id = request.source_id;
1083 let split_offsets = request.split_offsets;
1084
1085 let applied_split_ids = self
1087 .stream_manager
1088 .source_manager
1089 .validate_inject_source_offsets(source_id.into(), &split_offsets)
1090 .await?;
1091
1092 tracing::warn!(
1093 source_id = source_id,
1094 num_offsets = split_offsets.len(),
1095 "UNSAFE: Injecting source offsets - this may cause data duplication or loss"
1096 );
1097
1098 let database_id = self
1099 .metadata_manager
1100 .catalog_controller
1101 .get_object_database_id(SourceId::from(source_id))
1102 .await?;
1103
1104 self.barrier_scheduler
1105 .run_command(
1106 database_id,
1107 Command::InjectSourceOffsets {
1108 source_id: SourceId::from(source_id),
1109 split_offsets,
1110 },
1111 )
1112 .await?;
1113
1114 Ok(Response::new(InjectSourceOffsetsResponse {
1115 applied_split_ids,
1116 }))
1117 }
1118}
1119
1120fn fragment_desc_to_distribution(
1121 fragment_desc: FragmentDesc,
1122 upstreams: Vec<FragmentId>,
1123 include_node: bool,
1124) -> FragmentDistribution {
1125 let node = include_node.then(|| fragment_desc.stream_node.to_protobuf());
1126 FragmentDistribution {
1127 fragment_id: fragment_desc.fragment_id,
1128 table_id: fragment_desc.job_id,
1129 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
1130 state_table_ids: fragment_desc.state_table_ids.0,
1131 upstream_fragment_ids: upstreams,
1132 fragment_type_mask: fragment_desc.fragment_type_mask as _,
1133 parallelism: fragment_desc.parallelism as _,
1134 vnode_count: fragment_desc.vnode_count as _,
1135 node,
1136 parallelism_policy: fragment_desc.parallelism_policy,
1137 }
1138}
1139
1140#[cfg(test)]
1141mod tests {
1142 use risingwave_meta_model::{JobStatus, StreamingParallelism};
1143
1144 use super::effective_streaming_job_parallelism;
1145
1146 #[test]
1147 fn test_effective_streaming_job_parallelism_prefers_backfill_override() {
1148 let (parallelism, strategy) = effective_streaming_job_parallelism(
1149 JobStatus::Creating,
1150 StreamingParallelism::Adaptive,
1151 Some("BOUNDED(4)".to_owned()),
1152 Some(StreamingParallelism::Adaptive),
1153 Some("BOUNDED(2)".to_owned()),
1154 );
1155
1156 assert_eq!(parallelism, StreamingParallelism::Adaptive);
1157 assert_eq!(strategy.as_deref(), Some("BOUNDED(2)"));
1158 }
1159
1160 #[test]
1161 fn test_effective_streaming_job_parallelism_falls_back_to_job_strategy() {
1162 let (parallelism, strategy) = effective_streaming_job_parallelism(
1163 JobStatus::Initial,
1164 StreamingParallelism::Adaptive,
1165 Some("RATIO(0.5)".to_owned()),
1166 Some(StreamingParallelism::Adaptive),
1167 None,
1168 );
1169
1170 assert_eq!(parallelism, StreamingParallelism::Adaptive);
1171 assert_eq!(strategy.as_deref(), Some("RATIO(0.5)"));
1172 }
1173
1174 #[test]
1175 fn test_effective_streaming_job_parallelism_ignores_backfill_after_creation() {
1176 let (parallelism, strategy) = effective_streaming_job_parallelism(
1177 JobStatus::Created,
1178 StreamingParallelism::Fixed(4),
1179 None,
1180 Some(StreamingParallelism::Fixed(2)),
1181 Some("BOUNDED(2)".to_owned()),
1182 );
1183
1184 assert_eq!(parallelism, StreamingParallelism::Fixed(4));
1185 assert_eq!(strategy, None);
1186 }
1187}