1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::SYS_CATALOG_START_ID;
22use risingwave_hummock_sdk::key_range::KeyRange;
23use risingwave_hummock_sdk::version::HummockVersionDelta;
24use risingwave_meta::backup_restore::BackupManagerRef;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
27use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
28use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
29use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
30use risingwave_pb::hummock::*;
31use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
32use risingwave_pb::iceberg_compaction::{
33 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
34};
35use tonic::{Request, Response, Status, Streaming};
36
37use crate::RwReceiverStream;
38use crate::hummock::compaction::selector::ManualCompactionOption;
39use crate::hummock::{HummockManagerRef, ManualCompactionTriggerResult};
40
41pub struct HummockServiceImpl {
42 hummock_manager: HummockManagerRef,
43 metadata_manager: MetadataManager,
44 backup_manager: BackupManagerRef,
45 iceberg_compaction_manager: IcebergCompactionManagerRef,
46}
47
48impl HummockServiceImpl {
49 pub fn new(
50 hummock_manager: HummockManagerRef,
51 metadata_manager: MetadataManager,
52 backup_manager: BackupManagerRef,
53 iceberg_compaction_manager: IcebergCompactionManagerRef,
54 ) -> Self {
55 HummockServiceImpl {
56 hummock_manager,
57 metadata_manager,
58 backup_manager,
59 iceberg_compaction_manager,
60 }
61 }
62}
63
64macro_rules! fields_to_kvs {
65 ($struct:ident, $($field:ident),*) => {
66 {
67 let mut kvs = HashMap::default();
68 $(
69 kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
70 )*
71 kvs
72 }
73 }
74}
75
76#[async_trait::async_trait]
77impl HummockManagerService for HummockServiceImpl {
78 type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
79 type SubscribeIcebergCompactionEventStream =
80 RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
81
82 async fn unpin_version_before(
83 &self,
84 request: Request<UnpinVersionBeforeRequest>,
85 ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
86 let req = request.into_inner();
87 self.hummock_manager
88 .unpin_version_before(req.context_id, req.unpin_version_before)
89 .await?;
90 Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
91 }
92
93 async fn get_current_version(
94 &self,
95 _request: Request<GetCurrentVersionRequest>,
96 ) -> Result<Response<GetCurrentVersionResponse>, Status> {
97 let current_version = self
98 .hummock_manager
99 .on_current_version(|version| version.into())
100 .await;
101 Ok(Response::new(GetCurrentVersionResponse {
102 status: None,
103 current_version: Some(current_version),
104 }))
105 }
106
107 async fn replay_version_delta(
108 &self,
109 request: Request<ReplayVersionDeltaRequest>,
110 ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
111 let req = request.into_inner();
112 let (version, compaction_groups) = self
113 .hummock_manager
114 .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
115 &req.version_delta.unwrap(),
116 ))
117 .await?;
118 Ok(Response::new(ReplayVersionDeltaResponse {
119 version: Some(version.into()),
120 modified_compaction_groups: compaction_groups,
121 }))
122 }
123
124 async fn trigger_compaction_deterministic(
125 &self,
126 request: Request<TriggerCompactionDeterministicRequest>,
127 ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
128 let req = request.into_inner();
129 self.hummock_manager
130 .trigger_compaction_deterministic(req.version_id, req.compaction_groups)
131 .await?;
132 Ok(Response::new(TriggerCompactionDeterministicResponse {}))
133 }
134
135 async fn disable_commit_epoch(
136 &self,
137 _request: Request<DisableCommitEpochRequest>,
138 ) -> Result<Response<DisableCommitEpochResponse>, Status> {
139 let version = self.hummock_manager.disable_commit_epoch().await;
140 Ok(Response::new(DisableCommitEpochResponse {
141 current_version: Some(version.into()),
142 }))
143 }
144
145 async fn list_version_deltas(
146 &self,
147 request: Request<ListVersionDeltasRequest>,
148 ) -> Result<Response<ListVersionDeltasResponse>, Status> {
149 let req = request.into_inner();
150 let version_deltas = self
151 .hummock_manager
152 .list_version_deltas(req.start_id, req.num_limit)
153 .await?;
154 let resp = ListVersionDeltasResponse {
155 version_deltas: Some(PbHummockVersionDeltas {
156 version_deltas: version_deltas
157 .into_iter()
158 .map(HummockVersionDelta::into)
159 .collect(),
160 }),
161 };
162 Ok(Response::new(resp))
163 }
164
165 async fn get_new_object_ids(
166 &self,
167 request: Request<GetNewObjectIdsRequest>,
168 ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
169 let object_id_range = self
170 .hummock_manager
171 .get_new_object_ids(request.into_inner().number)
172 .await?;
173 Ok(Response::new(GetNewObjectIdsResponse {
174 status: None,
175 start_id: object_id_range.start_id,
176 end_id: object_id_range.end_id,
177 }))
178 }
179
180 async fn trigger_manual_compaction(
181 &self,
182 request: Request<TriggerManualCompactionRequest>,
183 ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
184 let request = request.into_inner();
185 let compaction_group_id = request.compaction_group_id;
186 let mut option = ManualCompactionOption {
187 level: request.level as usize,
188 sst_ids: request.sst_ids,
189 exclusive: request.exclusive.unwrap_or(false),
190 ..Default::default()
191 };
192
193 match request.key_range {
195 Some(pb_key_range) => {
196 option.key_range = KeyRange {
197 left: pb_key_range.left.into(),
198 right: pb_key_range.right.into(),
199 right_exclusive: pb_key_range.right_exclusive,
200 };
201 }
202
203 None => {
204 option.key_range = KeyRange::default();
205 }
206 }
207
208 if request.table_id.as_raw_id() < SYS_CATALOG_START_ID as u32 {
210 let job_id = request.table_id;
212 if let Ok(table_fragment) = self.metadata_manager.get_job_fragments_by_id(job_id).await
213 {
214 option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
215 }
216 }
217
218 assert!(
219 option
220 .internal_table_id
221 .iter()
222 .all(|table_id| table_id.as_raw_id() < SYS_CATALOG_START_ID as u32),
223 );
224
225 tracing::info!(
226 "Try trigger_manual_compaction compaction_group_id {} option {:?}",
227 compaction_group_id,
228 &option
229 );
230
231 let should_retry = match self
232 .hummock_manager
233 .trigger_manual_compaction(compaction_group_id, option)
234 .await?
235 {
236 ManualCompactionTriggerResult::Submitted => false,
237 ManualCompactionTriggerResult::Retry => true,
238 };
239
240 Ok(Response::new(TriggerManualCompactionResponse {
241 status: None,
242 should_retry: Some(should_retry),
243 }))
244 }
245
246 async fn trigger_full_gc(
247 &self,
248 request: Request<TriggerFullGcRequest>,
249 ) -> Result<Response<TriggerFullGcResponse>, Status> {
250 let req = request.into_inner();
251 let backup_manager_2 = self.backup_manager.clone();
252 let hummock_manager_2 = self.hummock_manager.clone();
253 tokio::task::spawn(async move {
254 use thiserror_ext::AsReport;
255 let _ = hummock_manager_2
256 .start_full_gc(
257 Duration::from_secs(req.sst_retention_time_sec),
258 req.prefix,
259 Some(backup_manager_2),
260 )
261 .await
262 .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
263 });
264 Ok(Response::new(TriggerFullGcResponse { status: None }))
265 }
266
267 async fn rise_ctl_get_pinned_versions_summary(
268 &self,
269 _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
270 ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
271 let pinned_versions = self.hummock_manager.list_pinned_version().await;
272 let workers = self
273 .hummock_manager
274 .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
275 .await?;
276 Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
277 summary: Some(PinnedVersionsSummary {
278 pinned_versions,
279 workers,
280 }),
281 }))
282 }
283
284 async fn get_assigned_compact_task_num(
285 &self,
286 _request: Request<GetAssignedCompactTaskNumRequest>,
287 ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
288 let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
289 Ok(Response::new(GetAssignedCompactTaskNumResponse {
290 num_tasks: num_tasks as u32,
291 }))
292 }
293
294 async fn rise_ctl_list_compaction_group(
295 &self,
296 _request: Request<RiseCtlListCompactionGroupRequest>,
297 ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
298 let compaction_groups = self.hummock_manager.list_compaction_group().await;
299 Ok(Response::new(RiseCtlListCompactionGroupResponse {
300 status: None,
301 compaction_groups,
302 }))
303 }
304
305 async fn rise_ctl_update_compaction_config(
306 &self,
307 request: Request<RiseCtlUpdateCompactionConfigRequest>,
308 ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
309 let RiseCtlUpdateCompactionConfigRequest {
310 compaction_group_ids,
311 configs,
312 } = request.into_inner();
313 self.hummock_manager
314 .update_compaction_config(
315 compaction_group_ids.as_slice(),
316 configs
317 .into_iter()
318 .map(|c| c.mutable_config.unwrap())
319 .collect::<Vec<_>>()
320 .as_slice(),
321 )
322 .await?;
323 Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
324 status: None,
325 }))
326 }
327
328 async fn init_metadata_for_replay(
329 &self,
330 request: Request<InitMetadataForReplayRequest>,
331 ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
332 let InitMetadataForReplayRequest {
333 tables,
334 compaction_groups,
335 } = request.into_inner();
336
337 self.hummock_manager
338 .init_metadata_for_version_replay(tables, compaction_groups)?;
339 Ok(Response::new(InitMetadataForReplayResponse {}))
340 }
341
342 async fn pin_version(
343 &self,
344 request: Request<PinVersionRequest>,
345 ) -> Result<Response<PinVersionResponse>, Status> {
346 let req = request.into_inner();
347 let version = self.hummock_manager.pin_version(req.context_id).await?;
348 Ok(Response::new(PinVersionResponse {
349 pinned_version: Some(version.into()),
350 }))
351 }
352
353 async fn split_compaction_group(
354 &self,
355 request: Request<SplitCompactionGroupRequest>,
356 ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
357 let req = request.into_inner();
358 let new_group_id = self
359 .hummock_manager
360 .move_state_tables_to_dedicated_compaction_group(
361 req.group_id,
362 &req.table_ids,
363 if req.partition_vnode_count > 0 {
364 Some(req.partition_vnode_count)
365 } else {
366 None
367 },
368 )
369 .await?
370 .0;
371 Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
372 }
373
374 async fn rise_ctl_pause_version_checkpoint(
375 &self,
376 _request: Request<RiseCtlPauseVersionCheckpointRequest>,
377 ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
378 self.hummock_manager.pause_version_checkpoint();
379 Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
380 }
381
382 async fn rise_ctl_resume_version_checkpoint(
383 &self,
384 _request: Request<RiseCtlResumeVersionCheckpointRequest>,
385 ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
386 self.hummock_manager.resume_version_checkpoint();
387 Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
388 }
389
390 async fn rise_ctl_get_checkpoint_version(
391 &self,
392 _request: Request<RiseCtlGetCheckpointVersionRequest>,
393 ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
394 let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
395 Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
396 checkpoint_version: Some(checkpoint_version.into()),
397 }))
398 }
399
400 async fn rise_ctl_list_compaction_status(
401 &self,
402 _request: Request<RiseCtlListCompactionStatusRequest>,
403 ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
404 let (compaction_statuses, task_assignment) =
405 self.hummock_manager.list_compaction_status().await;
406 let task_progress = self.hummock_manager.compactor_manager.get_progress();
407 Ok(Response::new(RiseCtlListCompactionStatusResponse {
408 compaction_statuses,
409 task_assignment,
410 task_progress,
411 }))
412 }
413
414 async fn subscribe_compaction_event(
415 &self,
416 request: Request<Streaming<SubscribeCompactionEventRequest>>,
417 ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
418 let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
419 let register_req = {
420 let req = request_stream.next().await.ok_or_else(|| {
421 Status::invalid_argument("subscribe_compaction_event request is empty")
422 })??;
423
424 match req.event {
425 Some(RequestEvent::Register(register)) => register,
426 _ => {
427 return Err(Status::invalid_argument(
428 "the first message must be `Register`",
429 ));
430 }
431 }
432 };
433
434 let context_id = register_req.context_id;
435
436 if !self.hummock_manager.check_context(context_id).await? {
439 return Err(Status::new(
440 tonic::Code::Internal,
441 format!("invalid hummock context {}", context_id),
442 ));
443 }
444 let compactor_manager = self.hummock_manager.compactor_manager.clone();
445
446 let rx: tokio::sync::mpsc::UnboundedReceiver<
447 Result<SubscribeCompactionEventResponse, crate::MetaError>,
448 > = compactor_manager.add_compactor(context_id);
449
450 self.hummock_manager
452 .add_compactor_stream(context_id, request_stream);
453
454 for cg_id in self.hummock_manager.compaction_group_ids().await {
456 self.hummock_manager
457 .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
458 }
459
460 Ok(Response::new(RwReceiverStream::new(rx)))
461 }
462
463 async fn subscribe_iceberg_compaction_event(
464 &self,
465 request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
466 ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
467 let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
468 request.into_inner();
469 let register_req = {
470 let req = request_stream.next().await.ok_or_else(|| {
471 Status::invalid_argument("subscribe_compaction_event request is empty")
472 })??;
473
474 match req.event {
475 Some(IcebergRequestEvent::Register(register)) => register,
476 _ => {
477 return Err(Status::invalid_argument(
478 "the first message must be `Register`",
479 ));
480 }
481 }
482 };
483
484 let context_id = register_req.context_id;
485
486 if !self.hummock_manager.check_context(context_id).await? {
489 return Err(Status::new(
490 tonic::Code::Internal,
491 format!("invalid hummock context {}", context_id),
492 ));
493 }
494
495 let rx: tokio::sync::mpsc::UnboundedReceiver<
496 Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
497 > = self
498 .iceberg_compaction_manager
499 .iceberg_compactor_manager
500 .add_compactor(context_id);
501
502 self.iceberg_compaction_manager
503 .add_compactor_stream(context_id, request_stream);
504
505 Ok(Response::new(RwReceiverStream::new(rx)))
508 }
509
510 async fn report_compaction_task(
511 &self,
512 _request: Request<ReportCompactionTaskRequest>,
513 ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
514 unreachable!()
515 }
516
517 async fn list_branched_object(
518 &self,
519 _request: Request<ListBranchedObjectRequest>,
520 ) -> Result<Response<ListBranchedObjectResponse>, Status> {
521 let branched_objects = self
522 .hummock_manager
523 .list_branched_objects()
524 .await
525 .into_iter()
526 .flat_map(|(object_id, v)| {
527 v.into_iter()
528 .map(move |(compaction_group_id, sst_ids)| BranchedObject {
529 object_id,
530 sst_id: sst_ids,
531 compaction_group_id,
532 })
533 })
534 .collect();
535 Ok(Response::new(ListBranchedObjectResponse {
536 branched_objects,
537 }))
538 }
539
540 async fn list_active_write_limit(
541 &self,
542 _request: Request<ListActiveWriteLimitRequest>,
543 ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
544 Ok(Response::new(ListActiveWriteLimitResponse {
545 write_limits: self.hummock_manager.write_limits().await,
546 }))
547 }
548
549 async fn list_hummock_meta_config(
550 &self,
551 _request: Request<ListHummockMetaConfigRequest>,
552 ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
553 let opt = &self.hummock_manager.env.opts;
554 let configs = fields_to_kvs!(
555 opt,
556 vacuum_interval_sec,
557 vacuum_spin_interval_ms,
558 hummock_version_checkpoint_interval_sec,
559 min_delta_log_num_for_hummock_version_checkpoint,
560 min_sst_retention_time_sec,
561 full_gc_interval_sec,
562 periodic_compaction_interval_sec,
563 periodic_space_reclaim_compaction_interval_sec,
564 periodic_ttl_reclaim_compaction_interval_sec,
565 periodic_tombstone_reclaim_compaction_interval_sec,
566 periodic_scheduling_compaction_group_split_interval_sec,
567 do_not_config_object_storage_lifecycle,
568 partition_vnode_count,
569 table_high_write_throughput_threshold,
570 table_low_write_throughput_threshold,
571 compaction_task_max_heartbeat_interval_secs,
572 periodic_scheduling_compaction_group_merge_interval_sec
573 );
574 Ok(Response::new(ListHummockMetaConfigResponse { configs }))
575 }
576
577 async fn rise_ctl_rebuild_table_stats(
578 &self,
579 _request: Request<RiseCtlRebuildTableStatsRequest>,
580 ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
581 self.hummock_manager.rebuild_table_stats().await?;
582 Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
583 }
584
585 async fn get_compaction_score(
586 &self,
587 request: Request<GetCompactionScoreRequest>,
588 ) -> Result<Response<GetCompactionScoreResponse>, Status> {
589 let compaction_group_id = request.into_inner().compaction_group_id;
590 let scores = self
591 .hummock_manager
592 .get_compaction_scores(compaction_group_id)
593 .await
594 .into_iter()
595 .map(|s| PickerInfo {
596 score: s.score,
597 select_level: s.select_level as _,
598 target_level: s.target_level as _,
599 picker_type: s.picker_type.to_string(),
600 })
601 .collect();
602 Ok(Response::new(GetCompactionScoreResponse {
603 compaction_group_id,
604 scores,
605 }))
606 }
607
608 async fn list_compact_task_assignment(
609 &self,
610 _request: Request<ListCompactTaskAssignmentRequest>,
611 ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
612 let (_compaction_statuses, task_assignment) =
613 self.hummock_manager.list_compaction_status().await;
614 Ok(Response::new(ListCompactTaskAssignmentResponse {
615 task_assignment,
616 }))
617 }
618
619 async fn list_compact_task_progress(
620 &self,
621 _request: Request<ListCompactTaskProgressRequest>,
622 ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
623 let task_progress = self.hummock_manager.compactor_manager.get_progress();
624
625 Ok(Response::new(ListCompactTaskProgressResponse {
626 task_progress,
627 }))
628 }
629
630 async fn cancel_compact_task(
631 &self,
632 request: Request<CancelCompactTaskRequest>,
633 ) -> Result<Response<CancelCompactTaskResponse>, Status> {
634 let request = request.into_inner();
635 let ret = self
636 .hummock_manager
637 .cancel_compact_task(
638 request.task_id,
639 PbTaskStatus::try_from(request.task_status).unwrap(),
640 )
641 .await?;
642
643 let response = Response::new(CancelCompactTaskResponse { ret });
644 return Ok(response);
645 }
646
647 async fn get_version_by_epoch(
648 &self,
649 request: Request<GetVersionByEpochRequest>,
650 ) -> Result<Response<GetVersionByEpochResponse>, Status> {
651 let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
652 let version = self
653 .hummock_manager
654 .epoch_to_version(epoch, table_id)
655 .await?;
656 Ok(Response::new(GetVersionByEpochResponse {
657 version: Some(version.to_protobuf()),
658 }))
659 }
660
661 async fn merge_compaction_group(
662 &self,
663 request: Request<MergeCompactionGroupRequest>,
664 ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
665 let req = request.into_inner();
666 self.hummock_manager
667 .merge_compaction_group(req.left_group_id, req.right_group_id)
668 .await?;
669 Ok(Response::new(MergeCompactionGroupResponse {}))
670 }
671}
672
673#[cfg(test)]
674mod tests {
675 use std::collections::HashMap;
676 #[test]
677 fn test_fields_to_kvs() {
678 struct S {
679 foo: u64,
680 bar: String,
681 }
682 let s = S {
683 foo: 15,
684 bar: "foobar".to_owned(),
685 };
686 let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
687 assert_eq!(kvs.len(), 2);
688 assert_eq!(kvs.get("foo").unwrap(), "15");
689 assert_eq!(kvs.get("bar").unwrap(), "foobar");
690 }
691}