risingwave_meta_service/
hummock_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::SYS_CATALOG_START_ID;
22use risingwave_hummock_sdk::key_range::KeyRange;
23use risingwave_hummock_sdk::version::HummockVersionDelta;
24use risingwave_meta::backup_restore::BackupManagerRef;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
27use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
28use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
29use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
30use risingwave_pb::hummock::*;
31use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
32use risingwave_pb::iceberg_compaction::{
33    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
34};
35use tonic::{Request, Response, Status, Streaming};
36
37use crate::RwReceiverStream;
38use crate::hummock::compaction::selector::ManualCompactionOption;
39use crate::hummock::{HummockManagerRef, ManualCompactionTriggerResult};
40
41pub struct HummockServiceImpl {
42    hummock_manager: HummockManagerRef,
43    metadata_manager: MetadataManager,
44    backup_manager: BackupManagerRef,
45    iceberg_compaction_manager: IcebergCompactionManagerRef,
46}
47
48impl HummockServiceImpl {
49    pub fn new(
50        hummock_manager: HummockManagerRef,
51        metadata_manager: MetadataManager,
52        backup_manager: BackupManagerRef,
53        iceberg_compaction_manager: IcebergCompactionManagerRef,
54    ) -> Self {
55        HummockServiceImpl {
56            hummock_manager,
57            metadata_manager,
58            backup_manager,
59            iceberg_compaction_manager,
60        }
61    }
62}
63
64macro_rules! fields_to_kvs {
65    ($struct:ident, $($field:ident),*) => {
66        {
67            let mut kvs = HashMap::default();
68            $(
69                kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
70            )*
71            kvs
72        }
73    }
74}
75
76#[async_trait::async_trait]
77impl HummockManagerService for HummockServiceImpl {
78    type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
79    type SubscribeIcebergCompactionEventStream =
80        RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
81
82    async fn unpin_version_before(
83        &self,
84        request: Request<UnpinVersionBeforeRequest>,
85    ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
86        let req = request.into_inner();
87        self.hummock_manager
88            .unpin_version_before(req.context_id, req.unpin_version_before)
89            .await?;
90        Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
91    }
92
93    async fn get_current_version(
94        &self,
95        _request: Request<GetCurrentVersionRequest>,
96    ) -> Result<Response<GetCurrentVersionResponse>, Status> {
97        let current_version = self
98            .hummock_manager
99            .on_current_version(|version| version.into())
100            .await;
101        Ok(Response::new(GetCurrentVersionResponse {
102            status: None,
103            current_version: Some(current_version),
104        }))
105    }
106
107    async fn replay_version_delta(
108        &self,
109        request: Request<ReplayVersionDeltaRequest>,
110    ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
111        let req = request.into_inner();
112        let (version, compaction_groups) = self
113            .hummock_manager
114            .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
115                &req.version_delta.unwrap(),
116            ))
117            .await?;
118        Ok(Response::new(ReplayVersionDeltaResponse {
119            version: Some(version.into()),
120            modified_compaction_groups: compaction_groups,
121        }))
122    }
123
124    async fn trigger_compaction_deterministic(
125        &self,
126        request: Request<TriggerCompactionDeterministicRequest>,
127    ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
128        let req = request.into_inner();
129        self.hummock_manager
130            .trigger_compaction_deterministic(req.version_id, req.compaction_groups)
131            .await?;
132        Ok(Response::new(TriggerCompactionDeterministicResponse {}))
133    }
134
135    async fn disable_commit_epoch(
136        &self,
137        _request: Request<DisableCommitEpochRequest>,
138    ) -> Result<Response<DisableCommitEpochResponse>, Status> {
139        let version = self.hummock_manager.disable_commit_epoch().await;
140        Ok(Response::new(DisableCommitEpochResponse {
141            current_version: Some(version.into()),
142        }))
143    }
144
145    async fn list_version_deltas(
146        &self,
147        request: Request<ListVersionDeltasRequest>,
148    ) -> Result<Response<ListVersionDeltasResponse>, Status> {
149        let req = request.into_inner();
150        let version_deltas = self
151            .hummock_manager
152            .list_version_deltas(req.start_id, req.num_limit)
153            .await?;
154        let resp = ListVersionDeltasResponse {
155            version_deltas: Some(PbHummockVersionDeltas {
156                version_deltas: version_deltas
157                    .into_iter()
158                    .map(HummockVersionDelta::into)
159                    .collect(),
160            }),
161        };
162        Ok(Response::new(resp))
163    }
164
165    async fn get_new_object_ids(
166        &self,
167        request: Request<GetNewObjectIdsRequest>,
168    ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
169        let object_id_range = self
170            .hummock_manager
171            .get_new_object_ids(request.into_inner().number)
172            .await?;
173        Ok(Response::new(GetNewObjectIdsResponse {
174            status: None,
175            start_id: object_id_range.start_id,
176            end_id: object_id_range.end_id,
177        }))
178    }
179
180    async fn trigger_manual_compaction(
181        &self,
182        request: Request<TriggerManualCompactionRequest>,
183    ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
184        let request = request.into_inner();
185        let compaction_group_id = request.compaction_group_id;
186        let mut option = ManualCompactionOption {
187            level: request.level as usize,
188            sst_ids: request.sst_ids,
189            exclusive: request.exclusive.unwrap_or(false),
190            ..Default::default()
191        };
192
193        // rewrite the key_range
194        match request.key_range {
195            Some(pb_key_range) => {
196                option.key_range = KeyRange {
197                    left: pb_key_range.left.into(),
198                    right: pb_key_range.right.into(),
199                    right_exclusive: pb_key_range.right_exclusive,
200                };
201            }
202
203            None => {
204                option.key_range = KeyRange::default();
205            }
206        }
207
208        // get internal_table_id by metadata_manger
209        if request.table_id.as_raw_id() < SYS_CATALOG_START_ID as u32 {
210            // We need to make sure to use the correct table_id to filter sst
211            let job_id = request.table_id;
212            if let Ok(table_fragment) = self.metadata_manager.get_job_fragments_by_id(job_id).await
213            {
214                option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
215            }
216        }
217
218        assert!(
219            option
220                .internal_table_id
221                .iter()
222                .all(|table_id| table_id.as_raw_id() < SYS_CATALOG_START_ID as u32),
223        );
224
225        tracing::info!(
226            "Try trigger_manual_compaction compaction_group_id {} option {:?}",
227            compaction_group_id,
228            &option
229        );
230
231        let should_retry = match self
232            .hummock_manager
233            .trigger_manual_compaction(compaction_group_id, option)
234            .await?
235        {
236            ManualCompactionTriggerResult::Submitted => false,
237            ManualCompactionTriggerResult::Retry => true,
238        };
239
240        Ok(Response::new(TriggerManualCompactionResponse {
241            status: None,
242            should_retry: Some(should_retry),
243        }))
244    }
245
246    async fn trigger_full_gc(
247        &self,
248        request: Request<TriggerFullGcRequest>,
249    ) -> Result<Response<TriggerFullGcResponse>, Status> {
250        let req = request.into_inner();
251        let backup_manager_2 = self.backup_manager.clone();
252        let hummock_manager_2 = self.hummock_manager.clone();
253        tokio::task::spawn(async move {
254            use thiserror_ext::AsReport;
255            let _ = hummock_manager_2
256                .start_full_gc(
257                    Duration::from_secs(req.sst_retention_time_sec),
258                    req.prefix,
259                    Some(backup_manager_2),
260                )
261                .await
262                .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
263        });
264        Ok(Response::new(TriggerFullGcResponse { status: None }))
265    }
266
267    async fn rise_ctl_get_pinned_versions_summary(
268        &self,
269        _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
270    ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
271        let pinned_versions = self.hummock_manager.list_pinned_version().await;
272        let workers = self
273            .hummock_manager
274            .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
275            .await?;
276        Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
277            summary: Some(PinnedVersionsSummary {
278                pinned_versions,
279                workers,
280            }),
281        }))
282    }
283
284    async fn get_assigned_compact_task_num(
285        &self,
286        _request: Request<GetAssignedCompactTaskNumRequest>,
287    ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
288        let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
289        Ok(Response::new(GetAssignedCompactTaskNumResponse {
290            num_tasks: num_tasks as u32,
291        }))
292    }
293
294    async fn rise_ctl_list_compaction_group(
295        &self,
296        _request: Request<RiseCtlListCompactionGroupRequest>,
297    ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
298        let compaction_groups = self.hummock_manager.list_compaction_group().await;
299        Ok(Response::new(RiseCtlListCompactionGroupResponse {
300            status: None,
301            compaction_groups,
302        }))
303    }
304
305    async fn rise_ctl_update_compaction_config(
306        &self,
307        request: Request<RiseCtlUpdateCompactionConfigRequest>,
308    ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
309        let RiseCtlUpdateCompactionConfigRequest {
310            compaction_group_ids,
311            configs,
312        } = request.into_inner();
313        self.hummock_manager
314            .update_compaction_config(
315                compaction_group_ids.as_slice(),
316                configs
317                    .into_iter()
318                    .map(|c| c.mutable_config.unwrap())
319                    .collect::<Vec<_>>()
320                    .as_slice(),
321            )
322            .await?;
323        Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
324            status: None,
325        }))
326    }
327
328    async fn init_metadata_for_replay(
329        &self,
330        request: Request<InitMetadataForReplayRequest>,
331    ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
332        let InitMetadataForReplayRequest {
333            tables,
334            compaction_groups,
335        } = request.into_inner();
336
337        self.hummock_manager
338            .init_metadata_for_version_replay(tables, compaction_groups)?;
339        Ok(Response::new(InitMetadataForReplayResponse {}))
340    }
341
342    async fn pin_version(
343        &self,
344        request: Request<PinVersionRequest>,
345    ) -> Result<Response<PinVersionResponse>, Status> {
346        let req = request.into_inner();
347        let version = self.hummock_manager.pin_version(req.context_id).await?;
348        Ok(Response::new(PinVersionResponse {
349            pinned_version: Some(version.into()),
350        }))
351    }
352
353    async fn split_compaction_group(
354        &self,
355        request: Request<SplitCompactionGroupRequest>,
356    ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
357        let req = request.into_inner();
358        let new_group_id = self
359            .hummock_manager
360            .move_state_tables_to_dedicated_compaction_group(
361                req.group_id,
362                &req.table_ids,
363                if req.partition_vnode_count > 0 {
364                    Some(req.partition_vnode_count)
365                } else {
366                    None
367                },
368            )
369            .await?
370            .0;
371        Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
372    }
373
374    async fn rise_ctl_pause_version_checkpoint(
375        &self,
376        _request: Request<RiseCtlPauseVersionCheckpointRequest>,
377    ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
378        self.hummock_manager.pause_version_checkpoint();
379        Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
380    }
381
382    async fn rise_ctl_resume_version_checkpoint(
383        &self,
384        _request: Request<RiseCtlResumeVersionCheckpointRequest>,
385    ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
386        self.hummock_manager.resume_version_checkpoint();
387        Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
388    }
389
390    async fn rise_ctl_get_checkpoint_version(
391        &self,
392        _request: Request<RiseCtlGetCheckpointVersionRequest>,
393    ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
394        let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
395        Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
396            checkpoint_version: Some(checkpoint_version.into()),
397        }))
398    }
399
400    async fn rise_ctl_list_compaction_status(
401        &self,
402        _request: Request<RiseCtlListCompactionStatusRequest>,
403    ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
404        let (compaction_statuses, task_assignment) =
405            self.hummock_manager.list_compaction_status().await;
406        let task_progress = self.hummock_manager.compactor_manager.get_progress();
407        Ok(Response::new(RiseCtlListCompactionStatusResponse {
408            compaction_statuses,
409            task_assignment,
410            task_progress,
411        }))
412    }
413
414    async fn subscribe_compaction_event(
415        &self,
416        request: Request<Streaming<SubscribeCompactionEventRequest>>,
417    ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
418        let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
419        let register_req = {
420            let req = request_stream.next().await.ok_or_else(|| {
421                Status::invalid_argument("subscribe_compaction_event request is empty")
422            })??;
423
424            match req.event {
425                Some(RequestEvent::Register(register)) => register,
426                _ => {
427                    return Err(Status::invalid_argument(
428                        "the first message must be `Register`",
429                    ));
430                }
431            }
432        };
433
434        let context_id = register_req.context_id;
435
436        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
437        // remove invalid compactor eventually.
438        if !self.hummock_manager.check_context(context_id).await? {
439            return Err(Status::new(
440                tonic::Code::Internal,
441                format!("invalid hummock context {}", context_id),
442            ));
443        }
444        let compactor_manager = self.hummock_manager.compactor_manager.clone();
445
446        let rx: tokio::sync::mpsc::UnboundedReceiver<
447            Result<SubscribeCompactionEventResponse, crate::MetaError>,
448        > = compactor_manager.add_compactor(context_id);
449
450        // register request stream to hummock
451        self.hummock_manager
452            .add_compactor_stream(context_id, request_stream);
453
454        // Trigger compaction on all compaction groups.
455        for cg_id in self.hummock_manager.compaction_group_ids().await {
456            self.hummock_manager
457                .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
458        }
459
460        Ok(Response::new(RwReceiverStream::new(rx)))
461    }
462
463    async fn subscribe_iceberg_compaction_event(
464        &self,
465        request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
466    ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
467        let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
468            request.into_inner();
469        let register_req = {
470            let req = request_stream.next().await.ok_or_else(|| {
471                Status::invalid_argument("subscribe_compaction_event request is empty")
472            })??;
473
474            match req.event {
475                Some(IcebergRequestEvent::Register(register)) => register,
476                _ => {
477                    return Err(Status::invalid_argument(
478                        "the first message must be `Register`",
479                    ));
480                }
481            }
482        };
483
484        let context_id = register_req.context_id;
485
486        // check_context and add_compactor as a whole is not atomic, but compactor_manager will
487        // remove invalid compactor eventually.
488        if !self.hummock_manager.check_context(context_id).await? {
489            return Err(Status::new(
490                tonic::Code::Internal,
491                format!("invalid hummock context {}", context_id),
492            ));
493        }
494
495        let rx: tokio::sync::mpsc::UnboundedReceiver<
496            Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
497        > = self
498            .iceberg_compaction_manager
499            .iceberg_compactor_manager
500            .add_compactor(context_id);
501
502        self.iceberg_compaction_manager
503            .add_compactor_stream(context_id, request_stream);
504
505        // TODO: Trigger iceberg compaction
506
507        Ok(Response::new(RwReceiverStream::new(rx)))
508    }
509
510    async fn report_compaction_task(
511        &self,
512        _request: Request<ReportCompactionTaskRequest>,
513    ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
514        unreachable!()
515    }
516
517    async fn list_branched_object(
518        &self,
519        _request: Request<ListBranchedObjectRequest>,
520    ) -> Result<Response<ListBranchedObjectResponse>, Status> {
521        let branched_objects = self
522            .hummock_manager
523            .list_branched_objects()
524            .await
525            .into_iter()
526            .flat_map(|(object_id, v)| {
527                v.into_iter()
528                    .map(move |(compaction_group_id, sst_ids)| BranchedObject {
529                        object_id,
530                        sst_id: sst_ids,
531                        compaction_group_id,
532                    })
533            })
534            .collect();
535        Ok(Response::new(ListBranchedObjectResponse {
536            branched_objects,
537        }))
538    }
539
540    async fn list_active_write_limit(
541        &self,
542        _request: Request<ListActiveWriteLimitRequest>,
543    ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
544        Ok(Response::new(ListActiveWriteLimitResponse {
545            write_limits: self.hummock_manager.write_limits().await,
546        }))
547    }
548
549    async fn list_hummock_meta_config(
550        &self,
551        _request: Request<ListHummockMetaConfigRequest>,
552    ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
553        let opt = &self.hummock_manager.env.opts;
554        let configs = fields_to_kvs!(
555            opt,
556            vacuum_interval_sec,
557            vacuum_spin_interval_ms,
558            hummock_version_checkpoint_interval_sec,
559            min_delta_log_num_for_hummock_version_checkpoint,
560            min_sst_retention_time_sec,
561            full_gc_interval_sec,
562            periodic_compaction_interval_sec,
563            periodic_space_reclaim_compaction_interval_sec,
564            periodic_ttl_reclaim_compaction_interval_sec,
565            periodic_tombstone_reclaim_compaction_interval_sec,
566            periodic_scheduling_compaction_group_split_interval_sec,
567            do_not_config_object_storage_lifecycle,
568            partition_vnode_count,
569            table_high_write_throughput_threshold,
570            table_low_write_throughput_threshold,
571            compaction_task_max_heartbeat_interval_secs,
572            periodic_scheduling_compaction_group_merge_interval_sec
573        );
574        Ok(Response::new(ListHummockMetaConfigResponse { configs }))
575    }
576
577    async fn rise_ctl_rebuild_table_stats(
578        &self,
579        _request: Request<RiseCtlRebuildTableStatsRequest>,
580    ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
581        self.hummock_manager.rebuild_table_stats().await?;
582        Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
583    }
584
585    async fn get_compaction_score(
586        &self,
587        request: Request<GetCompactionScoreRequest>,
588    ) -> Result<Response<GetCompactionScoreResponse>, Status> {
589        let compaction_group_id = request.into_inner().compaction_group_id;
590        let scores = self
591            .hummock_manager
592            .get_compaction_scores(compaction_group_id)
593            .await
594            .into_iter()
595            .map(|s| PickerInfo {
596                score: s.score,
597                select_level: s.select_level as _,
598                target_level: s.target_level as _,
599                picker_type: s.picker_type.to_string(),
600            })
601            .collect();
602        Ok(Response::new(GetCompactionScoreResponse {
603            compaction_group_id,
604            scores,
605        }))
606    }
607
608    async fn list_compact_task_assignment(
609        &self,
610        _request: Request<ListCompactTaskAssignmentRequest>,
611    ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
612        let (_compaction_statuses, task_assignment) =
613            self.hummock_manager.list_compaction_status().await;
614        Ok(Response::new(ListCompactTaskAssignmentResponse {
615            task_assignment,
616        }))
617    }
618
619    async fn list_compact_task_progress(
620        &self,
621        _request: Request<ListCompactTaskProgressRequest>,
622    ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
623        let task_progress = self.hummock_manager.compactor_manager.get_progress();
624
625        Ok(Response::new(ListCompactTaskProgressResponse {
626            task_progress,
627        }))
628    }
629
630    async fn cancel_compact_task(
631        &self,
632        request: Request<CancelCompactTaskRequest>,
633    ) -> Result<Response<CancelCompactTaskResponse>, Status> {
634        let request = request.into_inner();
635        let ret = self
636            .hummock_manager
637            .cancel_compact_task(
638                request.task_id,
639                PbTaskStatus::try_from(request.task_status).unwrap(),
640            )
641            .await?;
642
643        let response = Response::new(CancelCompactTaskResponse { ret });
644        return Ok(response);
645    }
646
647    async fn get_version_by_epoch(
648        &self,
649        request: Request<GetVersionByEpochRequest>,
650    ) -> Result<Response<GetVersionByEpochResponse>, Status> {
651        let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
652        let version = self
653            .hummock_manager
654            .epoch_to_version(epoch, table_id)
655            .await?;
656        Ok(Response::new(GetVersionByEpochResponse {
657            version: Some(version.to_protobuf()),
658        }))
659    }
660
661    async fn merge_compaction_group(
662        &self,
663        request: Request<MergeCompactionGroupRequest>,
664    ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
665        let req = request.into_inner();
666        self.hummock_manager
667            .merge_compaction_group(req.left_group_id, req.right_group_id)
668            .await?;
669        Ok(Response::new(MergeCompactionGroupResponse {}))
670    }
671}
672
673#[cfg(test)]
674mod tests {
675    use std::collections::HashMap;
676    #[test]
677    fn test_fields_to_kvs() {
678        struct S {
679            foo: u64,
680            bar: String,
681        }
682        let s = S {
683            foo: 15,
684            bar: "foobar".to_owned(),
685        };
686        let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
687        assert_eq!(kvs.len(), 2);
688        assert_eq!(kvs.get("foo").unwrap(), "15");
689        assert_eq!(kvs.get("bar").unwrap(), "foobar");
690    }
691}