risingwave_meta_service/
hummock_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::SYS_CATALOG_START_ID;
22use risingwave_common::id::JobId;
23use risingwave_hummock_sdk::HummockVersionId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::version::HummockVersionDelta;
26use risingwave_meta::backup_restore::BackupManagerRef;
27use risingwave_meta::manager::MetadataManager;
28use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
29use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
30use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
31use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
32use risingwave_pb::hummock::*;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
34use risingwave_pb::iceberg_compaction::{
35    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
36};
37use tonic::{Request, Response, Status, Streaming};
38
39use crate::RwReceiverStream;
40use crate::hummock::HummockManagerRef;
41use crate::hummock::compaction::selector::ManualCompactionOption;
42
43pub struct HummockServiceImpl {
44    hummock_manager: HummockManagerRef,
45    metadata_manager: MetadataManager,
46    backup_manager: BackupManagerRef,
47    iceberg_compaction_manager: IcebergCompactionManagerRef,
48}
49
50impl HummockServiceImpl {
51    pub fn new(
52        hummock_manager: HummockManagerRef,
53        metadata_manager: MetadataManager,
54        backup_manager: BackupManagerRef,
55        iceberg_compaction_manager: IcebergCompactionManagerRef,
56    ) -> Self {
57        HummockServiceImpl {
58            hummock_manager,
59            metadata_manager,
60            backup_manager,
61            iceberg_compaction_manager,
62        }
63    }
64}
65
66macro_rules! fields_to_kvs {
67    ($struct:ident, $($field:ident),*) => {
68        {
69            let mut kvs = HashMap::default();
70            $(
71                kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
72            )*
73            kvs
74        }
75    }
76}
77
78#[async_trait::async_trait]
79impl HummockManagerService for HummockServiceImpl {
80    type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
81    type SubscribeIcebergCompactionEventStream =
82        RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
83
84    async fn unpin_version_before(
85        &self,
86        request: Request<UnpinVersionBeforeRequest>,
87    ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
88        let req = request.into_inner();
89        self.hummock_manager
90            .unpin_version_before(
91                req.context_id,
92                HummockVersionId::new(req.unpin_version_before),
93            )
94            .await?;
95        Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
96    }
97
98    async fn get_current_version(
99        &self,
100        _request: Request<GetCurrentVersionRequest>,
101    ) -> Result<Response<GetCurrentVersionResponse>, Status> {
102        let current_version = self
103            .hummock_manager
104            .on_current_version(|version| version.into())
105            .await;
106        Ok(Response::new(GetCurrentVersionResponse {
107            status: None,
108            current_version: Some(current_version),
109        }))
110    }
111
112    async fn replay_version_delta(
113        &self,
114        request: Request<ReplayVersionDeltaRequest>,
115    ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
116        let req = request.into_inner();
117        let (version, compaction_groups) = self
118            .hummock_manager
119            .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
120                &req.version_delta.unwrap(),
121            ))
122            .await?;
123        Ok(Response::new(ReplayVersionDeltaResponse {
124            version: Some(version.into()),
125            modified_compaction_groups: compaction_groups,
126        }))
127    }
128
129    async fn trigger_compaction_deterministic(
130        &self,
131        request: Request<TriggerCompactionDeterministicRequest>,
132    ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
133        let req = request.into_inner();
134        self.hummock_manager
135            .trigger_compaction_deterministic(
136                HummockVersionId::new(req.version_id),
137                req.compaction_groups,
138            )
139            .await?;
140        Ok(Response::new(TriggerCompactionDeterministicResponse {}))
141    }
142
143    async fn disable_commit_epoch(
144        &self,
145        _request: Request<DisableCommitEpochRequest>,
146    ) -> Result<Response<DisableCommitEpochResponse>, Status> {
147        let version = self.hummock_manager.disable_commit_epoch().await;
148        Ok(Response::new(DisableCommitEpochResponse {
149            current_version: Some(version.into()),
150        }))
151    }
152
153    async fn list_version_deltas(
154        &self,
155        request: Request<ListVersionDeltasRequest>,
156    ) -> Result<Response<ListVersionDeltasResponse>, Status> {
157        let req = request.into_inner();
158        let version_deltas = self
159            .hummock_manager
160            .list_version_deltas(HummockVersionId::new(req.start_id), req.num_limit)
161            .await?;
162        let resp = ListVersionDeltasResponse {
163            version_deltas: Some(PbHummockVersionDeltas {
164                version_deltas: version_deltas
165                    .into_iter()
166                    .map(HummockVersionDelta::into)
167                    .collect(),
168            }),
169        };
170        Ok(Response::new(resp))
171    }
172
173    async fn get_new_object_ids(
174        &self,
175        request: Request<GetNewObjectIdsRequest>,
176    ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
177        let object_id_range = self
178            .hummock_manager
179            .get_new_object_ids(request.into_inner().number)
180            .await?;
181        Ok(Response::new(GetNewObjectIdsResponse {
182            status: None,
183            start_id: object_id_range.start_id.inner(),
184            end_id: object_id_range.end_id.inner(),
185        }))
186    }
187
188    async fn trigger_manual_compaction(
189        &self,
190        request: Request<TriggerManualCompactionRequest>,
191    ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
192        let request = request.into_inner();
193        let compaction_group_id = request.compaction_group_id;
194        let mut option = ManualCompactionOption {
195            level: request.level as usize,
196            sst_ids: request.sst_ids.into_iter().map(|id| id.into()).collect(),
197            ..Default::default()
198        };
199
200        // rewrite the key_range
201        match request.key_range {
202            Some(pb_key_range) => {
203                option.key_range = KeyRange {
204                    left: pb_key_range.left.into(),
205                    right: pb_key_range.right.into(),
206                    right_exclusive: pb_key_range.right_exclusive,
207                };
208            }
209
210            None => {
211                option.key_range = KeyRange::default();
212            }
213        }
214
215        // get internal_table_id by metadata_manger
216        if request.table_id < SYS_CATALOG_START_ID as u32 {
217            // We need to make sure to use the correct table_id to filter sst
218            let job_id = JobId::new(request.table_id);
219            if let Ok(table_fragment) = self.metadata_manager.get_job_fragments_by_id(job_id).await
220            {
221                option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
222            }
223        }
224
225        assert!(
226            option
227                .internal_table_id
228                .iter()
229                .all(|table_id| table_id.as_raw_id() < SYS_CATALOG_START_ID as u32),
230        );
231
232        tracing::info!(
233            "Try trigger_manual_compaction compaction_group_id {} option {:?}",
234            compaction_group_id,
235            &option
236        );
237
238        self.hummock_manager
239            .trigger_manual_compaction(compaction_group_id, option)
240            .await?;
241
242        Ok(Response::new(TriggerManualCompactionResponse {
243            status: None,
244        }))
245    }
246
247    async fn trigger_full_gc(
248        &self,
249        request: Request<TriggerFullGcRequest>,
250    ) -> Result<Response<TriggerFullGcResponse>, Status> {
251        let req = request.into_inner();
252        let backup_manager_2 = self.backup_manager.clone();
253        let hummock_manager_2 = self.hummock_manager.clone();
254        tokio::task::spawn(async move {
255            use thiserror_ext::AsReport;
256            let _ = hummock_manager_2
257                .start_full_gc(
258                    Duration::from_secs(req.sst_retention_time_sec),
259                    req.prefix,
260                    Some(backup_manager_2),
261                )
262                .await
263                .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
264        });
265        Ok(Response::new(TriggerFullGcResponse { status: None }))
266    }
267
268    async fn rise_ctl_get_pinned_versions_summary(
269        &self,
270        _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
271    ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
272        let pinned_versions = self.hummock_manager.list_pinned_version().await;
273        let workers = self
274            .hummock_manager
275            .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
276            .await?;
277        Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
278            summary: Some(PinnedVersionsSummary {
279                pinned_versions,
280                workers,
281            }),
282        }))
283    }
284
285    async fn get_assigned_compact_task_num(
286        &self,
287        _request: Request<GetAssignedCompactTaskNumRequest>,
288    ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
289        let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
290        Ok(Response::new(GetAssignedCompactTaskNumResponse {
291            num_tasks: num_tasks as u32,
292        }))
293    }
294
295    async fn rise_ctl_list_compaction_group(
296        &self,
297        _request: Request<RiseCtlListCompactionGroupRequest>,
298    ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
299        let compaction_groups = self.hummock_manager.list_compaction_group().await;
300        Ok(Response::new(RiseCtlListCompactionGroupResponse {
301            status: None,
302            compaction_groups,
303        }))
304    }
305
306    async fn rise_ctl_update_compaction_config(
307        &self,
308        request: Request<RiseCtlUpdateCompactionConfigRequest>,
309    ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
310        let RiseCtlUpdateCompactionConfigRequest {
311            compaction_group_ids,
312            configs,
313        } = request.into_inner();
314        self.hummock_manager
315            .update_compaction_config(
316                compaction_group_ids.as_slice(),
317                configs
318                    .into_iter()
319                    .map(|c| c.mutable_config.unwrap())
320                    .collect::<Vec<_>>()
321                    .as_slice(),
322            )
323            .await?;
324        Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
325            status: None,
326        }))
327    }
328
329    async fn init_metadata_for_replay(
330        &self,
331        request: Request<InitMetadataForReplayRequest>,
332    ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
333        let InitMetadataForReplayRequest {
334            tables,
335            compaction_groups,
336        } = request.into_inner();
337
338        self.hummock_manager
339            .init_metadata_for_version_replay(tables, compaction_groups)?;
340        Ok(Response::new(InitMetadataForReplayResponse {}))
341    }
342
343    async fn pin_version(
344        &self,
345        request: Request<PinVersionRequest>,
346    ) -> Result<Response<PinVersionResponse>, Status> {
347        let req = request.into_inner();
348        let version = self.hummock_manager.pin_version(req.context_id).await?;
349        Ok(Response::new(PinVersionResponse {
350            pinned_version: Some(version.into()),
351        }))
352    }
353
354    async fn split_compaction_group(
355        &self,
356        request: Request<SplitCompactionGroupRequest>,
357    ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
358        let req = request.into_inner();
359        let new_group_id = self
360            .hummock_manager
361            .move_state_tables_to_dedicated_compaction_group(
362                req.group_id,
363                &req.table_ids.into_iter().map_into().collect_vec(),
364                if req.partition_vnode_count > 0 {
365                    Some(req.partition_vnode_count)
366                } else {
367                    None
368                },
369            )
370            .await?
371            .0;
372        Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
373    }
374
375    async fn rise_ctl_pause_version_checkpoint(
376        &self,
377        _request: Request<RiseCtlPauseVersionCheckpointRequest>,
378    ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
379        self.hummock_manager.pause_version_checkpoint();
380        Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
381    }
382
383    async fn rise_ctl_resume_version_checkpoint(
384        &self,
385        _request: Request<RiseCtlResumeVersionCheckpointRequest>,
386    ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
387        self.hummock_manager.resume_version_checkpoint();
388        Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
389    }
390
391    async fn rise_ctl_get_checkpoint_version(
392        &self,
393        _request: Request<RiseCtlGetCheckpointVersionRequest>,
394    ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
395        let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
396        Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
397            checkpoint_version: Some(checkpoint_version.into()),
398        }))
399    }
400
401    async fn rise_ctl_list_compaction_status(
402        &self,
403        _request: Request<RiseCtlListCompactionStatusRequest>,
404    ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
405        let (compaction_statuses, task_assignment) =
406            self.hummock_manager.list_compaction_status().await;
407        let task_progress = self.hummock_manager.compactor_manager.get_progress();
408        Ok(Response::new(RiseCtlListCompactionStatusResponse {
409            compaction_statuses,
410            task_assignment,
411            task_progress,
412        }))
413    }
414
415    async fn subscribe_compaction_event(
416        &self,
417        request: Request<Streaming<SubscribeCompactionEventRequest>>,
418    ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
419        let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
420        let register_req = {
421            let req = request_stream.next().await.ok_or_else(|| {
422                Status::invalid_argument("subscribe_compaction_event request is empty")
423            })??;
424
425            match req.event {
426                Some(RequestEvent::Register(register)) => register,
427                _ => {
428                    return Err(Status::invalid_argument(
429                        "the first message must be `Register`",
430                    ));
431                }
432            }
433        };
434
435        let context_id = register_req.context_id;
436
437        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
438        // remove invalid compactor eventually.
439        if !self.hummock_manager.check_context(context_id).await? {
440            return Err(Status::new(
441                tonic::Code::Internal,
442                format!("invalid hummock context {}", context_id),
443            ));
444        }
445        let compactor_manager = self.hummock_manager.compactor_manager.clone();
446
447        let rx: tokio::sync::mpsc::UnboundedReceiver<
448            Result<SubscribeCompactionEventResponse, crate::MetaError>,
449        > = compactor_manager.add_compactor(context_id);
450
451        // register request stream to hummock
452        self.hummock_manager
453            .add_compactor_stream(context_id, request_stream);
454
455        // Trigger compaction on all compaction groups.
456        for cg_id in self.hummock_manager.compaction_group_ids().await {
457            self.hummock_manager
458                .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
459        }
460
461        Ok(Response::new(RwReceiverStream::new(rx)))
462    }
463
464    async fn subscribe_iceberg_compaction_event(
465        &self,
466        request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
467    ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
468        let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
469            request.into_inner();
470        let register_req = {
471            let req = request_stream.next().await.ok_or_else(|| {
472                Status::invalid_argument("subscribe_compaction_event request is empty")
473            })??;
474
475            match req.event {
476                Some(IcebergRequestEvent::Register(register)) => register,
477                _ => {
478                    return Err(Status::invalid_argument(
479                        "the first message must be `Register`",
480                    ));
481                }
482            }
483        };
484
485        let context_id = register_req.context_id;
486
487        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
488        // remove invalid compactor eventually.
489        if !self.hummock_manager.check_context(context_id).await? {
490            return Err(Status::new(
491                tonic::Code::Internal,
492                format!("invalid hummock context {}", context_id),
493            ));
494        }
495
496        let rx: tokio::sync::mpsc::UnboundedReceiver<
497            Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
498        > = self
499            .iceberg_compaction_manager
500            .iceberg_compactor_manager
501            .add_compactor(context_id);
502
503        self.iceberg_compaction_manager
504            .add_compactor_stream(context_id, request_stream);
505
506        // TODO: Trigger iceberg compaction
507
508        Ok(Response::new(RwReceiverStream::new(rx)))
509    }
510
511    async fn report_compaction_task(
512        &self,
513        _request: Request<ReportCompactionTaskRequest>,
514    ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
515        unreachable!()
516    }
517
518    async fn list_branched_object(
519        &self,
520        _request: Request<ListBranchedObjectRequest>,
521    ) -> Result<Response<ListBranchedObjectResponse>, Status> {
522        let branched_objects = self
523            .hummock_manager
524            .list_branched_objects()
525            .await
526            .into_iter()
527            .flat_map(|(object_id, v)| {
528                v.into_iter()
529                    .map(move |(compaction_group_id, sst_ids)| BranchedObject {
530                        object_id: object_id.inner(),
531                        sst_id: sst_ids.into_iter().map(|id| id.inner()).collect(),
532                        compaction_group_id,
533                    })
534            })
535            .collect();
536        Ok(Response::new(ListBranchedObjectResponse {
537            branched_objects,
538        }))
539    }
540
541    async fn list_active_write_limit(
542        &self,
543        _request: Request<ListActiveWriteLimitRequest>,
544    ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
545        Ok(Response::new(ListActiveWriteLimitResponse {
546            write_limits: self.hummock_manager.write_limits().await,
547        }))
548    }
549
550    async fn list_hummock_meta_config(
551        &self,
552        _request: Request<ListHummockMetaConfigRequest>,
553    ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
554        let opt = &self.hummock_manager.env.opts;
555        let configs = fields_to_kvs!(
556            opt,
557            vacuum_interval_sec,
558            vacuum_spin_interval_ms,
559            hummock_version_checkpoint_interval_sec,
560            min_delta_log_num_for_hummock_version_checkpoint,
561            min_sst_retention_time_sec,
562            full_gc_interval_sec,
563            periodic_compaction_interval_sec,
564            periodic_space_reclaim_compaction_interval_sec,
565            periodic_ttl_reclaim_compaction_interval_sec,
566            periodic_tombstone_reclaim_compaction_interval_sec,
567            periodic_scheduling_compaction_group_split_interval_sec,
568            do_not_config_object_storage_lifecycle,
569            partition_vnode_count,
570            table_high_write_throughput_threshold,
571            table_low_write_throughput_threshold,
572            compaction_task_max_heartbeat_interval_secs,
573            periodic_scheduling_compaction_group_merge_interval_sec
574        );
575        Ok(Response::new(ListHummockMetaConfigResponse { configs }))
576    }
577
578    async fn rise_ctl_rebuild_table_stats(
579        &self,
580        _request: Request<RiseCtlRebuildTableStatsRequest>,
581    ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
582        self.hummock_manager.rebuild_table_stats().await?;
583        Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
584    }
585
586    async fn get_compaction_score(
587        &self,
588        request: Request<GetCompactionScoreRequest>,
589    ) -> Result<Response<GetCompactionScoreResponse>, Status> {
590        let compaction_group_id = request.into_inner().compaction_group_id;
591        let scores = self
592            .hummock_manager
593            .get_compaction_scores(compaction_group_id)
594            .await
595            .into_iter()
596            .map(|s| PickerInfo {
597                score: s.score,
598                select_level: s.select_level as _,
599                target_level: s.target_level as _,
600                picker_type: s.picker_type.to_string(),
601            })
602            .collect();
603        Ok(Response::new(GetCompactionScoreResponse {
604            compaction_group_id,
605            scores,
606        }))
607    }
608
609    async fn list_compact_task_assignment(
610        &self,
611        _request: Request<ListCompactTaskAssignmentRequest>,
612    ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
613        let (_compaction_statuses, task_assignment) =
614            self.hummock_manager.list_compaction_status().await;
615        Ok(Response::new(ListCompactTaskAssignmentResponse {
616            task_assignment,
617        }))
618    }
619
620    async fn list_compact_task_progress(
621        &self,
622        _request: Request<ListCompactTaskProgressRequest>,
623    ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
624        let task_progress = self.hummock_manager.compactor_manager.get_progress();
625
626        Ok(Response::new(ListCompactTaskProgressResponse {
627            task_progress,
628        }))
629    }
630
631    async fn cancel_compact_task(
632        &self,
633        request: Request<CancelCompactTaskRequest>,
634    ) -> Result<Response<CancelCompactTaskResponse>, Status> {
635        let request = request.into_inner();
636        let ret = self
637            .hummock_manager
638            .cancel_compact_task(
639                request.task_id,
640                PbTaskStatus::try_from(request.task_status).unwrap(),
641            )
642            .await?;
643
644        let response = Response::new(CancelCompactTaskResponse { ret });
645        return Ok(response);
646    }
647
648    async fn get_version_by_epoch(
649        &self,
650        request: Request<GetVersionByEpochRequest>,
651    ) -> Result<Response<GetVersionByEpochResponse>, Status> {
652        let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
653        let version = self
654            .hummock_manager
655            .epoch_to_version(epoch, table_id.into())
656            .await?;
657        Ok(Response::new(GetVersionByEpochResponse {
658            version: Some(version.to_protobuf()),
659        }))
660    }
661
662    async fn merge_compaction_group(
663        &self,
664        request: Request<MergeCompactionGroupRequest>,
665    ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
666        let req = request.into_inner();
667        self.hummock_manager
668            .merge_compaction_group(req.left_group_id, req.right_group_id)
669            .await?;
670        Ok(Response::new(MergeCompactionGroupResponse {}))
671    }
672}
673
674#[cfg(test)]
675mod tests {
676    use std::collections::HashMap;
677    #[test]
678    fn test_fields_to_kvs() {
679        struct S {
680            foo: u64,
681            bar: String,
682        }
683        let s = S {
684            foo: 15,
685            bar: "foobar".to_owned(),
686        };
687        let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
688        assert_eq!(kvs.len(), 2);
689        assert_eq!(kvs.get("foo").unwrap(), "15");
690        assert_eq!(kvs.get("bar").unwrap(), "foobar");
691    }
692}