risingwave_meta/hummock/manager/
timer_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use futures::future::Either;
20use futures::stream::BoxStream;
21use futures::{FutureExt, StreamExt};
22use itertools::Itertools;
23use risingwave_hummock_sdk::CompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_pb::hummock::compact_task::{self, TaskStatus};
26use risingwave_pb::hummock::level_handler::RunningCompactTask;
27use rw_futures_util::select_all;
28use thiserror_ext::AsReport;
29use tokio::sync::oneshot::Sender;
30use tokio::task::JoinHandle;
31use tokio_stream::wrappers::IntervalStream;
32use tracing::warn;
33
34use crate::backup_restore::BackupManagerRef;
35use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
36use crate::hummock::{HummockManager, TASK_NORMAL};
37
38impl HummockManager {
39    pub fn hummock_timer_task(
40        hummock_manager: Arc<Self>,
41        backup_manager: Option<BackupManagerRef>,
42    ) -> (JoinHandle<()>, Sender<()>) {
43        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
44        let join_handle = tokio::spawn(async move {
45            const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;
46            const STAT_REPORT_PERIOD_SEC: u64 = 20;
47            const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1;
48
49            pub enum HummockTimerEvent {
50                GroupScheduleSplit,
51                CheckDeadTask,
52                Report,
53                CompactionHeartBeatExpiredCheck,
54
55                DynamicCompactionTrigger,
56                SpaceReclaimCompactionTrigger,
57                TtlCompactionTrigger,
58                TombstoneCompactionTrigger,
59
60                FullGc,
61
62                GroupScheduleMerge,
63            }
64            let mut check_compact_trigger_interval =
65                tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
66            check_compact_trigger_interval
67                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
68            check_compact_trigger_interval.reset();
69
70            let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
71                .map(|_| HummockTimerEvent::CheckDeadTask);
72
73            let mut stat_report_interval =
74                tokio::time::interval(std::time::Duration::from_secs(STAT_REPORT_PERIOD_SEC));
75            stat_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
76            stat_report_interval.reset();
77            let stat_report_trigger =
78                IntervalStream::new(stat_report_interval).map(|_| HummockTimerEvent::Report);
79
80            let mut compaction_heartbeat_interval = tokio::time::interval(
81                std::time::Duration::from_secs(COMPACTION_HEARTBEAT_PERIOD_SEC),
82            );
83            compaction_heartbeat_interval
84                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
85            compaction_heartbeat_interval.reset();
86            let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
87                .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
88
89            let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
90                hummock_manager.env.opts.periodic_compaction_interval_sec,
91            ));
92            min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
93            min_trigger_interval.reset();
94            let dynamic_tick_trigger = IntervalStream::new(min_trigger_interval)
95                .map(|_| HummockTimerEvent::DynamicCompactionTrigger);
96
97            let mut min_space_reclaim_trigger_interval =
98                tokio::time::interval(Duration::from_secs(
99                    hummock_manager
100                        .env
101                        .opts
102                        .periodic_space_reclaim_compaction_interval_sec,
103                ));
104            min_space_reclaim_trigger_interval
105                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
106            min_space_reclaim_trigger_interval.reset();
107            let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval)
108                .map(|_| HummockTimerEvent::SpaceReclaimCompactionTrigger);
109
110            let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs(
111                hummock_manager
112                    .env
113                    .opts
114                    .periodic_ttl_reclaim_compaction_interval_sec,
115            ));
116            min_ttl_reclaim_trigger_interval
117                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
118            min_ttl_reclaim_trigger_interval.reset();
119            let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
120                .map(|_| HummockTimerEvent::TtlCompactionTrigger);
121
122            let mut full_gc_interval = tokio::time::interval(Duration::from_secs(
123                hummock_manager.env.opts.full_gc_interval_sec,
124            ));
125            full_gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
126            full_gc_interval.reset();
127            let full_gc_trigger =
128                IntervalStream::new(full_gc_interval).map(|_| HummockTimerEvent::FullGc);
129
130            let mut tombstone_reclaim_trigger_interval =
131                tokio::time::interval(Duration::from_secs(
132                    hummock_manager
133                        .env
134                        .opts
135                        .periodic_tombstone_reclaim_compaction_interval_sec,
136                ));
137            tombstone_reclaim_trigger_interval
138                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
139            tombstone_reclaim_trigger_interval.reset();
140            let tombstone_reclaim_trigger = IntervalStream::new(tombstone_reclaim_trigger_interval)
141                .map(|_| HummockTimerEvent::TombstoneCompactionTrigger);
142
143            let mut triggers: Vec<BoxStream<'static, HummockTimerEvent>> = vec![
144                Box::pin(check_compact_trigger),
145                Box::pin(stat_report_trigger),
146                Box::pin(compaction_heartbeat_trigger),
147                Box::pin(dynamic_tick_trigger),
148                Box::pin(space_reclaim_trigger),
149                Box::pin(ttl_reclaim_trigger),
150                Box::pin(full_gc_trigger),
151                Box::pin(tombstone_reclaim_trigger),
152            ];
153
154            let periodic_scheduling_compaction_group_split_interval_sec = hummock_manager
155                .env
156                .opts
157                .periodic_scheduling_compaction_group_split_interval_sec;
158
159            if periodic_scheduling_compaction_group_split_interval_sec > 0 {
160                let mut scheduling_compaction_group_trigger_interval = tokio::time::interval(
161                    Duration::from_secs(periodic_scheduling_compaction_group_split_interval_sec),
162                );
163                scheduling_compaction_group_trigger_interval
164                    .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
165                scheduling_compaction_group_trigger_interval.reset();
166                let group_scheduling_split_trigger =
167                    IntervalStream::new(scheduling_compaction_group_trigger_interval)
168                        .map(|_| HummockTimerEvent::GroupScheduleSplit);
169                triggers.push(Box::pin(group_scheduling_split_trigger));
170            }
171
172            let periodic_scheduling_compaction_group_merge_interval_sec = hummock_manager
173                .env
174                .opts
175                .periodic_scheduling_compaction_group_merge_interval_sec;
176
177            if periodic_scheduling_compaction_group_merge_interval_sec > 0 {
178                let mut scheduling_compaction_group_merge_trigger_interval = tokio::time::interval(
179                    Duration::from_secs(periodic_scheduling_compaction_group_merge_interval_sec),
180                );
181                scheduling_compaction_group_merge_trigger_interval
182                    .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
183                scheduling_compaction_group_merge_trigger_interval.reset();
184                let group_scheduling_merge_trigger =
185                    IntervalStream::new(scheduling_compaction_group_merge_trigger_interval)
186                        .map(|_| HummockTimerEvent::GroupScheduleMerge);
187                triggers.push(Box::pin(group_scheduling_merge_trigger));
188            }
189
190            let event_stream = select_all(triggers);
191            use futures::pin_mut;
192            pin_mut!(event_stream);
193
194            let shutdown_rx_shared = shutdown_rx.shared();
195
196            tracing::info!(
197                "Hummock timer task [GroupSchedulingSplit interval {} sec] [GroupSchedulingMerge interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]",
198                periodic_scheduling_compaction_group_split_interval_sec,
199                periodic_scheduling_compaction_group_merge_interval_sec,
200                CHECK_PENDING_TASK_PERIOD_SEC,
201                STAT_REPORT_PERIOD_SEC,
202                COMPACTION_HEARTBEAT_PERIOD_SEC
203            );
204
205            loop {
206                let item =
207                    futures::future::select(event_stream.next(), shutdown_rx_shared.clone()).await;
208
209                match item {
210                    Either::Left((event, _)) => {
211                        if let Some(event) = event {
212                            match event {
213                                HummockTimerEvent::CheckDeadTask => {
214                                    if hummock_manager.env.opts.compaction_deterministic_test {
215                                        continue;
216                                    }
217
218                                    hummock_manager.check_dead_task().await;
219                                }
220
221                                HummockTimerEvent::GroupScheduleSplit => {
222                                    if hummock_manager.env.opts.compaction_deterministic_test {
223                                        continue;
224                                    }
225
226                                    hummock_manager.on_handle_schedule_group_split().await;
227                                }
228
229                                HummockTimerEvent::GroupScheduleMerge => {
230                                    if hummock_manager.env.opts.compaction_deterministic_test {
231                                        continue;
232                                    }
233
234                                    hummock_manager.on_handle_schedule_group_merge().await;
235                                }
236
237                                HummockTimerEvent::Report => {
238                                    let (current_version, id_to_config, version_stats) = {
239                                        let versioning_guard =
240                                            hummock_manager.versioning.read().await;
241
242                                        let configs =
243                                            hummock_manager.get_compaction_group_map().await;
244                                        let versioning_deref = versioning_guard;
245                                        (
246                                            versioning_deref.current_version.clone(),
247                                            configs,
248                                            versioning_deref.version_stats.clone(),
249                                        )
250                                    };
251
252                                    if let Some(mv_id_to_all_table_ids) = hummock_manager
253                                        .metadata_manager
254                                        .get_job_id_to_internal_table_ids_mapping()
255                                        .await
256                                    {
257                                        trigger_mv_stat(
258                                            &hummock_manager.metrics,
259                                            &version_stats,
260                                            mv_id_to_all_table_ids,
261                                        );
262                                    }
263
264                                    for compaction_group_id in
265                                        get_compaction_group_ids(&current_version)
266                                    {
267                                        let compaction_group_config =
268                                            &id_to_config[&compaction_group_id];
269
270                                        let group_levels = current_version
271                                            .get_compaction_group_levels(
272                                                compaction_group_config.group_id(),
273                                            );
274
275                                        trigger_lsm_stat(
276                                            &hummock_manager.metrics,
277                                            compaction_group_config.compaction_config(),
278                                            group_levels,
279                                            compaction_group_config.group_id(),
280                                        )
281                                    }
282
283                                    {
284                                        let group_infos = hummock_manager
285                                            .calculate_compaction_group_statistic()
286                                            .await;
287                                        let compaction_group_count = group_infos.len();
288                                        hummock_manager
289                                            .metrics
290                                            .compaction_group_count
291                                            .set(compaction_group_count as i64);
292
293                                        let table_write_throughput_statistic_manager =
294                                            hummock_manager
295                                                .table_write_throughput_statistic_manager
296                                                .read()
297                                                .clone();
298
299                                        let current_version_levels = &hummock_manager
300                                            .versioning
301                                            .read()
302                                            .await
303                                            .current_version
304                                            .levels;
305
306                                        for group_info in group_infos {
307                                            hummock_manager
308                                                .metrics
309                                                .compaction_group_size
310                                                .with_label_values(&[&group_info
311                                                    .group_id
312                                                    .to_string()])
313                                                .set(group_info.group_size as _);
314                                            // accumulate the throughput of all tables in the group
315                                            let mut avg_throuput = 0;
316                                            let max_statistic_expired_time = std::cmp::max(
317                                                hummock_manager
318                                                    .env
319                                                    .opts
320                                                    .table_stat_throuput_window_seconds_for_split,
321                                                hummock_manager
322                                                    .env
323                                                    .opts
324                                                    .table_stat_throuput_window_seconds_for_merge,
325                                            );
326                                            for table_id in group_info.table_statistic.keys() {
327                                                avg_throuput +=
328                                                    table_write_throughput_statistic_manager
329                                                        .avg_write_throughput(
330                                                            *table_id,
331                                                            max_statistic_expired_time as i64,
332                                                        )
333                                                        as u64;
334                                            }
335
336                                            hummock_manager
337                                                .metrics
338                                                .compaction_group_throughput
339                                                .with_label_values(&[&group_info
340                                                    .group_id
341                                                    .to_string()])
342                                                .set(avg_throuput as _);
343
344                                            if let Some(group_levels) =
345                                                current_version_levels.get(&group_info.group_id)
346                                            {
347                                                let file_count = group_levels.count_ssts();
348                                                hummock_manager
349                                                    .metrics
350                                                    .compaction_group_file_count
351                                                    .with_label_values(&[&group_info
352                                                        .group_id
353                                                        .to_string()])
354                                                    .set(file_count as _);
355                                            }
356                                        }
357                                    }
358                                }
359
360                                HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
361                                    let compactor_manager =
362                                        hummock_manager.compactor_manager.clone();
363
364                                    // TODO: add metrics to track expired tasks
365                                    // The cancel task has two paths
366                                    // 1. compactor heartbeat cancels the expired task based on task
367                                    // progress (meta + compactor)
368                                    // 2. meta periodically scans the task and performs a cancel on
369                                    // the meta side for tasks that are not updated by heartbeat
370                                    let expired_tasks: Vec<u64> = compactor_manager
371                                        .get_heartbeat_expired_tasks()
372                                        .into_iter()
373                                        .map(|task| task.task_id)
374                                        .collect();
375                                    if !expired_tasks.is_empty() {
376                                        tracing::info!(
377                                            expired_tasks = ?expired_tasks,
378                                            "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
379                                        );
380                                        if let Err(e) = hummock_manager
381                                            .cancel_compact_tasks(
382                                                expired_tasks.clone(),
383                                                TaskStatus::HeartbeatCanceled,
384                                            )
385                                            .await
386                                        {
387                                            tracing::error!(
388                                                expired_tasks = ?expired_tasks,
389                                                error = %e.as_report(),
390                                                "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
391                                                until we can successfully report its status",
392                                            );
393                                        }
394                                    }
395                                }
396
397                                HummockTimerEvent::DynamicCompactionTrigger => {
398                                    // Disable periodic trigger for compaction_deterministic_test.
399                                    if hummock_manager.env.opts.compaction_deterministic_test {
400                                        continue;
401                                    }
402
403                                    hummock_manager
404                                        .on_handle_trigger_multi_group(
405                                            compact_task::TaskType::Dynamic,
406                                        )
407                                        .await;
408                                }
409
410                                HummockTimerEvent::SpaceReclaimCompactionTrigger => {
411                                    // Disable periodic trigger for compaction_deterministic_test.
412                                    if hummock_manager.env.opts.compaction_deterministic_test {
413                                        continue;
414                                    }
415
416                                    hummock_manager
417                                        .on_handle_trigger_multi_group(
418                                            compact_task::TaskType::SpaceReclaim,
419                                        )
420                                        .await;
421
422                                    // share the same trigger with SpaceReclaim
423                                    hummock_manager
424                                        .on_handle_trigger_multi_group(
425                                            compact_task::TaskType::VnodeWatermark,
426                                        )
427                                        .await;
428                                }
429
430                                HummockTimerEvent::TtlCompactionTrigger => {
431                                    // Disable periodic trigger for compaction_deterministic_test.
432                                    if hummock_manager.env.opts.compaction_deterministic_test {
433                                        continue;
434                                    }
435
436                                    hummock_manager
437                                        .on_handle_trigger_multi_group(compact_task::TaskType::Ttl)
438                                        .await;
439                                }
440
441                                HummockTimerEvent::TombstoneCompactionTrigger => {
442                                    // Disable periodic trigger for compaction_deterministic_test.
443                                    if hummock_manager.env.opts.compaction_deterministic_test {
444                                        continue;
445                                    }
446
447                                    hummock_manager
448                                        .on_handle_trigger_multi_group(
449                                            compact_task::TaskType::Tombstone,
450                                        )
451                                        .await;
452                                }
453
454                                HummockTimerEvent::FullGc => {
455                                    let retention_sec =
456                                        hummock_manager.env.opts.min_sst_retention_time_sec;
457                                    let backup_manager_2 = backup_manager.clone();
458                                    let hummock_manager_2 = hummock_manager.clone();
459                                    tokio::task::spawn(async move {
460                                        use thiserror_ext::AsReport;
461                                        let _ = hummock_manager_2
462                                            .start_full_gc(
463                                                Duration::from_secs(retention_sec),
464                                                None,
465                                                backup_manager_2,
466                                            )
467                                            .await
468                                            .inspect_err(|e| {
469                                                warn!(error = %e.as_report(), "Failed to start GC.")
470                                            });
471                                    });
472                                }
473                            }
474                        }
475                    }
476
477                    Either::Right((_, _shutdown)) => {
478                        tracing::info!("Hummock timer loop is stopped");
479                        break;
480                    }
481                }
482            }
483        });
484        (join_handle, shutdown_tx)
485    }
486}
487
488impl HummockManager {
489    async fn check_dead_task(&self) {
490        const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32;
491        const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60;
492        let (groups, configs) = {
493            let versioning_guard = self.versioning.read().await;
494            let g = versioning_guard
495                .current_version
496                .levels
497                .iter()
498                .map(|(id, group)| {
499                    (
500                        *id,
501                        group
502                            .l0
503                            .sub_levels
504                            .iter()
505                            .map(|level| level.total_file_size)
506                            .sum::<u64>(),
507                    )
508                })
509                .collect_vec();
510            let c = self.get_compaction_group_map().await;
511            (g, c)
512        };
513        let mut slowdown_groups: HashMap<CompactionGroupId, u64> = HashMap::default();
514        {
515            for (group_id, l0_file_size) in groups {
516                let group = &configs[&group_id];
517                if l0_file_size
518                    > MAX_COMPACTION_L0_MULTIPLIER
519                        * group.compaction_config.max_bytes_for_level_base
520                {
521                    slowdown_groups.insert(group_id, l0_file_size);
522                }
523            }
524        }
525        if slowdown_groups.is_empty() {
526            return;
527        }
528        let mut pending_tasks: HashMap<u64, (CompactionGroupId, usize, RunningCompactTask)> =
529            HashMap::default();
530        {
531            let compaction_guard = self.compaction.read().await;
532            for group_id in slowdown_groups.keys() {
533                if let Some(status) = compaction_guard.compaction_statuses.get(group_id) {
534                    for (idx, level_handler) in status.level_handlers.iter().enumerate() {
535                        let tasks = level_handler.pending_tasks().to_vec();
536                        if tasks.is_empty() {
537                            continue;
538                        }
539                        for task in tasks {
540                            pending_tasks.insert(task.task_id, (*group_id, idx, task));
541                        }
542                    }
543                }
544            }
545        }
546        let task_ids = pending_tasks.keys().cloned().collect_vec();
547        let task_infos = self
548            .compactor_manager
549            .check_tasks_status(&task_ids, Duration::from_secs(MAX_COMPACTION_DURATION_SEC));
550        for (task_id, (compact_time, status)) in task_infos {
551            if status == TASK_NORMAL {
552                continue;
553            }
554            if let Some((group_id, level_id, task)) = pending_tasks.get(&task_id) {
555                let group_size = *slowdown_groups.get(group_id).unwrap();
556                warn!(
557                    "COMPACTION SLOW: the task-{} of group-{}(size: {}MB) level-{} has not finished after {:?}, {}, it may cause pending sstable files({:?}) blocking other task.",
558                    task_id,
559                    group_id,
560                    group_size / 1024 / 1024,
561                    *level_id,
562                    compact_time,
563                    status,
564                    task.ssts
565                );
566            }
567        }
568    }
569
570    /// Try to schedule a compaction `split` for the given compaction groups.
571    /// The `split` will be triggered if the following conditions are met:
572    /// 1. `state table throughput`: If the table is in a high throughput state and it belongs to a multi table group, then an attempt will be made to split the table into separate compaction groups to increase its throughput and reduce the impact on write amplification.
573    /// 2. `group size`: If the group size has exceeded the set upper limit, e.g. `max_group_size` * `split_group_size_ratio`
574    async fn on_handle_schedule_group_split(&self) {
575        let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
576        let mut group_infos = self.calculate_compaction_group_statistic().await;
577        group_infos.sort_by_key(|group| group.group_size);
578        group_infos.reverse();
579
580        for group in group_infos {
581            if group.table_statistic.len() == 1 {
582                // no need to handle the separate compaciton group
583                continue;
584            }
585
586            self.try_split_compaction_group(&table_write_throughput, group)
587                .await;
588        }
589    }
590
591    async fn on_handle_trigger_multi_group(&self, task_type: compact_task::TaskType) {
592        for cg_id in self.compaction_group_ids().await {
593            self.compaction_state.try_sched_compaction(
594                cg_id,
595                task_type,
596                super::compaction::ScheduleTrigger::Periodic,
597            );
598        }
599    }
600
601    /// Try to schedule a compaction merge for the given compaction groups.
602    /// The merge will be triggered if the following conditions are met:
603    /// 1. The compaction group is not contains creating table.
604    /// 2. The compaction group is a small group.
605    /// 3. All tables in compaction group is in a low throughput state.
606    async fn on_handle_schedule_group_merge(&self) {
607        let created_tables = match self.metadata_manager.get_created_table_ids().await {
608            Ok(created_tables) => HashSet::from_iter(created_tables),
609            Err(err) => {
610                tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
611                return;
612            }
613        };
614        let table_write_throughput_statistic_manager =
615            self.table_write_throughput_statistic_manager.read().clone();
616        let mut group_infos = self.calculate_compaction_group_statistic().await;
617        // sort by first table id for deterministic merge order
618        group_infos.sort_by_key(|group| {
619            let table_ids = group
620                .table_statistic
621                .keys()
622                .cloned()
623                .collect::<BTreeSet<_>>();
624            table_ids.iter().next().cloned()
625        });
626
627        let group_count = group_infos.len();
628        if group_count < 2 {
629            return;
630        }
631
632        let mut base = 0;
633        let mut candidate = 1;
634
635        while candidate < group_count {
636            let group = &group_infos[base];
637            let next_group = &group_infos[candidate];
638            match self
639                .try_merge_compaction_group(
640                    &table_write_throughput_statistic_manager,
641                    group,
642                    next_group,
643                    &created_tables,
644                )
645                .await
646            {
647                Ok(_) => candidate += 1,
648                Err(e) => {
649                    tracing::debug!(
650                        error = %e.as_report(),
651                        "Failed to merge compaction group",
652                    );
653                    base = candidate;
654                    candidate = base + 1;
655                }
656            }
657        }
658    }
659}