1use std::collections::{HashMap, HashSet};
16use std::time::Duration;
17
18use compact_task::PbTaskStatus;
19use futures::StreamExt;
20use itertools::Itertools;
21use risingwave_common::catalog::SYS_CATALOG_START_ID;
22use risingwave_hummock_sdk::key_range::KeyRange;
23use risingwave_hummock_sdk::version::HummockVersionDelta;
24use risingwave_meta::backup_restore::BackupManagerRef;
25use risingwave_meta::manager::MetadataManager;
26use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManagerRef;
27use risingwave_pb::hummock::get_compaction_score_response::PickerInfo;
28use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService;
29use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent;
30use risingwave_pb::hummock::*;
31use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::Event as IcebergRequestEvent;
32use risingwave_pb::iceberg_compaction::{
33 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
34};
35use tonic::{Request, Response, Status, Streaming};
36
37use crate::RwReceiverStream;
38use crate::hummock::compaction::selector::ManualCompactionOption;
39use crate::hummock::{HummockManagerRef, ManualCompactionTriggerResult};
40
41pub struct HummockServiceImpl {
42 hummock_manager: HummockManagerRef,
43 metadata_manager: MetadataManager,
44 backup_manager: BackupManagerRef,
45 iceberg_compaction_manager: IcebergCompactionManagerRef,
46}
47
48impl HummockServiceImpl {
49 pub fn new(
50 hummock_manager: HummockManagerRef,
51 metadata_manager: MetadataManager,
52 backup_manager: BackupManagerRef,
53 iceberg_compaction_manager: IcebergCompactionManagerRef,
54 ) -> Self {
55 HummockServiceImpl {
56 hummock_manager,
57 metadata_manager,
58 backup_manager,
59 iceberg_compaction_manager,
60 }
61 }
62}
63
64macro_rules! fields_to_kvs {
65 ($struct:ident, $($field:ident),*) => {
66 {
67 let mut kvs = HashMap::default();
68 $(
69 kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
70 )*
71 kvs
72 }
73 }
74}
75
76#[async_trait::async_trait]
77impl HummockManagerService for HummockServiceImpl {
78 type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
79 type SubscribeIcebergCompactionEventStream =
80 RwReceiverStream<SubscribeIcebergCompactionEventResponse>;
81
82 async fn unpin_version_before(
83 &self,
84 request: Request<UnpinVersionBeforeRequest>,
85 ) -> Result<Response<UnpinVersionBeforeResponse>, Status> {
86 let req = request.into_inner();
87 self.hummock_manager
88 .unpin_version_before(req.context_id, req.unpin_version_before)
89 .await?;
90 Ok(Response::new(UnpinVersionBeforeResponse { status: None }))
91 }
92
93 async fn get_current_version(
94 &self,
95 _request: Request<GetCurrentVersionRequest>,
96 ) -> Result<Response<GetCurrentVersionResponse>, Status> {
97 let current_version = self
98 .hummock_manager
99 .on_current_version(|version| version.into())
100 .await;
101 Ok(Response::new(GetCurrentVersionResponse {
102 status: None,
103 current_version: Some(current_version),
104 }))
105 }
106
107 async fn replay_version_delta(
108 &self,
109 request: Request<ReplayVersionDeltaRequest>,
110 ) -> Result<Response<ReplayVersionDeltaResponse>, Status> {
111 let req = request.into_inner();
112 let (version, compaction_groups) = self
113 .hummock_manager
114 .replay_version_delta(HummockVersionDelta::from_rpc_protobuf(
115 &req.version_delta.unwrap(),
116 ))
117 .await?;
118 Ok(Response::new(ReplayVersionDeltaResponse {
119 version: Some(version.into()),
120 modified_compaction_groups: compaction_groups,
121 }))
122 }
123
124 async fn trigger_compaction_deterministic(
125 &self,
126 request: Request<TriggerCompactionDeterministicRequest>,
127 ) -> Result<Response<TriggerCompactionDeterministicResponse>, Status> {
128 let req = request.into_inner();
129 self.hummock_manager
130 .trigger_compaction_deterministic(req.version_id, req.compaction_groups)
131 .await?;
132 Ok(Response::new(TriggerCompactionDeterministicResponse {}))
133 }
134
135 async fn disable_commit_epoch(
136 &self,
137 _request: Request<DisableCommitEpochRequest>,
138 ) -> Result<Response<DisableCommitEpochResponse>, Status> {
139 let version = self.hummock_manager.disable_commit_epoch().await;
140 Ok(Response::new(DisableCommitEpochResponse {
141 current_version: Some(version.into()),
142 }))
143 }
144
145 async fn list_version_deltas(
146 &self,
147 request: Request<ListVersionDeltasRequest>,
148 ) -> Result<Response<ListVersionDeltasResponse>, Status> {
149 let req = request.into_inner();
150 let version_deltas = self
151 .hummock_manager
152 .list_version_deltas(req.start_id, req.num_limit)
153 .await?;
154 let resp = ListVersionDeltasResponse {
155 version_deltas: Some(PbHummockVersionDeltas {
156 version_deltas: version_deltas
157 .into_iter()
158 .map(HummockVersionDelta::into)
159 .collect(),
160 }),
161 };
162 Ok(Response::new(resp))
163 }
164
165 async fn get_new_object_ids(
166 &self,
167 request: Request<GetNewObjectIdsRequest>,
168 ) -> Result<Response<GetNewObjectIdsResponse>, Status> {
169 let object_id_range = self
170 .hummock_manager
171 .get_new_object_ids(request.into_inner().number)
172 .await?;
173 Ok(Response::new(GetNewObjectIdsResponse {
174 status: None,
175 start_id: object_id_range.start_id,
176 end_id: object_id_range.end_id,
177 }))
178 }
179
180 async fn trigger_manual_compaction(
181 &self,
182 request: Request<TriggerManualCompactionRequest>,
183 ) -> Result<Response<TriggerManualCompactionResponse>, Status> {
184 let request = request.into_inner();
185 let compaction_group_id = request.compaction_group_id;
186 let mut option = ManualCompactionOption {
187 level: request.level as usize,
188 target_level: request
189 .target_level
190 .map(|target_level| target_level as usize),
191 sst_ids: request.sst_ids,
192 exclusive: request.exclusive.unwrap_or(false),
193 ..Default::default()
194 };
195
196 match request.key_range {
198 Some(pb_key_range) => {
199 option.key_range = KeyRange {
200 left: pb_key_range.left.into(),
201 right: pb_key_range.right.into(),
202 right_exclusive: pb_key_range.right_exclusive,
203 };
204 }
205
206 None => {
207 option.key_range = KeyRange::default();
208 }
209 }
210
211 if request.table_id.as_raw_id() < SYS_CATALOG_START_ID as u32 {
213 let job_id = request.table_id;
215 if let Ok(table_fragment) = self.metadata_manager.get_job_fragments_by_id(job_id).await
216 {
217 option.internal_table_id = HashSet::from_iter(table_fragment.all_table_ids());
218 }
219 }
220
221 assert!(
222 option
223 .internal_table_id
224 .iter()
225 .all(|table_id| table_id.as_raw_id() < SYS_CATALOG_START_ID as u32),
226 );
227
228 tracing::info!(
229 "Try trigger_manual_compaction compaction_group_id {} option {:?}",
230 compaction_group_id,
231 &option
232 );
233
234 let should_retry = match self
235 .hummock_manager
236 .trigger_manual_compaction(compaction_group_id, option)
237 .await?
238 {
239 ManualCompactionTriggerResult::Submitted => false,
240 ManualCompactionTriggerResult::Retry => true,
241 };
242
243 Ok(Response::new(TriggerManualCompactionResponse {
244 status: None,
245 should_retry: Some(should_retry),
246 }))
247 }
248
249 async fn trigger_full_gc(
250 &self,
251 request: Request<TriggerFullGcRequest>,
252 ) -> Result<Response<TriggerFullGcResponse>, Status> {
253 let req = request.into_inner();
254 let backup_manager_2 = self.backup_manager.clone();
255 let hummock_manager_2 = self.hummock_manager.clone();
256 tokio::task::spawn(async move {
257 use thiserror_ext::AsReport;
258 let _ = hummock_manager_2
259 .start_full_gc(
260 Duration::from_secs(req.sst_retention_time_sec),
261 req.prefix,
262 Some(backup_manager_2),
263 )
264 .await
265 .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC."));
266 });
267 Ok(Response::new(TriggerFullGcResponse { status: None }))
268 }
269
270 async fn rise_ctl_get_pinned_versions_summary(
271 &self,
272 _request: Request<RiseCtlGetPinnedVersionsSummaryRequest>,
273 ) -> Result<Response<RiseCtlGetPinnedVersionsSummaryResponse>, Status> {
274 let pinned_versions = self.hummock_manager.list_pinned_version().await;
275 let workers = self
276 .hummock_manager
277 .list_workers(&pinned_versions.iter().map(|v| v.context_id).collect_vec())
278 .await?;
279 Ok(Response::new(RiseCtlGetPinnedVersionsSummaryResponse {
280 summary: Some(PinnedVersionsSummary {
281 pinned_versions,
282 workers,
283 }),
284 }))
285 }
286
287 async fn get_assigned_compact_task_num(
288 &self,
289 _request: Request<GetAssignedCompactTaskNumRequest>,
290 ) -> Result<Response<GetAssignedCompactTaskNumResponse>, Status> {
291 let num_tasks = self.hummock_manager.get_assigned_compact_task_num().await;
292 Ok(Response::new(GetAssignedCompactTaskNumResponse {
293 num_tasks: num_tasks as u32,
294 }))
295 }
296
297 async fn rise_ctl_list_compaction_group(
298 &self,
299 _request: Request<RiseCtlListCompactionGroupRequest>,
300 ) -> Result<Response<RiseCtlListCompactionGroupResponse>, Status> {
301 let compaction_groups = self.hummock_manager.list_compaction_group().await;
302 Ok(Response::new(RiseCtlListCompactionGroupResponse {
303 status: None,
304 compaction_groups,
305 }))
306 }
307
308 async fn rise_ctl_update_compaction_config(
309 &self,
310 request: Request<RiseCtlUpdateCompactionConfigRequest>,
311 ) -> Result<Response<RiseCtlUpdateCompactionConfigResponse>, Status> {
312 let RiseCtlUpdateCompactionConfigRequest {
313 compaction_group_ids,
314 configs,
315 } = request.into_inner();
316 self.hummock_manager
317 .update_compaction_config(
318 compaction_group_ids.as_slice(),
319 configs
320 .into_iter()
321 .map(|c| c.mutable_config.unwrap())
322 .collect::<Vec<_>>()
323 .as_slice(),
324 )
325 .await?;
326 Ok(Response::new(RiseCtlUpdateCompactionConfigResponse {
327 status: None,
328 }))
329 }
330
331 async fn init_metadata_for_replay(
332 &self,
333 request: Request<InitMetadataForReplayRequest>,
334 ) -> Result<Response<InitMetadataForReplayResponse>, Status> {
335 let InitMetadataForReplayRequest {
336 tables,
337 compaction_groups,
338 } = request.into_inner();
339
340 self.hummock_manager
341 .init_metadata_for_version_replay(tables, compaction_groups)?;
342 Ok(Response::new(InitMetadataForReplayResponse {}))
343 }
344
345 async fn pin_version(
346 &self,
347 request: Request<PinVersionRequest>,
348 ) -> Result<Response<PinVersionResponse>, Status> {
349 let req = request.into_inner();
350 let version = self.hummock_manager.pin_version(req.context_id).await?;
351 Ok(Response::new(PinVersionResponse {
352 pinned_version: Some(version.into()),
353 }))
354 }
355
356 async fn split_compaction_group(
357 &self,
358 request: Request<SplitCompactionGroupRequest>,
359 ) -> Result<Response<SplitCompactionGroupResponse>, Status> {
360 let req = request.into_inner();
361 let new_group_id = self
362 .hummock_manager
363 .move_state_tables_to_dedicated_compaction_group(
364 req.group_id,
365 &req.table_ids,
366 if req.partition_vnode_count > 0 {
367 Some(req.partition_vnode_count)
368 } else {
369 None
370 },
371 )
372 .await?
373 .0;
374 Ok(Response::new(SplitCompactionGroupResponse { new_group_id }))
375 }
376
377 async fn rise_ctl_pause_version_checkpoint(
378 &self,
379 _request: Request<RiseCtlPauseVersionCheckpointRequest>,
380 ) -> Result<Response<RiseCtlPauseVersionCheckpointResponse>, Status> {
381 self.hummock_manager.pause_version_checkpoint();
382 Ok(Response::new(RiseCtlPauseVersionCheckpointResponse {}))
383 }
384
385 async fn rise_ctl_resume_version_checkpoint(
386 &self,
387 _request: Request<RiseCtlResumeVersionCheckpointRequest>,
388 ) -> Result<Response<RiseCtlResumeVersionCheckpointResponse>, Status> {
389 self.hummock_manager.resume_version_checkpoint();
390 Ok(Response::new(RiseCtlResumeVersionCheckpointResponse {}))
391 }
392
393 async fn rise_ctl_get_checkpoint_version(
394 &self,
395 _request: Request<RiseCtlGetCheckpointVersionRequest>,
396 ) -> Result<Response<RiseCtlGetCheckpointVersionResponse>, Status> {
397 let checkpoint_version = self.hummock_manager.get_checkpoint_version().await;
398 Ok(Response::new(RiseCtlGetCheckpointVersionResponse {
399 checkpoint_version: Some(checkpoint_version.into()),
400 }))
401 }
402
403 async fn rise_ctl_list_compaction_status(
404 &self,
405 _request: Request<RiseCtlListCompactionStatusRequest>,
406 ) -> Result<Response<RiseCtlListCompactionStatusResponse>, Status> {
407 let (compaction_statuses, task_assignment) =
408 self.hummock_manager.list_compaction_status().await;
409 let task_progress = self.hummock_manager.compactor_manager.get_progress();
410 Ok(Response::new(RiseCtlListCompactionStatusResponse {
411 compaction_statuses,
412 task_assignment,
413 task_progress,
414 }))
415 }
416
417 async fn subscribe_compaction_event(
418 &self,
419 request: Request<Streaming<SubscribeCompactionEventRequest>>,
420 ) -> Result<Response<Self::SubscribeCompactionEventStream>, tonic::Status> {
421 let mut request_stream: Streaming<SubscribeCompactionEventRequest> = request.into_inner();
422 let register_req = {
423 let req = request_stream.next().await.ok_or_else(|| {
424 Status::invalid_argument("subscribe_compaction_event request is empty")
425 })??;
426
427 match req.event {
428 Some(RequestEvent::Register(register)) => register,
429 _ => {
430 return Err(Status::invalid_argument(
431 "the first message must be `Register`",
432 ));
433 }
434 }
435 };
436
437 let context_id = register_req.context_id;
438
439 if !self.hummock_manager.check_context(context_id).await? {
442 return Err(Status::new(
443 tonic::Code::Internal,
444 format!("invalid hummock context {}", context_id),
445 ));
446 }
447 let compactor_manager = self.hummock_manager.compactor_manager.clone();
448
449 let rx: tokio::sync::mpsc::UnboundedReceiver<
450 Result<SubscribeCompactionEventResponse, crate::MetaError>,
451 > = compactor_manager.add_compactor(context_id);
452
453 self.hummock_manager
455 .add_compactor_stream(context_id, request_stream);
456
457 for cg_id in self.hummock_manager.compaction_group_ids().await {
459 self.hummock_manager
460 .try_send_compaction_request(cg_id, compact_task::TaskType::Dynamic);
461 }
462
463 Ok(Response::new(RwReceiverStream::new(rx)))
464 }
465
466 async fn subscribe_iceberg_compaction_event(
467 &self,
468 request: Request<Streaming<SubscribeIcebergCompactionEventRequest>>,
469 ) -> Result<Response<Self::SubscribeIcebergCompactionEventStream>, tonic::Status> {
470 let mut request_stream: Streaming<SubscribeIcebergCompactionEventRequest> =
471 request.into_inner();
472 let register_req = {
473 let req = request_stream.next().await.ok_or_else(|| {
474 Status::invalid_argument("subscribe_compaction_event request is empty")
475 })??;
476
477 match req.event {
478 Some(IcebergRequestEvent::Register(register)) => register,
479 _ => {
480 return Err(Status::invalid_argument(
481 "the first message must be `Register`",
482 ));
483 }
484 }
485 };
486
487 let context_id = register_req.context_id;
488
489 if !self.hummock_manager.check_context(context_id).await? {
492 return Err(Status::new(
493 tonic::Code::Internal,
494 format!("invalid hummock context {}", context_id),
495 ));
496 }
497
498 let rx: tokio::sync::mpsc::UnboundedReceiver<
499 Result<SubscribeIcebergCompactionEventResponse, crate::MetaError>,
500 > = self
501 .iceberg_compaction_manager
502 .iceberg_compactor_manager
503 .add_compactor(context_id);
504
505 self.iceberg_compaction_manager
506 .add_compactor_stream(context_id, request_stream);
507
508 Ok(Response::new(RwReceiverStream::new(rx)))
511 }
512
513 async fn report_compaction_task(
514 &self,
515 _request: Request<ReportCompactionTaskRequest>,
516 ) -> Result<Response<ReportCompactionTaskResponse>, Status> {
517 unreachable!()
518 }
519
520 async fn list_branched_object(
521 &self,
522 _request: Request<ListBranchedObjectRequest>,
523 ) -> Result<Response<ListBranchedObjectResponse>, Status> {
524 let branched_objects = self
525 .hummock_manager
526 .list_branched_objects()
527 .await
528 .into_iter()
529 .flat_map(|(object_id, v)| {
530 v.into_iter()
531 .map(move |(compaction_group_id, sst_ids)| BranchedObject {
532 object_id,
533 sst_id: sst_ids,
534 compaction_group_id,
535 })
536 })
537 .collect();
538 Ok(Response::new(ListBranchedObjectResponse {
539 branched_objects,
540 }))
541 }
542
543 async fn list_active_write_limit(
544 &self,
545 _request: Request<ListActiveWriteLimitRequest>,
546 ) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
547 Ok(Response::new(ListActiveWriteLimitResponse {
548 write_limits: self.hummock_manager.write_limits().await,
549 }))
550 }
551
552 async fn list_hummock_meta_config(
553 &self,
554 _request: Request<ListHummockMetaConfigRequest>,
555 ) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
556 let opt = &self.hummock_manager.env.opts;
557 let configs = fields_to_kvs!(
558 opt,
559 vacuum_interval_sec,
560 vacuum_spin_interval_ms,
561 hummock_version_checkpoint_interval_sec,
562 min_delta_log_num_for_hummock_version_checkpoint,
563 min_sst_retention_time_sec,
564 full_gc_interval_sec,
565 periodic_compaction_interval_sec,
566 periodic_space_reclaim_compaction_interval_sec,
567 periodic_ttl_reclaim_compaction_interval_sec,
568 periodic_tombstone_reclaim_compaction_interval_sec,
569 periodic_scheduling_compaction_group_split_interval_sec,
570 enable_compaction_group_normalize,
571 max_normalize_splits_per_round,
572 do_not_config_object_storage_lifecycle,
573 partition_vnode_count,
574 table_high_write_throughput_threshold,
575 table_low_write_throughput_threshold,
576 compaction_task_max_heartbeat_interval_secs,
577 periodic_scheduling_compaction_group_merge_interval_sec
578 );
579 Ok(Response::new(ListHummockMetaConfigResponse { configs }))
580 }
581
582 async fn rise_ctl_rebuild_table_stats(
583 &self,
584 _request: Request<RiseCtlRebuildTableStatsRequest>,
585 ) -> Result<Response<RiseCtlRebuildTableStatsResponse>, Status> {
586 self.hummock_manager.rebuild_table_stats().await?;
587 Ok(Response::new(RiseCtlRebuildTableStatsResponse {}))
588 }
589
590 async fn get_compaction_score(
591 &self,
592 request: Request<GetCompactionScoreRequest>,
593 ) -> Result<Response<GetCompactionScoreResponse>, Status> {
594 let compaction_group_id = request.into_inner().compaction_group_id;
595 let scores = self
596 .hummock_manager
597 .get_compaction_scores(compaction_group_id)
598 .await
599 .into_iter()
600 .map(|s| PickerInfo {
601 score: s.score,
602 select_level: s.select_level as _,
603 target_level: s.target_level as _,
604 picker_type: s.picker_type.to_string(),
605 })
606 .collect();
607 Ok(Response::new(GetCompactionScoreResponse {
608 compaction_group_id,
609 scores,
610 }))
611 }
612
613 async fn list_compact_task_assignment(
614 &self,
615 _request: Request<ListCompactTaskAssignmentRequest>,
616 ) -> Result<Response<ListCompactTaskAssignmentResponse>, Status> {
617 let (_compaction_statuses, task_assignment) =
618 self.hummock_manager.list_compaction_status().await;
619 Ok(Response::new(ListCompactTaskAssignmentResponse {
620 task_assignment,
621 }))
622 }
623
624 async fn list_compact_task_progress(
625 &self,
626 _request: Request<ListCompactTaskProgressRequest>,
627 ) -> Result<Response<ListCompactTaskProgressResponse>, Status> {
628 let task_progress = self.hummock_manager.compactor_manager.get_progress();
629
630 Ok(Response::new(ListCompactTaskProgressResponse {
631 task_progress,
632 }))
633 }
634
635 async fn cancel_compact_task(
636 &self,
637 request: Request<CancelCompactTaskRequest>,
638 ) -> Result<Response<CancelCompactTaskResponse>, Status> {
639 let request = request.into_inner();
640 let ret = self
641 .hummock_manager
642 .cancel_compact_task(
643 request.task_id,
644 PbTaskStatus::try_from(request.task_status).unwrap(),
645 )
646 .await?;
647
648 let response = Response::new(CancelCompactTaskResponse { ret });
649 return Ok(response);
650 }
651
652 async fn get_version_by_epoch(
653 &self,
654 request: Request<GetVersionByEpochRequest>,
655 ) -> Result<Response<GetVersionByEpochResponse>, Status> {
656 let GetVersionByEpochRequest { epoch, table_id } = request.into_inner();
657 let version = self
658 .hummock_manager
659 .epoch_to_version(epoch, table_id)
660 .await?;
661 Ok(Response::new(GetVersionByEpochResponse {
662 version: Some(version.to_protobuf()),
663 }))
664 }
665
666 async fn merge_compaction_group(
667 &self,
668 request: Request<MergeCompactionGroupRequest>,
669 ) -> Result<Response<MergeCompactionGroupResponse>, Status> {
670 let req = request.into_inner();
671 self.hummock_manager
672 .merge_compaction_group(req.left_group_id, req.right_group_id)
673 .await?;
674 Ok(Response::new(MergeCompactionGroupResponse {}))
675 }
676
677 async fn get_table_change_logs(
678 &self,
679 request: Request<GetTableChangeLogsRequest>,
680 ) -> Result<Response<GetTableChangeLogsResponse>, Status> {
681 let GetTableChangeLogsRequest {
682 epoch_only,
683 start_epoch_inclusive,
684 end_epoch_inclusive,
685 table_ids,
686 exclude_empty,
687 limit,
688 } = request.into_inner();
689 let table_change_logs = self
690 .hummock_manager
691 .get_table_change_logs(
692 epoch_only,
693 start_epoch_inclusive,
694 end_epoch_inclusive,
695 table_ids
696 .map(|t| t.table_ids.into_iter().collect::<HashSet<_>>())
697 .clone(),
698 exclude_empty,
699 limit,
700 )
701 .await
702 .into_iter()
703 .map(|(i, l)| (i.as_raw_id(), l.to_protobuf()))
704 .collect();
705 Ok(Response::new(GetTableChangeLogsResponse {
706 table_change_logs,
707 }))
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use std::collections::HashMap;
714 #[test]
715 fn test_fields_to_kvs() {
716 struct S {
717 foo: u64,
718 bar: String,
719 }
720 let s = S {
721 foo: 15,
722 bar: "foobar".to_owned(),
723 };
724 let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
725 assert_eq!(kvs.len(), 2);
726 assert_eq!(kvs.get("foo").unwrap(), "15");
727 assert_eq!(kvs.get("bar").unwrap(), "foobar");
728 }
729}