1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::{SYS_CATALOG_START_ID, TableId};
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::version::HummockVersionDelta;
25use risingwave_meta::backup_restore::BackupManagerRef;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
28use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
29use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
30use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
31use risingwave_pb::hummock::*;
32use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
33use risingwave_pb::iceberg_compaction::{
34 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
35};
36use tonic::{Request, Response, Status, Streaming};
37
38use crate::RwReceiverStream;
39use crate::hummock::HummockManagerRef;
40use crate::hummock::compaction::selector::ManualCompactionOption;
41
42pub struct HummockServiceImpl {
43 hummock_manager: HummockManagerRef,
44 metadata_manager: MetadataManager,
45 backup_manager: BackupManagerRef,
46 iceberg_compaction_manager: IcebergCompactionManagerRef,
47}
48
49impl HummockServiceImpl {
50 pub fn new(
51 hummock_manager: HummockManagerRef,
52 metadata_manager: MetadataManager,
53 backup_manager: BackupManagerRef,
54 iceberg_compaction_manager: IcebergCompactionManagerRef,
55 ) -> Self {
56 HummockServiceImpl {
57 hummock_manager,
58 metadata_manager,
59 backup_manager,
60 iceberg_compaction_manager,
61 }
62 }
63}
64
65macro_rules! fields_to_kvs {
66 ($struct:ident, $($field:ident),*) => {
67 {
68 let mut kvs = HashMap::default();
69 $(
70 kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
71 )*
72 kvs
73 }
74 }
75}
76
77#[async_trait::async_trait]
78impl HummockManagerService for HummockServiceImpl {
79 type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
80 type SubscribeIcebergCompactionEventStream =
81 RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
82
83 async fn unpin_version_before(
84 &self,
85 request: Request<UnpinVersionBeforeRequest>,
86 ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
87 let req = request.into_inner();
88 self.hummock_manager
89 .unpin_version_before(
90 req.context_id,
91 HummockVersionId::new(req.unpin_version_before),
92 )
93 .await?;
94 Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
95 }
96
97 async fn get_current_version(
98 &self,
99 _request: Request<GetCurrentVersionRequest>,
100 ) -> Result<Response<GetCurrentVersionResponse>, Status> {
101 let current_version = self
102 .hummock_manager
103 .on_current_version(|version| version.into())
104 .await;
105 Ok(Response::new(GetCurrentVersionResponse {
106 status: None,
107 current_version: Some(current_version),
108 }))
109 }
110
111 async fn replay_version_delta(
112 &self,
113 request: Request<ReplayVersionDeltaRequest>,
114 ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
115 let req = request.into_inner();
116 let (version, compaction_groups) = self
117 .hummock_manager
118 .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
119 &req.version_delta.unwrap(),
120 ))
121 .await?;
122 Ok(Response::new(ReplayVersionDeltaResponse {
123 version: Some(version.into()),
124 modified_compaction_groups: compaction_groups,
125 }))
126 }
127
128 async fn trigger_compaction_deterministic(
129 &self,
130 request: Request<TriggerCompactionDeterministicRequest>,
131 ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
132 let req = request.into_inner();
133 self.hummock_manager
134 .trigger_compaction_deterministic(
135 HummockVersionId::new(req.version_id),
136 req.compaction_groups,
137 )
138 .await?;
139 Ok(Response::new(TriggerCompactionDeterministicResponse {}))
140 }
141
142 async fn disable_commit_epoch(
143 &self,
144 _request: Request<DisableCommitEpochRequest>,
145 ) -> Result<Response<DisableCommitEpochResponse>, Status> {
146 let version = self.hummock_manager.disable_commit_epoch().await;
147 Ok(Response::new(DisableCommitEpochResponse {
148 current_version: Some(version.into()),
149 }))
150 }
151
152 async fn list_version_deltas(
153 &self,
154 request: Request<ListVersionDeltasRequest>,
155 ) -> Result<Response<ListVersionDeltasResponse>, Status> {
156 let req = request.into_inner();
157 let version_deltas = self
158 .hummock_manager
159 .list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit)
160 .await?;
161 let resp = ListVersionDeltasResponse {
162 version_deltas: Some(PbHummockVersionDeltas {
163 version_deltas: version_deltas
164 .into_iter()
165 .map(HummockVersionDelta::into)
166 .collect(),
167 }),
168 };
169 Ok(Response::new(resp))
170 }
171
172 async fn get_new_object_ids(
173 &self,
174 request: Request<GetNewObjectIdsRequest>,
175 ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
176 let object_id_range = self
177 .hummock_manager
178 .get_new_object_ids(request.into_inner().number)
179 .await?;
180 Ok(Response::new(GetNewObjectIdsResponse {
181 status: None,
182 start_id: object_id_range.start_id.inner(),
183 end_id: object_id_range.end_id.inner(),
184 }))
185 }
186
187 async fn trigger_manual_compaction(
188 &self,
189 request: Request<TriggerManualCompactionRequest>,
190 ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
191 let request = request.into_inner();
192 let compaction_group_id = request.compaction_group_id;
193 let mut option = ManualCompactionOption {
194 level: request.level as usize,
195 sst_ids: request.sst_ids.into_iter().map(|id| id.into()).collect(),
196 ..Default::default()
197 };
198
199 match request.key_range {
201 Some(pb_key_range) => {
202 option.key_range = KeyRange {
203 left: pb_key_range.left.into(),
204 right: pb_key_range.right.into(),
205 right_exclusive: pb_key_range.right_exclusive,
206 };
207 }
208
209 None => {
210 option.key_range = KeyRange::default();
211 }
212 }
213
214 if request.table_id < SYS_CATALOG_START_ID as u32 {
216 let table_id = TableId::new(request.table_id);
218 if let Ok(table_fragment) = self
219 .metadata_manager
220 .get_job_fragments_by_id(&table_id)
221 .await
222 {
223 option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
224 }
225 }
226
227 assert!(
228 option
229 .internal_table_id
230 .iter()
231 .all(|table_id| *table_id < SYS_CATALOG_START_ID as u32),
232 );
233
234 tracing::info!(
235 "Try trigger_manual_compaction compaction_group_id {} option {:?}",
236 compaction_group_id,
237 &option
238 );
239
240 self.hummock_manager
241 .trigger_manual_compaction(compaction_group_id, option)
242 .await?;
243
244 Ok(Response::new(TriggerManualCompactionResponse {
245 status: None,
246 }))
247 }
248
249 async fn trigger_full_gc(
250 &self,
251 request: Request<TriggerFullGcRequest>,
252 ) -> Result<Response<TriggerFullGcResponse>, Status> {
253 let req = request.into_inner();
254 let backup_manager_2 = self.backup_manager.clone();
255 let hummock_manager_2 = self.hummock_manager.clone();
256 tokio::task::spawn(async move {
257 use thiserror_ext::AsReport;
258 let _ = hummock_manager_2
259 .start_full_gc(
260 Duration::from_secs(req.sst_retention_time_sec),
261 req.prefix,
262 Some(backup_manager_2),
263 )
264 .await
265 .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
266 });
267 Ok(Response::new(TriggerFullGcResponse { status: None }))
268 }
269
270 async fn rise_ctl_get_pinned_versions_summary(
271 &self,
272 _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
273 ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
274 let pinned_versions = self.hummock_manager.list_pinned_version().await;
275 let workers = self
276 .hummock_manager
277 .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
278 .await?;
279 Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
280 summary: Some(PinnedVersionsSummary {
281 pinned_versions,
282 workers,
283 }),
284 }))
285 }
286
287 async fn get_assigned_compact_task_num(
288 &self,
289 _request: Request<GetAssignedCompactTaskNumRequest>,
290 ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
291 let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
292 Ok(Response::new(GetAssignedCompactTaskNumResponse {
293 num_tasks: num_tasks as u32,
294 }))
295 }
296
297 async fn rise_ctl_list_compaction_group(
298 &self,
299 _request: Request<RiseCtlListCompactionGroupRequest>,
300 ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
301 let compaction_groups = self.hummock_manager.list_compaction_group().await;
302 Ok(Response::new(RiseCtlListCompactionGroupResponse {
303 status: None,
304 compaction_groups,
305 }))
306 }
307
308 async fn rise_ctl_update_compaction_config(
309 &self,
310 request: Request<RiseCtlUpdateCompactionConfigRequest>,
311 ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
312 let RiseCtlUpdateCompactionConfigRequest {
313 compaction_group_ids,
314 configs,
315 } = request.into_inner();
316 self.hummock_manager
317 .update_compaction_config(
318 compaction_group_ids.as_slice(),
319 configs
320 .into_iter()
321 .map(|c| c.mutable_config.unwrap())
322 .collect::<Vec<_>>()
323 .as_slice(),
324 )
325 .await?;
326 Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
327 status: None,
328 }))
329 }
330
331 async fn init_metadata_for_replay(
332 &self,
333 request: Request<InitMetadataForReplayRequest>,
334 ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
335 let InitMetadataForReplayRequest {
336 tables,
337 compaction_groups,
338 } = request.into_inner();
339
340 self.hummock_manager
341 .init_metadata_for_version_replay(tables, compaction_groups)?;
342 Ok(Response::new(InitMetadataForReplayResponse {}))
343 }
344
345 async fn pin_version(
346 &self,
347 request: Request<PinVersionRequest>,
348 ) -> Result<Response<PinVersionResponse>, Status> {
349 let req = request.into_inner();
350 let version = self.hummock_manager.pin_version(req.context_id).await?;
351 Ok(Response::new(PinVersionResponse {
352 pinned_version: Some(version.into()),
353 }))
354 }
355
356 async fn split_compaction_group(
357 &self,
358 request: Request<SplitCompactionGroupRequest>,
359 ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
360 let req = request.into_inner();
361 let new_group_id = self
362 .hummock_manager
363 .move_state_tables_to_dedicated_compaction_group(
364 req.group_id,
365 &req.table_ids,
366 if req.partition_vnode_count > 0 {
367 Some(req.partition_vnode_count)
368 } else {
369 None
370 },
371 )
372 .await?
373 .0;
374 Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
375 }
376
377 async fn rise_ctl_pause_version_checkpoint(
378 &self,
379 _request: Request<RiseCtlPauseVersionCheckpointRequest>,
380 ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
381 self.hummock_manager.pause_version_checkpoint();
382 Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
383 }
384
385 async fn rise_ctl_resume_version_checkpoint(
386 &self,
387 _request: Request<RiseCtlResumeVersionCheckpointRequest>,
388 ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
389 self.hummock_manager.resume_version_checkpoint();
390 Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
391 }
392
393 async fn rise_ctl_get_checkpoint_version(
394 &self,
395 _request: Request<RiseCtlGetCheckpointVersionRequest>,
396 ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
397 let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
398 Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
399 checkpoint_version: Some(checkpoint_version.into()),
400 }))
401 }
402
403 async fn rise_ctl_list_compaction_status(
404 &self,
405 _request: Request<RiseCtlListCompactionStatusRequest>,
406 ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
407 let (compaction_statuses, task_assignment) =
408 self.hummock_manager.list_compaction_status().await;
409 let task_progress = self.hummock_manager.compactor_manager.get_progress();
410 Ok(Response::new(RiseCtlListCompactionStatusResponse {
411 compaction_statuses,
412 task_assignment,
413 task_progress,
414 }))
415 }
416
417 async fn subscribe_compaction_event(
418 &self,
419 request: Request<Streaming<SubscribeCompactionEventRequest>>,
420 ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
421 let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
422 let register_req = {
423 let req = request_stream.next().await.ok_or_else(|| {
424 Status::invalid_argument("subscribe_compaction_event request is empty")
425 })??;
426
427 match req.event {
428 Some(RequestEvent::Register(register)) => register,
429 _ => {
430 return Err(Status::invalid_argument(
431 "the first message must be `Register`",
432 ));
433 }
434 }
435 };
436
437 let context_id = register_req.context_id;
438
439 if !self.hummock_manager.check_context(context_id).await? {
442 return Err(Status::new(
443 tonic::Code::Internal,
444 format!("invalid hummock context {}", context_id),
445 ));
446 }
447 let compactor_manager = self.hummock_manager.compactor_manager.clone();
448
449 let rx: tokio::sync::mpsc::UnboundedReceiver<
450 Result<SubscribeCompactionEventResponse, crate::MetaError>,
451 > = compactor_manager.add_compactor(context_id);
452
453 self.hummock_manager
455 .add_compactor_stream(context_id, request_stream);
456
457 for cg_id in self.hummock_manager.compaction_group_ids().await {
459 self.hummock_manager
460 .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
461 }
462
463 Ok(Response::new(RwReceiverStream::new(rx)))
464 }
465
466 async fn subscribe_iceberg_compaction_event(
467 &self,
468 request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
469 ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
470 let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
471 request.into_inner();
472 let register_req = {
473 let req = request_stream.next().await.ok_or_else(|| {
474 Status::invalid_argument("subscribe_compaction_event request is empty")
475 })??;
476
477 match req.event {
478 Some(IcebergRequestEvent::Register(register)) => register,
479 _ => {
480 return Err(Status::invalid_argument(
481 "the first message must be `Register`",
482 ));
483 }
484 }
485 };
486
487 let context_id = register_req.context_id;
488
489 if !self.hummock_manager.check_context(context_id).await? {
492 return Err(Status::new(
493 tonic::Code::Internal,
494 format!("invalid hummock context {}", context_id),
495 ));
496 }
497
498 let rx: tokio::sync::mpsc::UnboundedReceiver<
499 Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
500 > = self
501 .iceberg_compaction_manager
502 .iceberg_compactor_manager
503 .add_compactor(context_id);
504
505 self.iceberg_compaction_manager
506 .add_compactor_stream(context_id, request_stream);
507
508 Ok(Response::new(RwReceiverStream::new(rx)))
511 }
512
513 async fn report_compaction_task(
514 &self,
515 _request: Request<ReportCompactionTaskRequest>,
516 ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
517 unreachable!()
518 }
519
520 async fn list_branched_object(
521 &self,
522 _request: Request<ListBranchedObjectRequest>,
523 ) -> Result<Response<ListBranchedObjectResponse>, Status> {
524 let branched_objects = self
525 .hummock_manager
526 .list_branched_objects()
527 .await
528 .into_iter()
529 .flat_map(|(object_id, v)| {
530 v.into_iter()
531 .map(move |(compaction_group_id, sst_ids)| BranchedObject {
532 object_id: object_id.inner(),
533 sst_id: sst_ids.into_iter().map(|id| id.inner()).collect(),
534 compaction_group_id,
535 })
536 })
537 .collect();
538 Ok(Response::new(ListBranchedObjectResponse {
539 branched_objects,
540 }))
541 }
542
543 async fn list_active_write_limit(
544 &self,
545 _request: Request<ListActiveWriteLimitRequest>,
546 ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
547 Ok(Response::new(ListActiveWriteLimitResponse {
548 write_limits: self.hummock_manager.write_limits().await,
549 }))
550 }
551
552 async fn list_hummock_meta_config(
553 &self,
554 _request: Request<ListHummockMetaConfigRequest>,
555 ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
556 let opt = &self.hummock_manager.env.opts;
557 let configs = fields_to_kvs!(
558 opt,
559 vacuum_interval_sec,
560 vacuum_spin_interval_ms,
561 hummock_version_checkpoint_interval_sec,
562 min_delta_log_num_for_hummock_version_checkpoint,
563 min_sst_retention_time_sec,
564 full_gc_interval_sec,
565 periodic_compaction_interval_sec,
566 periodic_space_reclaim_compaction_interval_sec,
567 periodic_ttl_reclaim_compaction_interval_sec,
568 periodic_tombstone_reclaim_compaction_interval_sec,
569 periodic_scheduling_compaction_group_split_interval_sec,
570 do_not_config_object_storage_lifecycle,
571 partition_vnode_count,
572 table_high_write_throughput_threshold,
573 table_low_write_throughput_threshold,
574 compaction_task_max_heartbeat_interval_secs,
575 periodic_scheduling_compaction_group_merge_interval_sec
576 );
577 Ok(Response::new(ListHummockMetaConfigResponse { configs }))
578 }
579
580 async fn rise_ctl_rebuild_table_stats(
581 &self,
582 _request: Request<RiseCtlRebuildTableStatsRequest>,
583 ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
584 self.hummock_manager.rebuild_table_stats().await?;
585 Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
586 }
587
588 async fn get_compaction_score(
589 &self,
590 request: Request<GetCompactionScoreRequest>,
591 ) -> Result<Response<GetCompactionScoreResponse>, Status> {
592 let compaction_group_id = request.into_inner().compaction_group_id;
593 let scores = self
594 .hummock_manager
595 .get_compaction_scores(compaction_group_id)
596 .await
597 .into_iter()
598 .map(|s| PickerInfo {
599 score: s.score,
600 select_level: s.select_level as _,
601 target_level: s.target_level as _,
602 picker_type: s.picker_type.to_string(),
603 })
604 .collect();
605 Ok(Response::new(GetCompactionScoreResponse {
606 compaction_group_id,
607 scores,
608 }))
609 }
610
611 async fn list_compact_task_assignment(
612 &self,
613 _request: Request<ListCompactTaskAssignmentRequest>,
614 ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
615 let (_compaction_statuses, task_assignment) =
616 self.hummock_manager.list_compaction_status().await;
617 Ok(Response::new(ListCompactTaskAssignmentResponse {
618 task_assignment,
619 }))
620 }
621
622 async fn list_compact_task_progress(
623 &self,
624 _request: Request<ListCompactTaskProgressRequest>,
625 ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
626 let task_progress = self.hummock_manager.compactor_manager.get_progress();
627
628 Ok(Response::new(ListCompactTaskProgressResponse {
629 task_progress,
630 }))
631 }
632
633 async fn cancel_compact_task(
634 &self,
635 request: Request<CancelCompactTaskRequest>,
636 ) -> Result<Response<CancelCompactTaskResponse>, Status> {
637 let request = request.into_inner();
638 let ret = self
639 .hummock_manager
640 .cancel_compact_task(
641 request.task_id,
642 PbTaskStatus::try_from(request.task_status).unwrap(),
643 )
644 .await?;
645
646 let response = Response::new(CancelCompactTaskResponse { ret });
647 return Ok(response);
648 }
649
650 async fn get_version_by_epoch(
651 &self,
652 request: Request<GetVersionByEpochRequest>,
653 ) -> Result<Response<GetVersionByEpochResponse>, Status> {
654 let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
655 let version = self
656 .hummock_manager
657 .epoch_to_version(epoch, table_id)
658 .await?;
659 Ok(Response::new(GetVersionByEpochResponse {
660 version: Some(version.to_protobuf()),
661 }))
662 }
663
664 async fn merge_compaction_group(
665 &self,
666 request: Request<MergeCompactionGroupRequest>,
667 ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
668 let req = request.into_inner();
669 self.hummock_manager
670 .merge_compaction_group(req.left_group_id, req.right_group_id)
671 .await?;
672 Ok(Response::new(MergeCompactionGroupResponse {}))
673 }
674}
675
676#[cfg(test)]
677mod tests {
678 use std::collections::HashMap;
679 #[test]
680 fn test_fields_to_kvs() {
681 struct S {
682 foo: u64,
683 bar: String,
684 }
685 let s = S {
686 foo: 15,
687 bar: "foobar".to_owned(),
688 };
689 let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
690 assert_eq!(kvs.len(), 2);
691 assert_eq!(kvs.get("foo").unwrap(), "15");
692 assert_eq!(kvs.get("bar").unwrap(), "foobar");
693 }
694}