risingwave_meta_service/
hummock_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::SYS_CATALOG_START_ID;
22use risingwave_hummock_sdk::key_range::KeyRange;
23use risingwave_hummock_sdk::version::HummockVersionDelta;
24use risingwave_meta::backup_restore::BackupManagerRef;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
27use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
28use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
29use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
30use risingwave_pb::hummock::*;
31use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
32use risingwave_pb::iceberg_compaction::{
33    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
34};
35use tonic::{Request, Response, Status, Streaming};
36
37use crate::RwReceiverStream;
38use crate::hummock::compaction::selector::ManualCompactionOption;
39use crate::hummock::{HummockManagerRef, ManualCompactionTriggerResult};
40
41pub struct HummockServiceImpl {
42    hummock_manager: HummockManagerRef,
43    metadata_manager: MetadataManager,
44    backup_manager: BackupManagerRef,
45    iceberg_compaction_manager: IcebergCompactionManagerRef,
46}
47
48impl HummockServiceImpl {
49    pub fn new(
50        hummock_manager: HummockManagerRef,
51        metadata_manager: MetadataManager,
52        backup_manager: BackupManagerRef,
53        iceberg_compaction_manager: IcebergCompactionManagerRef,
54    ) -> Self {
55        HummockServiceImpl {
56            hummock_manager,
57            metadata_manager,
58            backup_manager,
59            iceberg_compaction_manager,
60        }
61    }
62}
63
64macro_rules! fields_to_kvs {
65    ($struct:ident, $($field:ident),*) => {
66        {
67            let mut kvs = HashMap::default();
68            $(
69                kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
70            )*
71            kvs
72        }
73    }
74}
75
76#[async_trait::async_trait]
77impl HummockManagerService for HummockServiceImpl {
78    type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
79    type SubscribeIcebergCompactionEventStream =
80        RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
81
82    async fn unpin_version_before(
83        &self,
84        request: Request<UnpinVersionBeforeRequest>,
85    ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
86        let req = request.into_inner();
87        self.hummock_manager
88            .unpin_version_before(req.context_id, req.unpin_version_before)
89            .await?;
90        Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
91    }
92
93    async fn get_current_version(
94        &self,
95        _request: Request<GetCurrentVersionRequest>,
96    ) -> Result<Response<GetCurrentVersionResponse>, Status> {
97        let current_version = self
98            .hummock_manager
99            .on_current_version(|version| version.into())
100            .await;
101        Ok(Response::new(GetCurrentVersionResponse {
102            status: None,
103            current_version: Some(current_version),
104        }))
105    }
106
107    async fn replay_version_delta(
108        &self,
109        request: Request<ReplayVersionDeltaRequest>,
110    ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
111        let req = request.into_inner();
112        let (version, compaction_groups) = self
113            .hummock_manager
114            .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
115                &req.version_delta.unwrap(),
116            ))
117            .await?;
118        Ok(Response::new(ReplayVersionDeltaResponse {
119            version: Some(version.into()),
120            modified_compaction_groups: compaction_groups,
121        }))
122    }
123
124    async fn trigger_compaction_deterministic(
125        &self,
126        request: Request<TriggerCompactionDeterministicRequest>,
127    ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
128        let req = request.into_inner();
129        self.hummock_manager
130            .trigger_compaction_deterministic(req.version_id, req.compaction_groups)
131            .await?;
132        Ok(Response::new(TriggerCompactionDeterministicResponse {}))
133    }
134
135    async fn disable_commit_epoch(
136        &self,
137        _request: Request<DisableCommitEpochRequest>,
138    ) -> Result<Response<DisableCommitEpochResponse>, Status> {
139        let version = self.hummock_manager.disable_commit_epoch().await;
140        Ok(Response::new(DisableCommitEpochResponse {
141            current_version: Some(version.into()),
142        }))
143    }
144
145    async fn list_version_deltas(
146        &self,
147        request: Request<ListVersionDeltasRequest>,
148    ) -> Result<Response<ListVersionDeltasResponse>, Status> {
149        let req = request.into_inner();
150        let version_deltas = self
151            .hummock_manager
152            .list_version_deltas(req.start_id, req.num_limit)
153            .await?;
154        let resp = ListVersionDeltasResponse {
155            version_deltas: Some(PbHummockVersionDeltas {
156                version_deltas: version_deltas
157                    .into_iter()
158                    .map(HummockVersionDelta::into)
159                    .collect(),
160            }),
161        };
162        Ok(Response::new(resp))
163    }
164
165    async fn get_new_object_ids(
166        &self,
167        request: Request<GetNewObjectIdsRequest>,
168    ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
169        let object_id_range = self
170            .hummock_manager
171            .get_new_object_ids(request.into_inner().number)
172            .await?;
173        Ok(Response::new(GetNewObjectIdsResponse {
174            status: None,
175            start_id: object_id_range.start_id,
176            end_id: object_id_range.end_id,
177        }))
178    }
179
180    async fn trigger_manual_compaction(
181        &self,
182        request: Request<TriggerManualCompactionRequest>,
183    ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
184        let request = request.into_inner();
185        let compaction_group_id = request.compaction_group_id;
186        let mut option = ManualCompactionOption {
187            level: request.level as usize,
188            sst_ids: request.sst_ids,
189            exclusive: request.exclusive.unwrap_or(false),
190            ..Default::default()
191        };
192
193        // rewrite the key_range
194        match request.key_range {
195            Some(pb_key_range) => {
196                option.key_range = KeyRange {
197                    left: pb_key_range.left.into(),
198                    right: pb_key_range.right.into(),
199                    right_exclusive: pb_key_range.right_exclusive,
200                };
201            }
202
203            None => {
204                option.key_range = KeyRange::default();
205            }
206        }
207
208        // get internal_table_id by metadata_manger
209        if request.table_id.as_raw_id() < SYS_CATALOG_START_ID as u32 {
210            // We need to make sure to use the correct table_id to filter sst
211            let job_id = request.table_id;
212            if let Ok(table_fragment) = self.metadata_manager.get_job_fragments_by_id(job_id).await
213            {
214                option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
215            }
216        }
217
218        assert!(
219            option
220                .internal_table_id
221                .iter()
222                .all(|table_id| table_id.as_raw_id() < SYS_CATALOG_START_ID as u32),
223        );
224
225        tracing::info!(
226            "Try trigger_manual_compaction compaction_group_id {} option {:?}",
227            compaction_group_id,
228            &option
229        );
230
231        let should_retry = match self
232            .hummock_manager
233            .trigger_manual_compaction(compaction_group_id, option)
234            .await?
235        {
236            ManualCompactionTriggerResult::Submitted => false,
237            ManualCompactionTriggerResult::Retry => true,
238        };
239
240        Ok(Response::new(TriggerManualCompactionResponse {
241            status: None,
242            should_retry: Some(should_retry),
243        }))
244    }
245
246    async fn trigger_full_gc(
247        &self,
248        request: Request<TriggerFullGcRequest>,
249    ) -> Result<Response<TriggerFullGcResponse>, Status> {
250        let req = request.into_inner();
251        let backup_manager_2 = self.backup_manager.clone();
252        let hummock_manager_2 = self.hummock_manager.clone();
253        tokio::task::spawn(async move {
254            use thiserror_ext::AsReport;
255            let _ = hummock_manager_2
256                .start_full_gc(
257                    Duration::from_secs(req.sst_retention_time_sec),
258                    req.prefix,
259                    Some(backup_manager_2),
260                )
261                .await
262                .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
263        });
264        Ok(Response::new(TriggerFullGcResponse { status: None }))
265    }
266
267    async fn rise_ctl_get_pinned_versions_summary(
268        &self,
269        _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
270    ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
271        let pinned_versions = self.hummock_manager.list_pinned_version().await;
272        let workers = self
273            .hummock_manager
274            .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
275            .await?;
276        Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
277            summary: Some(PinnedVersionsSummary {
278                pinned_versions,
279                workers,
280            }),
281        }))
282    }
283
284    async fn get_assigned_compact_task_num(
285        &self,
286        _request: Request<GetAssignedCompactTaskNumRequest>,
287    ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
288        let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
289        Ok(Response::new(GetAssignedCompactTaskNumResponse {
290            num_tasks: num_tasks as u32,
291        }))
292    }
293
294    async fn rise_ctl_list_compaction_group(
295        &self,
296        _request: Request<RiseCtlListCompactionGroupRequest>,
297    ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
298        let compaction_groups = self.hummock_manager.list_compaction_group().await;
299        Ok(Response::new(RiseCtlListCompactionGroupResponse {
300            status: None,
301            compaction_groups,
302        }))
303    }
304
305    async fn rise_ctl_update_compaction_config(
306        &self,
307        request: Request<RiseCtlUpdateCompactionConfigRequest>,
308    ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
309        let RiseCtlUpdateCompactionConfigRequest {
310            compaction_group_ids,
311            configs,
312        } = request.into_inner();
313        self.hummock_manager
314            .update_compaction_config(
315                compaction_group_ids.as_slice(),
316                configs
317                    .into_iter()
318                    .map(|c| c.mutable_config.unwrap())
319                    .collect::<Vec<_>>()
320                    .as_slice(),
321            )
322            .await?;
323        Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
324            status: None,
325        }))
326    }
327
328    async fn init_metadata_for_replay(
329        &self,
330        request: Request<InitMetadataForReplayRequest>,
331    ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
332        let InitMetadataForReplayRequest {
333            tables,
334            compaction_groups,
335        } = request.into_inner();
336
337        self.hummock_manager
338            .init_metadata_for_version_replay(tables, compaction_groups)?;
339        Ok(Response::new(InitMetadataForReplayResponse {}))
340    }
341
342    async fn pin_version(
343        &self,
344        request: Request<PinVersionRequest>,
345    ) -> Result<Response<PinVersionResponse>, Status> {
346        let req = request.into_inner();
347        let version = self.hummock_manager.pin_version(req.context_id).await?;
348        Ok(Response::new(PinVersionResponse {
349            pinned_version: Some(version.into()),
350        }))
351    }
352
353    async fn split_compaction_group(
354        &self,
355        request: Request<SplitCompactionGroupRequest>,
356    ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
357        let req = request.into_inner();
358        let new_group_id = self
359            .hummock_manager
360            .move_state_tables_to_dedicated_compaction_group(
361                req.group_id,
362                &req.table_ids,
363                if req.partition_vnode_count > 0 {
364                    Some(req.partition_vnode_count)
365                } else {
366                    None
367                },
368            )
369            .await?
370            .0;
371        Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
372    }
373
374    async fn rise_ctl_pause_version_checkpoint(
375        &self,
376        _request: Request<RiseCtlPauseVersionCheckpointRequest>,
377    ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
378        self.hummock_manager.pause_version_checkpoint();
379        Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
380    }
381
382    async fn rise_ctl_resume_version_checkpoint(
383        &self,
384        _request: Request<RiseCtlResumeVersionCheckpointRequest>,
385    ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
386        self.hummock_manager.resume_version_checkpoint();
387        Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
388    }
389
390    async fn rise_ctl_get_checkpoint_version(
391        &self,
392        _request: Request<RiseCtlGetCheckpointVersionRequest>,
393    ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
394        let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
395        Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
396            checkpoint_version: Some(checkpoint_version.into()),
397        }))
398    }
399
400    async fn rise_ctl_list_compaction_status(
401        &self,
402        _request: Request<RiseCtlListCompactionStatusRequest>,
403    ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
404        let (compaction_statuses, task_assignment) =
405            self.hummock_manager.list_compaction_status().await;
406        let task_progress = self.hummock_manager.compactor_manager.get_progress();
407        Ok(Response::new(RiseCtlListCompactionStatusResponse {
408            compaction_statuses,
409            task_assignment,
410            task_progress,
411        }))
412    }
413
414    async fn subscribe_compaction_event(
415        &self,
416        request: Request<Streaming<SubscribeCompactionEventRequest>>,
417    ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
418        let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
419        let register_req = {
420            let req = request_stream.next().await.ok_or_else(|| {
421                Status::invalid_argument("subscribe_compaction_event request is empty")
422            })??;
423
424            match req.event {
425                Some(RequestEvent::Register(register)) => register,
426                _ => {
427                    return Err(Status::invalid_argument(
428                        "the first message must be `Register`",
429                    ));
430                }
431            }
432        };
433
434        let context_id = register_req.context_id;
435
436        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
437        // remove invalid compactor eventually.
438        if !self.hummock_manager.check_context(context_id).await? {
439            return Err(Status::new(
440                tonic::Code::Internal,
441                format!("invalid hummock context {}", context_id),
442            ));
443        }
444        let compactor_manager = self.hummock_manager.compactor_manager.clone();
445
446        let rx: tokio::sync::mpsc::UnboundedReceiver<
447            Result<SubscribeCompactionEventResponse, crate::MetaError>,
448        > = compactor_manager.add_compactor(context_id);
449
450        // register request stream to hummock
451        self.hummock_manager
452            .add_compactor_stream(context_id, request_stream);
453
454        // Trigger compaction on all compaction groups.
455        for cg_id in self.hummock_manager.compaction_group_ids().await {
456            self.hummock_manager
457                .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
458        }
459
460        Ok(Response::new(RwReceiverStream::new(rx)))
461    }
462
463    async fn subscribe_iceberg_compaction_event(
464        &self,
465        request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
466    ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
467        let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
468            request.into_inner();
469        let register_req = {
470            let req = request_stream.next().await.ok_or_else(|| {
471                Status::invalid_argument("subscribe_compaction_event request is empty")
472            })??;
473
474            match req.event {
475                Some(IcebergRequestEvent::Register(register)) => register,
476                _ => {
477                    return Err(Status::invalid_argument(
478                        "the first message must be `Register`",
479                    ));
480                }
481            }
482        };
483
484        let context_id = register_req.context_id;
485
486        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
487        // remove invalid compactor eventually.
488        if !self.hummock_manager.check_context(context_id).await? {
489            return Err(Status::new(
490                tonic::Code::Internal,
491                format!("invalid hummock context {}", context_id),
492            ));
493        }
494
495        let rx: tokio::sync::mpsc::UnboundedReceiver<
496            Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
497        > = self
498            .iceberg_compaction_manager
499            .iceberg_compactor_manager
500            .add_compactor(context_id);
501
502        self.iceberg_compaction_manager
503            .add_compactor_stream(context_id, request_stream);
504
505        // TODO: Trigger iceberg compaction
506
507        Ok(Response::new(RwReceiverStream::new(rx)))
508    }
509
510    async fn report_compaction_task(
511        &self,
512        _request: Request<ReportCompactionTaskRequest>,
513    ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
514        unreachable!()
515    }
516
517    async fn list_branched_object(
518        &self,
519        _request: Request<ListBranchedObjectRequest>,
520    ) -> Result<Response<ListBranchedObjectResponse>, Status> {
521        let branched_objects = self
522            .hummock_manager
523            .list_branched_objects()
524            .await
525            .into_iter()
526            .flat_map(|(object_id, v)| {
527                v.into_iter()
528                    .map(move |(compaction_group_id, sst_ids)| BranchedObject {
529                        object_id,
530                        sst_id: sst_ids,
531                        compaction_group_id,
532                    })
533            })
534            .collect();
535        Ok(Response::new(ListBranchedObjectResponse {
536            branched_objects,
537        }))
538    }
539
540    async fn list_active_write_limit(
541        &self,
542        _request: Request<ListActiveWriteLimitRequest>,
543    ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
544        Ok(Response::new(ListActiveWriteLimitResponse {
545            write_limits: self.hummock_manager.write_limits().await,
546        }))
547    }
548
549    async fn list_hummock_meta_config(
550        &self,
551        _request: Request<ListHummockMetaConfigRequest>,
552    ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
553        let opt = &self.hummock_manager.env.opts;
554        let configs = fields_to_kvs!(
555            opt,
556            vacuum_interval_sec,
557            vacuum_spin_interval_ms,
558            hummock_version_checkpoint_interval_sec,
559            min_delta_log_num_for_hummock_version_checkpoint,
560            min_sst_retention_time_sec,
561            full_gc_interval_sec,
562            periodic_compaction_interval_sec,
563            periodic_space_reclaim_compaction_interval_sec,
564            periodic_ttl_reclaim_compaction_interval_sec,
565            periodic_tombstone_reclaim_compaction_interval_sec,
566            periodic_scheduling_compaction_group_split_interval_sec,
567            enable_compaction_group_normalize,
568            max_normalize_splits_per_round,
569            do_not_config_object_storage_lifecycle,
570            partition_vnode_count,
571            table_high_write_throughput_threshold,
572            table_low_write_throughput_threshold,
573            compaction_task_max_heartbeat_interval_secs,
574            periodic_scheduling_compaction_group_merge_interval_sec
575        );
576        Ok(Response::new(ListHummockMetaConfigResponse { configs }))
577    }
578
579    async fn rise_ctl_rebuild_table_stats(
580        &self,
581        _request: Request<RiseCtlRebuildTableStatsRequest>,
582    ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
583        self.hummock_manager.rebuild_table_stats().await?;
584        Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
585    }
586
587    async fn get_compaction_score(
588        &self,
589        request: Request<GetCompactionScoreRequest>,
590    ) -> Result<Response<GetCompactionScoreResponse>, Status> {
591        let compaction_group_id = request.into_inner().compaction_group_id;
592        let scores = self
593            .hummock_manager
594            .get_compaction_scores(compaction_group_id)
595            .await
596            .into_iter()
597            .map(|s| PickerInfo {
598                score: s.score,
599                select_level: s.select_level as _,
600                target_level: s.target_level as _,
601                picker_type: s.picker_type.to_string(),
602            })
603            .collect();
604        Ok(Response::new(GetCompactionScoreResponse {
605            compaction_group_id,
606            scores,
607        }))
608    }
609
610    async fn list_compact_task_assignment(
611        &self,
612        _request: Request<ListCompactTaskAssignmentRequest>,
613    ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
614        let (_compaction_statuses, task_assignment) =
615            self.hummock_manager.list_compaction_status().await;
616        Ok(Response::new(ListCompactTaskAssignmentResponse {
617            task_assignment,
618        }))
619    }
620
621    async fn list_compact_task_progress(
622        &self,
623        _request: Request<ListCompactTaskProgressRequest>,
624    ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
625        let task_progress = self.hummock_manager.compactor_manager.get_progress();
626
627        Ok(Response::new(ListCompactTaskProgressResponse {
628            task_progress,
629        }))
630    }
631
632    async fn cancel_compact_task(
633        &self,
634        request: Request<CancelCompactTaskRequest>,
635    ) -> Result<Response<CancelCompactTaskResponse>, Status> {
636        let request = request.into_inner();
637        let ret = self
638            .hummock_manager
639            .cancel_compact_task(
640                request.task_id,
641                PbTaskStatus::try_from(request.task_status).unwrap(),
642            )
643            .await?;
644
645        let response = Response::new(CancelCompactTaskResponse { ret });
646        return Ok(response);
647    }
648
649    async fn get_version_by_epoch(
650        &self,
651        request: Request<GetVersionByEpochRequest>,
652    ) -> Result<Response<GetVersionByEpochResponse>, Status> {
653        let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
654        let version = self
655            .hummock_manager
656            .epoch_to_version(epoch, table_id)
657            .await?;
658        Ok(Response::new(GetVersionByEpochResponse {
659            version: Some(version.to_protobuf()),
660        }))
661    }
662
663    async fn merge_compaction_group(
664        &self,
665        request: Request<MergeCompactionGroupRequest>,
666    ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
667        let req = request.into_inner();
668        self.hummock_manager
669            .merge_compaction_group(req.left_group_id, req.right_group_id)
670            .await?;
671        Ok(Response::new(MergeCompactionGroupResponse {}))
672    }
673
674    async fn get_table_change_logs(
675        &self,
676        request: Request<GetTableChangeLogsRequest>,
677    ) -> Result<Response<GetTableChangeLogsResponse>, Status> {
678        let GetTableChangeLogsRequest {
679            epoch_only,
680            start_epoch_inclusive,
681            end_epoch_inclusive,
682            table_ids,
683            exclude_empty,
684            limit,
685        } = request.into_inner();
686        let table_change_logs = self
687            .hummock_manager
688            .get_table_change_logs(
689                epoch_only,
690                start_epoch_inclusive,
691                end_epoch_inclusive,
692                table_ids
693                    .map(|t| t.table_ids.into_iter().collect::<HashSet<_>>())
694                    .clone(),
695                exclude_empty,
696                limit,
697            )
698            .await
699            .into_iter()
700            .map(|(i, l)| (i.as_raw_id(), l.to_protobuf()))
701            .collect();
702        Ok(Response::new(GetTableChangeLogsResponse {
703            table_change_logs,
704        }))
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use std::collections::HashMap;
711    #[test]
712    fn test_fields_to_kvs() {
713        struct S {
714            foo: u64,
715            bar: String,
716        }
717        let s = S {
718            foo: 15,
719            bar: "foobar".to_owned(),
720        };
721        let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
722        assert_eq!(kvs.len(), 2);
723        assert_eq!(kvs.get("foo").unwrap(), "15");
724        assert_eq!(kvs.get("bar").unwrap(), "foobar");
725    }
726}