1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::SYS_CATALOG_START_ID;
22use risingwave_common::id::JobId;
23use risingwave_hummock_sdk::HummockVersionId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::version::HummockVersionDelta;
26use risingwave_meta::backup_restore::BackupManagerRef;
27use risingwave_meta::manager::MetadataManager;
28use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
29use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
30use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
31use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
32use risingwave_pb::hummock::*;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
34use risingwave_pb::iceberg_compaction::{
35 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
36};
37use tonic::{Request, Response, Status, Streaming};
38
39use crate::RwReceiverStream;
40use crate::hummock::HummockManagerRef;
41use crate::hummock::compaction::selector::ManualCompactionOption;
42
43pub struct HummockServiceImpl {
44 hummock_manager: HummockManagerRef,
45 metadata_manager: MetadataManager,
46 backup_manager: BackupManagerRef,
47 iceberg_compaction_manager: IcebergCompactionManagerRef,
48}
49
50impl HummockServiceImpl {
51 pub fn new(
52 hummock_manager: HummockManagerRef,
53 metadata_manager: MetadataManager,
54 backup_manager: BackupManagerRef,
55 iceberg_compaction_manager: IcebergCompactionManagerRef,
56 ) -> Self {
57 HummockServiceImpl {
58 hummock_manager,
59 metadata_manager,
60 backup_manager,
61 iceberg_compaction_manager,
62 }
63 }
64}
65
66macro_rules! fields_to_kvs {
67 ($struct:ident, $($field:ident),*) => {
68 {
69 let mut kvs = HashMap::default();
70 $(
71 kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
72 )*
73 kvs
74 }
75 }
76}
77
78#[async_trait::async_trait]
79impl HummockManagerService for HummockServiceImpl {
80 type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
81 type SubscribeIcebergCompactionEventStream =
82 RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
83
84 async fn unpin_version_before(
85 &self,
86 request: Request<UnpinVersionBeforeRequest>,
87 ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
88 let req = request.into_inner();
89 self.hummock_manager
90 .unpin_version_before(
91 req.context_id,
92 HummockVersionId::new(req.unpin_version_before),
93 )
94 .await?;
95 Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
96 }
97
98 async fn get_current_version(
99 &self,
100 _request: Request<GetCurrentVersionRequest>,
101 ) -> Result<Response<GetCurrentVersionResponse>, Status> {
102 let current_version = self
103 .hummock_manager
104 .on_current_version(|version| version.into())
105 .await;
106 Ok(Response::new(GetCurrentVersionResponse {
107 status: None,
108 current_version: Some(current_version),
109 }))
110 }
111
112 async fn replay_version_delta(
113 &self,
114 request: Request<ReplayVersionDeltaRequest>,
115 ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
116 let req = request.into_inner();
117 let (version, compaction_groups) = self
118 .hummock_manager
119 .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
120 &req.version_delta.unwrap(),
121 ))
122 .await?;
123 Ok(Response::new(ReplayVersionDeltaResponse {
124 version: Some(version.into()),
125 modified_compaction_groups: compaction_groups,
126 }))
127 }
128
129 async fn trigger_compaction_deterministic(
130 &self,
131 request: Request<TriggerCompactionDeterministicRequest>,
132 ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
133 let req = request.into_inner();
134 self.hummock_manager
135 .trigger_compaction_deterministic(
136 HummockVersionId::new(req.version_id),
137 req.compaction_groups,
138 )
139 .await?;
140 Ok(Response::new(TriggerCompactionDeterministicResponse {}))
141 }
142
143 async fn disable_commit_epoch(
144 &self,
145 _request: Request<DisableCommitEpochRequest>,
146 ) -> Result<Response<DisableCommitEpochResponse>, Status> {
147 let version = self.hummock_manager.disable_commit_epoch().await;
148 Ok(Response::new(DisableCommitEpochResponse {
149 current_version: Some(version.into()),
150 }))
151 }
152
153 async fn list_version_deltas(
154 &self,
155 request: Request<ListVersionDeltasRequest>,
156 ) -> Result<Response<ListVersionDeltasResponse>, Status> {
157 let req = request.into_inner();
158 let version_deltas = self
159 .hummock_manager
160 .list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit)
161 .await?;
162 let resp = ListVersionDeltasResponse {
163 version_deltas: Some(PbHummockVersionDeltas {
164 version_deltas: version_deltas
165 .into_iter()
166 .map(HummockVersionDelta::into)
167 .collect(),
168 }),
169 };
170 Ok(Response::new(resp))
171 }
172
173 async fn get_new_object_ids(
174 &self,
175 request: Request<GetNewObjectIdsRequest>,
176 ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
177 let object_id_range = self
178 .hummock_manager
179 .get_new_object_ids(request.into_inner().number)
180 .await?;
181 Ok(Response::new(GetNewObjectIdsResponse {
182 status: None,
183 start_id: object_id_range.start_id.inner(),
184 end_id: object_id_range.end_id.inner(),
185 }))
186 }
187
188 async fn trigger_manual_compaction(
189 &self,
190 request: Request<TriggerManualCompactionRequest>,
191 ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
192 let request = request.into_inner();
193 let compaction_group_id = request.compaction_group_id;
194 let mut option = ManualCompactionOption {
195 level: request.level as usize,
196 sst_ids: request.sst_ids.into_iter().map(|id| id.into()).collect(),
197 ..Default::default()
198 };
199
200 match request.key_range {
202 Some(pb_key_range) => {
203 option.key_range = KeyRange {
204 left: pb_key_range.left.into(),
205 right: pb_key_range.right.into(),
206 right_exclusive: pb_key_range.right_exclusive,
207 };
208 }
209
210 None => {
211 option.key_range = KeyRange::default();
212 }
213 }
214
215 if request.table_id < SYS_CATALOG_START_ID as u32 {
217 let job_id = JobId::new(request.table_id);
219 if let Ok(table_fragment) = self.metadata_manager.get_job_fragments_by_id(job_id).await
220 {
221 option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
222 }
223 }
224
225 assert!(
226 option
227 .internal_table_id
228 .iter()
229 .all(|table_id| table_id.as_raw_id() < SYS_CATALOG_START_ID as u32),
230 );
231
232 tracing::info!(
233 "Try trigger_manual_compaction compaction_group_id {} option {:?}",
234 compaction_group_id,
235 &option
236 );
237
238 self.hummock_manager
239 .trigger_manual_compaction(compaction_group_id, option)
240 .await?;
241
242 Ok(Response::new(TriggerManualCompactionResponse {
243 status: None,
244 }))
245 }
246
247 async fn trigger_full_gc(
248 &self,
249 request: Request<TriggerFullGcRequest>,
250 ) -> Result<Response<TriggerFullGcResponse>, Status> {
251 let req = request.into_inner();
252 let backup_manager_2 = self.backup_manager.clone();
253 let hummock_manager_2 = self.hummock_manager.clone();
254 tokio::task::spawn(async move {
255 use thiserror_ext::AsReport;
256 let _ = hummock_manager_2
257 .start_full_gc(
258 Duration::from_secs(req.sst_retention_time_sec),
259 req.prefix,
260 Some(backup_manager_2),
261 )
262 .await
263 .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
264 });
265 Ok(Response::new(TriggerFullGcResponse { status: None }))
266 }
267
268 async fn rise_ctl_get_pinned_versions_summary(
269 &self,
270 _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
271 ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
272 let pinned_versions = self.hummock_manager.list_pinned_version().await;
273 let workers = self
274 .hummock_manager
275 .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
276 .await?;
277 Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
278 summary: Some(PinnedVersionsSummary {
279 pinned_versions,
280 workers,
281 }),
282 }))
283 }
284
285 async fn get_assigned_compact_task_num(
286 &self,
287 _request: Request<GetAssignedCompactTaskNumRequest>,
288 ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
289 let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
290 Ok(Response::new(GetAssignedCompactTaskNumResponse {
291 num_tasks: num_tasks as u32,
292 }))
293 }
294
295 async fn rise_ctl_list_compaction_group(
296 &self,
297 _request: Request<RiseCtlListCompactionGroupRequest>,
298 ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
299 let compaction_groups = self.hummock_manager.list_compaction_group().await;
300 Ok(Response::new(RiseCtlListCompactionGroupResponse {
301 status: None,
302 compaction_groups,
303 }))
304 }
305
306 async fn rise_ctl_update_compaction_config(
307 &self,
308 request: Request<RiseCtlUpdateCompactionConfigRequest>,
309 ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
310 let RiseCtlUpdateCompactionConfigRequest {
311 compaction_group_ids,
312 configs,
313 } = request.into_inner();
314 self.hummock_manager
315 .update_compaction_config(
316 compaction_group_ids.as_slice(),
317 configs
318 .into_iter()
319 .map(|c| c.mutable_config.unwrap())
320 .collect::<Vec<_>>()
321 .as_slice(),
322 )
323 .await?;
324 Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
325 status: None,
326 }))
327 }
328
329 async fn init_metadata_for_replay(
330 &self,
331 request: Request<InitMetadataForReplayRequest>,
332 ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
333 let InitMetadataForReplayRequest {
334 tables,
335 compaction_groups,
336 } = request.into_inner();
337
338 self.hummock_manager
339 .init_metadata_for_version_replay(tables, compaction_groups)?;
340 Ok(Response::new(InitMetadataForReplayResponse {}))
341 }
342
343 async fn pin_version(
344 &self,
345 request: Request<PinVersionRequest>,
346 ) -> Result<Response<PinVersionResponse>, Status> {
347 let req = request.into_inner();
348 let version = self.hummock_manager.pin_version(req.context_id).await?;
349 Ok(Response::new(PinVersionResponse {
350 pinned_version: Some(version.into()),
351 }))
352 }
353
354 async fn split_compaction_group(
355 &self,
356 request: Request<SplitCompactionGroupRequest>,
357 ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
358 let req = request.into_inner();
359 let new_group_id = self
360 .hummock_manager
361 .move_state_tables_to_dedicated_compaction_group(
362 req.group_id,
363 &req.table_ids.into_iter().map_into().collect_vec(),
364 if req.partition_vnode_count > 0 {
365 Some(req.partition_vnode_count)
366 } else {
367 None
368 },
369 )
370 .await?
371 .0;
372 Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
373 }
374
375 async fn rise_ctl_pause_version_checkpoint(
376 &self,
377 _request: Request<RiseCtlPauseVersionCheckpointRequest>,
378 ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
379 self.hummock_manager.pause_version_checkpoint();
380 Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
381 }
382
383 async fn rise_ctl_resume_version_checkpoint(
384 &self,
385 _request: Request<RiseCtlResumeVersionCheckpointRequest>,
386 ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
387 self.hummock_manager.resume_version_checkpoint();
388 Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
389 }
390
391 async fn rise_ctl_get_checkpoint_version(
392 &self,
393 _request: Request<RiseCtlGetCheckpointVersionRequest>,
394 ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
395 let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
396 Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
397 checkpoint_version: Some(checkpoint_version.into()),
398 }))
399 }
400
401 async fn rise_ctl_list_compaction_status(
402 &self,
403 _request: Request<RiseCtlListCompactionStatusRequest>,
404 ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
405 let (compaction_statuses, task_assignment) =
406 self.hummock_manager.list_compaction_status().await;
407 let task_progress = self.hummock_manager.compactor_manager.get_progress();
408 Ok(Response::new(RiseCtlListCompactionStatusResponse {
409 compaction_statuses,
410 task_assignment,
411 task_progress,
412 }))
413 }
414
415 async fn subscribe_compaction_event(
416 &self,
417 request: Request<Streaming<SubscribeCompactionEventRequest>>,
418 ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
419 let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
420 let register_req = {
421 let req = request_stream.next().await.ok_or_else(|| {
422 Status::invalid_argument("subscribe_compaction_event request is empty")
423 })??;
424
425 match req.event {
426 Some(RequestEvent::Register(register)) => register,
427 _ => {
428 return Err(Status::invalid_argument(
429 "the first message must be `Register`",
430 ));
431 }
432 }
433 };
434
435 let context_id = register_req.context_id;
436
437 if !self.hummock_manager.check_context(context_id).await? {
440 return Err(Status::new(
441 tonic::Code::Internal,
442 format!("invalid hummock context {}", context_id),
443 ));
444 }
445 let compactor_manager = self.hummock_manager.compactor_manager.clone();
446
447 let rx: tokio::sync::mpsc::UnboundedReceiver<
448 Result<SubscribeCompactionEventResponse, crate::MetaError>,
449 > = compactor_manager.add_compactor(context_id);
450
451 self.hummock_manager
453 .add_compactor_stream(context_id, request_stream);
454
455 for cg_id in self.hummock_manager.compaction_group_ids().await {
457 self.hummock_manager
458 .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
459 }
460
461 Ok(Response::new(RwReceiverStream::new(rx)))
462 }
463
464 async fn subscribe_iceberg_compaction_event(
465 &self,
466 request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
467 ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
468 let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
469 request.into_inner();
470 let register_req = {
471 let req = request_stream.next().await.ok_or_else(|| {
472 Status::invalid_argument("subscribe_compaction_event request is empty")
473 })??;
474
475 match req.event {
476 Some(IcebergRequestEvent::Register(register)) => register,
477 _ => {
478 return Err(Status::invalid_argument(
479 "the first message must be `Register`",
480 ));
481 }
482 }
483 };
484
485 let context_id = register_req.context_id;
486
487 if !self.hummock_manager.check_context(context_id).await? {
490 return Err(Status::new(
491 tonic::Code::Internal,
492 format!("invalid hummock context {}", context_id),
493 ));
494 }
495
496 let rx: tokio::sync::mpsc::UnboundedReceiver<
497 Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
498 > = self
499 .iceberg_compaction_manager
500 .iceberg_compactor_manager
501 .add_compactor(context_id);
502
503 self.iceberg_compaction_manager
504 .add_compactor_stream(context_id, request_stream);
505
506 Ok(Response::new(RwReceiverStream::new(rx)))
509 }
510
511 async fn report_compaction_task(
512 &self,
513 _request: Request<ReportCompactionTaskRequest>,
514 ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
515 unreachable!()
516 }
517
518 async fn list_branched_object(
519 &self,
520 _request: Request<ListBranchedObjectRequest>,
521 ) -> Result<Response<ListBranchedObjectResponse>, Status> {
522 let branched_objects = self
523 .hummock_manager
524 .list_branched_objects()
525 .await
526 .into_iter()
527 .flat_map(|(object_id, v)| {
528 v.into_iter()
529 .map(move |(compaction_group_id, sst_ids)| BranchedObject {
530 object_id: object_id.inner(),
531 sst_id: sst_ids.into_iter().map(|id| id.inner()).collect(),
532 compaction_group_id,
533 })
534 })
535 .collect();
536 Ok(Response::new(ListBranchedObjectResponse {
537 branched_objects,
538 }))
539 }
540
541 async fn list_active_write_limit(
542 &self,
543 _request: Request<ListActiveWriteLimitRequest>,
544 ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
545 Ok(Response::new(ListActiveWriteLimitResponse {
546 write_limits: self.hummock_manager.write_limits().await,
547 }))
548 }
549
550 async fn list_hummock_meta_config(
551 &self,
552 _request: Request<ListHummockMetaConfigRequest>,
553 ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
554 let opt = &self.hummock_manager.env.opts;
555 let configs = fields_to_kvs!(
556 opt,
557 vacuum_interval_sec,
558 vacuum_spin_interval_ms,
559 hummock_version_checkpoint_interval_sec,
560 min_delta_log_num_for_hummock_version_checkpoint,
561 min_sst_retention_time_sec,
562 full_gc_interval_sec,
563 periodic_compaction_interval_sec,
564 periodic_space_reclaim_compaction_interval_sec,
565 periodic_ttl_reclaim_compaction_interval_sec,
566 periodic_tombstone_reclaim_compaction_interval_sec,
567 periodic_scheduling_compaction_group_split_interval_sec,
568 do_not_config_object_storage_lifecycle,
569 partition_vnode_count,
570 table_high_write_throughput_threshold,
571 table_low_write_throughput_threshold,
572 compaction_task_max_heartbeat_interval_secs,
573 periodic_scheduling_compaction_group_merge_interval_sec
574 );
575 Ok(Response::new(ListHummockMetaConfigResponse { configs }))
576 }
577
578 async fn rise_ctl_rebuild_table_stats(
579 &self,
580 _request: Request<RiseCtlRebuildTableStatsRequest>,
581 ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
582 self.hummock_manager.rebuild_table_stats().await?;
583 Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
584 }
585
586 async fn get_compaction_score(
587 &self,
588 request: Request<GetCompactionScoreRequest>,
589 ) -> Result<Response<GetCompactionScoreResponse>, Status> {
590 let compaction_group_id = request.into_inner().compaction_group_id;
591 let scores = self
592 .hummock_manager
593 .get_compaction_scores(compaction_group_id)
594 .await
595 .into_iter()
596 .map(|s| PickerInfo {
597 score: s.score,
598 select_level: s.select_level as _,
599 target_level: s.target_level as _,
600 picker_type: s.picker_type.to_string(),
601 })
602 .collect();
603 Ok(Response::new(GetCompactionScoreResponse {
604 compaction_group_id,
605 scores,
606 }))
607 }
608
609 async fn list_compact_task_assignment(
610 &self,
611 _request: Request<ListCompactTaskAssignmentRequest>,
612 ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
613 let (_compaction_statuses, task_assignment) =
614 self.hummock_manager.list_compaction_status().await;
615 Ok(Response::new(ListCompactTaskAssignmentResponse {
616 task_assignment,
617 }))
618 }
619
620 async fn list_compact_task_progress(
621 &self,
622 _request: Request<ListCompactTaskProgressRequest>,
623 ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
624 let task_progress = self.hummock_manager.compactor_manager.get_progress();
625
626 Ok(Response::new(ListCompactTaskProgressResponse {
627 task_progress,
628 }))
629 }
630
631 async fn cancel_compact_task(
632 &self,
633 request: Request<CancelCompactTaskRequest>,
634 ) -> Result<Response<CancelCompactTaskResponse>, Status> {
635 let request = request.into_inner();
636 let ret = self
637 .hummock_manager
638 .cancel_compact_task(
639 request.task_id,
640 PbTaskStatus::try_from(request.task_status).unwrap(),
641 )
642 .await?;
643
644 let response = Response::new(CancelCompactTaskResponse { ret });
645 return Ok(response);
646 }
647
648 async fn get_version_by_epoch(
649 &self,
650 request: Request<GetVersionByEpochRequest>,
651 ) -> Result<Response<GetVersionByEpochResponse>, Status> {
652 let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
653 let version = self
654 .hummock_manager
655 .epoch_to_version(epoch, table_id.into())
656 .await?;
657 Ok(Response::new(GetVersionByEpochResponse {
658 version: Some(version.to_protobuf()),
659 }))
660 }
661
662 async fn merge_compaction_group(
663 &self,
664 request: Request<MergeCompactionGroupRequest>,
665 ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
666 let req = request.into_inner();
667 self.hummock_manager
668 .merge_compaction_group(req.left_group_id, req.right_group_id)
669 .await?;
670 Ok(Response::new(MergeCompactionGroupResponse {}))
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use std::collections::HashMap;
677 #[test]
678 fn test_fields_to_kvs() {
679 struct S {
680 foo: u64,
681 bar: String,
682 }
683 let s = S {
684 foo: 15,
685 bar: "foobar".to_owned(),
686 };
687 let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
688 assert_eq!(kvs.len(), 2);
689 assert_eq!(kvs.get("foo").unwrap(), "15");
690 assert_eq!(kvs.get("bar").unwrap(), "foobar");
691 }
692}