1use std::collections::{HashMap, HashSet};
16
17use itertools::Itertools;
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_common::secret::LocalSecretManager;
20use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
21use risingwave_connector::source::SplitMetaData;
22use risingwave_meta::barrier::BarrierManagerRef;
23use risingwave_meta::controller::fragment::StreamingJobInfo;
24use risingwave_meta::controller::utils::FragmentDesc;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::model::ActorId;
27use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
28use risingwave_meta::{MetaError, model};
29use risingwave_meta_model::{FragmentId, ObjectId, SinkId, SourceId, StreamingParallelism};
30use risingwave_pb::meta::alter_connector_props_request::AlterConnectorPropsObject;
31use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
32use risingwave_pb::meta::list_actor_splits_response::FragmentType;
33use risingwave_pb::meta::list_table_fragments_response::{
34 ActorInfo, FragmentInfo, TableFragmentInfo,
35};
36use risingwave_pb::meta::stream_manager_service_server::StreamManagerService;
37use risingwave_pb::meta::table_fragments::PbState;
38use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
39use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
40use risingwave_pb::meta::*;
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use tonic::{Request, Response, Status};
43
44use crate::barrier::{BarrierScheduler, Command};
45use crate::manager::MetaSrvEnv;
46use crate::stream::GlobalStreamManagerRef;
47
48pub type TonicResponse<T> = Result<Response<T>, Status>;
49
50#[derive(Clone)]
51pub struct StreamServiceImpl {
52 env: MetaSrvEnv,
53 barrier_scheduler: BarrierScheduler,
54 barrier_manager: BarrierManagerRef,
55 stream_manager: GlobalStreamManagerRef,
56 metadata_manager: MetadataManager,
57}
58
59impl StreamServiceImpl {
60 pub fn new(
61 env: MetaSrvEnv,
62 barrier_scheduler: BarrierScheduler,
63 barrier_manager: BarrierManagerRef,
64 stream_manager: GlobalStreamManagerRef,
65 metadata_manager: MetadataManager,
66 ) -> Self {
67 StreamServiceImpl {
68 env,
69 barrier_scheduler,
70 barrier_manager,
71 stream_manager,
72 metadata_manager,
73 }
74 }
75}
76
77#[async_trait::async_trait]
78impl StreamManagerService for StreamServiceImpl {
79 async fn flush(&self, request: Request<FlushRequest>) -> TonicResponse<FlushResponse> {
80 self.env.idle_manager().record_activity();
81 let req = request.into_inner();
82
83 let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?;
84 Ok(Response::new(FlushResponse {
85 status: None,
86 hummock_version_id: version_id.to_u64(),
87 }))
88 }
89
90 async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
91 for database_id in self.metadata_manager.list_active_database_ids().await? {
92 self.barrier_scheduler
93 .run_command(database_id, Command::pause())
94 .await?;
95 }
96 Ok(Response::new(PauseResponse {}))
97 }
98
99 async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
100 for database_id in self.metadata_manager.list_active_database_ids().await? {
101 self.barrier_scheduler
102 .run_command(database_id, Command::resume())
103 .await?;
104 }
105 Ok(Response::new(ResumeResponse {}))
106 }
107
108 async fn apply_throttle(
109 &self,
110 request: Request<ApplyThrottleRequest>,
111 ) -> Result<Response<ApplyThrottleResponse>, Status> {
112 let request = request.into_inner();
113
114 let actor_to_apply = match request.kind() {
115 ThrottleTarget::Source | ThrottleTarget::TableWithSource => {
116 self.metadata_manager
117 .update_source_rate_limit_by_source_id(request.id as SourceId, request.rate)
118 .await?
119 }
120 ThrottleTarget::Mv => {
121 self.metadata_manager
122 .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
123 .await?
124 }
125 ThrottleTarget::CdcTable => {
126 self.metadata_manager
127 .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
128 .await?
129 }
130 ThrottleTarget::TableDml => {
131 self.metadata_manager
132 .update_dml_rate_limit_by_table_id(TableId::from(request.id), request.rate)
133 .await?
134 }
135 ThrottleTarget::Sink => {
136 self.metadata_manager
137 .update_sink_rate_limit_by_sink_id(request.id as SinkId, request.rate)
138 .await?
139 }
140 ThrottleTarget::Fragment => {
141 self.metadata_manager
142 .update_fragment_rate_limit_by_fragment_id(request.id as _, request.rate)
143 .await?
144 }
145 ThrottleTarget::Unspecified => {
146 return Err(Status::invalid_argument("unspecified throttle target"));
147 }
148 };
149
150 let request_id = if request.kind() == ThrottleTarget::Fragment {
151 self.metadata_manager
152 .catalog_controller
153 .get_fragment_streaming_job_id(request.id as _)
154 .await?
155 } else {
156 request.id as _
157 };
158
159 let database_id = self
160 .metadata_manager
161 .catalog_controller
162 .get_object_database_id(request_id as ObjectId)
163 .await?;
164 let database_id = DatabaseId::new(database_id as _);
165 let mutation: ThrottleConfig = actor_to_apply
167 .iter()
168 .map(|(fragment_id, actors)| {
169 (
170 *fragment_id,
171 actors
172 .iter()
173 .map(|actor_id| (*actor_id, request.rate))
174 .collect::<HashMap<ActorId, Option<u32>>>(),
175 )
176 })
177 .collect();
178 let _i = self
179 .barrier_scheduler
180 .run_command(database_id, Command::Throttle(mutation))
181 .await?;
182
183 Ok(Response::new(ApplyThrottleResponse { status: None }))
184 }
185
186 async fn cancel_creating_jobs(
187 &self,
188 request: Request<CancelCreatingJobsRequest>,
189 ) -> TonicResponse<CancelCreatingJobsResponse> {
190 let req = request.into_inner();
191 let table_ids = match req.jobs.unwrap() {
192 Jobs::Infos(infos) => self
193 .metadata_manager
194 .catalog_controller
195 .find_creating_streaming_job_ids(infos.infos)
196 .await?
197 .into_iter()
198 .map(|id| id as _)
199 .collect(),
200 Jobs::Ids(jobs) => jobs.job_ids,
201 };
202
203 let canceled_jobs = self
204 .stream_manager
205 .cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
206 .await
207 .into_iter()
208 .map(|id| id.table_id)
209 .collect_vec();
210 Ok(Response::new(CancelCreatingJobsResponse {
211 status: None,
212 canceled_jobs,
213 }))
214 }
215
216 async fn list_table_fragments(
217 &self,
218 request: Request<ListTableFragmentsRequest>,
219 ) -> Result<Response<ListTableFragmentsResponse>, Status> {
220 let req = request.into_inner();
221 let table_ids = HashSet::<u32>::from_iter(req.table_ids);
222
223 let mut info = HashMap::new();
224 for job_id in table_ids {
225 let table_fragments = self
226 .metadata_manager
227 .catalog_controller
228 .get_job_fragments_by_id(job_id as _)
229 .await?;
230 let mut dispatchers = self
231 .metadata_manager
232 .catalog_controller
233 .get_fragment_actor_dispatchers(
234 table_fragments.fragment_ids().map(|id| id as _).collect(),
235 )
236 .await?;
237 let ctx = table_fragments.ctx.to_protobuf();
238 info.insert(
239 table_fragments.stream_job_id().table_id,
240 TableFragmentInfo {
241 fragments: table_fragments
242 .fragments
243 .into_iter()
244 .map(|(id, fragment)| FragmentInfo {
245 id,
246 actors: fragment
247 .actors
248 .into_iter()
249 .map(|actor| ActorInfo {
250 id: actor.actor_id,
251 node: Some(fragment.nodes.clone()),
252 dispatcher: dispatchers
253 .get_mut(&(fragment.fragment_id as _))
254 .and_then(|dispatchers| {
255 dispatchers.remove(&(actor.actor_id as _))
256 })
257 .unwrap_or_default(),
258 })
259 .collect_vec(),
260 })
261 .collect_vec(),
262 ctx: Some(ctx),
263 },
264 );
265 }
266
267 Ok(Response::new(ListTableFragmentsResponse {
268 table_fragments: info,
269 }))
270 }
271
272 async fn list_streaming_job_states(
273 &self,
274 _request: Request<ListStreamingJobStatesRequest>,
275 ) -> Result<Response<ListStreamingJobStatesResponse>, Status> {
276 let job_infos = self
277 .metadata_manager
278 .catalog_controller
279 .list_streaming_job_infos()
280 .await?;
281 let states = job_infos
282 .into_iter()
283 .map(
284 |StreamingJobInfo {
285 job_id,
286 job_status,
287 name,
288 parallelism,
289 max_parallelism,
290 resource_group,
291 database_id,
292 schema_id,
293 ..
294 }| {
295 let parallelism = match parallelism {
296 StreamingParallelism::Adaptive => model::TableParallelism::Adaptive,
297 StreamingParallelism::Custom => model::TableParallelism::Custom,
298 StreamingParallelism::Fixed(n) => model::TableParallelism::Fixed(n as _),
299 };
300
301 list_streaming_job_states_response::StreamingJobState {
302 table_id: job_id as _,
303 name,
304 state: PbState::from(job_status) as _,
305 parallelism: Some(parallelism.into()),
306 max_parallelism: max_parallelism as _,
307 resource_group,
308 database_id: database_id as _,
309 schema_id: schema_id as _,
310 }
311 },
312 )
313 .collect_vec();
314
315 Ok(Response::new(ListStreamingJobStatesResponse { states }))
316 }
317
318 async fn list_fragment_distribution(
319 &self,
320 _request: Request<ListFragmentDistributionRequest>,
321 ) -> Result<Response<ListFragmentDistributionResponse>, Status> {
322 let fragment_descs = self
323 .metadata_manager
324 .catalog_controller
325 .list_fragment_descs()
326 .await?;
327 let distributions = fragment_descs
328 .into_iter()
329 .map(|(fragment_desc, upstreams)| {
330 fragment_desc_to_distribution(fragment_desc, upstreams)
331 })
332 .collect_vec();
333
334 Ok(Response::new(ListFragmentDistributionResponse {
335 distributions,
336 }))
337 }
338
339 async fn list_creating_fragment_distribution(
340 &self,
341 _request: Request<ListCreatingFragmentDistributionRequest>,
342 ) -> Result<Response<ListCreatingFragmentDistributionResponse>, Status> {
343 let fragment_descs = self
344 .metadata_manager
345 .catalog_controller
346 .list_creating_fragment_descs()
347 .await?;
348 let distributions = fragment_descs
349 .into_iter()
350 .map(|(fragment_desc, upstreams)| {
351 fragment_desc_to_distribution(fragment_desc, upstreams)
352 })
353 .collect_vec();
354
355 Ok(Response::new(ListCreatingFragmentDistributionResponse {
356 distributions,
357 }))
358 }
359
360 async fn get_fragment_by_id(
361 &self,
362 request: Request<GetFragmentByIdRequest>,
363 ) -> Result<Response<GetFragmentByIdResponse>, Status> {
364 let req = request.into_inner();
365 let fragment_desc = self
366 .metadata_manager
367 .catalog_controller
368 .get_fragment_desc_by_id(req.fragment_id as i32)
369 .await?;
370 let distribution =
371 fragment_desc.map(|(desc, upstreams)| fragment_desc_to_distribution(desc, upstreams));
372 Ok(Response::new(GetFragmentByIdResponse { distribution }))
373 }
374
375 async fn list_actor_states(
376 &self,
377 _request: Request<ListActorStatesRequest>,
378 ) -> Result<Response<ListActorStatesResponse>, Status> {
379 let actor_locations = self
380 .metadata_manager
381 .catalog_controller
382 .list_actor_locations()
383 .await?;
384 let states = actor_locations
385 .into_iter()
386 .map(|actor_location| list_actor_states_response::ActorState {
387 actor_id: actor_location.actor_id as _,
388 fragment_id: actor_location.fragment_id as _,
389 state: PbActorState::from(actor_location.status) as _,
390 worker_id: actor_location.worker_id as _,
391 })
392 .collect_vec();
393
394 Ok(Response::new(ListActorStatesResponse { states }))
395 }
396
397 async fn list_object_dependencies(
398 &self,
399 _request: Request<ListObjectDependenciesRequest>,
400 ) -> Result<Response<ListObjectDependenciesResponse>, Status> {
401 let dependencies = self
402 .metadata_manager
403 .catalog_controller
404 .list_created_object_dependencies()
405 .await?;
406
407 Ok(Response::new(ListObjectDependenciesResponse {
408 dependencies,
409 }))
410 }
411
412 async fn recover(
413 &self,
414 _request: Request<RecoverRequest>,
415 ) -> Result<Response<RecoverResponse>, Status> {
416 self.barrier_manager.adhoc_recovery().await?;
417 Ok(Response::new(RecoverResponse {}))
418 }
419
420 async fn list_actor_splits(
421 &self,
422 _request: Request<ListActorSplitsRequest>,
423 ) -> Result<Response<ListActorSplitsResponse>, Status> {
424 let SourceManagerRunningInfo {
425 source_fragments,
426 backfill_fragments,
427 mut actor_splits,
428 } = self.stream_manager.source_manager.get_running_info().await;
429
430 let source_actors = self
431 .metadata_manager
432 .catalog_controller
433 .list_source_actors()
434 .await?;
435
436 let is_shared_source = self
437 .metadata_manager
438 .catalog_controller
439 .list_source_id_with_shared_types()
440 .await?;
441
442 let fragment_to_source: HashMap<_, _> = source_fragments
443 .into_iter()
444 .flat_map(|(source_id, fragment_ids)| {
445 let source_type = if is_shared_source
446 .get(&(source_id as _))
447 .copied()
448 .unwrap_or(false)
449 {
450 FragmentType::SharedSource
451 } else {
452 FragmentType::NonSharedSource
453 };
454
455 fragment_ids
456 .into_iter()
457 .map(move |fragment_id| (fragment_id, (source_id, source_type)))
458 })
459 .chain(
460 backfill_fragments
461 .into_iter()
462 .flat_map(|(source_id, fragment_ids)| {
463 fragment_ids.into_iter().flat_map(
464 move |(fragment_id, upstream_fragment_id)| {
465 [
466 (fragment_id, (source_id, FragmentType::SharedSourceBackfill)),
467 (
468 upstream_fragment_id,
469 (source_id, FragmentType::SharedSource),
470 ),
471 ]
472 },
473 )
474 }),
475 )
476 .collect();
477
478 let actor_splits = source_actors
479 .into_iter()
480 .flat_map(|(actor_id, fragment_id)| {
481 let (source_id, fragment_type) = fragment_to_source
482 .get(&(fragment_id as _))
483 .copied()
484 .unwrap_or_default();
485
486 actor_splits
487 .remove(&(actor_id as _))
488 .unwrap_or_default()
489 .into_iter()
490 .map(move |split| list_actor_splits_response::ActorSplit {
491 actor_id: actor_id as _,
492 source_id: source_id as _,
493 fragment_id: fragment_id as _,
494 split_id: split.id().to_string(),
495 fragment_type: fragment_type.into(),
496 })
497 })
498 .collect_vec();
499
500 Ok(Response::new(ListActorSplitsResponse { actor_splits }))
501 }
502
503 async fn list_rate_limits(
504 &self,
505 _request: Request<ListRateLimitsRequest>,
506 ) -> Result<Response<ListRateLimitsResponse>, Status> {
507 let rate_limits = self
508 .metadata_manager
509 .catalog_controller
510 .list_rate_limits()
511 .await?;
512 Ok(Response::new(ListRateLimitsResponse { rate_limits }))
513 }
514
515 async fn alter_connector_props(
516 &self,
517 request: Request<AlterConnectorPropsRequest>,
518 ) -> Result<Response<AlterConnectorPropsResponse>, Status> {
519 let request = request.into_inner();
520
521 let database_id = self
522 .metadata_manager
523 .catalog_controller
524 .get_object_database_id(request.object_id as ObjectId)
525 .await?;
526 let database_id = DatabaseId::new(database_id as _);
527
528 let secret_manager = LocalSecretManager::global();
529 let new_props_plaintext = match request.object_type() {
530 AlterConnectorPropsObject::Sink => {
531 self.metadata_manager
532 .update_sink_props_by_sink_id(
533 request.object_id as i32,
534 request.changed_props.clone().into_iter().collect(),
535 )
536 .await?
537 }
538 AlterConnectorPropsObject::Source => {
539 if request.connector_conn_ref.is_some() {
541 return Err(Status::invalid_argument(
542 "alter connector_conn_ref is not supported",
543 ));
544 }
545 let options_with_secret = self
546 .metadata_manager
547 .catalog_controller
548 .update_source_props_by_source_id(
549 request.object_id as SourceId,
550 request.changed_props.clone().into_iter().collect(),
551 request.changed_secret_refs.clone().into_iter().collect(),
552 )
553 .await?;
554
555 self.stream_manager
556 .source_manager
557 .validate_source_once(request.object_id, options_with_secret.clone())
558 .await?;
559
560 let (options, secret_refs) = options_with_secret.into_parts();
561 secret_manager
562 .fill_secrets(options, secret_refs)
563 .map_err(MetaError::from)?
564 .into_iter()
565 .collect()
566 }
567 AlterConnectorPropsObject::Connection => {
568 todo!()
569 }
570 AlterConnectorPropsObject::Unspecified => unreachable!(),
571 };
572
573 let mut mutation = HashMap::default();
574 mutation.insert(request.object_id, new_props_plaintext);
575
576 let _i = self
577 .barrier_scheduler
578 .run_command(database_id, Command::ConnectorPropsChange(mutation))
579 .await?;
580
581 Ok(Response::new(AlterConnectorPropsResponse {}))
582 }
583
584 async fn set_sync_log_store_aligned(
585 &self,
586 request: Request<SetSyncLogStoreAlignedRequest>,
587 ) -> Result<Response<SetSyncLogStoreAlignedResponse>, Status> {
588 let req = request.into_inner();
589 let job_id = req.job_id;
590 let aligned = req.aligned;
591
592 self.metadata_manager
593 .catalog_controller
594 .mutate_fragments_by_job_id(
595 job_id as _,
596 |_mask, stream_node| {
597 let mut visited = false;
598 visit_stream_node_mut(stream_node, |body| {
599 if let NodeBody::SyncLogStore(sync_log_store) = body {
600 sync_log_store.aligned = aligned;
601 visited = true
602 }
603 });
604 visited
605 },
606 "no fragments found with synced log store",
607 )
608 .await?;
609
610 Ok(Response::new(SetSyncLogStoreAlignedResponse {}))
611 }
612}
613
614fn fragment_desc_to_distribution(
615 fragment_desc: FragmentDesc,
616 upstreams: Vec<FragmentId>,
617) -> FragmentDistribution {
618 FragmentDistribution {
619 fragment_id: fragment_desc.fragment_id as _,
620 table_id: fragment_desc.job_id as _,
621 distribution_type: PbFragmentDistributionType::from(fragment_desc.distribution_type) as _,
622 state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
623 upstream_fragment_ids: upstreams.iter().map(|id| *id as _).collect(),
624 fragment_type_mask: fragment_desc.fragment_type_mask as _,
625 parallelism: fragment_desc.parallelism as _,
626 vnode_count: fragment_desc.vnode_count as _,
627 node: Some(fragment_desc.stream_node.to_protobuf()),
628 }
629}