1use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use std::sync::atomic::Ordering::SeqCst;
19
20use await_tree::SpanExt;
21use bytes::Bytes;
22use futures::StreamExt;
23use futures::stream::FuturesUnordered;
24use num_integer::Integer;
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::VirtualNode;
27use risingwave_hummock_sdk::LocalSstableInfo;
28use risingwave_hummock_sdk::key::{FullKey, UserKey};
29use tokio::task::JoinHandle;
30
31use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
32use crate::hummock::compactor::task_progress::TaskProgress;
33use crate::hummock::sstable::filter::FilterBuilder;
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::value::HummockValue;
36use crate::hummock::{
37 BatchUploadWriter, BlockMeta, CachePolicy, HummockError, HummockResult, MemoryLimiter,
38 SstableBuilder, SstableBuilderOptions, SstableWriter, SstableWriterOptions, Xor16FilterBuilder,
39};
40use crate::monitor::CompactorMetrics;
41
42pub type UploadJoinHandle = JoinHandle<HummockResult<()>>;
43
44#[async_trait::async_trait]
45pub trait TableBuilderFactory {
46 type Writer: SstableWriter<Output = UploadJoinHandle>;
47 type Filter: FilterBuilder;
48 async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>>;
49}
50
51pub struct CapacitySplitTableBuilder<F>
56where
57 F: TableBuilderFactory,
58{
59 builder_factory: F,
61
62 sst_outputs: Vec<LocalSstableInfo>,
63
64 current_builder: Option<SstableBuilder<F::Writer, F::Filter>>,
65
66 pub compactor_metrics: Arc<CompactorMetrics>,
68
69 task_progress: Option<Arc<TaskProgress>>,
71
72 last_table_id: TableId,
73
74 vnode_count: usize,
75 table_vnode_partition: BTreeMap<TableId, u32>,
76 split_weight_by_vnode: u32,
77 largest_vnode_in_current_partition: usize,
80
81 concurrent_upload_join_handle: FuturesUnordered<UploadJoinHandle>,
82
83 concurrent_uploading_sst_count: Option<usize>,
84
85 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
86}
87
88impl<F> CapacitySplitTableBuilder<F>
89where
90 F: TableBuilderFactory,
91{
92 #[allow(clippy::too_many_arguments)]
94 pub fn new(
95 builder_factory: F,
96 compactor_metrics: Arc<CompactorMetrics>,
97 task_progress: Option<Arc<TaskProgress>>,
98 table_vnode_partition: BTreeMap<TableId, u32>,
99 concurrent_uploading_sst_count: Option<usize>,
100 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
101 ) -> Self {
102 let vnode_count = VirtualNode::COUNT_FOR_COMPAT;
104
105 Self {
106 builder_factory,
107 sst_outputs: Vec::new(),
108 current_builder: None,
109 compactor_metrics,
110 task_progress,
111 last_table_id: 0.into(),
112 table_vnode_partition,
113 vnode_count,
114 split_weight_by_vnode: 0,
115 largest_vnode_in_current_partition: vnode_count - 1,
116 concurrent_upload_join_handle: FuturesUnordered::new(),
117 concurrent_uploading_sst_count,
118 compaction_catalog_agent_ref,
119 }
120 }
121
122 pub fn for_test(
123 builder_factory: F,
124 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
125 ) -> Self {
126 Self {
127 builder_factory,
128 sst_outputs: Vec::new(),
129 current_builder: None,
130 compactor_metrics: Arc::new(CompactorMetrics::unused()),
131 task_progress: None,
132 last_table_id: 0.into(),
133 table_vnode_partition: BTreeMap::default(),
134 vnode_count: VirtualNode::COUNT_FOR_TEST,
135 split_weight_by_vnode: 0,
136 largest_vnode_in_current_partition: VirtualNode::MAX_FOR_TEST.to_index(),
137 concurrent_upload_join_handle: FuturesUnordered::new(),
138 concurrent_uploading_sst_count: None,
139 compaction_catalog_agent_ref,
140 }
141 }
142
143 pub fn len(&self) -> usize {
145 self.sst_outputs.len() + self.current_builder.is_some() as usize
146 }
147
148 pub fn is_empty(&self) -> bool {
150 self.sst_outputs.is_empty() && self.current_builder.is_none()
151 }
152
153 pub async fn add_full_key_for_test(
154 &mut self,
155 full_key: FullKey<&[u8]>,
156 value: HummockValue<&[u8]>,
157 is_new_user_key: bool,
158 ) -> HummockResult<()> {
159 self.add_full_key(full_key, value, is_new_user_key).await
160 }
161
162 pub async fn add_raw_block(
163 &mut self,
164 buf: Bytes,
165 filter_data: Vec<u8>,
166 smallest_key: FullKey<Vec<u8>>,
167 largest_key: Vec<u8>,
168 block_meta: BlockMeta,
169 ) -> HummockResult<bool> {
170 if self.current_builder.is_none() {
171 if let Some(progress) = &self.task_progress {
172 progress.inc_num_pending_write_io()
173 }
174 let builder = self.builder_factory.open_builder().await?;
175 self.current_builder = Some(builder);
176 }
177
178 let builder = self.current_builder.as_mut().unwrap();
179 builder
180 .add_raw_block(buf, filter_data, smallest_key, largest_key, block_meta)
181 .await
182 }
183
184 pub async fn add_full_key(
192 &mut self,
193 full_key: FullKey<&[u8]>,
194 value: HummockValue<&[u8]>,
195 is_new_user_key: bool,
196 ) -> HummockResult<()> {
197 let switch_builder = self.check_switch_builder(&full_key.user_key);
198
199 let mut need_seal_current = false;
208 if let Some(builder) = self.current_builder.as_mut()
209 && is_new_user_key
210 {
211 need_seal_current = switch_builder || builder.reach_capacity();
212 }
213
214 if need_seal_current {
215 self.seal_current().await?;
216 }
217
218 if self.current_builder.is_none() {
219 if let Some(progress) = &self.task_progress {
220 progress.inc_num_pending_write_io();
221 }
222 let builder = self.builder_factory.open_builder().await?;
223 self.current_builder = Some(builder);
224 }
225
226 let builder = self.current_builder.as_mut().unwrap();
227 builder.add(full_key, value).await
228 }
229
230 pub fn check_switch_builder(&mut self, user_key: &UserKey<&[u8]>) -> bool {
231 let mut switch_builder = false;
232 if user_key.table_id != self.last_table_id {
233 let new_vnode_partition_count = self.table_vnode_partition.get(&user_key.table_id);
234
235 self.vnode_count = self
236 .compaction_catalog_agent_ref
237 .vnode_count(user_key.table_id);
238 self.largest_vnode_in_current_partition = self.vnode_count - 1;
239
240 if new_vnode_partition_count.is_some()
241 || self.table_vnode_partition.contains_key(&self.last_table_id)
242 {
243 if let Some(new_vnode_partition_count) = new_vnode_partition_count {
244 if (*new_vnode_partition_count as usize) > self.vnode_count {
245 tracing::warn!(
246 "vnode partition count {} is larger than vnode count {}",
247 new_vnode_partition_count,
248 self.vnode_count
249 );
250
251 self.split_weight_by_vnode = 0;
252 } else {
253 self.split_weight_by_vnode = *new_vnode_partition_count;
254 };
255 } else {
256 self.split_weight_by_vnode = 0;
257 }
258
259 self.last_table_id = user_key.table_id;
261 switch_builder = true;
262 if self.split_weight_by_vnode > 1 {
263 self.largest_vnode_in_current_partition =
264 self.vnode_count / (self.split_weight_by_vnode as usize) - 1;
265 } else {
266 self.largest_vnode_in_current_partition = self.vnode_count - 1;
268 }
269 }
270 }
271 if self.largest_vnode_in_current_partition != self.vnode_count - 1 {
272 let key_vnode = user_key.get_vnode_id();
273 if key_vnode > self.largest_vnode_in_current_partition {
274 switch_builder = true;
276
277 let (basic, remainder) = self
279 .vnode_count
280 .div_rem(&(self.split_weight_by_vnode as usize));
281 let small_segments_area = basic * (self.split_weight_by_vnode as usize - remainder);
282 self.largest_vnode_in_current_partition = (if key_vnode < small_segments_area {
283 (key_vnode / basic + 1) * basic
284 } else {
285 ((key_vnode - small_segments_area) / (basic + 1) + 1) * (basic + 1)
286 + small_segments_area
287 }) - 1;
288 debug_assert!(key_vnode <= self.largest_vnode_in_current_partition);
289 }
290 }
291 switch_builder
292 }
293
294 pub fn need_flush(&self) -> bool {
295 self.current_builder
296 .as_ref()
297 .map(|builder| builder.reach_capacity())
298 .unwrap_or(false)
299 }
300
301 pub async fn seal_current(&mut self) -> HummockResult<()> {
306 use await_tree::InstrumentAwait;
307 if let Some(builder) = self.current_builder.take() {
308 let builder_output = builder.finish().await?;
309 {
310 if let Some(progress) = &self.task_progress {
312 progress.inc_ssts_sealed();
313 }
314 builder_output.stats.report_stats(&self.compactor_metrics);
315 }
316
317 self.concurrent_upload_join_handle
318 .push(builder_output.writer_output);
319
320 self.sst_outputs.push(builder_output.sst_info);
321
322 if let Some(concurrent_uploading_sst_count) = self.concurrent_uploading_sst_count
323 && self.concurrent_upload_join_handle.len() >= concurrent_uploading_sst_count
324 {
325 self.concurrent_upload_join_handle
326 .next()
327 .instrument_await("upload".verbose())
328 .await
329 .unwrap()
330 .map_err(HummockError::sstable_upload_error)??;
331 }
332 }
333 Ok(())
334 }
335
336 pub async fn finish(mut self) -> HummockResult<Vec<LocalSstableInfo>> {
338 use futures::future::try_join_all;
339 self.seal_current().await?;
340 try_join_all(self.concurrent_upload_join_handle.into_iter())
341 .await
342 .map_err(HummockError::sstable_upload_error)?
343 .into_iter()
344 .collect::<HummockResult<Vec<()>>>()?;
345
346 Ok(self.sst_outputs)
347 }
348}
349
350pub struct LocalTableBuilderFactory {
352 next_id: AtomicU64,
353 sstable_store: SstableStoreRef,
354 options: SstableBuilderOptions,
355 policy: CachePolicy,
356 limiter: MemoryLimiter,
357}
358
359impl LocalTableBuilderFactory {
360 pub fn new(
361 next_id: u64,
362 sstable_store: SstableStoreRef,
363 options: SstableBuilderOptions,
364 ) -> Self {
365 Self {
366 next_id: AtomicU64::new(next_id),
367 sstable_store,
368 options,
369 policy: CachePolicy::NotFill,
370 limiter: MemoryLimiter::new(1000000),
371 }
372 }
373}
374
375#[async_trait::async_trait]
376impl TableBuilderFactory for LocalTableBuilderFactory {
377 type Filter = Xor16FilterBuilder;
378 type Writer = BatchUploadWriter;
379
380 async fn open_builder(
381 &mut self,
382 ) -> HummockResult<SstableBuilder<BatchUploadWriter, Xor16FilterBuilder>> {
383 let id = self.next_id.fetch_add(1, SeqCst);
384 let tracker = self.limiter.require_memory(1).await;
385 let writer_options = SstableWriterOptions {
386 capacity_hint: Some(self.options.capacity),
387 tracker: Some(tracker),
388 policy: self.policy,
389 };
390 let writer = self
391 .sstable_store
392 .clone()
393 .create_sst_writer(id, writer_options);
394 let table_id_to_vnode = HashMap::from_iter(vec![(
395 TableId::default().as_raw_id(),
396 VirtualNode::COUNT_FOR_TEST,
397 )]);
398 let table_id_to_watermark_serde =
399 HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
400 let builder = SstableBuilder::for_test(
401 id,
402 writer,
403 self.options.clone(),
404 table_id_to_vnode,
405 table_id_to_watermark_serde,
406 );
407
408 Ok(builder)
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use risingwave_common::catalog::TableId;
415 use risingwave_common::util::epoch::{EpochExt, test_epoch};
416
417 use super::*;
418 use crate::compaction_catalog_manager::{
419 CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
420 };
421 use crate::hummock::DEFAULT_RESTART_INTERVAL;
422 use crate::hummock::iterator::test_utils::mock_sstable_store;
423 use crate::hummock::test_utils::{default_builder_opt_for_test, test_key_of, test_user_key_of};
424
425 #[tokio::test]
426 async fn test_empty() {
427 let block_size = 1 << 10;
428 let table_capacity = 4 * block_size;
429 let opts = SstableBuilderOptions {
430 capacity: table_capacity,
431 block_capacity: block_size,
432 restart_interval: DEFAULT_RESTART_INTERVAL,
433 bloom_false_positive: 0.1,
434 ..Default::default()
435 };
436 let builder_factory = LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts);
437 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::dummy());
438 let builder =
439 CapacitySplitTableBuilder::for_test(builder_factory, compaction_catalog_agent_ref);
440 let results = builder.finish().await.unwrap();
441 assert!(results.is_empty());
442 }
443
444 #[tokio::test]
445 async fn test_lots_of_tables() {
446 let block_size = 1 << 10;
447 let table_capacity = 4 * block_size;
448 let opts = SstableBuilderOptions {
449 capacity: table_capacity,
450 block_capacity: block_size,
451 restart_interval: DEFAULT_RESTART_INTERVAL,
452 bloom_false_positive: 0.1,
453 ..Default::default()
454 };
455 let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
456
457 let builder_factory = LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts);
458 let mut builder =
459 CapacitySplitTableBuilder::for_test(builder_factory, compaction_catalog_agent_ref);
460
461 for i in 0..table_capacity {
462 builder
463 .add_full_key_for_test(
464 FullKey::from_user_key(
465 test_user_key_of(i).as_ref(),
466 test_epoch((table_capacity - i) as u64),
467 ),
468 HummockValue::put(b"value"),
469 true,
470 )
471 .await
472 .unwrap();
473 }
474
475 let results = builder.finish().await.unwrap();
476 assert!(results.len() > 1);
477 }
478
479 #[tokio::test]
480 async fn test_table_seal() {
481 let opts = default_builder_opt_for_test();
482 let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
483 let mut builder = CapacitySplitTableBuilder::for_test(
484 LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
485 compaction_catalog_agent_ref,
486 );
487 let mut epoch = test_epoch(100);
488
489 macro_rules! add {
490 () => {
491 epoch.dec_epoch();
492 builder
493 .add_full_key_for_test(
494 FullKey::from_user_key(test_user_key_of(1).as_ref(), epoch),
495 HummockValue::put(b"v"),
496 true,
497 )
498 .await
499 .unwrap();
500 };
501 }
502
503 assert_eq!(builder.len(), 0);
504 builder.seal_current().await.unwrap();
505 assert_eq!(builder.len(), 0);
506 add!();
507 assert_eq!(builder.len(), 1);
508 add!();
509 assert_eq!(builder.len(), 1);
510 builder.seal_current().await.unwrap();
511 assert_eq!(builder.len(), 1);
512 add!();
513 assert_eq!(builder.len(), 2);
514 builder.seal_current().await.unwrap();
515 assert_eq!(builder.len(), 2);
516 builder.seal_current().await.unwrap();
517 assert_eq!(builder.len(), 2);
518
519 let results = builder.finish().await.unwrap();
520 assert_eq!(results.len(), 2);
521 }
522
523 #[tokio::test]
524 async fn test_initial_not_allowed_split() {
525 let opts = default_builder_opt_for_test();
526 let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
527 let mut builder = CapacitySplitTableBuilder::for_test(
528 LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
529 compaction_catalog_agent_ref,
530 );
531 builder
532 .add_full_key_for_test(test_key_of(0).to_ref(), HummockValue::put(b"v"), false)
533 .await
534 .unwrap();
535 }
536
537 #[tokio::test]
538 async fn test_check_table_and_vnode_change() {
539 let block_size = 256;
540 let table_capacity = 2 * block_size;
541 let opts = SstableBuilderOptions {
542 capacity: table_capacity,
543 block_capacity: block_size,
544 restart_interval: DEFAULT_RESTART_INTERVAL,
545 bloom_false_positive: 0.1,
546 ..Default::default()
547 };
548
549 {
550 let table_partition_vnode = BTreeMap::from([
551 (1_u32.into(), 4_u32),
552 (2_u32.into(), 4_u32),
553 (3_u32.into(), 4_u32),
554 ]);
555
556 let compaction_catalog_agent_ref =
557 CompactionCatalogAgent::for_test(vec![0, 1, 2, 3, 4, 5]);
558 let mut builder = CapacitySplitTableBuilder::new(
559 LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts.clone()),
560 Arc::new(CompactorMetrics::unused()),
561 None,
562 table_partition_vnode,
563 None,
564 compaction_catalog_agent_ref,
565 );
566
567 let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
568 table_key.extend_from_slice("a".as_bytes());
569
570 let switch_builder =
571 builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
572 assert!(switch_builder);
573
574 let mut table_key = VirtualNode::from_index(62).to_be_bytes().to_vec();
575 table_key.extend_from_slice("a".as_bytes());
576 let switch_builder =
577 builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
578 assert!(!switch_builder);
579
580 let mut table_key = VirtualNode::from_index(63).to_be_bytes().to_vec();
581 table_key.extend_from_slice("a".as_bytes());
582 let switch_builder =
583 builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
584 assert!(!switch_builder);
585
586 let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
587 table_key.extend_from_slice("a".as_bytes());
588 let switch_builder =
589 builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
590 assert!(switch_builder);
591
592 let switch_builder =
593 builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
594 assert!(switch_builder);
595 let switch_builder =
596 builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
597 assert!(switch_builder);
598 let switch_builder =
599 builder.check_switch_builder(&UserKey::for_test(TableId::from(4), &table_key));
600 assert!(switch_builder);
601 let switch_builder =
602 builder.check_switch_builder(&UserKey::for_test(TableId::from(5), &table_key));
603 assert!(!switch_builder);
604 }
605
606 {
607 let table_partition_vnode = BTreeMap::from([
609 (1_u32.into(), 4_u32),
610 (2_u32.into(), 4_u32),
611 (3_u32.into(), 4_u32),
612 ]);
613
614 let table_id_to_vnode =
615 HashMap::from_iter(vec![(1.into(), 64), (2.into(), 128), (3.into(), 256)]);
616 let table_id_to_watermark_serde =
617 HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
618 let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
619 FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
620 table_id_to_vnode,
621 table_id_to_watermark_serde,
622 ));
623
624 let mut builder = CapacitySplitTableBuilder::new(
625 LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
626 Arc::new(CompactorMetrics::unused()),
627 None,
628 table_partition_vnode,
629 None,
630 compaction_catalog_agent_ref,
631 );
632
633 let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
634 table_key.extend_from_slice("a".as_bytes());
635
636 let switch_builder =
637 builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
638 assert!(switch_builder);
639
640 let mut table_key = VirtualNode::from_index(15).to_be_bytes().to_vec();
641 table_key.extend_from_slice("a".as_bytes());
642 let switch_builder =
643 builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
644 assert!(!switch_builder);
645
646 let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
647 table_key.extend_from_slice("a".as_bytes());
648 let switch_builder =
649 builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
650 assert!(switch_builder);
651
652 let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
653 table_key.extend_from_slice("a".as_bytes());
654 let switch_builder =
655 builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
656 assert!(switch_builder);
657
658 let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
659 table_key.extend_from_slice("a".as_bytes());
660 let switch_builder =
661 builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
662 assert!(!switch_builder);
663
664 let mut table_key = VirtualNode::from_index(31).to_be_bytes().to_vec();
665 table_key.extend_from_slice("a".as_bytes());
666 let switch_builder =
667 builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
668 assert!(!switch_builder);
669
670 let mut table_key = VirtualNode::from_index(32).to_be_bytes().to_vec();
671 table_key.extend_from_slice("a".as_bytes());
672 let switch_builder =
673 builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
674 assert!(switch_builder);
675
676 let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
677 table_key.extend_from_slice("a".as_bytes());
678 let switch_builder =
679 builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
680 assert!(switch_builder);
681
682 let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
683 table_key.extend_from_slice("a".as_bytes());
684 let switch_builder =
685 builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
686 assert!(switch_builder);
687
688 let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
689 table_key.extend_from_slice("a".as_bytes());
690 let switch_builder =
691 builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
692 assert!(!switch_builder);
693
694 let mut table_key = VirtualNode::from_index(32).to_be_bytes().to_vec();
695 table_key.extend_from_slice("a".as_bytes());
696 let switch_builder =
697 builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
698 assert!(!switch_builder);
699
700 let mut table_key = VirtualNode::from_index(63).to_be_bytes().to_vec();
701 table_key.extend_from_slice("a".as_bytes());
702 let switch_builder =
703 builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
704 assert!(!switch_builder);
705
706 let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
707 table_key.extend_from_slice("a".as_bytes());
708 let switch_builder =
709 builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
710 assert!(switch_builder);
711 }
712 }
713}