risingwave_meta/hummock/manager/
timer_task.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use futures::future::Either;
20use futures::stream::BoxStream;
21use futures::{FutureExt, StreamExt};
22use itertools::Itertools;
23use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
24use risingwave_pb::hummock::compact_task::{self, TaskStatus};
25use risingwave_pb::hummock::level_handler::RunningCompactTask;
26use rw_futures_util::select_all;
27use thiserror_ext::AsReport;
28use tokio::sync::oneshot::Sender;
29use tokio::task::JoinHandle;
30use tokio_stream::wrappers::IntervalStream;
31use tracing::warn;
32
33use crate::backup_restore::BackupManagerRef;
34use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
35use crate::hummock::{HummockManager, TASK_NORMAL};
36
37impl HummockManager {
38    pub fn hummock_timer_task(
39        hummock_manager: Arc<Self>,
40        backup_manager: Option<BackupManagerRef>,
41    ) -> (JoinHandle<()>, Sender<()>) {
42        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
43        let join_handle = tokio::spawn(async move {
44            const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;
45            const STAT_REPORT_PERIOD_SEC: u64 = 20;
46            const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1;
47
48            pub enum HummockTimerEvent {
49                GroupScheduleSplit,
50                CheckDeadTask,
51                Report,
52                CompactionHeartBeatExpiredCheck,
53
54                DynamicCompactionTrigger,
55                SpaceReclaimCompactionTrigger,
56                TtlCompactionTrigger,
57                TombstoneCompactionTrigger,
58
59                FullGc,
60
61                GroupScheduleMerge,
62            }
63            let mut check_compact_trigger_interval =
64                tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
65            check_compact_trigger_interval
66                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
67            check_compact_trigger_interval.reset();
68
69            let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
70                .map(|_| HummockTimerEvent::CheckDeadTask);
71
72            let mut stat_report_interval =
73                tokio::time::interval(std::time::Duration::from_secs(STAT_REPORT_PERIOD_SEC));
74            stat_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
75            stat_report_interval.reset();
76            let stat_report_trigger =
77                IntervalStream::new(stat_report_interval).map(|_| HummockTimerEvent::Report);
78
79            let mut compaction_heartbeat_interval = tokio::time::interval(
80                std::time::Duration::from_secs(COMPACTION_HEARTBEAT_PERIOD_SEC),
81            );
82            compaction_heartbeat_interval
83                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
84            compaction_heartbeat_interval.reset();
85            let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
86                .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
87
88            let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
89                hummock_manager.env.opts.periodic_compaction_interval_sec,
90            ));
91            min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
92            min_trigger_interval.reset();
93            let dynamic_tick_trigger = IntervalStream::new(min_trigger_interval)
94                .map(|_| HummockTimerEvent::DynamicCompactionTrigger);
95
96            let mut min_space_reclaim_trigger_interval =
97                tokio::time::interval(Duration::from_secs(
98                    hummock_manager
99                        .env
100                        .opts
101                        .periodic_space_reclaim_compaction_interval_sec,
102                ));
103            min_space_reclaim_trigger_interval
104                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
105            min_space_reclaim_trigger_interval.reset();
106            let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval)
107                .map(|_| HummockTimerEvent::SpaceReclaimCompactionTrigger);
108
109            let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs(
110                hummock_manager
111                    .env
112                    .opts
113                    .periodic_ttl_reclaim_compaction_interval_sec,
114            ));
115            min_ttl_reclaim_trigger_interval
116                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
117            min_ttl_reclaim_trigger_interval.reset();
118            let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
119                .map(|_| HummockTimerEvent::TtlCompactionTrigger);
120
121            let mut full_gc_interval = tokio::time::interval(Duration::from_secs(
122                hummock_manager.env.opts.full_gc_interval_sec,
123            ));
124            full_gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
125            full_gc_interval.reset();
126            let full_gc_trigger =
127                IntervalStream::new(full_gc_interval).map(|_| HummockTimerEvent::FullGc);
128
129            let mut tombstone_reclaim_trigger_interval =
130                tokio::time::interval(Duration::from_secs(
131                    hummock_manager
132                        .env
133                        .opts
134                        .periodic_tombstone_reclaim_compaction_interval_sec,
135                ));
136            tombstone_reclaim_trigger_interval
137                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
138            tombstone_reclaim_trigger_interval.reset();
139            let tombstone_reclaim_trigger = IntervalStream::new(tombstone_reclaim_trigger_interval)
140                .map(|_| HummockTimerEvent::TombstoneCompactionTrigger);
141
142            let mut triggers: Vec<BoxStream<'static, HummockTimerEvent>> = vec![
143                Box::pin(check_compact_trigger),
144                Box::pin(stat_report_trigger),
145                Box::pin(compaction_heartbeat_trigger),
146                Box::pin(dynamic_tick_trigger),
147                Box::pin(space_reclaim_trigger),
148                Box::pin(ttl_reclaim_trigger),
149                Box::pin(full_gc_trigger),
150                Box::pin(tombstone_reclaim_trigger),
151            ];
152
153            let periodic_scheduling_compaction_group_split_interval_sec = hummock_manager
154                .env
155                .opts
156                .periodic_scheduling_compaction_group_split_interval_sec;
157
158            if periodic_scheduling_compaction_group_split_interval_sec > 0 {
159                let mut scheduling_compaction_group_trigger_interval = tokio::time::interval(
160                    Duration::from_secs(periodic_scheduling_compaction_group_split_interval_sec),
161                );
162                scheduling_compaction_group_trigger_interval
163                    .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
164                scheduling_compaction_group_trigger_interval.reset();
165                let group_scheduling_split_trigger =
166                    IntervalStream::new(scheduling_compaction_group_trigger_interval)
167                        .map(|_| HummockTimerEvent::GroupScheduleSplit);
168                triggers.push(Box::pin(group_scheduling_split_trigger));
169            }
170
171            let periodic_scheduling_compaction_group_merge_interval_sec = hummock_manager
172                .env
173                .opts
174                .periodic_scheduling_compaction_group_merge_interval_sec;
175
176            if periodic_scheduling_compaction_group_merge_interval_sec > 0 {
177                let mut scheduling_compaction_group_merge_trigger_interval = tokio::time::interval(
178                    Duration::from_secs(periodic_scheduling_compaction_group_merge_interval_sec),
179                );
180                scheduling_compaction_group_merge_trigger_interval
181                    .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
182                scheduling_compaction_group_merge_trigger_interval.reset();
183                let group_scheduling_merge_trigger =
184                    IntervalStream::new(scheduling_compaction_group_merge_trigger_interval)
185                        .map(|_| HummockTimerEvent::GroupScheduleMerge);
186                triggers.push(Box::pin(group_scheduling_merge_trigger));
187            }
188
189            let event_stream = select_all(triggers);
190            use futures::pin_mut;
191            pin_mut!(event_stream);
192
193            let shutdown_rx_shared = shutdown_rx.shared();
194
195            tracing::info!(
196                "Hummock timer task [GroupSchedulingSplit interval {} sec] [GroupSchedulingMerge interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]",
197                periodic_scheduling_compaction_group_split_interval_sec,
198                periodic_scheduling_compaction_group_merge_interval_sec,
199                CHECK_PENDING_TASK_PERIOD_SEC,
200                STAT_REPORT_PERIOD_SEC,
201                COMPACTION_HEARTBEAT_PERIOD_SEC
202            );
203
204            loop {
205                let item =
206                    futures::future::select(event_stream.next(), shutdown_rx_shared.clone()).await;
207
208                match item {
209                    Either::Left((event, _)) => {
210                        if let Some(event) = event {
211                            match event {
212                                HummockTimerEvent::CheckDeadTask => {
213                                    if hummock_manager.env.opts.compaction_deterministic_test {
214                                        continue;
215                                    }
216
217                                    hummock_manager.check_dead_task().await;
218                                }
219
220                                HummockTimerEvent::GroupScheduleSplit => {
221                                    if hummock_manager.env.opts.compaction_deterministic_test {
222                                        continue;
223                                    }
224
225                                    hummock_manager.on_handle_schedule_group_split().await;
226                                }
227
228                                HummockTimerEvent::GroupScheduleMerge => {
229                                    if hummock_manager.env.opts.compaction_deterministic_test {
230                                        continue;
231                                    }
232
233                                    hummock_manager.on_handle_schedule_group_merge().await;
234                                }
235
236                                HummockTimerEvent::Report => {
237                                    let (current_version, id_to_config, version_stats) = {
238                                        let versioning_guard =
239                                            hummock_manager.versioning.read().await;
240
241                                        let configs =
242                                            hummock_manager.get_compaction_group_map().await;
243                                        let versioning_deref = versioning_guard;
244                                        (
245                                            versioning_deref.current_version.clone(),
246                                            configs,
247                                            versioning_deref.version_stats.clone(),
248                                        )
249                                    };
250
251                                    if let Some(mv_id_to_all_table_ids) = hummock_manager
252                                        .metadata_manager
253                                        .get_job_id_to_internal_table_ids_mapping()
254                                        .await
255                                    {
256                                        trigger_mv_stat(
257                                            &hummock_manager.metrics,
258                                            &version_stats,
259                                            mv_id_to_all_table_ids,
260                                        );
261                                    }
262
263                                    for compaction_group_id in
264                                        get_compaction_group_ids(&current_version)
265                                    {
266                                        let compaction_group_config =
267                                            &id_to_config[&compaction_group_id];
268
269                                        let group_levels = current_version
270                                            .get_compaction_group_levels(
271                                                compaction_group_config.group_id(),
272                                            );
273
274                                        trigger_lsm_stat(
275                                            &hummock_manager.metrics,
276                                            compaction_group_config.compaction_config(),
277                                            group_levels,
278                                            compaction_group_config.group_id(),
279                                        )
280                                    }
281
282                                    {
283                                        let group_infos = hummock_manager
284                                            .calculate_compaction_group_statistic()
285                                            .await;
286                                        let compaction_group_count = group_infos.len();
287                                        hummock_manager
288                                            .metrics
289                                            .compaction_group_count
290                                            .set(compaction_group_count as i64);
291
292                                        let table_write_throughput_statistic_manager =
293                                            hummock_manager
294                                                .table_write_throughput_statistic_manager
295                                                .read()
296                                                .clone();
297
298                                        let current_version_levels = &hummock_manager
299                                            .versioning
300                                            .read()
301                                            .await
302                                            .current_version
303                                            .levels;
304
305                                        for group_info in group_infos {
306                                            hummock_manager
307                                                .metrics
308                                                .compaction_group_size
309                                                .with_label_values(&[&group_info
310                                                    .group_id
311                                                    .to_string()])
312                                                .set(group_info.group_size as _);
313                                            // accumulate the throughput of all tables in the group
314                                            let mut avg_throuput = 0;
315                                            let max_statistic_expired_time = std::cmp::max(
316                                                hummock_manager
317                                                    .env
318                                                    .opts
319                                                    .table_stat_throuput_window_seconds_for_split,
320                                                hummock_manager
321                                                    .env
322                                                    .opts
323                                                    .table_stat_throuput_window_seconds_for_merge,
324                                            );
325                                            for table_id in group_info.table_statistic.keys() {
326                                                avg_throuput +=
327                                                    table_write_throughput_statistic_manager
328                                                        .avg_write_throughput(
329                                                            *table_id,
330                                                            max_statistic_expired_time as i64,
331                                                        )
332                                                        as u64;
333                                            }
334
335                                            hummock_manager
336                                                .metrics
337                                                .compaction_group_throughput
338                                                .with_label_values(&[&group_info
339                                                    .group_id
340                                                    .to_string()])
341                                                .set(avg_throuput as _);
342
343                                            if let Some(group_levels) =
344                                                current_version_levels.get(&group_info.group_id)
345                                            {
346                                                let file_count = group_levels.count_ssts();
347                                                hummock_manager
348                                                    .metrics
349                                                    .compaction_group_file_count
350                                                    .with_label_values(&[&group_info
351                                                        .group_id
352                                                        .to_string()])
353                                                    .set(file_count as _);
354                                            }
355                                        }
356                                    }
357                                }
358
359                                HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
360                                    let compactor_manager =
361                                        hummock_manager.compactor_manager.clone();
362
363                                    // TODO: add metrics to track expired tasks
364                                    // The cancel task has two paths
365                                    // 1. compactor heartbeat cancels the expired task based on task
366                                    // progress (meta + compactor)
367                                    // 2. meta periodically scans the task and performs a cancel on
368                                    // the meta side for tasks that are not updated by heartbeat
369                                    let expired_tasks: Vec<u64> = compactor_manager
370                                        .get_heartbeat_expired_tasks()
371                                        .into_iter()
372                                        .map(|task| task.task_id)
373                                        .collect();
374                                    if !expired_tasks.is_empty() {
375                                        tracing::info!(
376                                            expired_tasks = ?expired_tasks,
377                                            "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
378                                        );
379                                        if let Err(e) = hummock_manager
380                                            .cancel_compact_tasks(
381                                                expired_tasks.clone(),
382                                                TaskStatus::HeartbeatCanceled,
383                                            )
384                                            .await
385                                        {
386                                            tracing::error!(
387                                                expired_tasks = ?expired_tasks,
388                                                error = %e.as_report(),
389                                                "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
390                                                until we can successfully report its status",
391                                            );
392                                        }
393                                    }
394                                }
395
396                                HummockTimerEvent::DynamicCompactionTrigger => {
397                                    // Disable periodic trigger for compaction_deterministic_test.
398                                    if hummock_manager.env.opts.compaction_deterministic_test {
399                                        continue;
400                                    }
401
402                                    hummock_manager
403                                        .on_handle_trigger_multi_group(
404                                            compact_task::TaskType::Dynamic,
405                                        )
406                                        .await;
407                                }
408
409                                HummockTimerEvent::SpaceReclaimCompactionTrigger => {
410                                    // Disable periodic trigger for compaction_deterministic_test.
411                                    if hummock_manager.env.opts.compaction_deterministic_test {
412                                        continue;
413                                    }
414
415                                    hummock_manager
416                                        .on_handle_trigger_multi_group(
417                                            compact_task::TaskType::SpaceReclaim,
418                                        )
419                                        .await;
420
421                                    // share the same trigger with SpaceReclaim
422                                    hummock_manager
423                                        .on_handle_trigger_multi_group(
424                                            compact_task::TaskType::VnodeWatermark,
425                                        )
426                                        .await;
427                                }
428
429                                HummockTimerEvent::TtlCompactionTrigger => {
430                                    // Disable periodic trigger for compaction_deterministic_test.
431                                    if hummock_manager.env.opts.compaction_deterministic_test {
432                                        continue;
433                                    }
434
435                                    hummock_manager
436                                        .on_handle_trigger_multi_group(compact_task::TaskType::Ttl)
437                                        .await;
438                                }
439
440                                HummockTimerEvent::TombstoneCompactionTrigger => {
441                                    // Disable periodic trigger for compaction_deterministic_test.
442                                    if hummock_manager.env.opts.compaction_deterministic_test {
443                                        continue;
444                                    }
445
446                                    hummock_manager
447                                        .on_handle_trigger_multi_group(
448                                            compact_task::TaskType::Tombstone,
449                                        )
450                                        .await;
451                                }
452
453                                HummockTimerEvent::FullGc => {
454                                    let retention_sec =
455                                        hummock_manager.env.opts.min_sst_retention_time_sec;
456                                    let backup_manager_2 = backup_manager.clone();
457                                    let hummock_manager_2 = hummock_manager.clone();
458                                    tokio::task::spawn(async move {
459                                        use thiserror_ext::AsReport;
460                                        let _ = hummock_manager_2
461                                            .start_full_gc(
462                                                Duration::from_secs(retention_sec),
463                                                None,
464                                                backup_manager_2,
465                                            )
466                                            .await
467                                            .inspect_err(|e| {
468                                                warn!(error = %e.as_report(), "Failed to start GC.")
469                                            });
470                                    });
471                                }
472                            }
473                        }
474                    }
475
476                    Either::Right((_, _shutdown)) => {
477                        tracing::info!("Hummock timer loop is stopped");
478                        break;
479                    }
480                }
481            }
482        });
483        (join_handle, shutdown_tx)
484    }
485}
486
487impl HummockManager {
488    async fn check_dead_task(&self) {
489        const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32;
490        const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60;
491        let (groups, configs) = {
492            let versioning_guard = self.versioning.read().await;
493            let g = versioning_guard
494                .current_version
495                .levels
496                .iter()
497                .map(|(id, group)| {
498                    (
499                        *id,
500                        group
501                            .l0
502                            .sub_levels
503                            .iter()
504                            .map(|level| level.total_file_size)
505                            .sum::<u64>(),
506                    )
507                })
508                .collect_vec();
509            let c = self.get_compaction_group_map().await;
510            (g, c)
511        };
512        let mut slowdown_groups: HashMap<u64, u64> = HashMap::default();
513        {
514            for (group_id, l0_file_size) in groups {
515                let group = &configs[&group_id];
516                if l0_file_size
517                    > MAX_COMPACTION_L0_MULTIPLIER
518                        * group.compaction_config.max_bytes_for_level_base
519                {
520                    slowdown_groups.insert(group_id, l0_file_size);
521                }
522            }
523        }
524        if slowdown_groups.is_empty() {
525            return;
526        }
527        let mut pending_tasks: HashMap<u64, (u64, usize, RunningCompactTask)> = HashMap::default();
528        {
529            let compaction_guard = self.compaction.read().await;
530            for group_id in slowdown_groups.keys() {
531                if let Some(status) = compaction_guard.compaction_statuses.get(group_id) {
532                    for (idx, level_handler) in status.level_handlers.iter().enumerate() {
533                        let tasks = level_handler.pending_tasks().to_vec();
534                        if tasks.is_empty() {
535                            continue;
536                        }
537                        for task in tasks {
538                            pending_tasks.insert(task.task_id, (*group_id, idx, task));
539                        }
540                    }
541                }
542            }
543        }
544        let task_ids = pending_tasks.keys().cloned().collect_vec();
545        let task_infos = self
546            .compactor_manager
547            .check_tasks_status(&task_ids, Duration::from_secs(MAX_COMPACTION_DURATION_SEC));
548        for (task_id, (compact_time, status)) in task_infos {
549            if status == TASK_NORMAL {
550                continue;
551            }
552            if let Some((group_id, level_id, task)) = pending_tasks.get(&task_id) {
553                let group_size = *slowdown_groups.get(group_id).unwrap();
554                warn!(
555                    "COMPACTION SLOW: the task-{} of group-{}(size: {}MB) level-{} has not finished after {:?}, {}, it may cause pending sstable files({:?}) blocking other task.",
556                    task_id,
557                    *group_id,
558                    group_size / 1024 / 1024,
559                    *level_id,
560                    compact_time,
561                    status,
562                    task.ssts
563                );
564            }
565        }
566    }
567
568    /// Try to schedule a compaction `split` for the given compaction groups.
569    /// The `split` will be triggered if the following conditions are met:
570    /// 1. `state table throughput`: If the table is in a high throughput state and it belongs to a multi table group, then an attempt will be made to split the table into separate compaction groups to increase its throughput and reduce the impact on write amplification.
571    /// 2. `group size`: If the group size has exceeded the set upper limit, e.g. `max_group_size` * `split_group_size_ratio`
572    async fn on_handle_schedule_group_split(&self) {
573        let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
574        let mut group_infos = self.calculate_compaction_group_statistic().await;
575        group_infos.sort_by_key(|group| group.group_size);
576        group_infos.reverse();
577
578        for group in group_infos {
579            if group.table_statistic.len() == 1 {
580                // no need to handle the separate compaciton group
581                continue;
582            }
583
584            self.try_split_compaction_group(&table_write_throughput, group)
585                .await;
586        }
587    }
588
589    async fn on_handle_trigger_multi_group(&self, task_type: compact_task::TaskType) {
590        for cg_id in self.compaction_group_ids().await {
591            if let Err(e) = self.compaction_state.try_sched_compaction(cg_id, task_type) {
592                tracing::error!(
593                    error = %e.as_report(),
594                    "Failed to schedule {:?} compaction for compaction group {}",
595                    task_type,
596                    cg_id,
597                );
598            }
599        }
600    }
601
602    /// Try to schedule a compaction merge for the given compaction groups.
603    /// The merge will be triggered if the following conditions are met:
604    /// 1. The compaction group is not contains creating table.
605    /// 2. The compaction group is a small group.
606    /// 3. All tables in compaction group is in a low throughput state.
607    async fn on_handle_schedule_group_merge(&self) {
608        let created_tables = match self.metadata_manager.get_created_table_ids().await {
609            Ok(created_tables) => HashSet::from_iter(created_tables),
610            Err(err) => {
611                tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
612                return;
613            }
614        };
615        let table_write_throughput_statistic_manager =
616            self.table_write_throughput_statistic_manager.read().clone();
617        let mut group_infos = self.calculate_compaction_group_statistic().await;
618        // sort by first table id for deterministic merge order
619        group_infos.sort_by_key(|group| {
620            let table_ids = group
621                .table_statistic
622                .keys()
623                .cloned()
624                .collect::<BTreeSet<_>>();
625            table_ids.iter().next().cloned()
626        });
627
628        let group_count = group_infos.len();
629        if group_count < 2 {
630            return;
631        }
632
633        let mut left = 0;
634        let mut right = left + 1;
635
636        while left < right && right < group_count {
637            let group = &group_infos[left];
638            let next_group = &group_infos[right];
639            match self
640                .try_merge_compaction_group(
641                    &table_write_throughput_statistic_manager,
642                    group,
643                    next_group,
644                    &created_tables,
645                )
646                .await
647            {
648                Ok(_) => right += 1,
649                Err(e) => {
650                    tracing::debug!(
651                        error = %e.as_report(),
652                        "Failed to merge compaction group",
653                    );
654                    left = right;
655                    right = left + 1;
656                }
657            }
658        }
659    }
660}