1use std::collections::{HashMap, HashSet, VecDeque};
16use std::future::poll_fn;
17use std::ops::Range;
18use std::sync::{Arc, LazyLock};
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use foyer::{HybridCacheEntry, RangeBoundsExt};
23use futures::future::{join_all, try_join_all};
24use futures::{Future, FutureExt};
25use itertools::Itertools;
26use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
27use prometheus::{
28 Histogram, HistogramVec, IntGauge, Registry, register_histogram_vec_with_registry,
29 register_int_counter_vec_with_registry, register_int_gauge_with_registry,
30};
31use risingwave_common::license::Feature;
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
34use risingwave_hummock_sdk::{HummockSstableObjectId, KeyComparator};
35use thiserror_ext::AsReport;
36use tokio::sync::Semaphore;
37use tokio::task::JoinHandle;
38
39use crate::hummock::local_version::pinned_version::PinnedVersion;
40use crate::hummock::{
41 Block, HummockError, HummockResult, RecentFilterTrait, Sstable, SstableBlockIndex,
42 SstableStoreRef, TableHolder,
43};
44use crate::monitor::StoreLocalStatistic;
45use crate::opts::StorageOpts;
46
47pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
48 LazyLock::new(|| CacheRefillMetrics::new(&GLOBAL_METRICS_REGISTRY));
49
50pub struct CacheRefillMetrics {
51 pub refill_duration: HistogramVec,
52 pub refill_total: GenericCounterVec<AtomicU64>,
53 pub refill_bytes: GenericCounterVec<AtomicU64>,
54
55 pub data_refill_success_duration: Histogram,
56 pub meta_refill_success_duration: Histogram,
57
58 pub data_refill_filtered_total: GenericCounter<AtomicU64>,
59 pub data_refill_attempts_total: GenericCounter<AtomicU64>,
60 pub data_refill_started_total: GenericCounter<AtomicU64>,
61 pub meta_refill_attempts_total: GenericCounter<AtomicU64>,
62
63 pub data_refill_parent_meta_lookup_hit_total: GenericCounter<AtomicU64>,
64 pub data_refill_parent_meta_lookup_miss_total: GenericCounter<AtomicU64>,
65 pub data_refill_unit_inheritance_hit_total: GenericCounter<AtomicU64>,
66 pub data_refill_unit_inheritance_miss_total: GenericCounter<AtomicU64>,
67
68 pub data_refill_block_unfiltered_total: GenericCounter<AtomicU64>,
69 pub data_refill_block_success_total: GenericCounter<AtomicU64>,
70
71 pub data_refill_ideal_bytes: GenericCounter<AtomicU64>,
72 pub data_refill_success_bytes: GenericCounter<AtomicU64>,
73
74 pub refill_queue_total: IntGauge,
75}
76
77impl CacheRefillMetrics {
78 pub fn new(registry: &Registry) -> Self {
79 let refill_duration = register_histogram_vec_with_registry!(
80 "refill_duration",
81 "refill duration",
82 &["type", "op"],
83 registry,
84 )
85 .unwrap();
86 let refill_total = register_int_counter_vec_with_registry!(
87 "refill_total",
88 "refill total",
89 &["type", "op"],
90 registry,
91 )
92 .unwrap();
93 let refill_bytes = register_int_counter_vec_with_registry!(
94 "refill_bytes",
95 "refill bytes",
96 &["type", "op"],
97 registry,
98 )
99 .unwrap();
100
101 let data_refill_success_duration = refill_duration
102 .get_metric_with_label_values(&["data", "success"])
103 .unwrap();
104 let meta_refill_success_duration = refill_duration
105 .get_metric_with_label_values(&["meta", "success"])
106 .unwrap();
107
108 let data_refill_filtered_total = refill_total
109 .get_metric_with_label_values(&["data", "filtered"])
110 .unwrap();
111 let data_refill_attempts_total = refill_total
112 .get_metric_with_label_values(&["data", "attempts"])
113 .unwrap();
114 let data_refill_started_total = refill_total
115 .get_metric_with_label_values(&["data", "started"])
116 .unwrap();
117 let meta_refill_attempts_total = refill_total
118 .get_metric_with_label_values(&["meta", "attempts"])
119 .unwrap();
120
121 let data_refill_parent_meta_lookup_hit_total = refill_total
122 .get_metric_with_label_values(&["parent_meta", "hit"])
123 .unwrap();
124 let data_refill_parent_meta_lookup_miss_total = refill_total
125 .get_metric_with_label_values(&["parent_meta", "miss"])
126 .unwrap();
127 let data_refill_unit_inheritance_hit_total = refill_total
128 .get_metric_with_label_values(&["unit_inheritance", "hit"])
129 .unwrap();
130 let data_refill_unit_inheritance_miss_total = refill_total
131 .get_metric_with_label_values(&["unit_inheritance", "miss"])
132 .unwrap();
133
134 let data_refill_block_unfiltered_total = refill_total
135 .get_metric_with_label_values(&["block", "unfiltered"])
136 .unwrap();
137 let data_refill_block_success_total = refill_total
138 .get_metric_with_label_values(&["block", "success"])
139 .unwrap();
140
141 let data_refill_ideal_bytes = refill_bytes
142 .get_metric_with_label_values(&["data", "ideal"])
143 .unwrap();
144 let data_refill_success_bytes = refill_bytes
145 .get_metric_with_label_values(&["data", "success"])
146 .unwrap();
147
148 let refill_queue_total = register_int_gauge_with_registry!(
149 "refill_queue_total",
150 "refill queue total",
151 registry,
152 )
153 .unwrap();
154
155 Self {
156 refill_duration,
157 refill_total,
158 refill_bytes,
159
160 data_refill_success_duration,
161 meta_refill_success_duration,
162 data_refill_filtered_total,
163 data_refill_attempts_total,
164 data_refill_started_total,
165 meta_refill_attempts_total,
166
167 data_refill_parent_meta_lookup_hit_total,
168 data_refill_parent_meta_lookup_miss_total,
169 data_refill_unit_inheritance_hit_total,
170 data_refill_unit_inheritance_miss_total,
171
172 data_refill_block_unfiltered_total,
173 data_refill_block_success_total,
174
175 data_refill_ideal_bytes,
176 data_refill_success_bytes,
177
178 refill_queue_total,
179 }
180 }
181}
182
183#[derive(Debug)]
184pub struct CacheRefillConfig {
185 pub timeout: Duration,
187
188 pub data_refill_levels: HashSet<u32>,
190
191 pub meta_refill_concurrency: usize,
193
194 pub concurrency: usize,
196
197 pub unit: usize,
199
200 pub threshold: f64,
204
205 pub skip_recent_filter: bool,
207}
208
209impl CacheRefillConfig {
210 pub fn from_storage_opts(options: &StorageOpts) -> Self {
211 let data_refill_levels = match Feature::ElasticDiskCache.check_available() {
212 Ok(_) => options
213 .cache_refill_data_refill_levels
214 .iter()
215 .copied()
216 .collect(),
217 Err(e) => {
218 tracing::warn!(error = %e.as_report(), "ElasticDiskCache is not available.");
219 HashSet::new()
220 }
221 };
222
223 Self {
224 timeout: Duration::from_millis(options.cache_refill_timeout_ms),
225 data_refill_levels,
226 concurrency: options.cache_refill_concurrency,
227 meta_refill_concurrency: options.cache_refill_meta_refill_concurrency,
228 unit: options.cache_refill_unit,
229 threshold: options.cache_refill_threshold,
230 skip_recent_filter: options.cache_refill_skip_recent_filter,
231 }
232 }
233}
234
235struct Item {
236 handle: JoinHandle<()>,
237 event: CacheRefillerEvent,
238}
239
240pub(crate) type SpawnRefillTask = Arc<
241 dyn Fn(Vec<SstDeltaInfo>, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()>
243 + Send
244 + Sync
245 + 'static,
246>;
247
248pub(crate) struct CacheRefiller {
250 queue: VecDeque<Item>,
252
253 context: CacheRefillContext,
254
255 spawn_refill_task: SpawnRefillTask,
256}
257
258impl CacheRefiller {
259 pub(crate) fn new(
260 config: CacheRefillConfig,
261 sstable_store: SstableStoreRef,
262 spawn_refill_task: SpawnRefillTask,
263 ) -> Self {
264 let config = Arc::new(config);
265 let concurrency = Arc::new(Semaphore::new(config.concurrency));
266 let meta_refill_concurrency = if config.meta_refill_concurrency == 0 {
267 None
268 } else {
269 Some(Arc::new(Semaphore::new(config.meta_refill_concurrency)))
270 };
271 Self {
272 queue: VecDeque::new(),
273 context: CacheRefillContext {
274 config,
275 meta_refill_concurrency,
276 concurrency,
277 sstable_store,
278 },
279 spawn_refill_task,
280 }
281 }
282
283 pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask {
284 Arc::new(|deltas, context, _, _| {
285 let task = CacheRefillTask { deltas, context };
286 tokio::spawn(task.run())
287 })
288 }
289
290 pub(crate) fn start_cache_refill(
291 &mut self,
292 deltas: Vec<SstDeltaInfo>,
293 pinned_version: PinnedVersion,
294 new_pinned_version: PinnedVersion,
295 ) {
296 let handle = (self.spawn_refill_task)(
297 deltas,
298 self.context.clone(),
299 pinned_version.clone(),
300 new_pinned_version.clone(),
301 );
302 let event = CacheRefillerEvent {
303 pinned_version,
304 new_pinned_version,
305 };
306 let item = Item { handle, event };
307 self.queue.push_back(item);
308 GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1);
309 }
310
311 pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> {
312 self.queue.back().map(|item| &item.event.new_pinned_version)
313 }
314}
315
316impl CacheRefiller {
317 pub(crate) fn next_events(&mut self) -> impl Future<Output = Vec<CacheRefillerEvent>> + '_ {
318 poll_fn(|cx| {
319 const MAX_BATCH_SIZE: usize = 16;
320 let mut events = None;
321 while let Some(item) = self.queue.front_mut()
322 && let Poll::Ready(result) = item.handle.poll_unpin(cx)
323 {
324 result.unwrap();
325 let item = self.queue.pop_front().unwrap();
326 GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1);
327 let events = events.get_or_insert_with(|| Vec::with_capacity(MAX_BATCH_SIZE));
328 events.push(item.event);
329 if events.len() >= MAX_BATCH_SIZE {
330 break;
331 }
332 }
333 if let Some(events) = events {
334 Poll::Ready(events)
335 } else {
336 Poll::Pending
337 }
338 })
339 }
340}
341
342pub struct CacheRefillerEvent {
343 pub pinned_version: PinnedVersion,
344 pub new_pinned_version: PinnedVersion,
345}
346
347#[derive(Clone)]
348pub(crate) struct CacheRefillContext {
349 config: Arc<CacheRefillConfig>,
350 meta_refill_concurrency: Option<Arc<Semaphore>>,
351 concurrency: Arc<Semaphore>,
352 sstable_store: SstableStoreRef,
353}
354
355struct CacheRefillTask {
356 deltas: Vec<SstDeltaInfo>,
357 context: CacheRefillContext,
358}
359
360impl CacheRefillTask {
361 async fn run(self) {
362 let tasks = self
363 .deltas
364 .iter()
365 .map(|delta| {
366 let context = self.context.clone();
367 async move {
368 let holders = match Self::meta_cache_refill(&context, delta).await {
369 Ok(holders) => holders,
370 Err(e) => {
371 tracing::warn!(error = %e.as_report(), "meta cache refill error");
372 return;
373 }
374 };
375 Self::data_cache_refill(&context, delta, holders).await;
376 }
377 })
378 .collect_vec();
379 let future = join_all(tasks);
380
381 let _ = tokio::time::timeout(self.context.config.timeout, future).await;
382 }
383
384 async fn meta_cache_refill(
385 context: &CacheRefillContext,
386 delta: &SstDeltaInfo,
387 ) -> HummockResult<Vec<TableHolder>> {
388 let tasks = delta
389 .insert_sst_infos
390 .iter()
391 .map(|info| async {
392 let mut stats = StoreLocalStatistic::default();
393 GLOBAL_CACHE_REFILL_METRICS.meta_refill_attempts_total.inc();
394
395 let permit = if let Some(c) = &context.meta_refill_concurrency {
396 Some(c.acquire().await.unwrap())
397 } else {
398 None
399 };
400
401 let now = Instant::now();
402 let res = context.sstable_store.sstable(info, &mut stats).await;
403 stats.discard();
404 GLOBAL_CACHE_REFILL_METRICS
405 .meta_refill_success_duration
406 .observe(now.elapsed().as_secs_f64());
407 drop(permit);
408
409 res
410 })
411 .collect_vec();
412 let holders = try_join_all(tasks).await?;
413 Ok(holders)
414 }
415
416 fn get_units_to_refill_by_inheritance(
418 context: &CacheRefillContext,
419 ssts: &[TableHolder],
420 parent_ssts: impl IntoIterator<Item = HybridCacheEntry<HummockSstableObjectId, Box<Sstable>>>,
421 ) -> HashSet<SstableUnit> {
422 let mut res = HashSet::default();
423
424 let recent_filter = context.sstable_store.recent_filter();
425
426 let units = {
427 let unit = context.config.unit;
428 ssts.iter()
429 .flat_map(|sst| {
430 let units = Unit::units(sst, unit);
431 (0..units).map(|uidx| Unit::new(sst, unit, uidx))
432 })
433 .collect_vec()
434 };
435
436 if cfg!(debug_assertions) {
437 units.iter().tuple_windows().for_each(|(a, b)| {
439 debug_assert_ne!(
440 KeyComparator::compare_encoded_full_key(a.largest_key(), b.smallest_key()),
441 std::cmp::Ordering::Greater
442 )
443 });
444 }
445
446 for psst in parent_ssts {
447 for pblk in 0..psst.block_count() {
448 let pleft = &psst.meta.block_metas[pblk].smallest_key;
449 let pright = if pblk + 1 == psst.block_count() {
450 &psst.meta.largest_key
452 } else {
453 &psst.meta.block_metas[pblk + 1].smallest_key
454 };
455
456 let uleft = units.partition_point(|unit| {
458 KeyComparator::compare_encoded_full_key(unit.largest_key(), pleft)
459 == std::cmp::Ordering::Less
460 });
461 let uright = units.partition_point(|unit| {
463 KeyComparator::compare_encoded_full_key(unit.smallest_key(), pright)
464 != std::cmp::Ordering::Greater
465 });
466
467 for u in units.iter().take(uright).skip(uleft) {
469 let unit = SstableUnit {
470 sst_obj_id: u.sst.id,
471 blks: u.blks.clone(),
472 };
473 if res.contains(&unit) {
474 continue;
475 }
476 if context.config.skip_recent_filter || recent_filter.contains(&(psst.id, pblk))
477 {
478 res.insert(unit);
479 }
480 }
481 }
482 }
483
484 let hit = res.len();
485 let miss = units.len() - res.len();
486 GLOBAL_CACHE_REFILL_METRICS
487 .data_refill_unit_inheritance_hit_total
488 .inc_by(hit as u64);
489 GLOBAL_CACHE_REFILL_METRICS
490 .data_refill_unit_inheritance_miss_total
491 .inc_by(miss as u64);
492
493 res
494 }
495
496 async fn data_cache_refill(
498 context: &CacheRefillContext,
499 delta: &SstDeltaInfo,
500 holders: Vec<TableHolder>,
501 ) {
502 if !context.sstable_store.block_cache().is_hybrid() {
504 return;
505 }
506
507 if delta.insert_sst_infos.is_empty() || delta.delete_sst_object_ids.is_empty() {
509 return;
510 }
511
512 if !context
514 .config
515 .data_refill_levels
516 .contains(&delta.insert_sst_level)
517 {
518 return;
519 }
520
521 let recent_filter = context.sstable_store.recent_filter();
522
523 let targets = delta
525 .delete_sst_object_ids
526 .iter()
527 .map(|id| (*id, usize::MAX))
528 .collect_vec();
529 if !context.config.skip_recent_filter && !recent_filter.contains_any(targets.iter()) {
530 GLOBAL_CACHE_REFILL_METRICS
531 .data_refill_filtered_total
532 .inc_by(delta.delete_sst_object_ids.len() as _);
533 return;
534 }
535
536 GLOBAL_CACHE_REFILL_METRICS
537 .data_refill_block_unfiltered_total
538 .inc_by(
539 holders
540 .iter()
541 .map(|sst| sst.block_count() as u64)
542 .sum::<u64>(),
543 );
544
545 if delta.insert_sst_level == 0 || context.config.skip_recent_filter {
546 Self::data_file_cache_refill_full_impl(context, delta, holders).await;
547 } else {
548 Self::data_file_cache_impl(context, delta, holders).await;
549 }
550 }
551
552 async fn data_file_cache_refill_full_impl(
553 context: &CacheRefillContext,
554 _delta: &SstDeltaInfo,
555 holders: Vec<TableHolder>,
556 ) {
557 let unit = context.config.unit;
558
559 let mut futures = vec![];
560
561 for sst in &holders {
562 for blk_start in (0..sst.block_count()).step_by(unit) {
563 let blk_end = std::cmp::min(sst.block_count(), blk_start + unit);
564 let unit = SstableUnit {
565 sst_obj_id: sst.id,
566 blks: blk_start..blk_end,
567 };
568 futures.push(
569 async move { Self::data_file_cache_refill_unit(context, sst, unit).await },
570 );
571 }
572 }
573 join_all(futures).await;
574 }
575
576 async fn data_file_cache_impl(
577 context: &CacheRefillContext,
578 delta: &SstDeltaInfo,
579 holders: Vec<TableHolder>,
580 ) {
581 let sstable_store = context.sstable_store.clone();
582 let futures = delta.delete_sst_object_ids.iter().map(|sst_obj_id| {
583 let store = &sstable_store;
584 async move {
585 let res = store.sstable_cached(*sst_obj_id).await;
586 match res {
587 Ok(Some(_)) => GLOBAL_CACHE_REFILL_METRICS
588 .data_refill_parent_meta_lookup_hit_total
589 .inc(),
590 Ok(None) => GLOBAL_CACHE_REFILL_METRICS
591 .data_refill_parent_meta_lookup_miss_total
592 .inc(),
593 _ => {}
594 }
595 res
596 }
597 });
598 let parent_ssts = match try_join_all(futures).await {
599 Ok(parent_ssts) => parent_ssts.into_iter().flatten(),
600 Err(e) => {
601 return tracing::error!(error = %e.as_report(), "get old meta from cache error");
602 }
603 };
604 let units = Self::get_units_to_refill_by_inheritance(context, &holders, parent_ssts);
605
606 let ssts: HashMap<HummockSstableObjectId, TableHolder> =
607 holders.into_iter().map(|meta| (meta.id, meta)).collect();
608 let futures = units.into_iter().map(|unit| {
609 let ssts = &ssts;
610 async move {
611 let sst = ssts.get(&unit.sst_obj_id).unwrap();
612 if let Err(e) = Self::data_file_cache_refill_unit(context, sst, unit).await {
613 tracing::error!(error = %e.as_report(), "data file cache unit refill error");
614 }
615 }
616 });
617 join_all(futures).await;
618 }
619
620 async fn data_file_cache_refill_unit(
621 context: &CacheRefillContext,
622 sst: &Sstable,
623 unit: SstableUnit,
624 ) -> HummockResult<()> {
625 let sstable_store = &context.sstable_store;
626 let threshold = context.config.threshold;
627 let recent_filter = sstable_store.recent_filter();
628
629 recent_filter.insert((sst.id, usize::MAX));
631
632 let blocks = unit.blks.size().unwrap();
633
634 let mut tasks = vec![];
635 let mut contexts = Vec::with_capacity(blocks);
636 let mut admits = 0;
637
638 let (range_first, _) = sst.calculate_block_info(unit.blks.start);
639 let (range_last, _) = sst.calculate_block_info(unit.blks.end - 1);
640 let range = range_first.start..range_last.end;
641
642 let size = range.size().unwrap();
643
644 GLOBAL_CACHE_REFILL_METRICS
645 .data_refill_ideal_bytes
646 .inc_by(size as _);
647
648 for blk in unit.blks {
649 let (range, uncompressed_capacity) = sst.calculate_block_info(blk);
650 let key = SstableBlockIndex {
651 sst_id: sst.id,
652 block_idx: blk as u64,
653 };
654
655 let mut writer = sstable_store.block_cache().storage_writer(key);
656
657 if writer.filter(size).is_admitted() {
658 admits += 1;
659 }
660
661 contexts.push((writer, range, uncompressed_capacity))
662 }
663
664 if admits as f64 / contexts.len() as f64 >= threshold {
665 let task = async move {
666 GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc();
667
668 let permit = context.concurrency.acquire().await.unwrap();
669
670 GLOBAL_CACHE_REFILL_METRICS.data_refill_started_total.inc();
671
672 let timer = GLOBAL_CACHE_REFILL_METRICS
673 .data_refill_success_duration
674 .start_timer();
675
676 let data = sstable_store
677 .store()
678 .read(&sstable_store.get_sst_data_path(sst.id), range.clone())
679 .await?;
680 let mut futures = vec![];
681 for (w, r, uc) in contexts {
682 let offset = r.start - range.start;
683 let len = r.end - r.start;
684 let bytes = data.slice(offset..offset + len);
685 let future = async move {
686 let value = Box::new(Block::decode(bytes, uc)?);
687 if let Some(_entry) = w.force().insert(value) {
689 GLOBAL_CACHE_REFILL_METRICS
690 .data_refill_success_bytes
691 .inc_by(len as u64);
692 GLOBAL_CACHE_REFILL_METRICS
693 .data_refill_block_success_total
694 .inc();
695 }
696 Ok::<_, HummockError>(())
697 };
698 futures.push(future);
699 }
700 try_join_all(futures)
701 .await
702 .map_err(HummockError::file_cache)?;
703
704 drop(permit);
705 drop(timer);
706
707 Ok::<_, HummockError>(())
708 };
709 tasks.push(task);
710 }
711
712 try_join_all(tasks).await?;
713
714 Ok(())
715 }
716}
717
718#[derive(Debug)]
719pub struct SstableBlock {
720 pub sst_obj_id: HummockSstableObjectId,
721 pub blk_idx: usize,
722}
723
724#[derive(Debug, Hash, PartialEq, Eq)]
725pub struct SstableUnit {
726 pub sst_obj_id: HummockSstableObjectId,
727 pub blks: Range<usize>,
728}
729
730impl Ord for SstableUnit {
731 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
732 match self.sst_obj_id.cmp(&other.sst_obj_id) {
733 std::cmp::Ordering::Equal => {}
734 ord => return ord,
735 }
736 match self.blks.start.cmp(&other.blks.start) {
737 std::cmp::Ordering::Equal => {}
738 ord => return ord,
739 }
740 self.blks.end.cmp(&other.blks.end)
741 }
742}
743
744impl PartialOrd for SstableUnit {
745 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
746 Some(self.cmp(other))
747 }
748}
749
750#[derive(Debug)]
751struct Unit<'a> {
752 sst: &'a Sstable,
753 blks: Range<usize>,
754}
755
756impl<'a> Unit<'a> {
757 fn new(sst: &'a Sstable, unit: usize, uidx: usize) -> Self {
758 let blks = unit * uidx..std::cmp::min(unit * (uidx + 1), sst.block_count());
759 Self { sst, blks }
760 }
761
762 fn smallest_key(&self) -> &Vec<u8> {
763 &self.sst.meta.block_metas[self.blks.start].smallest_key
764 }
765
766 fn largest_key(&self) -> &Vec<u8> {
768 if self.blks.end == self.sst.block_count() {
769 &self.sst.meta.largest_key
770 } else {
771 &self.sst.meta.block_metas[self.blks.end].smallest_key
772 }
773 }
774
775 fn units(sst: &Sstable, unit: usize) -> usize {
776 sst.block_count() / unit
777 + if sst.block_count().is_multiple_of(unit) {
778 0
779 } else {
780 1
781 }
782 }
783}