risingwave_storage/hummock/sstable/
multi_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use std::sync::atomic::Ordering::SeqCst;
19
20use await_tree::SpanExt;
21use bytes::Bytes;
22use futures::StreamExt;
23use futures::stream::FuturesUnordered;
24use num_integer::Integer;
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::VirtualNode;
27use risingwave_hummock_sdk::LocalSstableInfo;
28use risingwave_hummock_sdk::key::{FullKey, UserKey};
29use tokio::task::JoinHandle;
30
31use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
32use crate::hummock::compactor::task_progress::TaskProgress;
33use crate::hummock::sstable::filter::FilterBuilder;
34use crate::hummock::sstable_store::SstableStoreRef;
35use crate::hummock::value::HummockValue;
36use crate::hummock::{
37    BatchUploadWriter, BlockMeta, CachePolicy, HummockError, HummockResult, MemoryLimiter,
38    SstableBuilder, SstableBuilderOptions, SstableWriter, SstableWriterOptions, Xor16FilterBuilder,
39};
40use crate::monitor::CompactorMetrics;
41
42pub type UploadJoinHandle = JoinHandle<HummockResult<()>>;
43
44#[async_trait::async_trait]
45pub trait TableBuilderFactory {
46    type Writer: SstableWriter<Output = UploadJoinHandle>;
47    type Filter: FilterBuilder;
48    async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>>;
49}
50
51/// A wrapper for [`SstableBuilder`] which automatically split key-value pairs into multiple tables,
52/// based on their target capacity set in options.
53///
54/// When building is finished, one may call `finish` to get the results of zero, one or more tables.
55pub struct CapacitySplitTableBuilder<F>
56where
57    F: TableBuilderFactory,
58{
59    /// When creating a new [`SstableBuilder`], caller use this factory to generate it.
60    builder_factory: F,
61
62    sst_outputs: Vec<LocalSstableInfo>,
63
64    current_builder: Option<SstableBuilder<F::Writer, F::Filter>>,
65
66    /// Statistics.
67    pub compactor_metrics: Arc<CompactorMetrics>,
68
69    /// Update the number of sealed Sstables.
70    task_progress: Option<Arc<TaskProgress>>,
71
72    last_table_id: TableId,
73
74    vnode_count: usize,
75    table_vnode_partition: BTreeMap<TableId, u32>,
76    split_weight_by_vnode: u32,
77    /// When vnode of the coming key is greater than `largest_vnode_in_current_partition`, we will
78    /// switch SST.
79    largest_vnode_in_current_partition: usize,
80
81    concurrent_upload_join_handle: FuturesUnordered<UploadJoinHandle>,
82
83    concurrent_uploading_sst_count: Option<usize>,
84
85    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
86}
87
88impl<F> CapacitySplitTableBuilder<F>
89where
90    F: TableBuilderFactory,
91{
92    /// Creates a new [`CapacitySplitTableBuilder`] using given configuration generator.
93    #[allow(clippy::too_many_arguments)]
94    pub fn new(
95        builder_factory: F,
96        compactor_metrics: Arc<CompactorMetrics>,
97        task_progress: Option<Arc<TaskProgress>>,
98        table_vnode_partition: BTreeMap<TableId, u32>,
99        concurrent_uploading_sst_count: Option<usize>,
100        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
101    ) -> Self {
102        // TODO(var-vnode): should use value from caller
103        let vnode_count = VirtualNode::COUNT_FOR_COMPAT;
104
105        Self {
106            builder_factory,
107            sst_outputs: Vec::new(),
108            current_builder: None,
109            compactor_metrics,
110            task_progress,
111            last_table_id: 0.into(),
112            table_vnode_partition,
113            vnode_count,
114            split_weight_by_vnode: 0,
115            largest_vnode_in_current_partition: vnode_count - 1,
116            concurrent_upload_join_handle: FuturesUnordered::new(),
117            concurrent_uploading_sst_count,
118            compaction_catalog_agent_ref,
119        }
120    }
121
122    pub fn for_test(
123        builder_factory: F,
124        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
125    ) -> Self {
126        Self {
127            builder_factory,
128            sst_outputs: Vec::new(),
129            current_builder: None,
130            compactor_metrics: Arc::new(CompactorMetrics::unused()),
131            task_progress: None,
132            last_table_id: 0.into(),
133            table_vnode_partition: BTreeMap::default(),
134            vnode_count: VirtualNode::COUNT_FOR_TEST,
135            split_weight_by_vnode: 0,
136            largest_vnode_in_current_partition: VirtualNode::MAX_FOR_TEST.to_index(),
137            concurrent_upload_join_handle: FuturesUnordered::new(),
138            concurrent_uploading_sst_count: None,
139            compaction_catalog_agent_ref,
140        }
141    }
142
143    /// Returns the number of [`SstableBuilder`]s.
144    pub fn len(&self) -> usize {
145        self.sst_outputs.len() + self.current_builder.is_some() as usize
146    }
147
148    /// Returns true if no builder is created.
149    pub fn is_empty(&self) -> bool {
150        self.sst_outputs.is_empty() && self.current_builder.is_none()
151    }
152
153    pub async fn add_full_key_for_test(
154        &mut self,
155        full_key: FullKey<&[u8]>,
156        value: HummockValue<&[u8]>,
157        is_new_user_key: bool,
158    ) -> HummockResult<()> {
159        self.add_full_key(full_key, value, is_new_user_key).await
160    }
161
162    pub async fn add_raw_block(
163        &mut self,
164        buf: Bytes,
165        filter_data: Vec<u8>,
166        smallest_key: FullKey<Vec<u8>>,
167        largest_key: Vec<u8>,
168        block_meta: BlockMeta,
169    ) -> HummockResult<bool> {
170        if self.current_builder.is_none() {
171            if let Some(progress) = &self.task_progress {
172                progress.inc_num_pending_write_io()
173            }
174            let builder = self.builder_factory.open_builder().await?;
175            self.current_builder = Some(builder);
176        }
177
178        let builder = self.current_builder.as_mut().unwrap();
179        builder
180            .add_raw_block(buf, filter_data, smallest_key, largest_key, block_meta)
181            .await
182    }
183
184    /// Adds a key-value pair to the underlying builders.
185    ///
186    /// If `allow_split` and the current builder reaches its capacity, this function will create a
187    /// new one with the configuration generated by the closure provided earlier.
188    ///
189    /// Note that in some cases like compaction of the same user key, automatic splitting is not
190    /// allowed, where `allow_split` should be `false`.
191    pub async fn add_full_key(
192        &mut self,
193        full_key: FullKey<&[u8]>,
194        value: HummockValue<&[u8]>,
195        is_new_user_key: bool,
196    ) -> HummockResult<()> {
197        let switch_builder = self.check_switch_builder(&full_key.user_key);
198
199        // We use this `need_seal_current` flag to store whether we need to call `seal_current` and
200        // then call `seal_current` later outside the `if let` instead of calling
201        // `seal_current` at where we set `need_seal_current = true`. This is because
202        // `seal_current` is an async method, and if we call `seal_current` within the `if let`,
203        // this temporary reference to `current_builder` will be captured in the future generated
204        // from the current method. Since this generated future is usually required to be `Send`,
205        // the captured reference to `current_builder` is also required to be `Send`, and then
206        // `current_builder` itself is required to be `Sync`, which is unnecessary.
207        let mut need_seal_current = false;
208        if let Some(builder) = self.current_builder.as_mut()
209            && is_new_user_key
210        {
211            need_seal_current = switch_builder || builder.reach_capacity();
212        }
213
214        if need_seal_current {
215            self.seal_current().await?;
216        }
217
218        if self.current_builder.is_none() {
219            if let Some(progress) = &self.task_progress {
220                progress.inc_num_pending_write_io();
221            }
222            let builder = self.builder_factory.open_builder().await?;
223            self.current_builder = Some(builder);
224        }
225
226        let builder = self.current_builder.as_mut().unwrap();
227        builder.add(full_key, value).await
228    }
229
230    pub fn check_switch_builder(&mut self, user_key: &UserKey<&[u8]>) -> bool {
231        let mut switch_builder = false;
232        if user_key.table_id != self.last_table_id {
233            let new_vnode_partition_count = self.table_vnode_partition.get(&user_key.table_id);
234
235            self.vnode_count = self
236                .compaction_catalog_agent_ref
237                .vnode_count(user_key.table_id);
238            self.largest_vnode_in_current_partition = self.vnode_count - 1;
239
240            if new_vnode_partition_count.is_some()
241                || self.table_vnode_partition.contains_key(&self.last_table_id)
242            {
243                if let Some(new_vnode_partition_count) = new_vnode_partition_count {
244                    if (*new_vnode_partition_count as usize) > self.vnode_count {
245                        tracing::warn!(
246                            "vnode partition count {} is larger than vnode count {}",
247                            new_vnode_partition_count,
248                            self.vnode_count
249                        );
250
251                        self.split_weight_by_vnode = 0;
252                    } else {
253                        self.split_weight_by_vnode = *new_vnode_partition_count;
254                    };
255                } else {
256                    self.split_weight_by_vnode = 0;
257                }
258
259                // table_id change
260                self.last_table_id = user_key.table_id;
261                switch_builder = true;
262                if self.split_weight_by_vnode > 1 {
263                    self.largest_vnode_in_current_partition =
264                        self.vnode_count / (self.split_weight_by_vnode as usize) - 1;
265                } else {
266                    // default
267                    self.largest_vnode_in_current_partition = self.vnode_count - 1;
268                }
269            }
270        }
271        if self.largest_vnode_in_current_partition != self.vnode_count - 1 {
272            let key_vnode = user_key.get_vnode_id();
273            if key_vnode > self.largest_vnode_in_current_partition {
274                // vnode partition change
275                switch_builder = true;
276
277                // SAFETY: `self.split_weight_by_vnode > 1` here.
278                let (basic, remainder) = self
279                    .vnode_count
280                    .div_rem(&(self.split_weight_by_vnode as usize));
281                let small_segments_area = basic * (self.split_weight_by_vnode as usize - remainder);
282                self.largest_vnode_in_current_partition = (if key_vnode < small_segments_area {
283                    (key_vnode / basic + 1) * basic
284                } else {
285                    ((key_vnode - small_segments_area) / (basic + 1) + 1) * (basic + 1)
286                        + small_segments_area
287                }) - 1;
288                debug_assert!(key_vnode <= self.largest_vnode_in_current_partition);
289            }
290        }
291        switch_builder
292    }
293
294    pub fn need_flush(&self) -> bool {
295        self.current_builder
296            .as_ref()
297            .map(|builder| builder.reach_capacity())
298            .unwrap_or(false)
299    }
300
301    /// Marks the current builder as sealed. Next call of `add` will always create a new table.
302    ///
303    /// If there's no builder created, or current one is already sealed before, then this function
304    /// will be no-op.
305    pub async fn seal_current(&mut self) -> HummockResult<()> {
306        use await_tree::InstrumentAwait;
307        if let Some(builder) = self.current_builder.take() {
308            let builder_output = builder.finish().await?;
309            {
310                // report
311                if let Some(progress) = &self.task_progress {
312                    progress.inc_ssts_sealed();
313                }
314                builder_output.stats.report_stats(&self.compactor_metrics);
315            }
316
317            self.concurrent_upload_join_handle
318                .push(builder_output.writer_output);
319
320            self.sst_outputs.push(builder_output.sst_info);
321
322            if let Some(concurrent_uploading_sst_count) = self.concurrent_uploading_sst_count
323                && self.concurrent_upload_join_handle.len() >= concurrent_uploading_sst_count
324            {
325                self.concurrent_upload_join_handle
326                    .next()
327                    .instrument_await("upload".verbose())
328                    .await
329                    .unwrap()
330                    .map_err(HummockError::sstable_upload_error)??;
331            }
332        }
333        Ok(())
334    }
335
336    /// Finalizes all the tables to be ids, blocks and metadata.
337    pub async fn finish(mut self) -> HummockResult<Vec<LocalSstableInfo>> {
338        use futures::future::try_join_all;
339        self.seal_current().await?;
340        try_join_all(self.concurrent_upload_join_handle.into_iter())
341            .await
342            .map_err(HummockError::sstable_upload_error)?
343            .into_iter()
344            .collect::<HummockResult<Vec<()>>>()?;
345
346        Ok(self.sst_outputs)
347    }
348}
349
350/// Used for unit tests and benchmarks.
351pub struct LocalTableBuilderFactory {
352    next_id: AtomicU64,
353    sstable_store: SstableStoreRef,
354    options: SstableBuilderOptions,
355    policy: CachePolicy,
356    limiter: MemoryLimiter,
357}
358
359impl LocalTableBuilderFactory {
360    pub fn new(
361        next_id: u64,
362        sstable_store: SstableStoreRef,
363        options: SstableBuilderOptions,
364    ) -> Self {
365        Self {
366            next_id: AtomicU64::new(next_id),
367            sstable_store,
368            options,
369            policy: CachePolicy::NotFill,
370            limiter: MemoryLimiter::new(1000000),
371        }
372    }
373}
374
375#[async_trait::async_trait]
376impl TableBuilderFactory for LocalTableBuilderFactory {
377    type Filter = Xor16FilterBuilder;
378    type Writer = BatchUploadWriter;
379
380    async fn open_builder(
381        &mut self,
382    ) -> HummockResult<SstableBuilder<BatchUploadWriter, Xor16FilterBuilder>> {
383        let id = self.next_id.fetch_add(1, SeqCst);
384        let tracker = self.limiter.require_memory(1).await;
385        let writer_options = SstableWriterOptions {
386            capacity_hint: Some(self.options.capacity),
387            tracker: Some(tracker),
388            policy: self.policy,
389        };
390        let writer = self
391            .sstable_store
392            .clone()
393            .create_sst_writer(id, writer_options);
394        let table_id_to_vnode = HashMap::from_iter(vec![(
395            TableId::default().as_raw_id(),
396            VirtualNode::COUNT_FOR_TEST,
397        )]);
398        let table_id_to_watermark_serde =
399            HashMap::from_iter(vec![(TableId::default().as_raw_id(), None)]);
400        let builder = SstableBuilder::for_test(
401            id,
402            writer,
403            self.options.clone(),
404            table_id_to_vnode,
405            table_id_to_watermark_serde,
406        );
407
408        Ok(builder)
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use risingwave_common::catalog::TableId;
415    use risingwave_common::util::epoch::{EpochExt, test_epoch};
416
417    use super::*;
418    use crate::compaction_catalog_manager::{
419        CompactionCatalogAgent, FilterKeyExtractorImpl, FullKeyFilterKeyExtractor,
420    };
421    use crate::hummock::DEFAULT_RESTART_INTERVAL;
422    use crate::hummock::iterator::test_utils::mock_sstable_store;
423    use crate::hummock::test_utils::{default_builder_opt_for_test, test_key_of, test_user_key_of};
424
425    #[tokio::test]
426    async fn test_empty() {
427        let block_size = 1 << 10;
428        let table_capacity = 4 * block_size;
429        let opts = SstableBuilderOptions {
430            capacity: table_capacity,
431            block_capacity: block_size,
432            restart_interval: DEFAULT_RESTART_INTERVAL,
433            bloom_false_positive: 0.1,
434            ..Default::default()
435        };
436        let builder_factory = LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts);
437        let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::dummy());
438        let builder =
439            CapacitySplitTableBuilder::for_test(builder_factory, compaction_catalog_agent_ref);
440        let results = builder.finish().await.unwrap();
441        assert!(results.is_empty());
442    }
443
444    #[tokio::test]
445    async fn test_lots_of_tables() {
446        let block_size = 1 << 10;
447        let table_capacity = 4 * block_size;
448        let opts = SstableBuilderOptions {
449            capacity: table_capacity,
450            block_capacity: block_size,
451            restart_interval: DEFAULT_RESTART_INTERVAL,
452            bloom_false_positive: 0.1,
453            ..Default::default()
454        };
455        let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
456
457        let builder_factory = LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts);
458        let mut builder =
459            CapacitySplitTableBuilder::for_test(builder_factory, compaction_catalog_agent_ref);
460
461        for i in 0..table_capacity {
462            builder
463                .add_full_key_for_test(
464                    FullKey::from_user_key(
465                        test_user_key_of(i).as_ref(),
466                        test_epoch((table_capacity - i) as u64),
467                    ),
468                    HummockValue::put(b"value"),
469                    true,
470                )
471                .await
472                .unwrap();
473        }
474
475        let results = builder.finish().await.unwrap();
476        assert!(results.len() > 1);
477    }
478
479    #[tokio::test]
480    async fn test_table_seal() {
481        let opts = default_builder_opt_for_test();
482        let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
483        let mut builder = CapacitySplitTableBuilder::for_test(
484            LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
485            compaction_catalog_agent_ref,
486        );
487        let mut epoch = test_epoch(100);
488
489        macro_rules! add {
490            () => {
491                epoch.dec_epoch();
492                builder
493                    .add_full_key_for_test(
494                        FullKey::from_user_key(test_user_key_of(1).as_ref(), epoch),
495                        HummockValue::put(b"v"),
496                        true,
497                    )
498                    .await
499                    .unwrap();
500            };
501        }
502
503        assert_eq!(builder.len(), 0);
504        builder.seal_current().await.unwrap();
505        assert_eq!(builder.len(), 0);
506        add!();
507        assert_eq!(builder.len(), 1);
508        add!();
509        assert_eq!(builder.len(), 1);
510        builder.seal_current().await.unwrap();
511        assert_eq!(builder.len(), 1);
512        add!();
513        assert_eq!(builder.len(), 2);
514        builder.seal_current().await.unwrap();
515        assert_eq!(builder.len(), 2);
516        builder.seal_current().await.unwrap();
517        assert_eq!(builder.len(), 2);
518
519        let results = builder.finish().await.unwrap();
520        assert_eq!(results.len(), 2);
521    }
522
523    #[tokio::test]
524    async fn test_initial_not_allowed_split() {
525        let opts = default_builder_opt_for_test();
526        let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
527        let mut builder = CapacitySplitTableBuilder::for_test(
528            LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
529            compaction_catalog_agent_ref,
530        );
531        builder
532            .add_full_key_for_test(test_key_of(0).to_ref(), HummockValue::put(b"v"), false)
533            .await
534            .unwrap();
535    }
536
537    #[tokio::test]
538    async fn test_check_table_and_vnode_change() {
539        let block_size = 256;
540        let table_capacity = 2 * block_size;
541        let opts = SstableBuilderOptions {
542            capacity: table_capacity,
543            block_capacity: block_size,
544            restart_interval: DEFAULT_RESTART_INTERVAL,
545            bloom_false_positive: 0.1,
546            ..Default::default()
547        };
548
549        {
550            let table_partition_vnode = BTreeMap::from([
551                (1_u32.into(), 4_u32),
552                (2_u32.into(), 4_u32),
553                (3_u32.into(), 4_u32),
554            ]);
555
556            let compaction_catalog_agent_ref =
557                CompactionCatalogAgent::for_test(vec![0, 1, 2, 3, 4, 5]);
558            let mut builder = CapacitySplitTableBuilder::new(
559                LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts.clone()),
560                Arc::new(CompactorMetrics::unused()),
561                None,
562                table_partition_vnode,
563                None,
564                compaction_catalog_agent_ref,
565            );
566
567            let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
568            table_key.extend_from_slice("a".as_bytes());
569
570            let switch_builder =
571                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
572            assert!(switch_builder);
573
574            let mut table_key = VirtualNode::from_index(62).to_be_bytes().to_vec();
575            table_key.extend_from_slice("a".as_bytes());
576            let switch_builder =
577                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
578            assert!(!switch_builder);
579
580            let mut table_key = VirtualNode::from_index(63).to_be_bytes().to_vec();
581            table_key.extend_from_slice("a".as_bytes());
582            let switch_builder =
583                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
584            assert!(!switch_builder);
585
586            let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
587            table_key.extend_from_slice("a".as_bytes());
588            let switch_builder =
589                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
590            assert!(switch_builder);
591
592            let switch_builder =
593                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
594            assert!(switch_builder);
595            let switch_builder =
596                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
597            assert!(switch_builder);
598            let switch_builder =
599                builder.check_switch_builder(&UserKey::for_test(TableId::from(4), &table_key));
600            assert!(switch_builder);
601            let switch_builder =
602                builder.check_switch_builder(&UserKey::for_test(TableId::from(5), &table_key));
603            assert!(!switch_builder);
604        }
605
606        {
607            // Test different table vnode count
608            let table_partition_vnode = BTreeMap::from([
609                (1_u32.into(), 4_u32),
610                (2_u32.into(), 4_u32),
611                (3_u32.into(), 4_u32),
612            ]);
613
614            let table_id_to_vnode =
615                HashMap::from_iter(vec![(1.into(), 64), (2.into(), 128), (3.into(), 256)]);
616            let table_id_to_watermark_serde =
617                HashMap::from_iter(vec![(1.into(), None), (2.into(), None), (3.into(), None)]);
618            let compaction_catalog_agent_ref = Arc::new(CompactionCatalogAgent::new(
619                FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor),
620                table_id_to_vnode,
621                table_id_to_watermark_serde,
622            ));
623
624            let mut builder = CapacitySplitTableBuilder::new(
625                LocalTableBuilderFactory::new(1001, mock_sstable_store().await, opts),
626                Arc::new(CompactorMetrics::unused()),
627                None,
628                table_partition_vnode,
629                None,
630                compaction_catalog_agent_ref,
631            );
632
633            let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
634            table_key.extend_from_slice("a".as_bytes());
635
636            let switch_builder =
637                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
638            assert!(switch_builder);
639
640            let mut table_key = VirtualNode::from_index(15).to_be_bytes().to_vec();
641            table_key.extend_from_slice("a".as_bytes());
642            let switch_builder =
643                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
644            assert!(!switch_builder);
645
646            let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
647            table_key.extend_from_slice("a".as_bytes());
648            let switch_builder =
649                builder.check_switch_builder(&UserKey::for_test(TableId::from(1), &table_key));
650            assert!(switch_builder);
651
652            let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
653            table_key.extend_from_slice("a".as_bytes());
654            let switch_builder =
655                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
656            assert!(switch_builder);
657
658            let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
659            table_key.extend_from_slice("a".as_bytes());
660            let switch_builder =
661                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
662            assert!(!switch_builder);
663
664            let mut table_key = VirtualNode::from_index(31).to_be_bytes().to_vec();
665            table_key.extend_from_slice("a".as_bytes());
666            let switch_builder =
667                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
668            assert!(!switch_builder);
669
670            let mut table_key = VirtualNode::from_index(32).to_be_bytes().to_vec();
671            table_key.extend_from_slice("a".as_bytes());
672            let switch_builder =
673                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
674            assert!(switch_builder);
675
676            let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
677            table_key.extend_from_slice("a".as_bytes());
678            let switch_builder =
679                builder.check_switch_builder(&UserKey::for_test(TableId::from(2), &table_key));
680            assert!(switch_builder);
681
682            let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec();
683            table_key.extend_from_slice("a".as_bytes());
684            let switch_builder =
685                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
686            assert!(switch_builder);
687
688            let mut table_key = VirtualNode::from_index(16).to_be_bytes().to_vec();
689            table_key.extend_from_slice("a".as_bytes());
690            let switch_builder =
691                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
692            assert!(!switch_builder);
693
694            let mut table_key = VirtualNode::from_index(32).to_be_bytes().to_vec();
695            table_key.extend_from_slice("a".as_bytes());
696            let switch_builder =
697                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
698            assert!(!switch_builder);
699
700            let mut table_key = VirtualNode::from_index(63).to_be_bytes().to_vec();
701            table_key.extend_from_slice("a".as_bytes());
702            let switch_builder =
703                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
704            assert!(!switch_builder);
705
706            let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec();
707            table_key.extend_from_slice("a".as_bytes());
708            let switch_builder =
709                builder.check_switch_builder(&UserKey::for_test(TableId::from(3), &table_key));
710            assert!(switch_builder);
711        }
712    }
713}