risingwave_meta_service/
hummock_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::{SYS_CATALOG_START_ID, TableId};
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::version::HummockVersionDelta;
25use risingwave_meta::backup_restore::BackupManagerRef;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
28use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
29use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
30use risingwave_pb::hummock::*;
31use tonic::{Request, Response, Status, Streaming};
32
33use crate::RwReceiverStream;
34use crate::hummock::HummockManagerRef;
35use crate::hummock::compaction::selector::ManualCompactionOption;
36
37pub struct HummockServiceImpl {
38    hummock_manager: HummockManagerRef,
39    metadata_manager: MetadataManager,
40    backup_manager: BackupManagerRef,
41}
42
43impl HummockServiceImpl {
44    pub fn new(
45        hummock_manager: HummockManagerRef,
46        metadata_manager: MetadataManager,
47        backup_manager: BackupManagerRef,
48    ) -> Self {
49        HummockServiceImpl {
50            hummock_manager,
51            metadata_manager,
52            backup_manager,
53        }
54    }
55}
56
57macro_rules! fields_to_kvs {
58    ($struct:ident, $($field:ident),*) => {
59        {
60            let mut kvs = HashMap::default();
61            $(
62                kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
63            )*
64            kvs
65        }
66    }
67}
68
69#[async_trait::async_trait]
70impl HummockManagerService for HummockServiceImpl {
71    type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
72
73    async fn unpin_version_before(
74        &self,
75        request: Request<UnpinVersionBeforeRequest>,
76    ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
77        let req = request.into_inner();
78        self.hummock_manager
79            .unpin_version_before(
80                req.context_id,
81                HummockVersionId::new(req.unpin_version_before),
82            )
83            .await?;
84        Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
85    }
86
87    async fn get_current_version(
88        &self,
89        _request: Request<GetCurrentVersionRequest>,
90    ) -> Result<Response<GetCurrentVersionResponse>, Status> {
91        let current_version = self
92            .hummock_manager
93            .on_current_version(|version| version.into())
94            .await;
95        Ok(Response::new(GetCurrentVersionResponse {
96            status: None,
97            current_version: Some(current_version),
98        }))
99    }
100
101    async fn replay_version_delta(
102        &self,
103        request: Request<ReplayVersionDeltaRequest>,
104    ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
105        let req = request.into_inner();
106        let (version, compaction_groups) = self
107            .hummock_manager
108            .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
109                &req.version_delta.unwrap(),
110            ))
111            .await?;
112        Ok(Response::new(ReplayVersionDeltaResponse {
113            version: Some(version.into()),
114            modified_compaction_groups: compaction_groups,
115        }))
116    }
117
118    async fn trigger_compaction_deterministic(
119        &self,
120        request: Request<TriggerCompactionDeterministicRequest>,
121    ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
122        let req = request.into_inner();
123        self.hummock_manager
124            .trigger_compaction_deterministic(
125                HummockVersionId::new(req.version_id),
126                req.compaction_groups,
127            )
128            .await?;
129        Ok(Response::new(TriggerCompactionDeterministicResponse {}))
130    }
131
132    async fn disable_commit_epoch(
133        &self,
134        _request: Request<DisableCommitEpochRequest>,
135    ) -> Result<Response<DisableCommitEpochResponse>, Status> {
136        let version = self.hummock_manager.disable_commit_epoch().await;
137        Ok(Response::new(DisableCommitEpochResponse {
138            current_version: Some(version.into()),
139        }))
140    }
141
142    async fn list_version_deltas(
143        &self,
144        request: Request<ListVersionDeltasRequest>,
145    ) -> Result<Response<ListVersionDeltasResponse>, Status> {
146        let req = request.into_inner();
147        let version_deltas = self
148            .hummock_manager
149            .list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit)
150            .await?;
151        let resp = ListVersionDeltasResponse {
152            version_deltas: Some(PbHummockVersionDeltas {
153                version_deltas: version_deltas
154                    .into_iter()
155                    .map(HummockVersionDelta::into)
156                    .collect(),
157            }),
158        };
159        Ok(Response::new(resp))
160    }
161
162    async fn get_new_sst_ids(
163        &self,
164        request: Request<GetNewSstIdsRequest>,
165    ) -> Result<Response<GetNewSstIdsResponse>, Status> {
166        let sst_id_range = self
167            .hummock_manager
168            .get_new_sst_ids(request.into_inner().number)
169            .await?;
170        Ok(Response::new(GetNewSstIdsResponse {
171            status: None,
172            start_id: sst_id_range.start_id,
173            end_id: sst_id_range.end_id,
174        }))
175    }
176
177    async fn trigger_manual_compaction(
178        &self,
179        request: Request<TriggerManualCompactionRequest>,
180    ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
181        let request = request.into_inner();
182        let compaction_group_id = request.compaction_group_id;
183        let mut option = ManualCompactionOption {
184            level: request.level as usize,
185            sst_ids: request.sst_ids,
186            ..Default::default()
187        };
188
189        // rewrite the key_range
190        match request.key_range {
191            Some(pb_key_range) => {
192                option.key_range = KeyRange {
193                    left: pb_key_range.left.into(),
194                    right: pb_key_range.right.into(),
195                    right_exclusive: pb_key_range.right_exclusive,
196                };
197            }
198
199            None => {
200                option.key_range = KeyRange::default();
201            }
202        }
203
204        // get internal_table_id by metadata_manger
205        if request.table_id < SYS_CATALOG_START_ID as u32 {
206            // We need to make sure to use the correct table_id to filter sst
207            let table_id = TableId::new(request.table_id);
208            if let Ok(table_fragment) = self
209                .metadata_manager
210                .get_job_fragments_by_id(&table_id)
211                .await
212            {
213                option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
214            }
215        }
216
217        assert!(
218            option
219                .internal_table_id
220                .iter()
221                .all(|table_id| *table_id < SYS_CATALOG_START_ID as u32),
222        );
223
224        tracing::info!(
225            "Try trigger_manual_compaction compaction_group_id {} option {:?}",
226            compaction_group_id,
227            &option
228        );
229
230        self.hummock_manager
231            .trigger_manual_compaction(compaction_group_id, option)
232            .await?;
233
234        Ok(Response::new(TriggerManualCompactionResponse {
235            status: None,
236        }))
237    }
238
239    async fn trigger_full_gc(
240        &self,
241        request: Request<TriggerFullGcRequest>,
242    ) -> Result<Response<TriggerFullGcResponse>, Status> {
243        let req = request.into_inner();
244        let backup_manager_2 = self.backup_manager.clone();
245        let hummock_manager_2 = self.hummock_manager.clone();
246        tokio::task::spawn(async move {
247            use thiserror_ext::AsReport;
248            let _ = hummock_manager_2
249                .start_full_gc(
250                    Duration::from_secs(req.sst_retention_time_sec),
251                    req.prefix,
252                    Some(backup_manager_2),
253                )
254                .await
255                .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
256        });
257        Ok(Response::new(TriggerFullGcResponse { status: None }))
258    }
259
260    async fn rise_ctl_get_pinned_versions_summary(
261        &self,
262        _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
263    ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
264        let pinned_versions = self.hummock_manager.list_pinned_version().await;
265        let workers = self
266            .hummock_manager
267            .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
268            .await?;
269        Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
270            summary: Some(PinnedVersionsSummary {
271                pinned_versions,
272                workers,
273            }),
274        }))
275    }
276
277    async fn get_assigned_compact_task_num(
278        &self,
279        _request: Request<GetAssignedCompactTaskNumRequest>,
280    ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
281        let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
282        Ok(Response::new(GetAssignedCompactTaskNumResponse {
283            num_tasks: num_tasks as u32,
284        }))
285    }
286
287    async fn rise_ctl_list_compaction_group(
288        &self,
289        _request: Request<RiseCtlListCompactionGroupRequest>,
290    ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
291        let compaction_groups = self.hummock_manager.list_compaction_group().await;
292        Ok(Response::new(RiseCtlListCompactionGroupResponse {
293            status: None,
294            compaction_groups,
295        }))
296    }
297
298    async fn rise_ctl_update_compaction_config(
299        &self,
300        request: Request<RiseCtlUpdateCompactionConfigRequest>,
301    ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
302        let RiseCtlUpdateCompactionConfigRequest {
303            compaction_group_ids,
304            configs,
305        } = request.into_inner();
306        self.hummock_manager
307            .update_compaction_config(
308                compaction_group_ids.as_slice(),
309                configs
310                    .into_iter()
311                    .map(|c| c.mutable_config.unwrap())
312                    .collect::<Vec<_>>()
313                    .as_slice(),
314            )
315            .await?;
316        Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
317            status: None,
318        }))
319    }
320
321    async fn init_metadata_for_replay(
322        &self,
323        request: Request<InitMetadataForReplayRequest>,
324    ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
325        let InitMetadataForReplayRequest {
326            tables,
327            compaction_groups,
328        } = request.into_inner();
329
330        self.hummock_manager
331            .init_metadata_for_version_replay(tables, compaction_groups)?;
332        Ok(Response::new(InitMetadataForReplayResponse {}))
333    }
334
335    async fn pin_version(
336        &self,
337        request: Request<PinVersionRequest>,
338    ) -> Result<Response<PinVersionResponse>, Status> {
339        let req = request.into_inner();
340        let version = self.hummock_manager.pin_version(req.context_id).await?;
341        Ok(Response::new(PinVersionResponse {
342            pinned_version: Some(version.into()),
343        }))
344    }
345
346    async fn split_compaction_group(
347        &self,
348        request: Request<SplitCompactionGroupRequest>,
349    ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
350        let req = request.into_inner();
351        let new_group_id = self
352            .hummock_manager
353            .move_state_tables_to_dedicated_compaction_group(
354                req.group_id,
355                &req.table_ids,
356                if req.partition_vnode_count > 0 {
357                    Some(req.partition_vnode_count)
358                } else {
359                    None
360                },
361            )
362            .await?
363            .0;
364        Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
365    }
366
367    async fn rise_ctl_pause_version_checkpoint(
368        &self,
369        _request: Request<RiseCtlPauseVersionCheckpointRequest>,
370    ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
371        self.hummock_manager.pause_version_checkpoint();
372        Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
373    }
374
375    async fn rise_ctl_resume_version_checkpoint(
376        &self,
377        _request: Request<RiseCtlResumeVersionCheckpointRequest>,
378    ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
379        self.hummock_manager.resume_version_checkpoint();
380        Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
381    }
382
383    async fn rise_ctl_get_checkpoint_version(
384        &self,
385        _request: Request<RiseCtlGetCheckpointVersionRequest>,
386    ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
387        let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
388        Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
389            checkpoint_version: Some(checkpoint_version.into()),
390        }))
391    }
392
393    async fn rise_ctl_list_compaction_status(
394        &self,
395        _request: Request<RiseCtlListCompactionStatusRequest>,
396    ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
397        let (compaction_statuses, task_assignment) =
398            self.hummock_manager.list_compaction_status().await;
399        let task_progress = self.hummock_manager.compactor_manager.get_progress();
400        Ok(Response::new(RiseCtlListCompactionStatusResponse {
401            compaction_statuses,
402            task_assignment,
403            task_progress,
404        }))
405    }
406
407    async fn subscribe_compaction_event(
408        &self,
409        request: Request<Streaming<SubscribeCompactionEventRequest>>,
410    ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
411        let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
412        let register_req = {
413            let req = request_stream.next().await.ok_or_else(|| {
414                Status::invalid_argument("subscribe_compaction_event request is empty")
415            })??;
416
417            match req.event {
418                Some(RequestEvent::Register(register)) => register,
419                _ => {
420                    return Err(Status::invalid_argument(
421                        "the first message must be `Register`",
422                    ));
423                }
424            }
425        };
426
427        let context_id = register_req.context_id;
428
429        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
430        // remove invalid compactor eventually.
431        if !self.hummock_manager.check_context(context_id).await? {
432            return Err(Status::new(
433                tonic::Code::Internal,
434                format!("invalid hummock context {}", context_id),
435            ));
436        }
437        let compactor_manager = self.hummock_manager.compactor_manager.clone();
438
439        let rx: tokio::sync::mpsc::UnboundedReceiver<
440            Result<SubscribeCompactionEventResponse, crate::MetaError>,
441        > = compactor_manager.add_compactor(context_id);
442
443        // register request stream to hummock
444        self.hummock_manager
445            .add_compactor_stream(context_id, request_stream);
446
447        // Trigger compaction on all compaction groups.
448        for cg_id in self.hummock_manager.compaction_group_ids().await {
449            self.hummock_manager
450                .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
451        }
452
453        Ok(Response::new(RwReceiverStream::new(rx)))
454    }
455
456    async fn report_compaction_task(
457        &self,
458        _request: Request<ReportCompactionTaskRequest>,
459    ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
460        unreachable!()
461    }
462
463    async fn list_branched_object(
464        &self,
465        _request: Request<ListBranchedObjectRequest>,
466    ) -> Result<Response<ListBranchedObjectResponse>, Status> {
467        let branched_objects = self
468            .hummock_manager
469            .list_branched_objects()
470            .await
471            .into_iter()
472            .flat_map(|(object_id, v)| {
473                v.into_iter()
474                    .map(move |(compaction_group_id, sst_id)| BranchedObject {
475                        object_id,
476                        sst_id,
477                        compaction_group_id,
478                    })
479            })
480            .collect();
481        Ok(Response::new(ListBranchedObjectResponse {
482            branched_objects,
483        }))
484    }
485
486    async fn list_active_write_limit(
487        &self,
488        _request: Request<ListActiveWriteLimitRequest>,
489    ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
490        Ok(Response::new(ListActiveWriteLimitResponse {
491            write_limits: self.hummock_manager.write_limits().await,
492        }))
493    }
494
495    async fn list_hummock_meta_config(
496        &self,
497        _request: Request<ListHummockMetaConfigRequest>,
498    ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
499        let opt = &self.hummock_manager.env.opts;
500        let configs = fields_to_kvs!(
501            opt,
502            vacuum_interval_sec,
503            vacuum_spin_interval_ms,
504            hummock_version_checkpoint_interval_sec,
505            min_delta_log_num_for_hummock_version_checkpoint,
506            min_sst_retention_time_sec,
507            full_gc_interval_sec,
508            periodic_compaction_interval_sec,
509            periodic_space_reclaim_compaction_interval_sec,
510            periodic_ttl_reclaim_compaction_interval_sec,
511            periodic_tombstone_reclaim_compaction_interval_sec,
512            periodic_scheduling_compaction_group_split_interval_sec,
513            do_not_config_object_storage_lifecycle,
514            partition_vnode_count,
515            table_high_write_throughput_threshold,
516            table_low_write_throughput_threshold,
517            compaction_task_max_heartbeat_interval_secs,
518            periodic_scheduling_compaction_group_merge_interval_sec
519        );
520        Ok(Response::new(ListHummockMetaConfigResponse { configs }))
521    }
522
523    async fn rise_ctl_rebuild_table_stats(
524        &self,
525        _request: Request<RiseCtlRebuildTableStatsRequest>,
526    ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
527        self.hummock_manager.rebuild_table_stats().await?;
528        Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
529    }
530
531    async fn get_compaction_score(
532        &self,
533        request: Request<GetCompactionScoreRequest>,
534    ) -> Result<Response<GetCompactionScoreResponse>, Status> {
535        let compaction_group_id = request.into_inner().compaction_group_id;
536        let scores = self
537            .hummock_manager
538            .get_compaction_scores(compaction_group_id)
539            .await
540            .into_iter()
541            .map(|s| PickerInfo {
542                score: s.score,
543                select_level: s.select_level as _,
544                target_level: s.target_level as _,
545                picker_type: s.picker_type.to_string(),
546            })
547            .collect();
548        Ok(Response::new(GetCompactionScoreResponse {
549            compaction_group_id,
550            scores,
551        }))
552    }
553
554    async fn list_compact_task_assignment(
555        &self,
556        _request: Request<ListCompactTaskAssignmentRequest>,
557    ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
558        let (_compaction_statuses, task_assignment) =
559            self.hummock_manager.list_compaction_status().await;
560        Ok(Response::new(ListCompactTaskAssignmentResponse {
561            task_assignment,
562        }))
563    }
564
565    async fn list_compact_task_progress(
566        &self,
567        _request: Request<ListCompactTaskProgressRequest>,
568    ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
569        let task_progress = self.hummock_manager.compactor_manager.get_progress();
570
571        Ok(Response::new(ListCompactTaskProgressResponse {
572            task_progress,
573        }))
574    }
575
576    async fn cancel_compact_task(
577        &self,
578        request: Request<CancelCompactTaskRequest>,
579    ) -> Result<Response<CancelCompactTaskResponse>, Status> {
580        let request = request.into_inner();
581        let ret = self
582            .hummock_manager
583            .cancel_compact_task(
584                request.task_id,
585                PbTaskStatus::try_from(request.task_status).unwrap(),
586            )
587            .await?;
588
589        let response = Response::new(CancelCompactTaskResponse { ret });
590        return Ok(response);
591    }
592
593    async fn get_version_by_epoch(
594        &self,
595        request: Request<GetVersionByEpochRequest>,
596    ) -> Result<Response<GetVersionByEpochResponse>, Status> {
597        let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
598        let version = self
599            .hummock_manager
600            .epoch_to_version(epoch, table_id)
601            .await?;
602        Ok(Response::new(GetVersionByEpochResponse {
603            version: Some(version.to_protobuf()),
604        }))
605    }
606
607    async fn merge_compaction_group(
608        &self,
609        request: Request<MergeCompactionGroupRequest>,
610    ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
611        let req = request.into_inner();
612        self.hummock_manager
613            .merge_compaction_group(req.left_group_id, req.right_group_id)
614            .await?;
615        Ok(Response::new(MergeCompactionGroupResponse {}))
616    }
617}
618
619#[cfg(test)]
620mod tests {
621    use std::collections::HashMap;
622    #[test]
623    fn test_fields_to_kvs() {
624        struct S {
625            foo: u64,
626            bar: String,
627        }
628        let s = S {
629            foo: 15,
630            bar: "foobar".to_owned(),
631        };
632        let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
633        assert_eq!(kvs.len(), 2);
634        assert_eq!(kvs.get("foo").unwrap(), "15");
635        assert_eq!(kvs.get("bar").unwrap(), "foobar");
636    }
637}