1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_common::secret::LocalSecretManager;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_connector::source::SplitMetaData;
22use risingwave_meta::barrier::BarrierManagerRef;
23use risingwave_meta::controller::fragment::StreamingJobInfo;
24use risingwave_meta::controller::utils::FragmentDesc;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::model::ActorId;
27use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
28use risingwave_meta::{MetaError, model};
29use risingwave_meta_model::{FragmentId, ObjectId, SinkId, SourceId, StreamingParallelism};
30use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
31use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
32use risingwave_pb::meta::list_actor_splits_response::FragmentType;
33use risingwave_pb::meta::list_cdc_progress_response::PbCdcProgress;
34use risingwave_pb::meta::list_table_fragments_response::{
35 ActorInfo, FragmentInfo, TableFragmentInfo,
36};
37use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
38use risingwave_pb::meta::table_fragments::PbState;
39use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
40use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
41use risingwave_pb::meta::*;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use tonic::{Request, Response, Status};
44
45use crate::barrier::{BarrierScheduler, Command};
46use crate::manager::MetaSrvEnv;
47use crate::stream::GlobalStreamManagerRef;
48
49pub type TonicResponse<T> = Result<Response<T>, Status>;
50
51#[derive(Clone)]
52pub struct StreamServiceImpl {
53 env: MetaSrvEnv,
54 barrier_scheduler: BarrierScheduler,
55 barrier_manager: BarrierManagerRef,
56 stream_manager: GlobalStreamManagerRef,
57 metadata_manager: MetadataManager,
58}
59
60impl StreamServiceImpl {
61 pub fn new(
62 env: MetaSrvEnv,
63 barrier_scheduler: BarrierScheduler,
64 barrier_manager: BarrierManagerRef,
65 stream_manager: GlobalStreamManagerRef,
66 metadata_manager: MetadataManager,
67 ) -> Self {
68 StreamServiceImpl {
69 env,
70 barrier_scheduler,
71 barrier_manager,
72 stream_manager,
73 metadata_manager,
74 }
75 }
76}
77
78#[async_trait::async_trait]
79impl StreamManagerService for StreamServiceImpl {
80 async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
81 self.env.idle_manager().record_activity();
82 let req = request.into_inner();
83
84 let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?;
85 Ok(Response::new(FlushResponse {
86 status: None,
87 hummock_version_id: version_id.to_u64(),
88 }))
89 }
90
91 async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
92 for database_id in self.metadata_manager.list_active_database_ids().await? {
93 self.barrier_scheduler
94 .run_command(database_id, Command::pause())
95 .await?;
96 }
97 Ok(Response::new(PauseResponse {}))
98 }
99
100 async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
101 for database_id in self.metadata_manager.list_active_database_ids().await? {
102 self.barrier_scheduler
103 .run_command(database_id, Command::resume())
104 .await?;
105 }
106 Ok(Response::new(ResumeResponse {}))
107 }
108
109 async fn apply_throttle(
110 &self,
111 request: Request<ApplyThrottleRequest>,
112 ) -> Result<Response<ApplyThrottleResponse>, Status> {
113 let request = request.into_inner();
114
115 let actor_to_apply = match request.kind() {
116 ThrottleTarget::Source | ThrottleTarget::TableWithSource => {
117 self.metadata_manager
118 .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
119 .await?
120 }
121 ThrottleTarget::Mv => {
122 self.metadata_manager
123 .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
124 .await?
125 }
126 ThrottleTarget::CdcTable => {
127 self.metadata_manager
128 .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
129 .await?
130 }
131 ThrottleTarget::TableDml => {
132 self.metadata_manager
133 .update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate)
134 .await?
135 }
136 ThrottleTarget::Sink => {
137 self.metadata_manager
138 .update_sink_rate_limit_by_sink_id(request.id as SinkId, request.rate)
139 .await?
140 }
141 ThrottleTarget::Fragment => {
142 self.metadata_manager
143 .update_fragment_rate_limit_by_fragment_id(request.id as _, request.rate)
144 .await?
145 }
146 ThrottleTarget::Unspecified => {
147 return Err(Status::invalid_argument("unspecified throttle target"));
148 }
149 };
150
151 let request_id = if request.kind() == ThrottleTarget::Fragment {
152 self.metadata_manager
153 .catalog_controller
154 .get_fragment_streaming_job_id(request.id as _)
155 .await?
156 } else {
157 request.id as _
158 };
159
160 let database_id = self
161 .metadata_manager
162 .catalog_controller
163 .get_object_database_id(request_id as ObjectId)
164 .await?;
165 let database_id = DatabaseId::new(database_id as _);
166 let mutation: ThrottleConfig = actor_to_apply
168 .iter()
169 .map(|(fragment_id, actors)| {
170 (
171 *fragment_id,
172 actors
173 .iter()
174 .map(|actor_id| (*actor_id, request.rate))
175 .collect::<HashMap<ActorId, Option<u32>>>(),
176 )
177 })
178 .collect();
179 let _i = self
180 .barrier_scheduler
181 .run_command(database_id, Command::Throttle(mutation))
182 .await?;
183
184 Ok(Response::new(ApplyThrottleResponse { status: None }))
185 }
186
187 async fn cancel_creating_jobs(
188 &self,
189 request: Request<CancelCreatingJobsRequest>,
190 ) -> TonicResponse<CancelCreatingJobsResponse> {
191 let req = request.into_inner();
192 let table_ids = match req.jobs.unwrap() {
193 Jobs::Infos(infos) => self
194 .metadata_manager
195 .catalog_controller
196 .find_creating_streaming_job_ids(infos.infos)
197 .await?
198 .into_iter()
199 .map(|id| id as _)
200 .collect(),
201 Jobs::Ids(jobs) => jobs.job_ids,
202 };
203
204 let canceled_jobs = self
205 .stream_manager
206 .cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
207 .await
208 .into_iter()
209 .map(|id| id.table_id)
210 .collect_vec();
211 Ok(Response::new(CancelCreatingJobsResponse {
212 status: None,
213 canceled_jobs,
214 }))
215 }
216
217 async fn list_table_fragments(
218 &self,
219 request: Request<ListTableFragmentsRequest>,
220 ) -> Result<Response<ListTableFragmentsResponse>, Status> {
221 let req = request.into_inner();
222 let table_ids = HashSet::<u32>::from_iter(req.table_ids);
223
224 let mut info = HashMap::new();
225 for job_id in table_ids {
226 let table_fragments = self
227 .metadata_manager
228 .catalog_controller
229 .get_job_fragments_by_id(job_id as _)
230 .await?;
231 let mut dispatchers = self
232 .metadata_manager
233 .catalog_controller
234 .get_fragment_actor_dispatchers(
235 table_fragments.fragment_ids().map(|id| id as _).collect(),
236 )
237 .await?;
238 let ctx = table_fragments.ctx.to_protobuf();
239 info.insert(
240 table_fragments.stream_job_id().table_id,
241 TableFragmentInfo {
242 fragments: table_fragments
243 .fragments
244 .into_iter()
245 .map(|(id, fragment)| FragmentInfo {
246 id,
247 actors: fragment
248 .actors
249 .into_iter()
250 .map(|actor| ActorInfo {
251 id: actor.actor_id,
252 node: Some(fragment.nodes.clone()),
253 dispatcher: dispatchers
254 .get_mut(&(fragment.fragment_id as _))
255 .and_then(|dispatchers| {
256 dispatchers.remove(&(actor.actor_id as _))
257 })
258 .unwrap_or_default(),
259 })
260 .collect_vec(),
261 })
262 .collect_vec(),
263 ctx: Some(ctx),
264 },
265 );
266 }
267
268 Ok(Response::new(ListTableFragmentsResponse {
269 table_fragments: info,
270 }))
271 }
272
273 async fn list_streaming_job_states(
274 &self,
275 _request: Request<ListStreamingJobStatesRequest>,
276 ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
277 let job_infos = self
278 .metadata_manager
279 .catalog_controller
280 .list_streaming_job_infos()
281 .await?;
282 let states = job_infos
283 .into_iter()
284 .map(
285 |StreamingJobInfo {
286 job_id,
287 job_status,
288 name,
289 parallelism,
290 max_parallelism,
291 resource_group,
292 database_id,
293 schema_id,
294 ..
295 }| {
296 let parallelism = match parallelism {
297 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
298 StreamingParallelism::Custom => model::TableParallelism::Custom,
299 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
300 };
301
302 list_streaming_job_states_response::StreamingJobState {
303 table_id: job_id as _,
304 name,
305 state: PbState::from(job_status) as _,
306 parallelism: Some(parallelism.into()),
307 max_parallelism: max_parallelism as _,
308 resource_group,
309 database_id: database_id as _,
310 schema_id: schema_id as _,
311 }
312 },
313 )
314 .collect_vec();
315
316 Ok(Response::new(ListStreamingJobStatesResponse { states }))
317 }
318
319 async fn list_fragment_distribution(
320 &self,
321 _request: Request<ListFragmentDistributionRequest>,
322 ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
323 let fragment_descs = self
324 .metadata_manager
325 .catalog_controller
326 .list_fragment_descs()
327 .await?;
328 let distributions = fragment_descs
329 .into_iter()
330 .map(|(fragment_desc, upstreams)| {
331 fragment_desc_to_distribution(fragment_desc, upstreams)
332 })
333 .collect_vec();
334
335 Ok(Response::new(ListFragmentDistributionResponse {
336 distributions,
337 }))
338 }
339
340 async fn list_creating_fragment_distribution(
341 &self,
342 _request: Request<ListCreatingFragmentDistributionRequest>,
343 ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
344 let fragment_descs = self
345 .metadata_manager
346 .catalog_controller
347 .list_creating_fragment_descs()
348 .await?;
349 let distributions = fragment_descs
350 .into_iter()
351 .map(|(fragment_desc, upstreams)| {
352 fragment_desc_to_distribution(fragment_desc, upstreams)
353 })
354 .collect_vec();
355
356 Ok(Response::new(ListCreatingFragmentDistributionResponse {
357 distributions,
358 }))
359 }
360
361 async fn get_fragment_by_id(
362 &self,
363 request: Request<GetFragmentByIdRequest>,
364 ) -> Result<Response<GetFragmentByIdResponse>, Status> {
365 let req = request.into_inner();
366 let fragment_desc = self
367 .metadata_manager
368 .catalog_controller
369 .get_fragment_desc_by_id(req.fragment_id as i32)
370 .await?;
371 let distribution =
372 fragment_desc.map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams));
373 Ok(Response::new(GetFragmentByIdResponse { distribution }))
374 }
375
376 async fn list_actor_states(
377 &self,
378 _request: Request<ListActorStatesRequest>,
379 ) -> Result<Response<ListActorStatesResponse>, Status> {
380 let actor_locations = self
381 .metadata_manager
382 .catalog_controller
383 .list_actor_locations()
384 .await?;
385 let states = actor_locations
386 .into_iter()
387 .map(|actor_location| list_actor_states_response::ActorState {
388 actor_id: actor_location.actor_id as _,
389 fragment_id: actor_location.fragment_id as _,
390 state: PbActorState::from(actor_location.status) as _,
391 worker_id: actor_location.worker_id as _,
392 })
393 .collect_vec();
394
395 Ok(Response::new(ListActorStatesResponse { states }))
396 }
397
398 async fn list_object_dependencies(
399 &self,
400 _request: Request<ListObjectDependenciesRequest>,
401 ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
402 let dependencies = self
403 .metadata_manager
404 .catalog_controller
405 .list_created_object_dependencies()
406 .await?;
407
408 Ok(Response::new(ListObjectDependenciesResponse {
409 dependencies,
410 }))
411 }
412
413 async fn recover(
414 &self,
415 _request: Request<RecoverRequest>,
416 ) -> Result<Response<RecoverResponse>, Status> {
417 self.barrier_manager.adhoc_recovery().await?;
418 Ok(Response::new(RecoverResponse {}))
419 }
420
421 async fn list_actor_splits(
422 &self,
423 _request: Request<ListActorSplitsRequest>,
424 ) -> Result<Response<ListActorSplitsResponse>, Status> {
425 let SourceManagerRunningInfo {
426 source_fragments,
427 backfill_fragments,
428 } = self.stream_manager.source_manager.get_running_info().await;
429
430 let mut actor_splits = self.env.shared_actor_infos().list_assignments();
431
432 let source_actors = self
433 .metadata_manager
434 .catalog_controller
435 .list_source_actors()
436 .await?;
437
438 let is_shared_source = self
439 .metadata_manager
440 .catalog_controller
441 .list_source_id_with_shared_types()
442 .await?;
443
444 let fragment_to_source: HashMap<_, _> = source_fragments
445 .into_iter()
446 .flat_map(|(source_id, fragment_ids)| {
447 let source_type = if is_shared_source
448 .get(&(source_id as _))
449 .copied()
450 .unwrap_or(false)
451 {
452 FragmentType::SharedSource
453 } else {
454 FragmentType::NonSharedSource
455 };
456
457 fragment_ids
458 .into_iter()
459 .map(move |fragment_id| (fragment_id, (source_id, source_type)))
460 })
461 .chain(
462 backfill_fragments
463 .into_iter()
464 .flat_map(|(source_id, fragment_ids)| {
465 fragment_ids.into_iter().flat_map(
466 move |(fragment_id, upstream_fragment_id)| {
467 [
468 (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
469 (
470 upstream_fragment_id,
471 (source_id, FragmentType::SharedSource),
472 ),
473 ]
474 },
475 )
476 }),
477 )
478 .collect();
479
480 let actor_splits = source_actors
481 .into_iter()
482 .flat_map(|(actor_id, fragment_id)| {
483 let (source_id, fragment_type) = fragment_to_source
484 .get(&(fragment_id as _))
485 .copied()
486 .unwrap_or_default();
487
488 actor_splits
489 .remove(&(actor_id as _))
490 .unwrap_or_default()
491 .into_iter()
492 .map(move |split| list_actor_splits_response::ActorSplit {
493 actor_id: actor_id as _,
494 source_id: source_id as _,
495 fragment_id: fragment_id as _,
496 split_id: split.id().to_string(),
497 fragment_type: fragment_type.into(),
498 })
499 })
500 .collect_vec();
501
502 Ok(Response::new(ListActorSplitsResponse { actor_splits }))
503 }
504
505 async fn list_rate_limits(
506 &self,
507 _request: Request<ListRateLimitsRequest>,
508 ) -> Result<Response<ListRateLimitsResponse>, Status> {
509 let rate_limits = self
510 .metadata_manager
511 .catalog_controller
512 .list_rate_limits()
513 .await?;
514 Ok(Response::new(ListRateLimitsResponse { rate_limits }))
515 }
516
517 #[cfg_attr(coverage, coverage(off))]
518 async fn refresh(
519 &self,
520 request: Request<RefreshRequest>,
521 ) -> Result<Response<RefreshResponse>, Status> {
522 let req = request.into_inner();
523
524 tracing::info!("Refreshing table with id: {}", req.table_id);
525
526 let refresh_manager = risingwave_meta::stream::RefreshManager::new(
528 self.metadata_manager.clone(),
529 self.barrier_scheduler.clone(),
530 );
531
532 let response = refresh_manager.refresh_table(req).await?;
533
534 Ok(Response::new(response))
535 }
536
537 async fn alter_connector_props(
538 &self,
539 request: Request<AlterConnectorPropsRequest>,
540 ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
541 let request = request.into_inner();
542 let secret_manager = LocalSecretManager::global();
543 let (new_props_plaintext, object_id) =
544 match AlterConnectorPropsObject::try_from(request.object_type) {
545 Ok(AlterConnectorPropsObject::Sink) => (
546 self.metadata_manager
547 .update_sink_props_by_sink_id(
548 request.object_id as i32,
549 request.changed_props.clone().into_iter().collect(),
550 )
551 .await?,
552 request.object_id,
553 ),
554 Ok(AlterConnectorPropsObject::IcebergTable) => {
555 self.metadata_manager
556 .update_iceberg_table_props_by_table_id(
557 TableId::from(request.object_id),
558 request.changed_props.clone().into_iter().collect(),
559 request.extra_options,
560 )
561 .await?
562 }
563
564 Ok(AlterConnectorPropsObject::Source) => {
565 if request.connector_conn_ref.is_some() {
567 return Err(Status::invalid_argument(
568 "alter connector_conn_ref is not supported",
569 ));
570 }
571 let options_with_secret = self
572 .metadata_manager
573 .catalog_controller
574 .update_source_props_by_source_id(
575 request.object_id as SourceId,
576 request.changed_props.clone().into_iter().collect(),
577 request.changed_secret_refs.clone().into_iter().collect(),
578 )
579 .await?;
580
581 self.stream_manager
582 .source_manager
583 .validate_source_once(request.object_id, options_with_secret.clone())
584 .await?;
585
586 let (options, secret_refs) = options_with_secret.into_parts();
587 (
588 secret_manager
589 .fill_secrets(options, secret_refs)
590 .map_err(MetaError::from)?
591 .into_iter()
592 .collect(),
593 request.object_id,
594 )
595 }
596
597 _ => {
598 unimplemented!(
599 "Unsupported object type for AlterConnectorProps: {:?}",
600 request.object_type
601 );
602 }
603 };
604
605 let database_id = self
606 .metadata_manager
607 .catalog_controller
608 .get_object_database_id(object_id as ObjectId)
609 .await?;
610 let database_id = DatabaseId::new(database_id as _);
611
612 let mut mutation = HashMap::default();
613 mutation.insert(object_id, new_props_plaintext);
614
615 let _i = self
616 .barrier_scheduler
617 .run_command(database_id, Command::ConnectorPropsChange(mutation))
618 .await?;
619
620 Ok(Response::new(AlterConnectorPropsResponse {}))
621 }
622
623 async fn set_sync_log_store_aligned(
624 &self,
625 request: Request<SetSyncLogStoreAlignedRequest>,
626 ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
627 let req = request.into_inner();
628 let job_id = req.job_id;
629 let aligned = req.aligned;
630
631 self.metadata_manager
632 .catalog_controller
633 .mutate_fragments_by_job_id(
634 job_id as _,
635 |_mask, stream_node| {
636 let mut visited = false;
637 visit_stream_node_mut(stream_node, |body| {
638 if let NodeBody::SyncLogStore(sync_log_store) = body {
639 sync_log_store.aligned = aligned;
640 visited = true
641 }
642 });
643 Ok(visited)
644 },
645 "no fragments found with synced log store",
646 )
647 .await?;
648
649 Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
650 }
651
652 async fn list_cdc_progress(
653 &self,
654 _request: Request<ListCdcProgressRequest>,
655 ) -> Result<Response<ListCdcProgressResponse>, Status> {
656 let cdc_progress = self
657 .env
658 .cdc_table_backfill_tracker()
659 .list_cdc_progress()
660 .into_iter()
661 .map(|(job_id, p)| {
662 (
663 job_id,
664 PbCdcProgress {
665 split_total_count: p.split_total_count,
666 split_backfilled_count: p.split_backfilled_count,
667 split_completed_count: p.split_completed_count,
668 },
669 )
670 })
671 .collect();
672 Ok(Response::new(ListCdcProgressResponse { cdc_progress }))
673 }
674}
675
676fn fragment_desc_to_distribution(
677 fragment_desc: FragmentDesc,
678 upstreams: Vec<FragmentId>,
679) -> FragmentDistribution {
680 FragmentDistribution {
681 fragment_id: fragment_desc.fragment_id as _,
682 table_id: fragment_desc.job_id as _,
683 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
684 state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
685 upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
686 fragment_type_mask: fragment_desc.fragment_type_mask as _,
687 parallelism: fragment_desc.parallelism as _,
688 vnode_count: fragment_desc.vnode_count as _,
689 node: Some(fragment_desc.stream_node.to_protobuf()),
690 }
691}