risingwave_storage/hummock/sstable/
multi_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use std::sync::atomic::Ordering::SeqCst;
19
20use await_tree::SpanExt;
21use bytes::Bytes;
22use futures::StreamExt;
23use futures::stream::FuturesUnordered;
24use num_integer::Integer;
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::VirtualNode;
27use risingwave_hummock_sdk::LocalSstableInfo;
28use risingwave_hummock_sdk::key::{FullKey, UserKey};
29use tokio::task::JoinHandle;
30
31use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
32use crate::hummock::compactor::task_progress::TaskProgress;
33use crate::hummock::sstable::filter::FilterBuilder;
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::value::HummockValue;
36use crate::hummock::{
37    BatchUploadWriter, BlockMeta, CachePolicy, HummockError, HummockResult, MemoryLimiter,
38    SstableBuilder, SstableBuilderOptions, SstableWriter, SstableWriterOptions, Xor16FilterBuilder,
39};
40use crate::monitor::CompactorMetrics;
41
42pub type UploadJoinHandle = JoinHandle<HummockResult<()>>;
43
44#[async_trait::async_trait]
45pub trait TableBuilderFactory {
46    type Writer: SstableWriter<Output = UploadJoinHandle>;
47    type Filter: FilterBuilder;
48    async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>>;
49}
50
51/// A wrapper for [`SstableBuilder`] which automatically split key-value pairs into multiple tables,
52/// based on their target capacity set in options.
53///
54/// When building is finished, one may call `finish` to get the results of zero, one or more tables.
55pub struct CapacitySplitTableBuilder<F>
56where
57    F: TableBuilderFactory,
58{
59    /// When creating a new [`SstableBuilder`], caller use this factory to generate it.
60    builder_factory: F,
61
62    sst_outputs: Vec<LocalSstableInfo>,
63
64    current_builder: Option<SstableBuilder<F::Writer, F::Filter>>,
65
66    /// Statistics.
67    pub compactor_metrics: Arc<CompactorMetrics>,
68
69    /// Update the number of sealed Sstables.
70    task_progress: Option<Arc<TaskProgress>>,
71
72    last_table_id: u32,
73
74    vnode_count: usize,
75    table_vnode_partition: BTreeMap<u32, u32>,
76    split_weight_by_vnode: u32,
77    /// When vnode of the coming key is greater than `largest_vnode_in_current_partition`, we will
78    /// switch SST.
79    largest_vnode_in_current_partition: usize,
80
81    concurrent_upload_join_handle: FuturesUnordered<UploadJoinHandle>,
82
83    concurrent_uploading_sst_count: Option<usize>,
84
85    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
86}
87
88impl<F> CapacitySplitTableBuilder<F>
89where
90    F: TableBuilderFactory,
91{
92    /// Creates a new [`CapacitySplitTableBuilder`] using given configuration generator.
93    #[allow(clippy::too_many_arguments)]
94    pub fn new(
95        builder_factory: F,
96        compactor_metrics: Arc<CompactorMetrics>,
97        task_progress: Option<Arc<TaskProgress>>,
98        table_vnode_partition: BTreeMap<u32, u32>,
99        concurrent_uploading_sst_count: Option<usize>,
100        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
101    ) -> Self {
102        // TODO(var-vnode): should use value from caller
103        let vnode_count = VirtualNode::COUNT_FOR_COMPAT;
104
105        Self {
106            builder_factory,
107            sst_outputs: Vec::new(),
108            current_builder: None,
109            compactor_metrics,
110            task_progress,
111            last_table_id: 0,
112            table_vnode_partition,
113            vnode_count,
114            split_weight_by_vnode: 0,
115            largest_vnode_in_current_partition: vnode_count - 1,
116            concurrent_upload_join_handle: FuturesUnordered::new(),
117            concurrent_uploading_sst_count,
118            compaction_catalog_agent_ref,
119        }
120    }
121
122    pub fn for_test(
123        builder_factory: F,
124        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
125    ) -> Self {
126        Self {
127            builder_factory,
128            sst_outputs: Vec::new(),
129            current_builder: None,
130            compactor_metrics: Arc::new(CompactorMetrics::unused()),
131            task_progress: None,
132            last_table_id: 0,
133            table_vnode_partition: BTreeMap::default(),
134            vnode_count: VirtualNode::COUNT_FOR_TEST,
135            split_weight_by_vnode: 0,
136            largest_vnode_in_current_partition: VirtualNode::MAX_FOR_TEST.to_index(),
137            concurrent_upload_join_handle: FuturesUnordered::new(),
138            concurrent_uploading_sst_count: None,
139            compaction_catalog_agent_ref,
140        }
141    }
142
143    /// Returns the number of [`SstableBuilder`]s.
144    pub fn len(&self) -> usize {
145        self.sst_outputs.len() + self.current_builder.is_some() as usize
146    }
147
148    /// Returns true if no builder is created.
149    pub fn is_empty(&self) -> bool {
150        self.sst_outputs.is_empty() && self.current_builder.is_none()
151    }
152
153    pub async fn add_full_key_for_test(
154        &mut self,
155        full_key: FullKey<&[u8]>,
156        value: HummockValue<&[u8]>,
157        is_new_user_key: bool,
158    ) -> HummockResult<()> {
159        self.add_full_key(full_key, value, is_new_user_key).await
160    }
161
162    pub async fn add_raw_block(
163        &mut self,
164        buf: Bytes,
165        filter_data: Vec<u8>,
166        smallest_key: FullKey<Vec<u8>>,
167        largest_key: Vec<u8>,
168        block_meta: BlockMeta,
169    ) -> HummockResult<bool> {
170        if self.current_builder.is_none() {
171            if let Some(progress) = &self.task_progress {
172                progress.inc_num_pending_write_io()
173            }
174            let builder = self.builder_factory.open_builder().await?;
175            self.current_builder = Some(builder);
176        }
177
178        let builder = self.current_builder.as_mut().unwrap();
179        builder
180            .add_raw_block(buf, filter_data, smallest_key, largest_key, block_meta)
181            .await
182    }
183
184    /// Adds a key-value pair to the underlying builders.
185    ///
186    /// If `allow_split` and the current builder reaches its capacity, this function will create a
187    /// new one with the configuration generated by the closure provided earlier.
188    ///
189    /// Note that in some cases like compaction of the same user key, automatic splitting is not
190    /// allowed, where `allow_split` should be `false`.
191    pub async fn add_full_key(
192        &mut self,
193        full_key: FullKey<&[u8]>,
194        value: HummockValue<&[u8]>,
195        is_new_user_key: bool,
196    ) -> HummockResult<()> {
197        let switch_builder = self.check_switch_builder(&full_key.user_key);
198
199        // We use this `need_seal_current` flag to store whether we need to call `seal_current` and
200        // then call `seal_current` later outside the `if let` instead of calling
201        // `seal_current` at where we set `need_seal_current = true`. This is because
202        // `seal_current` is an async method, and if we call `seal_current` within the `if let`,
203        // this temporary reference to `current_builder` will be captured in the future generated
204        // from the current method. Since this generated future is usually required to be `Send`,
205        // the captured reference to `current_builder` is also required to be `Send`, and then
206        // `current_builder` itself is required to be `Sync`, which is unnecessary.
207        let mut need_seal_current = false;
208        if let Some(builder) = self.current_builder.as_mut() {
209            if is_new_user_key {
210                need_seal_current = switch_builder || builder.reach_capacity();
211            }
212        }
213
214        if need_seal_current {
215            self.seal_current().await?;
216        }
217
218        if self.current_builder.is_none() {
219            if let Some(progress) = &self.task_progress {
220                progress.inc_num_pending_write_io();
221            }
222            let builder = self.builder_factory.open_builder().await?;
223            self.current_builder = Some(builder);
224        }
225
226        let builder = self.current_builder.as_mut().unwrap();
227        builder.add(full_key, value).await
228    }
229
230    pub fn check_switch_builder(&mut self, user_key: &UserKey<&[u8]>) -> bool {
231        let mut switch_builder = false;
232        if user_key.table_id.table_id != self.last_table_id {
233            let new_vnode_partition_count =
234                self.table_vnode_partition.get(&user_key.table_id.table_id);
235
236            self.vnode_count = self
237                .compaction_catalog_agent_ref
238                .vnode_count(user_key.table_id.table_id);
239            self.largest_vnode_in_current_partition = self.vnode_count - 1;
240
241            if new_vnode_partition_count.is_some()
242                || self.table_vnode_partition.contains_key(&self.last_table_id)
243            {
244                if new_vnode_partition_count.is_some() {
245                    if (*new_vnode_partition_count.unwrap() as usize) > self.vnode_count {
246                        tracing::warn!(
247                            "vnode partition count {} is larger than vnode count {}",
248                            new_vnode_partition_count.unwrap(),
249                            self.vnode_count
250                        );
251
252                        self.split_weight_by_vnode = 0;
253                    } else {
254                        self.split_weight_by_vnode = *new_vnode_partition_count.unwrap()
255                    };
256                } else {
257                    self.split_weight_by_vnode = 0;
258                }
259
260                // table_id change
261                self.last_table_id = user_key.table_id.table_id;
262                switch_builder = true;
263                if self.split_weight_by_vnode > 1 {
264                    self.largest_vnode_in_current_partition =
265                        self.vnode_count / (self.split_weight_by_vnode as usize) - 1;
266                } else {
267                    // default
268                    self.largest_vnode_in_current_partition = self.vnode_count - 1;
269                }
270            }
271        }
272        if self.largest_vnode_in_current_partition != self.vnode_count - 1 {
273            let key_vnode = user_key.get_vnode_id();
274            if key_vnode > self.largest_vnode_in_current_partition {
275                // vnode partition change
276                switch_builder = true;
277
278                // SAFETY: `self.split_weight_by_vnode > 1` here.
279                let (basic, remainder) = self
280                    .vnode_count
281                    .div_rem(&(self.split_weight_by_vnode as usize));
282                let small_segments_area = basic * (self.split_weight_by_vnode as usize - remainder);
283                self.largest_vnode_in_current_partition = (if key_vnode < small_segments_area {
284                    (key_vnode / basic + 1) * basic
285                } else {
286                    ((key_vnode - small_segments_area) / (basic + 1) + 1) * (basic + 1)
287                        + small_segments_area
288                }) - 1;
289                debug_assert!(key_vnode <= self.largest_vnode_in_current_partition);
290            }
291        }
292        switch_builder
293    }
294
295    pub fn need_flush(&self) -> bool {
296        self.current_builder
297            .as_ref()
298            .map(|builder| builder.reach_capacity())
299            .unwrap_or(false)
300    }
301
302    /// Marks the current builder as sealed. Next call of `add` will always create a new table.
303    ///
304    /// If there's no builder created, or current one is already sealed before, then this function
305    /// will be no-op.
306    pub async fn seal_current(&mut self) -> HummockResult<()> {
307        use await_tree::InstrumentAwait;
308        if let Some(builder) = self.current_builder.take() {
309            let builder_output = builder.finish().await?;
310            {
311                // report
312                if let Some(progress) = &self.task_progress {
313                    progress.inc_ssts_sealed();
314                }
315                builder_output.stats.report_stats(&self.compactor_metrics);
316            }
317
318            self.concurrent_upload_join_handle
319                .push(builder_output.writer_output);
320
321            self.sst_outputs.push(builder_output.sst_info);
322
323            if let Some(concurrent_uploading_sst_count) = self.concurrent_uploading_sst_count
324                && self.concurrent_upload_join_handle.len() >= concurrent_uploading_sst_count
325            {
326                self.concurrent_upload_join_handle
327                    .next()
328                    .instrument_await("upload".verbose())
329                    .await
330                    .unwrap()
331                    .map_err(HummockError::sstable_upload_error)??;
332            }
333        }
334        Ok(())
335    }
336
337    /// Finalizes all the tables to be ids, blocks and metadata.
338    pub async fn finish(mut self) -> HummockResult<Vec<LocalSstableInfo>> {
339        use futures::future::try_join_all;
340        self.seal_current().await?;
341        try_join_all(self.concurrent_upload_join_handle.into_iter())
342            .await
343            .map_err(HummockError::sstable_upload_error)?
344            .into_iter()
345            .collect::<HummockResult<Vec<()>>>()?;
346
347        Ok(self.sst_outputs)
348    }
349}
350
351/// Used for unit tests and benchmarks.
352pub struct LocalTableBuilderFactory {
353    next_id: AtomicU64,
354    sstable_store: SstableStoreRef,
355    options: SstableBuilderOptions,
356    policy: CachePolicy,
357    limiter: MemoryLimiter,
358}
359
360impl LocalTableBuilderFactory {
361    pub fn new(
362        next_id: u64,
363        sstable_store: SstableStoreRef,
364        options: SstableBuilderOptions,
365    ) -> Self {
366        Self {
367            next_id: AtomicU64::new(next_id),
368            sstable_store,
369            options,
370            policy: CachePolicy::NotFill,
371            limiter: MemoryLimiter::new(1000000),
372        }
373    }
374}
375
376#[async_trait::async_trait]
377impl TableBuilderFactory for LocalTableBuilderFactory {
378    type Filter = Xor16FilterBuilder;
379    type Writer = BatchUploadWriter;
380
381    async fn open_builder(
382        &mut self,
383    ) -> HummockResult<SstableBuilder<BatchUploadWriter, Xor16FilterBuilder>> {
384        let id = self.next_id.fetch_add(1, SeqCst);
385        let tracker = self.limiter.require_memory(1).await;
386        let writer_options = SstableWriterOptions {
387            capacity_hint: Some(self.options.capacity),
388            tracker: Some(tracker),
389            policy: self.policy,
390        };
391        let writer = self
392            .sstable_store
393            .clone()
394            .create_sst_writer(id, writer_options);
395        let table_id_to_vnode = HashMap::from_iter(vec![(
396            TableId::default().table_id(),
397            VirtualNode::COUNT_FOR_TEST,
398        )]);
399        let table_id_to_watermark_serde =
400            HashMap::from_iter(vec![(TableId::default().table_id(), None)]);
401        let builder = SstableBuilder::for_test(
402            id,
403            writer,
404            self.options.clone(),
405            table_id_to_vnode,
406            table_id_to_watermark_serde,
407        );
408
409        Ok(builder)
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use risingwave_common::catalog::TableId;
416    use risingwave_common::util::epoch::{EpochExt, test_epoch};
417
418    use super::*;
419    use crate::compaction_catalog_manager::{
420        CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
421    };
422    use crate::hummock::DEFAULT_RESTART_INTERVAL;
423    use crate::hummock::iterator::test_utils::mock_sstable_store;
424    use crate::hummock::test_utils::{default_builder_opt_for_test, test_key_of, test_user_key_of};
425
426    #[tokio::test]
427    async fn test_empty() {
428        let block_size = 1 << 10;
429        let table_capacity = 4 * block_size;
430        let opts = SstableBuilderOptions {
431            capacity: table_capacity,
432            block_capacity: block_size,
433            restart_interval: DEFAULT_RESTART_INTERVAL,
434            bloom_false_positive: 0.1,
435            ..Default::default()
436        };
437        let builder_factory = LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts);
438        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::dummy());
439        let builder =
440            CapacitySplitTableBuilder::for_test(builder_factory, compaction_catalog_agent_ref);
441        let results = builder.finish().await.unwrap();
442        assert!(results.is_empty());
443    }
444
445    #[tokio::test]
446    async fn test_lots_of_tables() {
447        let block_size = 1 << 10;
448        let table_capacity = 4 * block_size;
449        let opts = SstableBuilderOptions {
450            capacity: table_capacity,
451            block_capacity: block_size,
452            restart_interval: DEFAULT_RESTART_INTERVAL,
453            bloom_false_positive: 0.1,
454            ..Default::default()
455        };
456        let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
457
458        let builder_factory = LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts);
459        let mut builder =
460            CapacitySplitTableBuilder::for_test(builder_factory, compaction_catalog_agent_ref);
461
462        for i in 0..table_capacity {
463            builder
464                .add_full_key_for_test(
465                    FullKey::from_user_key(
466                        test_user_key_of(i).as_ref(),
467                        test_epoch((table_capacity - i) as u64),
468                    ),
469                    HummockValue::put(b"value"),
470                    true,
471                )
472                .await
473                .unwrap();
474        }
475
476        let results = builder.finish().await.unwrap();
477        assert!(results.len() > 1);
478    }
479
480    #[tokio::test]
481    async fn test_table_seal() {
482        let opts = default_builder_opt_for_test();
483        let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
484        let mut builder = CapacitySplitTableBuilder::for_test(
485            LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
486            compaction_catalog_agent_ref,
487        );
488        let mut epoch = test_epoch(100);
489
490        macro_rules! add {
491            () => {
492                epoch.dec_epoch();
493                builder
494                    .add_full_key_for_test(
495                        FullKey::from_user_key(test_user_key_of(1).as_ref(), epoch),
496                        HummockValue::put(b"v"),
497                        true,
498                    )
499                    .await
500                    .unwrap();
501            };
502        }
503
504        assert_eq!(builder.len(), 0);
505        builder.seal_current().await.unwrap();
506        assert_eq!(builder.len(), 0);
507        add!();
508        assert_eq!(builder.len(), 1);
509        add!();
510        assert_eq!(builder.len(), 1);
511        builder.seal_current().await.unwrap();
512        assert_eq!(builder.len(), 1);
513        add!();
514        assert_eq!(builder.len(), 2);
515        builder.seal_current().await.unwrap();
516        assert_eq!(builder.len(), 2);
517        builder.seal_current().await.unwrap();
518        assert_eq!(builder.len(), 2);
519
520        let results = builder.finish().await.unwrap();
521        assert_eq!(results.len(), 2);
522    }
523
524    #[tokio::test]
525    async fn test_initial_not_allowed_split() {
526        let opts = default_builder_opt_for_test();
527        let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
528        let mut builder = CapacitySplitTableBuilder::for_test(
529            LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
530            compaction_catalog_agent_ref,
531        );
532        builder
533            .add_full_key_for_test(test_key_of(0).to_ref(), HummockValue::put(b"v"), false)
534            .await
535            .unwrap();
536    }
537
538    #[tokio::test]
539    async fn test_check_table_and_vnode_change() {
540        let block_size = 256;
541        let table_capacity = 2 * block_size;
542        let opts = SstableBuilderOptions {
543            capacity: table_capacity,
544            block_capacity: block_size,
545            restart_interval: DEFAULT_RESTART_INTERVAL,
546            bloom_false_positive: 0.1,
547            ..Default::default()
548        };
549
550        {
551            let table_partition_vnode =
552                BTreeMap::from([(1_u32, 4_u32), (2_u32, 4_u32), (3_u32, 4_u32)]);
553
554            let compaction_catalog_agent_ref =
555                CompactionCatalogAgent::for_test(vec![0, 1, 2, 3, 4, 5]);
556            let mut builder = CapacitySplitTableBuilder::new(
557                LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts.clone()),
558                Arc::new(CompactorMetrics::unused()),
559                None,
560                table_partition_vnode,
561                None,
562                compaction_catalog_agent_ref,
563            );
564
565            let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
566            table_key.extend_from_slice("a".as_bytes());
567
568            let switch_builder =
569                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
570            assert!(switch_builder);
571
572            let mut table_key = VirtualNode::from_index(62).to_be_bytes().to_vec();
573            table_key.extend_from_slice("a".as_bytes());
574            let switch_builder =
575                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
576            assert!(!switch_builder);
577
578            let mut table_key = VirtualNode::from_index(63).to_be_bytes().to_vec();
579            table_key.extend_from_slice("a".as_bytes());
580            let switch_builder =
581                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
582            assert!(!switch_builder);
583
584            let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
585            table_key.extend_from_slice("a".as_bytes());
586            let switch_builder =
587                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
588            assert!(switch_builder);
589
590            let switch_builder =
591                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
592            assert!(switch_builder);
593            let switch_builder =
594                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
595            assert!(switch_builder);
596            let switch_builder =
597                builder.check_switch_builder(&UserKey::for_test(TableId::from(4), &table_key));
598            assert!(switch_builder);
599            let switch_builder =
600                builder.check_switch_builder(&UserKey::for_test(TableId::from(5), &table_key));
601            assert!(!switch_builder);
602        }
603
604        {
605            // Test different table vnode count
606            let table_partition_vnode =
607                BTreeMap::from([(1_u32, 4_u32), (2_u32, 4_u32), (3_u32, 4_u32)]);
608
609            let table_id_to_vnode = HashMap::from_iter(vec![(1, 64), (2, 128), (3, 256)]);
610            let table_id_to_watermark_serde =
611                HashMap::from_iter(vec![(1, None), (2, None), (3, None)]);
612            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
613                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
614                table_id_to_vnode,
615                table_id_to_watermark_serde,
616            ));
617
618            let mut builder = CapacitySplitTableBuilder::new(
619                LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
620                Arc::new(CompactorMetrics::unused()),
621                None,
622                table_partition_vnode,
623                None,
624                compaction_catalog_agent_ref,
625            );
626
627            let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
628            table_key.extend_from_slice("a".as_bytes());
629
630            let switch_builder =
631                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
632            assert!(switch_builder);
633
634            let mut table_key = VirtualNode::from_index(15).to_be_bytes().to_vec();
635            table_key.extend_from_slice("a".as_bytes());
636            let switch_builder =
637                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
638            assert!(!switch_builder);
639
640            let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
641            table_key.extend_from_slice("a".as_bytes());
642            let switch_builder =
643                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
644            assert!(switch_builder);
645
646            let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
647            table_key.extend_from_slice("a".as_bytes());
648            let switch_builder =
649                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
650            assert!(switch_builder);
651
652            let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
653            table_key.extend_from_slice("a".as_bytes());
654            let switch_builder =
655                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
656            assert!(!switch_builder);
657
658            let mut table_key = VirtualNode::from_index(31).to_be_bytes().to_vec();
659            table_key.extend_from_slice("a".as_bytes());
660            let switch_builder =
661                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
662            assert!(!switch_builder);
663
664            let mut table_key = VirtualNode::from_index(32).to_be_bytes().to_vec();
665            table_key.extend_from_slice("a".as_bytes());
666            let switch_builder =
667                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
668            assert!(switch_builder);
669
670            let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
671            table_key.extend_from_slice("a".as_bytes());
672            let switch_builder =
673                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
674            assert!(switch_builder);
675
676            let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
677            table_key.extend_from_slice("a".as_bytes());
678            let switch_builder =
679                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
680            assert!(switch_builder);
681
682            let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
683            table_key.extend_from_slice("a".as_bytes());
684            let switch_builder =
685                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
686            assert!(!switch_builder);
687
688            let mut table_key = VirtualNode::from_index(32).to_be_bytes().to_vec();
689            table_key.extend_from_slice("a".as_bytes());
690            let switch_builder =
691                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
692            assert!(!switch_builder);
693
694            let mut table_key = VirtualNode::from_index(63).to_be_bytes().to_vec();
695            table_key.extend_from_slice("a".as_bytes());
696            let switch_builder =
697                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
698            assert!(!switch_builder);
699
700            let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
701            table_key.extend_from_slice("a".as_bytes());
702            let switch_builder =
703                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
704            assert!(switch_builder);
705        }
706    }
707}