risingwave_meta_service/
hummock_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::{SYS_CATALOG_START_ID, TableId};
22use risingwave_hummock_sdk::HummockVersionId;
23use risingwave_hummock_sdk::key_range::KeyRange;
24use risingwave_hummock_sdk::version::HummockVersionDelta;
25use risingwave_meta::backup_restore::BackupManagerRef;
26use risingwave_meta::manager::MetadataManager;
27use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
28use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
29use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
30use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
31use risingwave_pb::hummock::*;
32use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
33use risingwave_pb::iceberg_compaction::{
34    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
35};
36use tonic::{Request, Response, Status, Streaming};
37
38use crate::RwReceiverStream;
39use crate::hummock::HummockManagerRef;
40use crate::hummock::compaction::selector::ManualCompactionOption;
41
42pub struct HummockServiceImpl {
43    hummock_manager: HummockManagerRef,
44    metadata_manager: MetadataManager,
45    backup_manager: BackupManagerRef,
46    iceberg_compaction_manager: IcebergCompactionManagerRef,
47}
48
49impl HummockServiceImpl {
50    pub fn new(
51        hummock_manager: HummockManagerRef,
52        metadata_manager: MetadataManager,
53        backup_manager: BackupManagerRef,
54        iceberg_compaction_manager: IcebergCompactionManagerRef,
55    ) -> Self {
56        HummockServiceImpl {
57            hummock_manager,
58            metadata_manager,
59            backup_manager,
60            iceberg_compaction_manager,
61        }
62    }
63}
64
65macro_rules! fields_to_kvs {
66    ($struct:ident, $($field:ident),*) => {
67        {
68            let mut kvs = HashMap::default();
69            $(
70                kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
71            )*
72            kvs
73        }
74    }
75}
76
77#[async_trait::async_trait]
78impl HummockManagerService for HummockServiceImpl {
79    type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
80    type SubscribeIcebergCompactionEventStream =
81        RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
82
83    async fn unpin_version_before(
84        &self,
85        request: Request<UnpinVersionBeforeRequest>,
86    ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
87        let req = request.into_inner();
88        self.hummock_manager
89            .unpin_version_before(
90                req.context_id,
91                HummockVersionId::new(req.unpin_version_before),
92            )
93            .await?;
94        Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
95    }
96
97    async fn get_current_version(
98        &self,
99        _request: Request<GetCurrentVersionRequest>,
100    ) -> Result<Response<GetCurrentVersionResponse>, Status> {
101        let current_version = self
102            .hummock_manager
103            .on_current_version(|version| version.into())
104            .await;
105        Ok(Response::new(GetCurrentVersionResponse {
106            status: None,
107            current_version: Some(current_version),
108        }))
109    }
110
111    async fn replay_version_delta(
112        &self,
113        request: Request<ReplayVersionDeltaRequest>,
114    ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
115        let req = request.into_inner();
116        let (version, compaction_groups) = self
117            .hummock_manager
118            .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
119                &req.version_delta.unwrap(),
120            ))
121            .await?;
122        Ok(Response::new(ReplayVersionDeltaResponse {
123            version: Some(version.into()),
124            modified_compaction_groups: compaction_groups,
125        }))
126    }
127
128    async fn trigger_compaction_deterministic(
129        &self,
130        request: Request<TriggerCompactionDeterministicRequest>,
131    ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
132        let req = request.into_inner();
133        self.hummock_manager
134            .trigger_compaction_deterministic(
135                HummockVersionId::new(req.version_id),
136                req.compaction_groups,
137            )
138            .await?;
139        Ok(Response::new(TriggerCompactionDeterministicResponse {}))
140    }
141
142    async fn disable_commit_epoch(
143        &self,
144        _request: Request<DisableCommitEpochRequest>,
145    ) -> Result<Response<DisableCommitEpochResponse>, Status> {
146        let version = self.hummock_manager.disable_commit_epoch().await;
147        Ok(Response::new(DisableCommitEpochResponse {
148            current_version: Some(version.into()),
149        }))
150    }
151
152    async fn list_version_deltas(
153        &self,
154        request: Request<ListVersionDeltasRequest>,
155    ) -> Result<Response<ListVersionDeltasResponse>, Status> {
156        let req = request.into_inner();
157        let version_deltas = self
158            .hummock_manager
159            .list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit)
160            .await?;
161        let resp = ListVersionDeltasResponse {
162            version_deltas: Some(PbHummockVersionDeltas {
163                version_deltas: version_deltas
164                    .into_iter()
165                    .map(HummockVersionDelta::into)
166                    .collect(),
167            }),
168        };
169        Ok(Response::new(resp))
170    }
171
172    async fn get_new_object_ids(
173        &self,
174        request: Request<GetNewObjectIdsRequest>,
175    ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
176        let object_id_range = self
177            .hummock_manager
178            .get_new_object_ids(request.into_inner().number)
179            .await?;
180        Ok(Response::new(GetNewObjectIdsResponse {
181            status: None,
182            start_id: object_id_range.start_id.inner(),
183            end_id: object_id_range.end_id.inner(),
184        }))
185    }
186
187    async fn trigger_manual_compaction(
188        &self,
189        request: Request<TriggerManualCompactionRequest>,
190    ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
191        let request = request.into_inner();
192        let compaction_group_id = request.compaction_group_id;
193        let mut option = ManualCompactionOption {
194            level: request.level as usize,
195            sst_ids: request.sst_ids.into_iter().map(|id| id.into()).collect(),
196            ..Default::default()
197        };
198
199        // rewrite the key_range
200        match request.key_range {
201            Some(pb_key_range) => {
202                option.key_range = KeyRange {
203                    left: pb_key_range.left.into(),
204                    right: pb_key_range.right.into(),
205                    right_exclusive: pb_key_range.right_exclusive,
206                };
207            }
208
209            None => {
210                option.key_range = KeyRange::default();
211            }
212        }
213
214        // get internal_table_id by metadata_manger
215        if request.table_id < SYS_CATALOG_START_ID as u32 {
216            // We need to make sure to use the correct table_id to filter sst
217            let table_id = TableId::new(request.table_id);
218            if let Ok(table_fragment) = self
219                .metadata_manager
220                .get_job_fragments_by_id(&table_id)
221                .await
222            {
223                option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
224            }
225        }
226
227        assert!(
228            option
229                .internal_table_id
230                .iter()
231                .all(|table_id| *table_id < SYS_CATALOG_START_ID as u32),
232        );
233
234        tracing::info!(
235            "Try trigger_manual_compaction compaction_group_id {} option {:?}",
236            compaction_group_id,
237            &option
238        );
239
240        self.hummock_manager
241            .trigger_manual_compaction(compaction_group_id, option)
242            .await?;
243
244        Ok(Response::new(TriggerManualCompactionResponse {
245            status: None,
246        }))
247    }
248
249    async fn trigger_full_gc(
250        &self,
251        request: Request<TriggerFullGcRequest>,
252    ) -> Result<Response<TriggerFullGcResponse>, Status> {
253        let req = request.into_inner();
254        let backup_manager_2 = self.backup_manager.clone();
255        let hummock_manager_2 = self.hummock_manager.clone();
256        tokio::task::spawn(async move {
257            use thiserror_ext::AsReport;
258            let _ = hummock_manager_2
259                .start_full_gc(
260                    Duration::from_secs(req.sst_retention_time_sec),
261                    req.prefix,
262                    Some(backup_manager_2),
263                )
264                .await
265                .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
266        });
267        Ok(Response::new(TriggerFullGcResponse { status: None }))
268    }
269
270    async fn rise_ctl_get_pinned_versions_summary(
271        &self,
272        _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
273    ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
274        let pinned_versions = self.hummock_manager.list_pinned_version().await;
275        let workers = self
276            .hummock_manager
277            .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
278            .await?;
279        Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
280            summary: Some(PinnedVersionsSummary {
281                pinned_versions,
282                workers,
283            }),
284        }))
285    }
286
287    async fn get_assigned_compact_task_num(
288        &self,
289        _request: Request<GetAssignedCompactTaskNumRequest>,
290    ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
291        let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
292        Ok(Response::new(GetAssignedCompactTaskNumResponse {
293            num_tasks: num_tasks as u32,
294        }))
295    }
296
297    async fn rise_ctl_list_compaction_group(
298        &self,
299        _request: Request<RiseCtlListCompactionGroupRequest>,
300    ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
301        let compaction_groups = self.hummock_manager.list_compaction_group().await;
302        Ok(Response::new(RiseCtlListCompactionGroupResponse {
303            status: None,
304            compaction_groups,
305        }))
306    }
307
308    async fn rise_ctl_update_compaction_config(
309        &self,
310        request: Request<RiseCtlUpdateCompactionConfigRequest>,
311    ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
312        let RiseCtlUpdateCompactionConfigRequest {
313            compaction_group_ids,
314            configs,
315        } = request.into_inner();
316        self.hummock_manager
317            .update_compaction_config(
318                compaction_group_ids.as_slice(),
319                configs
320                    .into_iter()
321                    .map(|c| c.mutable_config.unwrap())
322                    .collect::<Vec<_>>()
323                    .as_slice(),
324            )
325            .await?;
326        Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
327            status: None,
328        }))
329    }
330
331    async fn init_metadata_for_replay(
332        &self,
333        request: Request<InitMetadataForReplayRequest>,
334    ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
335        let InitMetadataForReplayRequest {
336            tables,
337            compaction_groups,
338        } = request.into_inner();
339
340        self.hummock_manager
341            .init_metadata_for_version_replay(tables, compaction_groups)?;
342        Ok(Response::new(InitMetadataForReplayResponse {}))
343    }
344
345    async fn pin_version(
346        &self,
347        request: Request<PinVersionRequest>,
348    ) -> Result<Response<PinVersionResponse>, Status> {
349        let req = request.into_inner();
350        let version = self.hummock_manager.pin_version(req.context_id).await?;
351        Ok(Response::new(PinVersionResponse {
352            pinned_version: Some(version.into()),
353        }))
354    }
355
356    async fn split_compaction_group(
357        &self,
358        request: Request<SplitCompactionGroupRequest>,
359    ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
360        let req = request.into_inner();
361        let new_group_id = self
362            .hummock_manager
363            .move_state_tables_to_dedicated_compaction_group(
364                req.group_id,
365                &req.table_ids,
366                if req.partition_vnode_count > 0 {
367                    Some(req.partition_vnode_count)
368                } else {
369                    None
370                },
371            )
372            .await?
373            .0;
374        Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
375    }
376
377    async fn rise_ctl_pause_version_checkpoint(
378        &self,
379        _request: Request<RiseCtlPauseVersionCheckpointRequest>,
380    ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
381        self.hummock_manager.pause_version_checkpoint();
382        Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
383    }
384
385    async fn rise_ctl_resume_version_checkpoint(
386        &self,
387        _request: Request<RiseCtlResumeVersionCheckpointRequest>,
388    ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
389        self.hummock_manager.resume_version_checkpoint();
390        Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
391    }
392
393    async fn rise_ctl_get_checkpoint_version(
394        &self,
395        _request: Request<RiseCtlGetCheckpointVersionRequest>,
396    ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
397        let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
398        Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
399            checkpoint_version: Some(checkpoint_version.into()),
400        }))
401    }
402
403    async fn rise_ctl_list_compaction_status(
404        &self,
405        _request: Request<RiseCtlListCompactionStatusRequest>,
406    ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
407        let (compaction_statuses, task_assignment) =
408            self.hummock_manager.list_compaction_status().await;
409        let task_progress = self.hummock_manager.compactor_manager.get_progress();
410        Ok(Response::new(RiseCtlListCompactionStatusResponse {
411            compaction_statuses,
412            task_assignment,
413            task_progress,
414        }))
415    }
416
417    async fn subscribe_compaction_event(
418        &self,
419        request: Request<Streaming<SubscribeCompactionEventRequest>>,
420    ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
421        let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
422        let register_req = {
423            let req = request_stream.next().await.ok_or_else(|| {
424                Status::invalid_argument("subscribe_compaction_event request is empty")
425            })??;
426
427            match req.event {
428                Some(RequestEvent::Register(register)) => register,
429                _ => {
430                    return Err(Status::invalid_argument(
431                        "the first message must be `Register`",
432                    ));
433                }
434            }
435        };
436
437        let context_id = register_req.context_id;
438
439        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
440        // remove invalid compactor eventually.
441        if !self.hummock_manager.check_context(context_id).await? {
442            return Err(Status::new(
443                tonic::Code::Internal,
444                format!("invalid hummock context {}", context_id),
445            ));
446        }
447        let compactor_manager = self.hummock_manager.compactor_manager.clone();
448
449        let rx: tokio::sync::mpsc::UnboundedReceiver<
450            Result<SubscribeCompactionEventResponse, crate::MetaError>,
451        > = compactor_manager.add_compactor(context_id);
452
453        // register request stream to hummock
454        self.hummock_manager
455            .add_compactor_stream(context_id, request_stream);
456
457        // Trigger compaction on all compaction groups.
458        for cg_id in self.hummock_manager.compaction_group_ids().await {
459            self.hummock_manager
460                .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
461        }
462
463        Ok(Response::new(RwReceiverStream::new(rx)))
464    }
465
466    async fn subscribe_iceberg_compaction_event(
467        &self,
468        request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
469    ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
470        let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
471            request.into_inner();
472        let register_req = {
473            let req = request_stream.next().await.ok_or_else(|| {
474                Status::invalid_argument("subscribe_compaction_event request is empty")
475            })??;
476
477            match req.event {
478                Some(IcebergRequestEvent::Register(register)) => register,
479                _ => {
480                    return Err(Status::invalid_argument(
481                        "the first message must be `Register`",
482                    ));
483                }
484            }
485        };
486
487        let context_id = register_req.context_id;
488
489        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
490        // remove invalid compactor eventually.
491        if !self.hummock_manager.check_context(context_id).await? {
492            return Err(Status::new(
493                tonic::Code::Internal,
494                format!("invalid hummock context {}", context_id),
495            ));
496        }
497
498        let rx: tokio::sync::mpsc::UnboundedReceiver<
499            Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
500        > = self
501            .iceberg_compaction_manager
502            .iceberg_compactor_manager
503            .add_compactor(context_id);
504
505        self.iceberg_compaction_manager
506            .add_compactor_stream(context_id, request_stream);
507
508        // TODO: Trigger iceberg compaction
509
510        Ok(Response::new(RwReceiverStream::new(rx)))
511    }
512
513    async fn report_compaction_task(
514        &self,
515        _request: Request<ReportCompactionTaskRequest>,
516    ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
517        unreachable!()
518    }
519
520    async fn list_branched_object(
521        &self,
522        _request: Request<ListBranchedObjectRequest>,
523    ) -> Result<Response<ListBranchedObjectResponse>, Status> {
524        let branched_objects = self
525            .hummock_manager
526            .list_branched_objects()
527            .await
528            .into_iter()
529            .flat_map(|(object_id, v)| {
530                v.into_iter()
531                    .map(move |(compaction_group_id, sst_ids)| BranchedObject {
532                        object_id: object_id.inner(),
533                        sst_id: sst_ids.into_iter().map(|id| id.inner()).collect(),
534                        compaction_group_id,
535                    })
536            })
537            .collect();
538        Ok(Response::new(ListBranchedObjectResponse {
539            branched_objects,
540        }))
541    }
542
543    async fn list_active_write_limit(
544        &self,
545        _request: Request<ListActiveWriteLimitRequest>,
546    ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
547        Ok(Response::new(ListActiveWriteLimitResponse {
548            write_limits: self.hummock_manager.write_limits().await,
549        }))
550    }
551
552    async fn list_hummock_meta_config(
553        &self,
554        _request: Request<ListHummockMetaConfigRequest>,
555    ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
556        let opt = &self.hummock_manager.env.opts;
557        let configs = fields_to_kvs!(
558            opt,
559            vacuum_interval_sec,
560            vacuum_spin_interval_ms,
561            hummock_version_checkpoint_interval_sec,
562            min_delta_log_num_for_hummock_version_checkpoint,
563            min_sst_retention_time_sec,
564            full_gc_interval_sec,
565            periodic_compaction_interval_sec,
566            periodic_space_reclaim_compaction_interval_sec,
567            periodic_ttl_reclaim_compaction_interval_sec,
568            periodic_tombstone_reclaim_compaction_interval_sec,
569            periodic_scheduling_compaction_group_split_interval_sec,
570            do_not_config_object_storage_lifecycle,
571            partition_vnode_count,
572            table_high_write_throughput_threshold,
573            table_low_write_throughput_threshold,
574            compaction_task_max_heartbeat_interval_secs,
575            periodic_scheduling_compaction_group_merge_interval_sec
576        );
577        Ok(Response::new(ListHummockMetaConfigResponse { configs }))
578    }
579
580    async fn rise_ctl_rebuild_table_stats(
581        &self,
582        _request: Request<RiseCtlRebuildTableStatsRequest>,
583    ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
584        self.hummock_manager.rebuild_table_stats().await?;
585        Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
586    }
587
588    async fn get_compaction_score(
589        &self,
590        request: Request<GetCompactionScoreRequest>,
591    ) -> Result<Response<GetCompactionScoreResponse>, Status> {
592        let compaction_group_id = request.into_inner().compaction_group_id;
593        let scores = self
594            .hummock_manager
595            .get_compaction_scores(compaction_group_id)
596            .await
597            .into_iter()
598            .map(|s| PickerInfo {
599                score: s.score,
600                select_level: s.select_level as _,
601                target_level: s.target_level as _,
602                picker_type: s.picker_type.to_string(),
603            })
604            .collect();
605        Ok(Response::new(GetCompactionScoreResponse {
606            compaction_group_id,
607            scores,
608        }))
609    }
610
611    async fn list_compact_task_assignment(
612        &self,
613        _request: Request<ListCompactTaskAssignmentRequest>,
614    ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
615        let (_compaction_statuses, task_assignment) =
616            self.hummock_manager.list_compaction_status().await;
617        Ok(Response::new(ListCompactTaskAssignmentResponse {
618            task_assignment,
619        }))
620    }
621
622    async fn list_compact_task_progress(
623        &self,
624        _request: Request<ListCompactTaskProgressRequest>,
625    ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
626        let task_progress = self.hummock_manager.compactor_manager.get_progress();
627
628        Ok(Response::new(ListCompactTaskProgressResponse {
629            task_progress,
630        }))
631    }
632
633    async fn cancel_compact_task(
634        &self,
635        request: Request<CancelCompactTaskRequest>,
636    ) -> Result<Response<CancelCompactTaskResponse>, Status> {
637        let request = request.into_inner();
638        let ret = self
639            .hummock_manager
640            .cancel_compact_task(
641                request.task_id,
642                PbTaskStatus::try_from(request.task_status).unwrap(),
643            )
644            .await?;
645
646        let response = Response::new(CancelCompactTaskResponse { ret });
647        return Ok(response);
648    }
649
650    async fn get_version_by_epoch(
651        &self,
652        request: Request<GetVersionByEpochRequest>,
653    ) -> Result<Response<GetVersionByEpochResponse>, Status> {
654        let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
655        let version = self
656            .hummock_manager
657            .epoch_to_version(epoch, table_id)
658            .await?;
659        Ok(Response::new(GetVersionByEpochResponse {
660            version: Some(version.to_protobuf()),
661        }))
662    }
663
664    async fn merge_compaction_group(
665        &self,
666        request: Request<MergeCompactionGroupRequest>,
667    ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
668        let req = request.into_inner();
669        self.hummock_manager
670            .merge_compaction_group(req.left_group_id, req.right_group_id)
671            .await?;
672        Ok(Response::new(MergeCompactionGroupResponse {}))
673    }
674}
675
676#[cfg(test)]
677mod tests {
678    use std::collections::HashMap;
679    #[test]
680    fn test_fields_to_kvs() {
681        struct S {
682            foo: u64,
683            bar: String,
684        }
685        let s = S {
686            foo: 15,
687            bar: "foobar".to_owned(),
688        };
689        let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
690        assert_eq!(kvs.len(), 2);
691        assert_eq!(kvs.get("foo").unwrap(), "15");
692        assert_eq!(kvs.get("bar").unwrap(), "foobar");
693    }
694}