Skip to main content

risingwave_meta_service/
hummock_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::SYS_CATALOG_START_ID;
22use risingwave_hummock_sdk::key_range::KeyRange;
23use risingwave_hummock_sdk::version::HummockVersionDelta;
24use risingwave_meta::backup_restore::BackupManagerRef;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
27use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
28use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
29use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
30use risingwave_pb::hummock::*;
31use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
32use risingwave_pb::iceberg_compaction::{
33    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
34};
35use tonic::{Request, Response, Status, Streaming};
36
37use crate::RwReceiverStream;
38use crate::hummock::compaction::selector::ManualCompactionOption;
39use crate::hummock::{HummockManagerRef, ManualCompactionTriggerResult};
40
41pub struct HummockServiceImpl {
42    hummock_manager: HummockManagerRef,
43    metadata_manager: MetadataManager,
44    backup_manager: BackupManagerRef,
45    iceberg_compaction_manager: IcebergCompactionManagerRef,
46}
47
48impl HummockServiceImpl {
49    pub fn new(
50        hummock_manager: HummockManagerRef,
51        metadata_manager: MetadataManager,
52        backup_manager: BackupManagerRef,
53        iceberg_compaction_manager: IcebergCompactionManagerRef,
54    ) -> Self {
55        HummockServiceImpl {
56            hummock_manager,
57            metadata_manager,
58            backup_manager,
59            iceberg_compaction_manager,
60        }
61    }
62}
63
64macro_rules! fields_to_kvs {
65    ($struct:ident, $($field:ident),*) => {
66        {
67            let mut kvs = HashMap::default();
68            $(
69                kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
70            )*
71            kvs
72        }
73    }
74}
75
76#[async_trait::async_trait]
77impl HummockManagerService for HummockServiceImpl {
78    type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
79    type SubscribeIcebergCompactionEventStream =
80        RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
81
82    async fn unpin_version_before(
83        &self,
84        request: Request<UnpinVersionBeforeRequest>,
85    ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
86        let req = request.into_inner();
87        self.hummock_manager
88            .unpin_version_before(req.context_id, req.unpin_version_before)
89            .await?;
90        Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
91    }
92
93    async fn get_current_version(
94        &self,
95        _request: Request<GetCurrentVersionRequest>,
96    ) -> Result<Response<GetCurrentVersionResponse>, Status> {
97        let current_version = self
98            .hummock_manager
99            .on_current_version(|version| version.into())
100            .await;
101        Ok(Response::new(GetCurrentVersionResponse {
102            status: None,
103            current_version: Some(current_version),
104        }))
105    }
106
107    async fn replay_version_delta(
108        &self,
109        request: Request<ReplayVersionDeltaRequest>,
110    ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
111        let req = request.into_inner();
112        let (version, compaction_groups) = self
113            .hummock_manager
114            .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
115                &req.version_delta.unwrap(),
116            ))
117            .await?;
118        Ok(Response::new(ReplayVersionDeltaResponse {
119            version: Some(version.into()),
120            modified_compaction_groups: compaction_groups,
121        }))
122    }
123
124    async fn trigger_compaction_deterministic(
125        &self,
126        request: Request<TriggerCompactionDeterministicRequest>,
127    ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
128        let req = request.into_inner();
129        self.hummock_manager
130            .trigger_compaction_deterministic(req.version_id, req.compaction_groups)
131            .await?;
132        Ok(Response::new(TriggerCompactionDeterministicResponse {}))
133    }
134
135    async fn disable_commit_epoch(
136        &self,
137        _request: Request<DisableCommitEpochRequest>,
138    ) -> Result<Response<DisableCommitEpochResponse>, Status> {
139        let version = self.hummock_manager.disable_commit_epoch().await;
140        Ok(Response::new(DisableCommitEpochResponse {
141            current_version: Some(version.into()),
142        }))
143    }
144
145    async fn list_version_deltas(
146        &self,
147        request: Request<ListVersionDeltasRequest>,
148    ) -> Result<Response<ListVersionDeltasResponse>, Status> {
149        let req = request.into_inner();
150        let version_deltas = self
151            .hummock_manager
152            .list_version_deltas(req.start_id, req.num_limit)
153            .await?;
154        let resp = ListVersionDeltasResponse {
155            version_deltas: Some(PbHummockVersionDeltas {
156                version_deltas: version_deltas
157                    .into_iter()
158                    .map(HummockVersionDelta::into)
159                    .collect(),
160            }),
161        };
162        Ok(Response::new(resp))
163    }
164
165    async fn get_new_object_ids(
166        &self,
167        request: Request<GetNewObjectIdsRequest>,
168    ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
169        let object_id_range = self
170            .hummock_manager
171            .get_new_object_ids(request.into_inner().number)
172            .await?;
173        Ok(Response::new(GetNewObjectIdsResponse {
174            status: None,
175            start_id: object_id_range.start_id,
176            end_id: object_id_range.end_id,
177        }))
178    }
179
180    async fn trigger_manual_compaction(
181        &self,
182        request: Request<TriggerManualCompactionRequest>,
183    ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
184        let request = request.into_inner();
185        let compaction_group_id = request.compaction_group_id;
186        let mut option = ManualCompactionOption {
187            level: request.level as usize,
188            target_level: request
189                .target_level
190                .map(|target_level| target_level as usize),
191            sst_ids: request.sst_ids,
192            exclusive: request.exclusive.unwrap_or(false),
193            ..Default::default()
194        };
195
196        // rewrite the key_range
197        match request.key_range {
198            Some(pb_key_range) => {
199                option.key_range = KeyRange {
200                    left: pb_key_range.left.into(),
201                    right: pb_key_range.right.into(),
202                    right_exclusive: pb_key_range.right_exclusive,
203                };
204            }
205
206            None => {
207                option.key_range = KeyRange::default();
208            }
209        }
210
211        // get internal_table_id by metadata_manger
212        if request.table_id.as_raw_id() < SYS_CATALOG_START_ID as u32 {
213            // We need to make sure to use the correct table_id to filter sst
214            let job_id = request.table_id;
215            if let Ok(table_fragment) = self.metadata_manager.get_job_fragments_by_id(job_id).await
216            {
217                option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
218            }
219        }
220
221        assert!(
222            option
223                .internal_table_id
224                .iter()
225                .all(|table_id| table_id.as_raw_id() < SYS_CATALOG_START_ID as u32),
226        );
227
228        tracing::info!(
229            "Try trigger_manual_compaction compaction_group_id {} option {:?}",
230            compaction_group_id,
231            &option
232        );
233
234        let should_retry = match self
235            .hummock_manager
236            .trigger_manual_compaction(compaction_group_id, option)
237            .await?
238        {
239            ManualCompactionTriggerResult::Submitted => false,
240            ManualCompactionTriggerResult::Retry => true,
241        };
242
243        Ok(Response::new(TriggerManualCompactionResponse {
244            status: None,
245            should_retry: Some(should_retry),
246        }))
247    }
248
249    async fn trigger_full_gc(
250        &self,
251        request: Request<TriggerFullGcRequest>,
252    ) -> Result<Response<TriggerFullGcResponse>, Status> {
253        let req = request.into_inner();
254        let backup_manager_2 = self.backup_manager.clone();
255        let hummock_manager_2 = self.hummock_manager.clone();
256        tokio::task::spawn(async move {
257            use thiserror_ext::AsReport;
258            let _ = hummock_manager_2
259                .start_full_gc(
260                    Duration::from_secs(req.sst_retention_time_sec),
261                    req.prefix,
262                    Some(backup_manager_2),
263                )
264                .await
265                .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
266        });
267        Ok(Response::new(TriggerFullGcResponse { status: None }))
268    }
269
270    async fn rise_ctl_get_pinned_versions_summary(
271        &self,
272        _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
273    ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
274        let pinned_versions = self.hummock_manager.list_pinned_version().await;
275        let workers = self
276            .hummock_manager
277            .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
278            .await?;
279        Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
280            summary: Some(PinnedVersionsSummary {
281                pinned_versions,
282                workers,
283            }),
284        }))
285    }
286
287    async fn get_assigned_compact_task_num(
288        &self,
289        _request: Request<GetAssignedCompactTaskNumRequest>,
290    ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
291        let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
292        Ok(Response::new(GetAssignedCompactTaskNumResponse {
293            num_tasks: num_tasks as u32,
294        }))
295    }
296
297    async fn rise_ctl_list_compaction_group(
298        &self,
299        _request: Request<RiseCtlListCompactionGroupRequest>,
300    ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
301        let compaction_groups = self.hummock_manager.list_compaction_group().await;
302        Ok(Response::new(RiseCtlListCompactionGroupResponse {
303            status: None,
304            compaction_groups,
305        }))
306    }
307
308    async fn rise_ctl_update_compaction_config(
309        &self,
310        request: Request<RiseCtlUpdateCompactionConfigRequest>,
311    ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
312        let RiseCtlUpdateCompactionConfigRequest {
313            compaction_group_ids,
314            configs,
315        } = request.into_inner();
316        self.hummock_manager
317            .update_compaction_config(
318                compaction_group_ids.as_slice(),
319                configs
320                    .into_iter()
321                    .map(|c| c.mutable_config.unwrap())
322                    .collect::<Vec<_>>()
323                    .as_slice(),
324            )
325            .await?;
326        Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
327            status: None,
328        }))
329    }
330
331    async fn init_metadata_for_replay(
332        &self,
333        request: Request<InitMetadataForReplayRequest>,
334    ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
335        let InitMetadataForReplayRequest {
336            tables,
337            compaction_groups,
338        } = request.into_inner();
339
340        self.hummock_manager
341            .init_metadata_for_version_replay(tables, compaction_groups)?;
342        Ok(Response::new(InitMetadataForReplayResponse {}))
343    }
344
345    async fn pin_version(
346        &self,
347        request: Request<PinVersionRequest>,
348    ) -> Result<Response<PinVersionResponse>, Status> {
349        let req = request.into_inner();
350        let version = self.hummock_manager.pin_version(req.context_id).await?;
351        Ok(Response::new(PinVersionResponse {
352            pinned_version: Some(version.into()),
353        }))
354    }
355
356    async fn split_compaction_group(
357        &self,
358        request: Request<SplitCompactionGroupRequest>,
359    ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
360        let req = request.into_inner();
361        let new_group_id = self
362            .hummock_manager
363            .move_state_tables_to_dedicated_compaction_group(
364                req.group_id,
365                &req.table_ids,
366                if req.partition_vnode_count > 0 {
367                    Some(req.partition_vnode_count)
368                } else {
369                    None
370                },
371            )
372            .await?
373            .0;
374        Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
375    }
376
377    async fn rise_ctl_pause_version_checkpoint(
378        &self,
379        _request: Request<RiseCtlPauseVersionCheckpointRequest>,
380    ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
381        self.hummock_manager.pause_version_checkpoint();
382        Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
383    }
384
385    async fn rise_ctl_resume_version_checkpoint(
386        &self,
387        _request: Request<RiseCtlResumeVersionCheckpointRequest>,
388    ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
389        self.hummock_manager.resume_version_checkpoint();
390        Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
391    }
392
393    async fn rise_ctl_get_checkpoint_version(
394        &self,
395        _request: Request<RiseCtlGetCheckpointVersionRequest>,
396    ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
397        let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
398        Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
399            checkpoint_version: Some(checkpoint_version.into()),
400        }))
401    }
402
403    async fn rise_ctl_list_compaction_status(
404        &self,
405        _request: Request<RiseCtlListCompactionStatusRequest>,
406    ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
407        let (compaction_statuses, task_assignment) =
408            self.hummock_manager.list_compaction_status().await;
409        let task_progress = self.hummock_manager.compactor_manager.get_progress();
410        Ok(Response::new(RiseCtlListCompactionStatusResponse {
411            compaction_statuses,
412            task_assignment,
413            task_progress,
414        }))
415    }
416
417    async fn subscribe_compaction_event(
418        &self,
419        request: Request<Streaming<SubscribeCompactionEventRequest>>,
420    ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
421        let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
422        let register_req = {
423            let req = request_stream.next().await.ok_or_else(|| {
424                Status::invalid_argument("subscribe_compaction_event request is empty")
425            })??;
426
427            match req.event {
428                Some(RequestEvent::Register(register)) => register,
429                _ => {
430                    return Err(Status::invalid_argument(
431                        "the first message must be `Register`",
432                    ));
433                }
434            }
435        };
436
437        let context_id = register_req.context_id;
438
439        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
440        // remove invalid compactor eventually.
441        if !self.hummock_manager.check_context(context_id).await? {
442            return Err(Status::new(
443                tonic::Code::Internal,
444                format!("invalid hummock context {}", context_id),
445            ));
446        }
447        let compactor_manager = self.hummock_manager.compactor_manager.clone();
448
449        let rx: tokio::sync::mpsc::UnboundedReceiver<
450            Result<SubscribeCompactionEventResponse, crate::MetaError>,
451        > = compactor_manager.add_compactor(context_id);
452
453        // register request stream to hummock
454        self.hummock_manager
455            .add_compactor_stream(context_id, request_stream);
456
457        // Trigger compaction on all compaction groups.
458        for cg_id in self.hummock_manager.compaction_group_ids().await {
459            self.hummock_manager
460                .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
461        }
462
463        Ok(Response::new(RwReceiverStream::new(rx)))
464    }
465
466    async fn subscribe_iceberg_compaction_event(
467        &self,
468        request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
469    ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
470        let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
471            request.into_inner();
472        let register_req = {
473            let req = request_stream.next().await.ok_or_else(|| {
474                Status::invalid_argument("subscribe_compaction_event request is empty")
475            })??;
476
477            match req.event {
478                Some(IcebergRequestEvent::Register(register)) => register,
479                _ => {
480                    return Err(Status::invalid_argument(
481                        "the first message must be `Register`",
482                    ));
483                }
484            }
485        };
486
487        let context_id = register_req.context_id;
488
489        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
490        // remove invalid compactor eventually.
491        if !self.hummock_manager.check_context(context_id).await? {
492            return Err(Status::new(
493                tonic::Code::Internal,
494                format!("invalid hummock context {}", context_id),
495            ));
496        }
497
498        let rx: tokio::sync::mpsc::UnboundedReceiver<
499            Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
500        > = self
501            .iceberg_compaction_manager
502            .iceberg_compactor_manager
503            .add_compactor(context_id);
504
505        self.iceberg_compaction_manager
506            .add_compactor_stream(context_id, request_stream);
507
508        // TODO: Trigger iceberg compaction
509
510        Ok(Response::new(RwReceiverStream::new(rx)))
511    }
512
513    async fn report_compaction_task(
514        &self,
515        _request: Request<ReportCompactionTaskRequest>,
516    ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
517        unreachable!()
518    }
519
520    async fn list_branched_object(
521        &self,
522        _request: Request<ListBranchedObjectRequest>,
523    ) -> Result<Response<ListBranchedObjectResponse>, Status> {
524        let branched_objects = self
525            .hummock_manager
526            .list_branched_objects()
527            .await
528            .into_iter()
529            .flat_map(|(object_id, v)| {
530                v.into_iter()
531                    .map(move |(compaction_group_id, sst_ids)| BranchedObject {
532                        object_id,
533                        sst_id: sst_ids,
534                        compaction_group_id,
535                    })
536            })
537            .collect();
538        Ok(Response::new(ListBranchedObjectResponse {
539            branched_objects,
540        }))
541    }
542
543    async fn list_active_write_limit(
544        &self,
545        _request: Request<ListActiveWriteLimitRequest>,
546    ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
547        Ok(Response::new(ListActiveWriteLimitResponse {
548            write_limits: self.hummock_manager.write_limits().await,
549        }))
550    }
551
552    async fn list_hummock_meta_config(
553        &self,
554        _request: Request<ListHummockMetaConfigRequest>,
555    ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
556        let opt = &self.hummock_manager.env.opts;
557        let configs = fields_to_kvs!(
558            opt,
559            vacuum_interval_sec,
560            vacuum_spin_interval_ms,
561            hummock_version_checkpoint_interval_sec,
562            min_delta_log_num_for_hummock_version_checkpoint,
563            min_sst_retention_time_sec,
564            full_gc_interval_sec,
565            periodic_compaction_interval_sec,
566            periodic_space_reclaim_compaction_interval_sec,
567            periodic_ttl_reclaim_compaction_interval_sec,
568            periodic_tombstone_reclaim_compaction_interval_sec,
569            periodic_scheduling_compaction_group_split_interval_sec,
570            enable_compaction_group_normalize,
571            max_normalize_splits_per_round,
572            do_not_config_object_storage_lifecycle,
573            partition_vnode_count,
574            table_high_write_throughput_threshold,
575            table_low_write_throughput_threshold,
576            compaction_task_max_heartbeat_interval_secs,
577            periodic_scheduling_compaction_group_merge_interval_sec
578        );
579        Ok(Response::new(ListHummockMetaConfigResponse { configs }))
580    }
581
582    async fn rise_ctl_rebuild_table_stats(
583        &self,
584        _request: Request<RiseCtlRebuildTableStatsRequest>,
585    ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
586        self.hummock_manager.rebuild_table_stats().await?;
587        Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
588    }
589
590    async fn get_compaction_score(
591        &self,
592        request: Request<GetCompactionScoreRequest>,
593    ) -> Result<Response<GetCompactionScoreResponse>, Status> {
594        let compaction_group_id = request.into_inner().compaction_group_id;
595        let scores = self
596            .hummock_manager
597            .get_compaction_scores(compaction_group_id)
598            .await
599            .into_iter()
600            .map(|s| PickerInfo {
601                score: s.score,
602                select_level: s.select_level as _,
603                target_level: s.target_level as _,
604                picker_type: s.picker_type.to_string(),
605            })
606            .collect();
607        Ok(Response::new(GetCompactionScoreResponse {
608            compaction_group_id,
609            scores,
610        }))
611    }
612
613    async fn list_compact_task_assignment(
614        &self,
615        _request: Request<ListCompactTaskAssignmentRequest>,
616    ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
617        let (_compaction_statuses, task_assignment) =
618            self.hummock_manager.list_compaction_status().await;
619        Ok(Response::new(ListCompactTaskAssignmentResponse {
620            task_assignment,
621        }))
622    }
623
624    async fn list_compact_task_progress(
625        &self,
626        _request: Request<ListCompactTaskProgressRequest>,
627    ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
628        let task_progress = self.hummock_manager.compactor_manager.get_progress();
629
630        Ok(Response::new(ListCompactTaskProgressResponse {
631            task_progress,
632        }))
633    }
634
635    async fn cancel_compact_task(
636        &self,
637        request: Request<CancelCompactTaskRequest>,
638    ) -> Result<Response<CancelCompactTaskResponse>, Status> {
639        let request = request.into_inner();
640        let ret = self
641            .hummock_manager
642            .cancel_compact_task(
643                request.task_id,
644                PbTaskStatus::try_from(request.task_status).unwrap(),
645            )
646            .await?;
647
648        let response = Response::new(CancelCompactTaskResponse { ret });
649        return Ok(response);
650    }
651
652    async fn get_version_by_epoch(
653        &self,
654        request: Request<GetVersionByEpochRequest>,
655    ) -> Result<Response<GetVersionByEpochResponse>, Status> {
656        let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
657        let version = self
658            .hummock_manager
659            .epoch_to_version(epoch, table_id)
660            .await?;
661        Ok(Response::new(GetVersionByEpochResponse {
662            version: Some(version.to_protobuf()),
663        }))
664    }
665
666    async fn merge_compaction_group(
667        &self,
668        request: Request<MergeCompactionGroupRequest>,
669    ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
670        let req = request.into_inner();
671        self.hummock_manager
672            .merge_compaction_group(req.left_group_id, req.right_group_id)
673            .await?;
674        Ok(Response::new(MergeCompactionGroupResponse {}))
675    }
676
677    async fn get_table_change_logs(
678        &self,
679        request: Request<GetTableChangeLogsRequest>,
680    ) -> Result<Response<GetTableChangeLogsResponse>, Status> {
681        let GetTableChangeLogsRequest {
682            epoch_only,
683            start_epoch_inclusive,
684            end_epoch_inclusive,
685            table_ids,
686            exclude_empty,
687            limit,
688        } = request.into_inner();
689        let table_change_logs = self
690            .hummock_manager
691            .get_table_change_logs(
692                epoch_only,
693                start_epoch_inclusive,
694                end_epoch_inclusive,
695                table_ids
696                    .map(|t| t.table_ids.into_iter().collect::<HashSet<_>>())
697                    .clone(),
698                exclude_empty,
699                limit,
700            )
701            .await
702            .into_iter()
703            .map(|(i, l)| (i.as_raw_id(), l.to_protobuf()))
704            .collect();
705        Ok(Response::new(GetTableChangeLogsResponse {
706            table_change_logs,
707        }))
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use std::collections::HashMap;
714    #[test]
715    fn test_fields_to_kvs() {
716        struct S {
717            foo: u64,
718            bar: String,
719        }
720        let s = S {
721            foo: 15,
722            bar: "foobar".to_owned(),
723        };
724        let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
725        assert_eq!(kvs.len(), 2);
726        assert_eq!(kvs.get("foo").unwrap(), "15");
727        assert_eq!(kvs.get("bar").unwrap(), "foobar");
728    }
729}