1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::{SYS_CATALOG_START_ID, TableId};
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::version::HummockVersionDelta;
25use risingwave_meta::backup_restore::BackupManagerRef;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
28use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
29use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
30use risingwave_pb::hummock::*;
31use tonic::{Request, Response, Status, Streaming};
32
33use crate::RwReceiverStream;
34use crate::hummock::HummockManagerRef;
35use crate::hummock::compaction::selector::ManualCompactionOption;
36
37pub struct HummockServiceImpl {
38 hummock_manager: HummockManagerRef,
39 metadata_manager: MetadataManager,
40 backup_manager: BackupManagerRef,
41}
42
43impl HummockServiceImpl {
44 pub fn new(
45 hummock_manager: HummockManagerRef,
46 metadata_manager: MetadataManager,
47 backup_manager: BackupManagerRef,
48 ) -> Self {
49 HummockServiceImpl {
50 hummock_manager,
51 metadata_manager,
52 backup_manager,
53 }
54 }
55}
56
57macro_rules! fields_to_kvs {
58 ($struct:ident, $($field:ident),*) => {
59 {
60 let mut kvs = HashMap::default();
61 $(
62 kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
63 )*
64 kvs
65 }
66 }
67}
68
69#[async_trait::async_trait]
70impl HummockManagerService for HummockServiceImpl {
71 type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
72
73 async fn unpin_version_before(
74 &self,
75 request: Request<UnpinVersionBeforeRequest>,
76 ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
77 let req = request.into_inner();
78 self.hummock_manager
79 .unpin_version_before(
80 req.context_id,
81 HummockVersionId::new(req.unpin_version_before),
82 )
83 .await?;
84 Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
85 }
86
87 async fn get_current_version(
88 &self,
89 _request: Request<GetCurrentVersionRequest>,
90 ) -> Result<Response<GetCurrentVersionResponse>, Status> {
91 let current_version = self
92 .hummock_manager
93 .on_current_version(|version| version.into())
94 .await;
95 Ok(Response::new(GetCurrentVersionResponse {
96 status: None,
97 current_version: Some(current_version),
98 }))
99 }
100
101 async fn replay_version_delta(
102 &self,
103 request: Request<ReplayVersionDeltaRequest>,
104 ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
105 let req = request.into_inner();
106 let (version, compaction_groups) = self
107 .hummock_manager
108 .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
109 &req.version_delta.unwrap(),
110 ))
111 .await?;
112 Ok(Response::new(ReplayVersionDeltaResponse {
113 version: Some(version.into()),
114 modified_compaction_groups: compaction_groups,
115 }))
116 }
117
118 async fn trigger_compaction_deterministic(
119 &self,
120 request: Request<TriggerCompactionDeterministicRequest>,
121 ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
122 let req = request.into_inner();
123 self.hummock_manager
124 .trigger_compaction_deterministic(
125 HummockVersionId::new(req.version_id),
126 req.compaction_groups,
127 )
128 .await?;
129 Ok(Response::new(TriggerCompactionDeterministicResponse {}))
130 }
131
132 async fn disable_commit_epoch(
133 &self,
134 _request: Request<DisableCommitEpochRequest>,
135 ) -> Result<Response<DisableCommitEpochResponse>, Status> {
136 let version = self.hummock_manager.disable_commit_epoch().await;
137 Ok(Response::new(DisableCommitEpochResponse {
138 current_version: Some(version.into()),
139 }))
140 }
141
142 async fn list_version_deltas(
143 &self,
144 request: Request<ListVersionDeltasRequest>,
145 ) -> Result<Response<ListVersionDeltasResponse>, Status> {
146 let req = request.into_inner();
147 let version_deltas = self
148 .hummock_manager
149 .list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit)
150 .await?;
151 let resp = ListVersionDeltasResponse {
152 version_deltas: Some(PbHummockVersionDeltas {
153 version_deltas: version_deltas
154 .into_iter()
155 .map(HummockVersionDelta::into)
156 .collect(),
157 }),
158 };
159 Ok(Response::new(resp))
160 }
161
162 async fn get_new_sst_ids(
163 &self,
164 request: Request<GetNewSstIdsRequest>,
165 ) -> Result<Response<GetNewSstIdsResponse>, Status> {
166 let sst_id_range = self
167 .hummock_manager
168 .get_new_sst_ids(request.into_inner().number)
169 .await?;
170 Ok(Response::new(GetNewSstIdsResponse {
171 status: None,
172 start_id: sst_id_range.start_id,
173 end_id: sst_id_range.end_id,
174 }))
175 }
176
177 async fn trigger_manual_compaction(
178 &self,
179 request: Request<TriggerManualCompactionRequest>,
180 ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
181 let request = request.into_inner();
182 let compaction_group_id = request.compaction_group_id;
183 let mut option = ManualCompactionOption {
184 level: request.level as usize,
185 sst_ids: request.sst_ids,
186 ..Default::default()
187 };
188
189 match request.key_range {
191 Some(pb_key_range) => {
192 option.key_range = KeyRange {
193 left: pb_key_range.left.into(),
194 right: pb_key_range.right.into(),
195 right_exclusive: pb_key_range.right_exclusive,
196 };
197 }
198
199 None => {
200 option.key_range = KeyRange::default();
201 }
202 }
203
204 if request.table_id < SYS_CATALOG_START_ID as u32 {
206 let table_id = TableId::new(request.table_id);
208 if let Ok(table_fragment) = self
209 .metadata_manager
210 .get_job_fragments_by_id(&table_id)
211 .await
212 {
213 option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
214 }
215 }
216
217 assert!(
218 option
219 .internal_table_id
220 .iter()
221 .all(|table_id| *table_id < SYS_CATALOG_START_ID as u32),
222 );
223
224 tracing::info!(
225 "Try trigger_manual_compaction compaction_group_id {} option {:?}",
226 compaction_group_id,
227 &option
228 );
229
230 self.hummock_manager
231 .trigger_manual_compaction(compaction_group_id, option)
232 .await?;
233
234 Ok(Response::new(TriggerManualCompactionResponse {
235 status: None,
236 }))
237 }
238
239 async fn trigger_full_gc(
240 &self,
241 request: Request<TriggerFullGcRequest>,
242 ) -> Result<Response<TriggerFullGcResponse>, Status> {
243 let req = request.into_inner();
244 let backup_manager_2 = self.backup_manager.clone();
245 let hummock_manager_2 = self.hummock_manager.clone();
246 tokio::task::spawn(async move {
247 use thiserror_ext::AsReport;
248 let _ = hummock_manager_2
249 .start_full_gc(
250 Duration::from_secs(req.sst_retention_time_sec),
251 req.prefix,
252 Some(backup_manager_2),
253 )
254 .await
255 .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
256 });
257 Ok(Response::new(TriggerFullGcResponse { status: None }))
258 }
259
260 async fn rise_ctl_get_pinned_versions_summary(
261 &self,
262 _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
263 ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
264 let pinned_versions = self.hummock_manager.list_pinned_version().await;
265 let workers = self
266 .hummock_manager
267 .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
268 .await?;
269 Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
270 summary: Some(PinnedVersionsSummary {
271 pinned_versions,
272 workers,
273 }),
274 }))
275 }
276
277 async fn get_assigned_compact_task_num(
278 &self,
279 _request: Request<GetAssignedCompactTaskNumRequest>,
280 ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
281 let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
282 Ok(Response::new(GetAssignedCompactTaskNumResponse {
283 num_tasks: num_tasks as u32,
284 }))
285 }
286
287 async fn rise_ctl_list_compaction_group(
288 &self,
289 _request: Request<RiseCtlListCompactionGroupRequest>,
290 ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
291 let compaction_groups = self.hummock_manager.list_compaction_group().await;
292 Ok(Response::new(RiseCtlListCompactionGroupResponse {
293 status: None,
294 compaction_groups,
295 }))
296 }
297
298 async fn rise_ctl_update_compaction_config(
299 &self,
300 request: Request<RiseCtlUpdateCompactionConfigRequest>,
301 ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
302 let RiseCtlUpdateCompactionConfigRequest {
303 compaction_group_ids,
304 configs,
305 } = request.into_inner();
306 self.hummock_manager
307 .update_compaction_config(
308 compaction_group_ids.as_slice(),
309 configs
310 .into_iter()
311 .map(|c| c.mutable_config.unwrap())
312 .collect::<Vec<_>>()
313 .as_slice(),
314 )
315 .await?;
316 Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
317 status: None,
318 }))
319 }
320
321 async fn init_metadata_for_replay(
322 &self,
323 request: Request<InitMetadataForReplayRequest>,
324 ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
325 let InitMetadataForReplayRequest {
326 tables,
327 compaction_groups,
328 } = request.into_inner();
329
330 self.hummock_manager
331 .init_metadata_for_version_replay(tables, compaction_groups)?;
332 Ok(Response::new(InitMetadataForReplayResponse {}))
333 }
334
335 async fn pin_version(
336 &self,
337 request: Request<PinVersionRequest>,
338 ) -> Result<Response<PinVersionResponse>, Status> {
339 let req = request.into_inner();
340 let version = self.hummock_manager.pin_version(req.context_id).await?;
341 Ok(Response::new(PinVersionResponse {
342 pinned_version: Some(version.into()),
343 }))
344 }
345
346 async fn split_compaction_group(
347 &self,
348 request: Request<SplitCompactionGroupRequest>,
349 ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
350 let req = request.into_inner();
351 let new_group_id = self
352 .hummock_manager
353 .move_state_tables_to_dedicated_compaction_group(
354 req.group_id,
355 &req.table_ids,
356 if req.partition_vnode_count > 0 {
357 Some(req.partition_vnode_count)
358 } else {
359 None
360 },
361 )
362 .await?
363 .0;
364 Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
365 }
366
367 async fn rise_ctl_pause_version_checkpoint(
368 &self,
369 _request: Request<RiseCtlPauseVersionCheckpointRequest>,
370 ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
371 self.hummock_manager.pause_version_checkpoint();
372 Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
373 }
374
375 async fn rise_ctl_resume_version_checkpoint(
376 &self,
377 _request: Request<RiseCtlResumeVersionCheckpointRequest>,
378 ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
379 self.hummock_manager.resume_version_checkpoint();
380 Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
381 }
382
383 async fn rise_ctl_get_checkpoint_version(
384 &self,
385 _request: Request<RiseCtlGetCheckpointVersionRequest>,
386 ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
387 let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
388 Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
389 checkpoint_version: Some(checkpoint_version.into()),
390 }))
391 }
392
393 async fn rise_ctl_list_compaction_status(
394 &self,
395 _request: Request<RiseCtlListCompactionStatusRequest>,
396 ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
397 let (compaction_statuses, task_assignment) =
398 self.hummock_manager.list_compaction_status().await;
399 let task_progress = self.hummock_manager.compactor_manager.get_progress();
400 Ok(Response::new(RiseCtlListCompactionStatusResponse {
401 compaction_statuses,
402 task_assignment,
403 task_progress,
404 }))
405 }
406
407 async fn subscribe_compaction_event(
408 &self,
409 request: Request<Streaming<SubscribeCompactionEventRequest>>,
410 ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
411 let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
412 let register_req = {
413 let req = request_stream.next().await.ok_or_else(|| {
414 Status::invalid_argument("subscribe_compaction_event request is empty")
415 })??;
416
417 match req.event {
418 Some(RequestEvent::Register(register)) => register,
419 _ => {
420 return Err(Status::invalid_argument(
421 "the first message must be `Register`",
422 ));
423 }
424 }
425 };
426
427 let context_id = register_req.context_id;
428
429 if !self.hummock_manager.check_context(context_id).await? {
432 return Err(Status::new(
433 tonic::Code::Internal,
434 format!("invalid hummock context {}", context_id),
435 ));
436 }
437 let compactor_manager = self.hummock_manager.compactor_manager.clone();
438
439 let rx: tokio::sync::mpsc::UnboundedReceiver<
440 Result<SubscribeCompactionEventResponse, crate::MetaError>,
441 > = compactor_manager.add_compactor(context_id);
442
443 self.hummock_manager
445 .add_compactor_stream(context_id, request_stream);
446
447 for cg_id in self.hummock_manager.compaction_group_ids().await {
449 self.hummock_manager
450 .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
451 }
452
453 Ok(Response::new(RwReceiverStream::new(rx)))
454 }
455
456 async fn report_compaction_task(
457 &self,
458 _request: Request<ReportCompactionTaskRequest>,
459 ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
460 unreachable!()
461 }
462
463 async fn list_branched_object(
464 &self,
465 _request: Request<ListBranchedObjectRequest>,
466 ) -> Result<Response<ListBranchedObjectResponse>, Status> {
467 let branched_objects = self
468 .hummock_manager
469 .list_branched_objects()
470 .await
471 .into_iter()
472 .flat_map(|(object_id, v)| {
473 v.into_iter()
474 .map(move |(compaction_group_id, sst_id)| BranchedObject {
475 object_id,
476 sst_id,
477 compaction_group_id,
478 })
479 })
480 .collect();
481 Ok(Response::new(ListBranchedObjectResponse {
482 branched_objects,
483 }))
484 }
485
486 async fn list_active_write_limit(
487 &self,
488 _request: Request<ListActiveWriteLimitRequest>,
489 ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
490 Ok(Response::new(ListActiveWriteLimitResponse {
491 write_limits: self.hummock_manager.write_limits().await,
492 }))
493 }
494
495 async fn list_hummock_meta_config(
496 &self,
497 _request: Request<ListHummockMetaConfigRequest>,
498 ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
499 let opt = &self.hummock_manager.env.opts;
500 let configs = fields_to_kvs!(
501 opt,
502 vacuum_interval_sec,
503 vacuum_spin_interval_ms,
504 hummock_version_checkpoint_interval_sec,
505 min_delta_log_num_for_hummock_version_checkpoint,
506 min_sst_retention_time_sec,
507 full_gc_interval_sec,
508 periodic_compaction_interval_sec,
509 periodic_space_reclaim_compaction_interval_sec,
510 periodic_ttl_reclaim_compaction_interval_sec,
511 periodic_tombstone_reclaim_compaction_interval_sec,
512 periodic_scheduling_compaction_group_split_interval_sec,
513 do_not_config_object_storage_lifecycle,
514 partition_vnode_count,
515 table_high_write_throughput_threshold,
516 table_low_write_throughput_threshold,
517 compaction_task_max_heartbeat_interval_secs,
518 periodic_scheduling_compaction_group_merge_interval_sec
519 );
520 Ok(Response::new(ListHummockMetaConfigResponse { configs }))
521 }
522
523 async fn rise_ctl_rebuild_table_stats(
524 &self,
525 _request: Request<RiseCtlRebuildTableStatsRequest>,
526 ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
527 self.hummock_manager.rebuild_table_stats().await?;
528 Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
529 }
530
531 async fn get_compaction_score(
532 &self,
533 request: Request<GetCompactionScoreRequest>,
534 ) -> Result<Response<GetCompactionScoreResponse>, Status> {
535 let compaction_group_id = request.into_inner().compaction_group_id;
536 let scores = self
537 .hummock_manager
538 .get_compaction_scores(compaction_group_id)
539 .await
540 .into_iter()
541 .map(|s| PickerInfo {
542 score: s.score,
543 select_level: s.select_level as _,
544 target_level: s.target_level as _,
545 picker_type: s.picker_type.to_string(),
546 })
547 .collect();
548 Ok(Response::new(GetCompactionScoreResponse {
549 compaction_group_id,
550 scores,
551 }))
552 }
553
554 async fn list_compact_task_assignment(
555 &self,
556 _request: Request<ListCompactTaskAssignmentRequest>,
557 ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
558 let (_compaction_statuses, task_assignment) =
559 self.hummock_manager.list_compaction_status().await;
560 Ok(Response::new(ListCompactTaskAssignmentResponse {
561 task_assignment,
562 }))
563 }
564
565 async fn list_compact_task_progress(
566 &self,
567 _request: Request<ListCompactTaskProgressRequest>,
568 ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
569 let task_progress = self.hummock_manager.compactor_manager.get_progress();
570
571 Ok(Response::new(ListCompactTaskProgressResponse {
572 task_progress,
573 }))
574 }
575
576 async fn cancel_compact_task(
577 &self,
578 request: Request<CancelCompactTaskRequest>,
579 ) -> Result<Response<CancelCompactTaskResponse>, Status> {
580 let request = request.into_inner();
581 let ret = self
582 .hummock_manager
583 .cancel_compact_task(
584 request.task_id,
585 PbTaskStatus::try_from(request.task_status).unwrap(),
586 )
587 .await?;
588
589 let response = Response::new(CancelCompactTaskResponse { ret });
590 return Ok(response);
591 }
592
593 async fn get_version_by_epoch(
594 &self,
595 request: Request<GetVersionByEpochRequest>,
596 ) -> Result<Response<GetVersionByEpochResponse>, Status> {
597 let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
598 let version = self
599 .hummock_manager
600 .epoch_to_version(epoch, table_id)
601 .await?;
602 Ok(Response::new(GetVersionByEpochResponse {
603 version: Some(version.to_protobuf()),
604 }))
605 }
606
607 async fn merge_compaction_group(
608 &self,
609 request: Request<MergeCompactionGroupRequest>,
610 ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
611 let req = request.into_inner();
612 self.hummock_manager
613 .merge_compaction_group(req.left_group_id, req.right_group_id)
614 .await?;
615 Ok(Response::new(MergeCompactionGroupResponse {}))
616 }
617}
618
619#[cfg(test)]
620mod tests {
621 use std::collections::HashMap;
622 #[test]
623 fn test_fields_to_kvs() {
624 struct S {
625 foo: u64,
626 bar: String,
627 }
628 let s = S {
629 foo: 15,
630 bar: "foobar".to_owned(),
631 };
632 let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
633 assert_eq!(kvs.len(), 2);
634 assert_eq!(kvs.get("foo").unwrap(), "15");
635 assert_eq!(kvs.get("bar").unwrap(), "foobar");
636 }
637}