risingwave_storage/hummock/compactor/
compaction_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::constants::hummock::CompactionFilterFlag;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::compaction_group::StateTableId;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStatsMap;
31use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
32use risingwave_pb::hummock::compact_task::PbTaskType;
33use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema};
34use tokio::time::Instant;
35
36pub use super::context::CompactorContext;
37use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
38use crate::hummock::compactor::{
39    ConcatSstableIterator, MultiCompactionFilter, StateCleanUpCompactionFilter, TaskProgress,
40    TtlCompactionFilter,
41};
42use crate::hummock::iterator::{
43    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
44    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
45    UserIterator,
46};
47use crate::hummock::multi_builder::TableBuilderFactory;
48use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
49use crate::hummock::{
50    CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
51    SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
52};
53use crate::monitor::StoreLocalStatistic;
54
55pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
56    pub object_id_getter: Arc<dyn GetObjectId>,
57    pub limiter: Arc<MemoryLimiter>,
58    pub options: SstableBuilderOptions,
59    pub policy: CachePolicy,
60    pub remote_rpc_cost: Arc<AtomicU64>,
61    pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
62    pub sstable_writer_factory: W,
63    pub _phantom: PhantomData<F>,
64}
65
66#[async_trait::async_trait]
67impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
68    type Filter = F;
69    type Writer = W::Writer;
70
71    async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
72        let timer = Instant::now();
73        let table_id = self.object_id_getter.get_new_sst_object_id().await?;
74        let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
75        self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
76        let writer_options = SstableWriterOptions {
77            capacity_hint: Some(self.options.capacity + self.options.block_capacity),
78            tracker: None,
79            policy: self.policy,
80        };
81        let writer = self
82            .sstable_writer_factory
83            .create_sst_writer(table_id, writer_options)
84            .await?;
85        let builder = SstableBuilder::new(
86            table_id,
87            writer,
88            Self::Filter::create(
89                self.options.bloom_false_positive,
90                self.options.capacity / DEFAULT_ENTRY_SIZE + 1,
91            ),
92            self.options.clone(),
93            self.compaction_catalog_agent_ref.clone(),
94            Some(self.limiter.clone()),
95        );
96        Ok(builder)
97    }
98}
99
100/// `CompactionStatistics` will count the results of each compact split
101#[derive(Default, Debug)]
102pub struct CompactionStatistics {
103    // to report per-table metrics
104    pub delta_drop_stat: TableStatsMap,
105
106    // to calculate delete ratio
107    pub iter_total_key_counts: u64,
108    pub iter_drop_key_counts: u64,
109}
110
111impl CompactionStatistics {
112    #[allow(dead_code)]
113    fn delete_ratio(&self) -> Option<u64> {
114        if self.iter_total_key_counts == 0 {
115            return None;
116        }
117
118        Some(self.iter_drop_key_counts / self.iter_total_key_counts)
119    }
120}
121
122#[derive(Clone, Default)]
123pub struct TaskConfig {
124    pub key_range: KeyRange,
125    pub cache_policy: CachePolicy,
126    pub gc_delete_keys: bool,
127    pub retain_multiple_version: bool,
128    /// `stats_target_table_ids` decides whether a dropped key should be counted as table stats
129    /// change. For an divided SST as input, a dropped key shouldn't be counted if its table id
130    /// doesn't belong to this divided SST. See `Compactor::compact_and_build_sst`.
131    pub stats_target_table_ids: Option<HashSet<TableId>>,
132    pub task_type: PbTaskType,
133    pub use_block_based_filter: bool,
134
135    pub table_vnode_partition: BTreeMap<TableId, u32>,
136    /// `TableId` -> `TableSchema`
137    /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`.
138    /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped.
139    pub table_schemas: HashMap<TableId, PbTableSchema>,
140    /// `disable_drop_column_optimization` should only be set in benchmark.
141    pub disable_drop_column_optimization: bool,
142}
143
144pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
145    let mut multi_filter = MultiCompactionFilter::default();
146    let compaction_filter_flag =
147        CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
148    if compaction_filter_flag.contains(CompactionFilterFlag::STATE_CLEAN) {
149        let state_clean_up_filter = Box::new(StateCleanUpCompactionFilter::new(
150            HashSet::from_iter(compact_task.existing_table_ids.clone()),
151        ));
152
153        multi_filter.register(state_clean_up_filter);
154    }
155
156    if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
157        let id_to_ttl = compact_task
158            .table_options
159            .iter()
160            .filter_map(|(id, option)| {
161                option
162                    .retention_seconds
163                    .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
164            })
165            .collect();
166
167        let ttl_filter = Box::new(TtlCompactionFilter::new(
168            id_to_ttl,
169            compact_task.current_epoch_time,
170        ));
171        multi_filter.register(ttl_filter);
172    }
173
174    multi_filter
175}
176
177fn generate_splits_fast(
178    sstable_infos: &Vec<SstableInfo>,
179    compaction_size: u64,
180    context: &CompactorContext,
181    max_sub_compaction: u32,
182) -> Vec<KeyRange> {
183    let worker_num = context.compaction_executor.worker_num();
184    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
185
186    let parallelism = calculate_task_parallelism_impl(
187        worker_num,
188        parallel_compact_size,
189        compaction_size,
190        max_sub_compaction,
191    );
192    let mut indexes = vec![];
193    for sst in sstable_infos {
194        let key_range = &sst.key_range;
195        indexes.push(
196            FullKey {
197                user_key: FullKey::decode(&key_range.left).user_key,
198                epoch_with_gap: EpochWithGap::new_max_epoch(),
199            }
200            .encode(),
201        );
202        indexes.push(
203            FullKey {
204                user_key: FullKey::decode(&key_range.right).user_key,
205                epoch_with_gap: EpochWithGap::new_max_epoch(),
206            }
207            .encode(),
208        );
209    }
210    indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
211    indexes.dedup();
212    if indexes.len() <= parallelism {
213        return vec![];
214    }
215
216    let mut splits = vec![];
217    splits.push(KeyRange::default());
218    let parallel_key_count = indexes.len() / parallelism;
219    let mut last_split_key_count = 0;
220    for key in indexes {
221        if last_split_key_count >= parallel_key_count {
222            splits.last_mut().unwrap().right = Bytes::from(key.clone());
223            splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
224            last_split_key_count = 0;
225        }
226        last_split_key_count += 1;
227    }
228
229    splits
230}
231
232pub async fn generate_splits(
233    sstable_infos: &Vec<SstableInfo>,
234    compaction_size: u64,
235    context: &CompactorContext,
236    max_sub_compaction: u32,
237) -> HummockResult<Vec<KeyRange>> {
238    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
239    if compaction_size > parallel_compact_size {
240        if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
241            return Ok(generate_splits_fast(
242                sstable_infos,
243                compaction_size,
244                context,
245                max_sub_compaction,
246            ));
247        }
248        let mut indexes = vec![];
249        // preload the meta and get the smallest key to split sub_compaction
250        for sstable_info in sstable_infos {
251            indexes.extend(
252                context
253                    .sstable_store
254                    .sstable(sstable_info, &mut StoreLocalStatistic::default())
255                    .await?
256                    .meta
257                    .block_metas
258                    .iter()
259                    .map(|block| {
260                        let data_size = block.len;
261                        let full_key = FullKey {
262                            user_key: FullKey::decode(&block.smallest_key).user_key,
263                            epoch_with_gap: EpochWithGap::new_max_epoch(),
264                        }
265                        .encode();
266                        (data_size as u64, full_key)
267                    })
268                    .collect_vec(),
269            );
270        }
271        // sort by key, as for every data block has the same size;
272        indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
273        let mut splits = vec![];
274        splits.push(KeyRange::default());
275
276        let parallelism = calculate_task_parallelism_impl(
277            context.compaction_executor.worker_num(),
278            parallel_compact_size,
279            compaction_size,
280            max_sub_compaction,
281        );
282
283        let sub_compaction_data_size =
284            std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
285
286        if parallelism > 1 {
287            let mut last_buffer_size = 0;
288            let mut last_key: Vec<u8> = vec![];
289            let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
290            for (data_size, key) in indexes {
291                if last_buffer_size >= sub_compaction_data_size
292                    && !last_key.eq(&key)
293                    && remaining_size > parallel_compact_size
294                {
295                    splits.last_mut().unwrap().right = Bytes::from(key.clone());
296                    splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
297                    last_buffer_size = data_size;
298                } else {
299                    last_buffer_size += data_size;
300                }
301                remaining_size -= data_size;
302                last_key = key;
303            }
304            return Ok(splits);
305        }
306    }
307
308    Ok(vec![])
309}
310
311pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
312    let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
313    let total_input_uncompressed_file_size = task
314        .input_ssts
315        .iter()
316        .flat_map(|level| level.table_infos.iter())
317        .map(|table| table.uncompressed_file_size)
318        .sum::<u64>();
319
320    let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
321    std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
322}
323
324/// Compare result of compaction task and input. The data saw by user shall not change after applying compaction result.
325pub async fn check_compaction_result(
326    compact_task: &CompactTask,
327    context: CompactorContext,
328    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
329) -> HummockResult<bool> {
330    let mut table_ids_from_input_ssts = compact_task.get_table_ids_from_input_ssts();
331    let need_clean_state_table = table_ids_from_input_ssts
332        .any(|table_id| !compact_task.existing_table_ids.contains(&table_id));
333    // This check method does not consider dropped keys by compaction filter.
334    if compact_task.contains_ttl() || need_clean_state_table {
335        return Ok(true);
336    }
337
338    let mut table_iters = Vec::new();
339    for level in &compact_task.input_ssts {
340        if level.table_infos.is_empty() {
341            continue;
342        }
343
344        // Do not need to filter the table because manager has done it.
345        if level.level_type == PbLevelType::Nonoverlapping {
346            debug_assert!(can_concat(&level.table_infos));
347
348            table_iters.push(ConcatSstableIterator::new(
349                compact_task.existing_table_ids.clone(),
350                level.table_infos.clone(),
351                KeyRange::inf(),
352                context.sstable_store.clone(),
353                Arc::new(TaskProgress::default()),
354                context.storage_opts.compactor_iter_max_io_retry_times,
355            ));
356        } else {
357            for table_info in &level.table_infos {
358                table_iters.push(ConcatSstableIterator::new(
359                    compact_task.existing_table_ids.clone(),
360                    vec![table_info.clone()],
361                    KeyRange::inf(),
362                    context.sstable_store.clone(),
363                    Arc::new(TaskProgress::default()),
364                    context.storage_opts.compactor_iter_max_io_retry_times,
365                ));
366            }
367        }
368    }
369
370    let iter = MergeIterator::for_compactor(table_iters);
371    let left_iter = {
372        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
373            iter,
374            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
375                compact_task.pk_prefix_table_watermarks.clone(),
376            ),
377        );
378
379        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
380            skip_watermark_iter,
381            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
382                compact_task.non_pk_prefix_table_watermarks.clone(),
383                compaction_catalog_agent_ref.clone(),
384            ),
385        );
386
387        UserIterator::new(
388            combine_iter,
389            (Bound::Unbounded, Bound::Unbounded),
390            u64::MAX,
391            0,
392            None,
393        )
394    };
395    let iter = ConcatSstableIterator::new(
396        compact_task.existing_table_ids.clone(),
397        compact_task.sorted_output_ssts.clone(),
398        KeyRange::inf(),
399        context.sstable_store.clone(),
400        Arc::new(TaskProgress::default()),
401        context.storage_opts.compactor_iter_max_io_retry_times,
402    );
403    let right_iter = {
404        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
405            iter,
406            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
407                compact_task.pk_prefix_table_watermarks.clone(),
408            ),
409        );
410
411        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
412            skip_watermark_iter,
413            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
414                compact_task.non_pk_prefix_table_watermarks.clone(),
415                compaction_catalog_agent_ref,
416            ),
417        );
418
419        UserIterator::new(
420            combine_iter,
421            (Bound::Unbounded, Bound::Unbounded),
422            u64::MAX,
423            0,
424            None,
425        )
426    };
427
428    check_result(left_iter, right_iter).await
429}
430
431pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
432    left_iter: UserIterator<I>,
433    existing_table_ids: Vec<StateTableId>,
434    sort_ssts: Vec<SstableInfo>,
435    context: CompactorContext,
436) -> HummockResult<bool> {
437    let iter = ConcatSstableIterator::new(
438        existing_table_ids.clone(),
439        sort_ssts.clone(),
440        KeyRange::inf(),
441        context.sstable_store.clone(),
442        Arc::new(TaskProgress::default()),
443        0,
444    );
445    let right_iter = UserIterator::new(
446        iter,
447        (Bound::Unbounded, Bound::Unbounded),
448        u64::MAX,
449        0,
450        None,
451    );
452    check_result(left_iter, right_iter).await
453}
454
455async fn check_result<
456    I1: HummockIterator<Direction = Forward>,
457    I2: HummockIterator<Direction = Forward>,
458>(
459    mut left_iter: UserIterator<I1>,
460    mut right_iter: UserIterator<I2>,
461) -> HummockResult<bool> {
462    left_iter.rewind().await?;
463    right_iter.rewind().await?;
464    let mut right_count = 0;
465    let mut left_count = 0;
466    while left_iter.is_valid() && right_iter.is_valid() {
467        if left_iter.key() != right_iter.key() {
468            tracing::error!(
469                "The key of input and output not equal. key: {:?} vs {:?}",
470                left_iter.key(),
471                right_iter.key()
472            );
473            return Ok(false);
474        }
475        if left_iter.value() != right_iter.value() {
476            tracing::error!(
477                "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
478                left_iter.key(),
479                left_iter.value(),
480                right_iter.value()
481            );
482            return Ok(false);
483        }
484        left_iter.next().await?;
485        right_iter.next().await?;
486        left_count += 1;
487        right_count += 1;
488    }
489    while left_iter.is_valid() {
490        left_count += 1;
491        left_iter.next().await?;
492    }
493    while right_iter.is_valid() {
494        right_count += 1;
495        right_iter.next().await?;
496    }
497    if left_count != right_count {
498        tracing::error!(
499            "The key count of input and output not equal: {} vs {}",
500            left_count,
501            right_count
502        );
503        return Ok(false);
504    }
505    Ok(true)
506}
507
508pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
509    let sstable_infos = compact_task
510        .input_ssts
511        .iter()
512        .flat_map(|level| level.table_infos.iter())
513        .filter(|table_info| {
514            let table_ids = &table_info.table_ids;
515            table_ids
516                .iter()
517                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
518        })
519        .cloned()
520        .collect_vec();
521    let compaction_size = sstable_infos
522        .iter()
523        .map(|table_info| table_info.sst_size)
524        .sum::<u64>();
525
526    let all_ssts_are_blocked_filter = sstable_infos
527        .iter()
528        .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
529
530    let delete_key_count = sstable_infos
531        .iter()
532        .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
533        .sum::<u64>();
534    let total_key_count = sstable_infos
535        .iter()
536        .map(|table_info| table_info.total_key_count)
537        .sum::<u64>();
538
539    let single_table = compact_task.build_compact_table_ids().len() == 1;
540    context.storage_opts.enable_fast_compaction
541        && all_ssts_are_blocked_filter
542        && !compact_task.contains_range_tombstone()
543        && !compact_task.contains_ttl()
544        && !compact_task.contains_split_sst()
545        && single_table
546        && compact_task.target_level > 0
547        && compact_task.input_ssts.len() == 2
548        && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
549        && delete_key_count * 100
550            < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
551        && compact_task.task_type == PbTaskType::Dynamic
552}
553
554pub async fn generate_splits_for_task(
555    compact_task: &mut CompactTask,
556    context: &CompactorContext,
557    optimize_by_copy_block: bool,
558) -> HummockResult<()> {
559    let sstable_infos = compact_task
560        .input_ssts
561        .iter()
562        .flat_map(|level| level.table_infos.iter())
563        .filter(|table_info| {
564            let table_ids = &table_info.table_ids;
565            table_ids
566                .iter()
567                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
568        })
569        .cloned()
570        .collect_vec();
571    let compaction_size = sstable_infos
572        .iter()
573        .map(|table_info| table_info.sst_size)
574        .sum::<u64>();
575
576    if !optimize_by_copy_block {
577        let splits = generate_splits(
578            &sstable_infos,
579            compaction_size,
580            context,
581            compact_task.max_sub_compaction,
582        )
583        .await?;
584        if !splits.is_empty() {
585            compact_task.splits = splits;
586        }
587        return Ok(());
588    }
589
590    Ok(())
591}
592
593pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
594    let group_label = compact_task.compaction_group_id.to_string();
595    let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
596    let select_table_infos = compact_task
597        .input_ssts
598        .iter()
599        .filter(|level| level.level_idx != compact_task.target_level)
600        .flat_map(|level| level.table_infos.iter())
601        .collect_vec();
602    let target_table_infos = compact_task
603        .input_ssts
604        .iter()
605        .filter(|level| level.level_idx == compact_task.target_level)
606        .flat_map(|level| level.table_infos.iter())
607        .collect_vec();
608    let select_size = select_table_infos
609        .iter()
610        .map(|table| table.sst_size)
611        .sum::<u64>();
612    context
613        .compactor_metrics
614        .compact_read_current_level
615        .with_label_values(&[&group_label, &cur_level_label])
616        .inc_by(select_size);
617    context
618        .compactor_metrics
619        .compact_read_sstn_current_level
620        .with_label_values(&[&group_label, &cur_level_label])
621        .inc_by(select_table_infos.len() as u64);
622
623    let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::<u64>();
624    let next_level_label = compact_task.target_level.to_string();
625    context
626        .compactor_metrics
627        .compact_read_next_level
628        .with_label_values(&[&group_label, &next_level_label])
629        .inc_by(target_level_read_bytes);
630    context
631        .compactor_metrics
632        .compact_read_sstn_next_level
633        .with_label_values(&[&group_label, &next_level_label])
634        .inc_by(target_table_infos.len() as u64);
635}
636
637pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
638    let optimize_by_copy_block = optimize_by_copy_block(compact_task, context);
639
640    if optimize_by_copy_block {
641        return 1;
642    }
643
644    let sstable_infos = compact_task
645        .input_ssts
646        .iter()
647        .flat_map(|level| level.table_infos.iter())
648        .filter(|table_info| {
649            let table_ids = &table_info.table_ids;
650            table_ids
651                .iter()
652                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
653        })
654        .cloned()
655        .collect_vec();
656    let compaction_size = sstable_infos
657        .iter()
658        .map(|table_info| table_info.sst_size)
659        .sum::<u64>();
660    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
661    calculate_task_parallelism_impl(
662        context.compaction_executor.worker_num(),
663        parallel_compact_size,
664        compaction_size,
665        compact_task.max_sub_compaction,
666    )
667}
668
669pub fn calculate_task_parallelism_impl(
670    worker_num: usize,
671    parallel_compact_size: u64,
672    compaction_size: u64,
673    max_sub_compaction: u32,
674) -> usize {
675    let parallelism = compaction_size.div_ceil(parallel_compact_size);
676    worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
677}