risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2025 RisingWave Labs
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21//     http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::{Instant, SystemTime};
32
33use anyhow::Context;
34use fail::fail_point;
35use futures::future::Shared;
36use futures::stream::FuturesUnordered;
37use futures::{FutureExt, StreamExt};
38use itertools::Itertools;
39use parking_lot::Mutex;
40use rand::rng as thread_rng;
41use rand::seq::SliceRandom;
42use risingwave_common::config::default::compaction_config;
43use risingwave_common::util::epoch::Epoch;
44use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
45use risingwave_hummock_sdk::compaction_group::StateTableId;
46use risingwave_hummock_sdk::key_range::KeyRange;
47use risingwave_hummock_sdk::level::Levels;
48use risingwave_hummock_sdk::sstable_info::SstableInfo;
49use risingwave_hummock_sdk::table_stats::{
50    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
51};
52use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
53use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
54use risingwave_hummock_sdk::{
55    CompactionGroupId, HummockCompactionTaskId, HummockSstableObjectId, HummockVersionId,
56    compact_task_to_string, statistics_compact_task,
57};
58use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
59use risingwave_pb::hummock::subscribe_compaction_event_request::{
60    self, Event as RequestEvent, HeartBeat, PullTask,
61};
62use risingwave_pb::hummock::subscribe_compaction_event_response::{
63    Event as ResponseEvent, PullTaskAck,
64};
65use risingwave_pb::hummock::{
66    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
67    SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
68};
69use rw_futures_util::pending_on_none;
70use thiserror_ext::AsReport;
71use tokio::sync::RwLockWriteGuard;
72use tokio::sync::mpsc::error::SendError;
73use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
74use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
75use tokio::task::JoinHandle;
76use tonic::Streaming;
77use tracing::warn;
78
79use crate::hummock::compaction::selector::level_selector::PickerInfo;
80use crate::hummock::compaction::selector::{
81    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
82    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
83    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
84};
85use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
86use crate::hummock::error::{Error, Result};
87use crate::hummock::manager::transaction::{
88    HummockVersionStatsTransaction, HummockVersionTransaction,
89};
90use crate::hummock::manager::versioning::Versioning;
91use crate::hummock::metrics_utils::{
92    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
93    trigger_local_table_stat,
94};
95use crate::hummock::model::CompactionGroup;
96use crate::hummock::sequence::next_compaction_task_id;
97use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
98use crate::manager::META_NODE_ID;
99use crate::model::BTreeMapTransaction;
100
101pub mod compaction_group_manager;
102pub mod compaction_group_schedule;
103
104const MAX_SKIP_TIMES: usize = 8;
105const MAX_REPORT_COUNT: usize = 16;
106
107static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
108    [
109        TaskStatus::ManualCanceled,
110        TaskStatus::SendFailCanceled,
111        TaskStatus::AssignFailCanceled,
112        TaskStatus::HeartbeatCanceled,
113        TaskStatus::InvalidGroupCanceled,
114        TaskStatus::NoAvailMemoryResourceCanceled,
115        TaskStatus::NoAvailCpuResourceCanceled,
116        TaskStatus::HeartbeatProgressCanceled,
117    ]
118    .into_iter()
119    .collect()
120});
121
122type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
123
124fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
125    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
126        HashMap::default();
127    compaction_selectors.insert(
128        compact_task::TaskType::Dynamic,
129        Box::<DynamicLevelSelector>::default(),
130    );
131    compaction_selectors.insert(
132        compact_task::TaskType::SpaceReclaim,
133        Box::<SpaceReclaimCompactionSelector>::default(),
134    );
135    compaction_selectors.insert(
136        compact_task::TaskType::Ttl,
137        Box::<TtlCompactionSelector>::default(),
138    );
139    compaction_selectors.insert(
140        compact_task::TaskType::Tombstone,
141        Box::<TombstoneCompactionSelector>::default(),
142    );
143    compaction_selectors.insert(
144        compact_task::TaskType::VnodeWatermark,
145        Box::<VnodeWatermarkCompactionSelector>::default(),
146    );
147    compaction_selectors
148}
149
150impl HummockVersionTransaction<'_> {
151    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
152        let mut version_delta = self.new_delta();
153        let trivial_move = compact_task.is_trivial_move_task();
154        version_delta.trivial_move = trivial_move;
155
156        let group_deltas = &mut version_delta
157            .group_deltas
158            .entry(compact_task.compaction_group_id)
159            .or_default()
160            .group_deltas;
161        let mut removed_table_ids_map: BTreeMap<u32, HashSet<u64>> = BTreeMap::default();
162
163        for level in &compact_task.input_ssts {
164            let level_idx = level.level_idx;
165
166            removed_table_ids_map
167                .entry(level_idx)
168                .or_default()
169                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
170        }
171
172        for (level_idx, removed_table_ids) in removed_table_ids_map {
173            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
174                level_idx,
175                0, // default
176                removed_table_ids,
177                vec![], // default
178                0,      // default
179                compact_task.compaction_group_version_id,
180            ));
181
182            group_deltas.push(group_delta);
183        }
184
185        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
186            compact_task.target_level,
187            compact_task.target_sub_level_id,
188            HashSet::new(), // default
189            compact_task.sorted_output_ssts.clone(),
190            compact_task.split_weight_by_vnode,
191            compact_task.compaction_group_version_id,
192        ));
193
194        group_deltas.push(group_delta);
195        version_delta.pre_apply();
196    }
197}
198
199#[derive(Default)]
200pub struct Compaction {
201    /// Compaction task that is already assigned to a compactor
202    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
203    /// `CompactStatus` of each compaction group
204    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
205
206    pub _deterministic_mode: bool,
207}
208
209impl HummockManager {
210    pub async fn get_assigned_compact_task_num(&self) -> u64 {
211        self.compaction.read().await.compact_task_assignment.len() as u64
212    }
213
214    pub async fn list_compaction_status(
215        &self,
216    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
217        let compaction = self.compaction.read().await;
218        (
219            compaction.compaction_statuses.values().map_into().collect(),
220            compaction
221                .compact_task_assignment
222                .values()
223                .cloned()
224                .collect(),
225        )
226    }
227
228    pub async fn get_compaction_scores(
229        &self,
230        compaction_group_id: CompactionGroupId,
231    ) -> Vec<PickerInfo> {
232        let (status, levels, group) = {
233            let compaction = self.compaction.read().await;
234            let versioning = self.versioning.read().await;
235            let config_manager = self.compaction_group_manager.read().await;
236            match (
237                compaction.compaction_statuses.get(&compaction_group_id),
238                versioning.current_version.levels.get(&compaction_group_id),
239                config_manager.try_get_compaction_group_config(compaction_group_id),
240            ) {
241                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
242                _ => {
243                    return vec![];
244                }
245            }
246        };
247        let dynamic_level_core = DynamicLevelSelectorCore::new(
248            group.compaction_config,
249            Arc::new(CompactionDeveloperConfig::default()),
250        );
251        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
252        ctx.score_levels
253    }
254}
255
256impl HummockManager {
257    async fn handle_pull_task_event(
258        &self,
259        context_id: u32,
260        pull_task_count: usize,
261        compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
262        max_get_task_probe_times: usize,
263    ) {
264        assert_ne!(0, pull_task_count);
265        if let Some(compactor) = self.compactor_manager.get_compactor(context_id) {
266            let (groups, task_type) = self.auto_pick_compaction_groups_and_type().await;
267            if let TaskType::Ttl = task_type {
268                match self
269                    .metadata_manager
270                    .get_all_table_options()
271                    .await
272                    .map_err(|err| Error::MetaStore(err.into()))
273                {
274                    Ok(table_options) => {
275                        self.update_table_id_to_table_option(table_options);
276                    }
277                    Err(err) => {
278                        warn!(error = %err.as_report(), "Failed to get table options");
279                    }
280                }
281            }
282
283            if !groups.is_empty() {
284                let selector: &mut Box<dyn CompactionSelector> =
285                    compaction_selectors.get_mut(&task_type).unwrap();
286
287                let mut generated_task_count = 0;
288                let mut existed_groups = groups.clone();
289                let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
290                let mut failed_tasks = vec![];
291                let mut loop_times = 0;
292
293                while generated_task_count < pull_task_count
294                    && failed_tasks.is_empty()
295                    && loop_times < max_get_task_probe_times
296                {
297                    loop_times += 1;
298                    let compact_ret = self
299                        .get_compact_tasks(
300                            existed_groups.clone(),
301                            pull_task_count - generated_task_count,
302                            selector,
303                        )
304                        .await;
305
306                    match compact_ret {
307                        Ok((compact_tasks, unschedule_groups)) => {
308                            no_task_groups.extend(unschedule_groups);
309                            if compact_tasks.is_empty() {
310                                break;
311                            }
312                            generated_task_count += compact_tasks.len();
313                            for task in compact_tasks {
314                                let task_id = task.task_id;
315                                if let Err(e) =
316                                    compactor.send_event(ResponseEvent::CompactTask(task.into()))
317                                {
318                                    tracing::warn!(
319                                        error = %e.as_report(),
320                                        "Failed to send task {} to {}",
321                                        task_id,
322                                        compactor.context_id(),
323                                    );
324                                    failed_tasks.push(task_id);
325                                }
326                            }
327                            if !failed_tasks.is_empty() {
328                                self.compactor_manager.remove_compactor(context_id);
329                            }
330                            existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
331                        }
332                        Err(err) => {
333                            tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
334                            break;
335                        }
336                    };
337                }
338                for group in no_task_groups {
339                    self.compaction_state.unschedule(group, task_type);
340                }
341                if let Err(err) = self
342                    .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
343                    .await
344                {
345                    tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
346                }
347            }
348
349            // ack to compactor
350            if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
351                tracing::warn!(
352                    error = %e.as_report(),
353                    "Failed to send ask to {}",
354                    context_id,
355                );
356                self.compactor_manager.remove_compactor(context_id);
357            }
358        }
359    }
360
361    /// dedicated event runtime for CPU/IO bound event
362    pub fn compaction_event_loop(
363        hummock_manager: Arc<Self>,
364        mut compactor_streams_change_rx: UnboundedReceiver<(
365            u32,
366            Streaming<SubscribeCompactionEventRequest>,
367        )>,
368    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
369        let mut compactor_request_streams = FuturesUnordered::new();
370        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
371        let (shutdown_tx_dedicated, shutdown_rx_dedicated) = tokio::sync::oneshot::channel();
372        let shutdown_rx_shared = shutdown_rx.shared();
373        let shutdown_rx_dedicated_shared = shutdown_rx_dedicated.shared();
374
375        let (tx, rx) = unbounded_channel();
376
377        let mut join_handle_vec = Vec::default();
378
379        let hummock_manager_dedicated = hummock_manager.clone();
380        let compact_task_event_handler_join_handle = tokio::spawn(async move {
381            Self::compact_task_dedicated_event_handler(
382                hummock_manager_dedicated,
383                rx,
384                shutdown_rx_dedicated_shared,
385            )
386            .await;
387        });
388
389        join_handle_vec.push((
390            compact_task_event_handler_join_handle,
391            shutdown_tx_dedicated,
392        ));
393
394        let join_handle = tokio::spawn(async move {
395            let push_stream =
396                |context_id: u32,
397                 stream: Streaming<SubscribeCompactionEventRequest>,
398                 compactor_request_streams: &mut FuturesUnordered<_>| {
399                    let future = StreamExt::into_future(stream)
400                        .map(move |stream_future| (context_id, stream_future));
401
402                    compactor_request_streams.push(future);
403                };
404
405            let mut event_loop_iteration_now = Instant::now();
406
407            loop {
408                let shutdown_rx_shared = shutdown_rx_shared.clone();
409                let hummock_manager = hummock_manager.clone();
410                hummock_manager
411                    .metrics
412                    .compaction_event_loop_iteration_latency
413                    .observe(event_loop_iteration_now.elapsed().as_millis() as _);
414                event_loop_iteration_now = Instant::now();
415
416                tokio::select! {
417                    _ = shutdown_rx_shared => { return; },
418
419                    compactor_stream = compactor_streams_change_rx.recv() => {
420                        if let Some((context_id, stream)) = compactor_stream {
421                            tracing::info!("compactor {} enters the cluster", context_id);
422                            push_stream(context_id, stream, &mut compactor_request_streams);
423                        }
424                    },
425
426                    result = pending_on_none(compactor_request_streams.next()) => {
427                        let mut compactor_alive = true;
428
429                        let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<SubscribeCompactionEventRequest, _>>, _)) = result;
430                        let (event, create_at, stream) = match compactor_stream_req {
431                            (Some(Ok(req)), stream) => {
432                                (req.event.unwrap(), req.create_at, stream)
433                            }
434
435                            (Some(Err(err)), _stream) => {
436                                tracing::warn!(error = %err.as_report(), "compactor stream {} poll with err, recv stream may be destroyed", context_id);
437                                continue
438                            }
439
440                            _ => {
441                                tracing::warn!("compactor stream {} poll err, recv stream may be destroyed", context_id);
442                                continue
443                            },
444                        };
445
446                        {
447                            let consumed_latency_ms = SystemTime::now()
448                                .duration_since(std::time::UNIX_EPOCH)
449                                .expect("Clock may have gone backwards")
450                                .as_millis()
451                                as u64
452                            - create_at;
453                            hummock_manager.metrics
454                                .compaction_event_consumed_latency
455                                .observe(consumed_latency_ms as _);
456                        }
457
458                        match event {
459                            RequestEvent::HeartBeat(HeartBeat {
460                                progress,
461                            }) => {
462                                let compactor_manager = hummock_manager.compactor_manager.clone();
463                                let cancel_tasks = compactor_manager.update_task_heartbeats(&progress).into_iter().map(|task|task.task_id).collect::<Vec<_>>();
464                                if !cancel_tasks.is_empty() {
465                                    tracing::info!(
466                                        ?cancel_tasks,
467                                        context_id,
468                                        "Tasks cancel has expired due to lack of visible progress",
469                                    );
470
471                                    if let Err(e) = hummock_manager
472                                        .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
473                                        .await
474                                    {
475                                        tracing::error!(
476                                            error = %e.as_report(),
477                                            "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
478                                            until we can successfully report its status."
479                                        );
480                                    }
481                                }
482
483                                match compactor_manager.get_compactor(context_id) { Some(compactor) => {
484                                    // Forcefully cancel the task so that it terminates
485                                    // early on the compactor
486                                    // node.
487                                    if !cancel_tasks.is_empty() {
488                                        let _ = compactor.cancel_tasks(&cancel_tasks);
489                                        tracing::info!(
490                                            ?cancel_tasks,
491                                            context_id,
492                                            "CancelTask operation has been sent to compactor node",
493                                        );
494                                    }
495                                } _ => {
496                                    // Determine the validity of the compactor streaming rpc. When the compactor no longer exists in the manager, the stream will be removed.
497                                    // Tip: Connectivity to the compactor will be determined through the `send_event` operation. When send fails, it will be removed from the manager
498                                    compactor_alive = false;
499                                }}
500                            },
501
502                            RequestEvent::Register(_) => {
503                                unreachable!()
504                            }
505
506                            e @ (RequestEvent::PullTask(_) | RequestEvent::ReportTask(_)) => {
507                                let _ = tx.send((context_id, e));
508                            }
509                        }
510
511                        if compactor_alive {
512                            push_stream(context_id, stream, &mut compactor_request_streams);
513                        } else {
514                            tracing::warn!(context_id, "compactor stream error, send stream may be destroyed");
515                        }
516                    },
517                }
518            }
519        });
520
521        join_handle_vec.push((join_handle, shutdown_tx));
522
523        join_handle_vec
524    }
525
526    pub fn add_compactor_stream(
527        &self,
528        context_id: u32,
529        req_stream: Streaming<SubscribeCompactionEventRequest>,
530    ) {
531        self.compactor_streams_change_tx
532            .send((context_id, req_stream))
533            .unwrap();
534    }
535
536    pub async fn auto_pick_compaction_group_and_type(
537        &self,
538    ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
539        let mut compaction_group_ids = self.compaction_group_ids().await;
540        compaction_group_ids.shuffle(&mut thread_rng());
541
542        for cg_id in compaction_group_ids {
543            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
544                return Some((cg_id, pick_type));
545            }
546        }
547
548        None
549    }
550
551    /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`.
552    /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type.
553    async fn auto_pick_compaction_groups_and_type(
554        &self,
555    ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
556        let mut compaction_group_ids = self.compaction_group_ids().await;
557        compaction_group_ids.shuffle(&mut thread_rng());
558
559        let mut normal_groups = vec![];
560        for cg_id in compaction_group_ids {
561            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
562                if pick_type == TaskType::Dynamic {
563                    normal_groups.push(cg_id);
564                } else if normal_groups.is_empty() {
565                    return (vec![cg_id], pick_type);
566                }
567            }
568        }
569        (normal_groups, TaskType::Dynamic)
570    }
571
572    /// dedicated event runtime for CPU/IO bound event
573    async fn compact_task_dedicated_event_handler(
574        hummock_manager: Arc<HummockManager>,
575        mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>,
576        shutdown_rx_shared: Shared<OneShotReceiver<()>>,
577    ) {
578        let mut compaction_selectors = init_selectors();
579
580        tokio::select! {
581            _ = shutdown_rx_shared => {}
582
583            _ = async {
584                while let Some((context_id, event)) = rx.recv().await {
585                    let mut report_events = vec![];
586                    let mut skip_times = 0;
587                    match event {
588                        RequestEvent::PullTask(PullTask { pull_task_count }) => {
589                            hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, hummock_manager.env.opts.max_get_task_probe_times).await;
590                        }
591
592                        RequestEvent::ReportTask(task) => {
593                           report_events.push(task.into());
594                        }
595
596                        _ => unreachable!(),
597                    }
598                    while let Ok((context_id, event)) = rx.try_recv() {
599                        match event {
600                            RequestEvent::PullTask(PullTask { pull_task_count }) => {
601                                hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, hummock_manager.env.opts.max_get_task_probe_times).await;
602                                if !report_events.is_empty() {
603                                    if skip_times > MAX_SKIP_TIMES {
604                                        break;
605                                    }
606                                    skip_times += 1;
607                                }
608                            }
609
610                            RequestEvent::ReportTask(task) => {
611                                report_events.push(task.into());
612                                if report_events.len() >= MAX_REPORT_COUNT {
613                                    break;
614                                }
615                            }
616                        _ => unreachable!(),
617                        }
618                    }
619                    if !report_events.is_empty() {
620                        if let Err(e) = hummock_manager.report_compact_tasks(report_events).await
621                        {
622                            tracing::error!(error = %e.as_report(), "report compact_tack fail")
623                        }
624                    }
625                }
626            } => {}
627        }
628    }
629}
630
631impl HummockManager {
632    pub async fn get_compact_tasks_impl(
633        &self,
634        compaction_groups: Vec<CompactionGroupId>,
635        max_select_count: usize,
636        selector: &mut Box<dyn CompactionSelector>,
637    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
638        let deterministic_mode = self.env.opts.compaction_deterministic_test;
639
640        let mut compaction_guard = self.compaction.write().await;
641        let compaction: &mut Compaction = &mut compaction_guard;
642        let mut versioning_guard = self.versioning.write().await;
643        let versioning: &mut Versioning = &mut versioning_guard;
644
645        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
646
647        let start_time = Instant::now();
648        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
649
650        let mut compact_task_assignment =
651            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
652
653        let mut version = HummockVersionTransaction::new(
654            &mut versioning.current_version,
655            &mut versioning.hummock_version_deltas,
656            self.env.notification_manager(),
657            None,
658            &self.metrics,
659        );
660        // Apply stats changes.
661        let mut version_stats = HummockVersionStatsTransaction::new(
662            &mut versioning.version_stats,
663            self.env.notification_manager(),
664        );
665
666        if deterministic_mode {
667            version.disable_apply_to_txn();
668        }
669        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
670            self.metadata_manager
671                .catalog_controller
672                .get_versioned_table_schemas()
673                .await
674                .map_err(|e| Error::Internal(e.into()))?
675        } else {
676            HashMap::default()
677        };
678        let mut unschedule_groups = vec![];
679        let mut trivial_tasks = vec![];
680        let mut pick_tasks = vec![];
681        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
682            &self.env.opts,
683        ));
684        'outside: for compaction_group_id in compaction_groups {
685            if pick_tasks.len() >= max_select_count {
686                break;
687            }
688
689            if !version
690                .latest_version()
691                .levels
692                .contains_key(&compaction_group_id)
693            {
694                continue;
695            }
696
697            // When the last table of a compaction group is deleted, the compaction group (and its
698            // config) is destroyed as well. Then a compaction task for this group may come later and
699            // cannot find its config.
700            let group_config = {
701                let config_manager = self.compaction_group_manager.read().await;
702
703                match config_manager.try_get_compaction_group_config(compaction_group_id) {
704                    Some(config) => config,
705                    None => continue,
706                }
707            };
708
709            // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL.
710            let task_id = next_compaction_task_id(&self.env).await?;
711
712            if !compaction_statuses.contains_key(&compaction_group_id) {
713                // lazy initialize.
714                compaction_statuses.insert(
715                    compaction_group_id,
716                    CompactStatus::new(
717                        compaction_group_id,
718                        group_config.compaction_config.max_level,
719                    ),
720                );
721            }
722            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
723
724            let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
725                || matches!(selector.task_type(), TaskType::Emergency);
726
727            let mut stats = LocalSelectorStatistic::default();
728            let member_table_ids: Vec<_> = version
729                .latest_version()
730                .state_table_info
731                .compaction_group_member_table_ids(compaction_group_id)
732                .iter()
733                .map(|table_id| table_id.table_id)
734                .collect();
735
736            let mut table_id_to_option: HashMap<u32, _> = HashMap::default();
737
738            {
739                let guard = self.table_id_to_table_option.read();
740                for table_id in &member_table_ids {
741                    if let Some(opts) = guard.get(table_id) {
742                        table_id_to_option.insert(*table_id, *opts);
743                    }
744                }
745            }
746
747            while let Some(compact_task) = compact_status.get_compact_task(
748                version
749                    .latest_version()
750                    .get_compaction_group_levels(compaction_group_id),
751                version
752                    .latest_version()
753                    .state_table_info
754                    .compaction_group_member_table_ids(compaction_group_id),
755                task_id as HummockCompactionTaskId,
756                &group_config,
757                &mut stats,
758                selector,
759                &table_id_to_option,
760                developer_config.clone(),
761                &version.latest_version().table_watermarks,
762                &version.latest_version().state_table_info,
763            ) {
764                let target_level_id = compact_task.input.target_level as u32;
765                let compaction_group_version_id = version
766                    .latest_version()
767                    .get_compaction_group_levels(compaction_group_id)
768                    .compaction_group_version_id;
769                let compression_algorithm = match compact_task.compression_algorithm.as_str() {
770                    "Lz4" => 1,
771                    "Zstd" => 2,
772                    _ => 0,
773                };
774                let vnode_partition_count = compact_task.input.vnode_partition_count;
775                let mut compact_task = CompactTask {
776                    input_ssts: compact_task.input.input_levels,
777                    splits: vec![KeyRange::inf()],
778                    sorted_output_ssts: vec![],
779                    task_id,
780                    target_level: target_level_id,
781                    // only gc delete keys in last level because there may be older version in more bottom
782                    // level.
783                    gc_delete_keys: version
784                        .latest_version()
785                        .get_compaction_group_levels(compaction_group_id)
786                        .is_last_level(target_level_id),
787                    base_level: compact_task.base_level as u32,
788                    task_status: TaskStatus::Pending,
789                    compaction_group_id: group_config.group_id,
790                    compaction_group_version_id,
791                    existing_table_ids: member_table_ids.clone(),
792                    compression_algorithm,
793                    target_file_size: compact_task.target_file_size,
794                    table_options: table_id_to_option
795                        .iter()
796                        .map(|(table_id, table_option)| {
797                            (*table_id, TableOption::from(table_option))
798                        })
799                        .collect(),
800                    current_epoch_time: Epoch::now().0,
801                    compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
802                    target_sub_level_id: compact_task.input.target_sub_level_id,
803                    task_type: compact_task.compaction_task_type,
804                    split_weight_by_vnode: vnode_partition_count,
805                    max_sub_compaction: group_config.compaction_config.max_sub_compaction,
806                    ..Default::default()
807                };
808
809                let is_trivial_reclaim = compact_task.is_trivial_reclaim();
810                let is_trivial_move = compact_task.is_trivial_move_task();
811                if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
812                    let log_label = if is_trivial_reclaim {
813                        "TrivialReclaim"
814                    } else {
815                        "TrivialMove"
816                    };
817                    let label = if is_trivial_reclaim {
818                        "trivial-space-reclaim"
819                    } else {
820                        "trivial-move"
821                    };
822
823                    tracing::debug!(
824                        "{} for compaction group {}: input: {:?}, cost time: {:?}",
825                        log_label,
826                        compact_task.compaction_group_id,
827                        compact_task.input_ssts,
828                        start_time.elapsed()
829                    );
830                    compact_task.task_status = TaskStatus::Success;
831                    compact_status.report_compact_task(&compact_task);
832                    if !is_trivial_reclaim {
833                        compact_task
834                            .sorted_output_ssts
835                            .clone_from(&compact_task.input_ssts[0].table_infos);
836                    }
837                    update_table_stats_for_vnode_watermark_trivial_reclaim(
838                        &mut version_stats.table_stats,
839                        &compact_task,
840                    );
841                    self.metrics
842                        .compact_frequency
843                        .with_label_values(&[
844                            label,
845                            &compact_task.compaction_group_id.to_string(),
846                            selector.task_type().as_str_name(),
847                            "SUCCESS",
848                        ])
849                        .inc();
850
851                    version.apply_compact_task(&compact_task);
852                    trivial_tasks.push(compact_task);
853                    if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
854                        break 'outside;
855                    }
856                } else {
857                    self.calculate_vnode_partition(
858                        &mut compact_task,
859                        group_config.compaction_config.as_ref(),
860                    );
861                    let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
862                        .latest_version()
863                        .safe_epoch_table_watermarks(&compact_task.existing_table_ids)
864                        .into_iter()
865                        .partition(|(_table_id, table_watermarke)| {
866                            matches!(
867                                table_watermarke.watermark_type,
868                                WatermarkSerdeType::PkPrefix
869                            )
870                        });
871
872                    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
873                    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
874
875                    compact_task.table_schemas = compact_task
876                        .existing_table_ids
877                        .iter()
878                        .filter_map(|table_id| {
879                            let id = (*table_id).try_into().unwrap();
880                            all_versioned_table_schemas.get(&id).map(|column_ids| {
881                                (
882                                    *table_id,
883                                    TableSchema {
884                                        column_ids: column_ids.clone(),
885                                    },
886                                )
887                            })
888                        })
889                        .collect();
890
891                    compact_task_assignment.insert(
892                        compact_task.task_id,
893                        CompactTaskAssignment {
894                            compact_task: Some(compact_task.clone().into()),
895                            context_id: META_NODE_ID, // deprecated
896                        },
897                    );
898
899                    pick_tasks.push(compact_task);
900                    break;
901                }
902
903                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
904                stats = LocalSelectorStatistic::default();
905            }
906            if pick_tasks
907                .last()
908                .map(|task| task.compaction_group_id != compaction_group_id)
909                .unwrap_or(true)
910            {
911                unschedule_groups.push(compaction_group_id);
912            }
913            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
914        }
915
916        if !trivial_tasks.is_empty() {
917            commit_multi_var!(
918                self.meta_store_ref(),
919                compaction_statuses,
920                compact_task_assignment,
921                version,
922                version_stats
923            )?;
924            self.metrics
925                .compact_task_batch_count
926                .with_label_values(&["batch_trivial_move"])
927                .observe(trivial_tasks.len() as f64);
928
929            for trivial_task in &trivial_tasks {
930                self.metrics
931                    .compact_task_trivial_move_sst_count
932                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
933                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
934            }
935
936            drop(versioning_guard);
937        } else {
938            // We are using a single transaction to ensure that each task has progress when it is
939            // created.
940            drop(versioning_guard);
941            commit_multi_var!(
942                self.meta_store_ref(),
943                compaction_statuses,
944                compact_task_assignment
945            )?;
946        }
947        drop(compaction_guard);
948        if !pick_tasks.is_empty() {
949            self.metrics
950                .compact_task_batch_count
951                .with_label_values(&["batch_get_compact_task"])
952                .observe(pick_tasks.len() as f64);
953        }
954
955        for compact_task in &mut pick_tasks {
956            let compaction_group_id = compact_task.compaction_group_id;
957
958            // Initiate heartbeat for the task to track its progress.
959            self.compactor_manager
960                .initiate_task_heartbeat(compact_task.clone());
961
962            // this task has been finished.
963            compact_task.task_status = TaskStatus::Pending;
964            let compact_task_statistics = statistics_compact_task(compact_task);
965
966            let level_type_label = build_compact_task_level_type_metrics_label(
967                compact_task.input_ssts[0].level_idx as usize,
968                compact_task.input_ssts.last().unwrap().level_idx as usize,
969            );
970
971            let level_count = compact_task.input_ssts.len();
972            if compact_task.input_ssts[0].level_idx == 0 {
973                self.metrics
974                    .l0_compact_level_count
975                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
976                    .observe(level_count as _);
977            }
978
979            self.metrics
980                .compact_task_size
981                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
982                .observe(compact_task_statistics.total_file_size as _);
983
984            self.metrics
985                .compact_task_size
986                .with_label_values(&[
987                    &compaction_group_id.to_string(),
988                    &format!("{} uncompressed", level_type_label),
989                ])
990                .observe(compact_task_statistics.total_uncompressed_file_size as _);
991
992            self.metrics
993                .compact_task_file_count
994                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
995                .observe(compact_task_statistics.total_file_count as _);
996
997            tracing::trace!(
998                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
999                compaction_group_id,
1000                level_count,
1001                compact_task.input_ssts[0].level_type.as_str_name(),
1002                compact_task.input_ssts[0].level_idx,
1003                compact_task.target_level,
1004                start_time.elapsed(),
1005                compact_task_statistics
1006            );
1007        }
1008
1009        #[cfg(test)]
1010        {
1011            self.check_state_consistency().await;
1012        }
1013        pick_tasks.extend(trivial_tasks);
1014        Ok((pick_tasks, unschedule_groups))
1015    }
1016
1017    /// Cancels a compaction task no matter it's assigned or unassigned.
1018    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1019        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
1020            anyhow::anyhow!("failpoint metastore err")
1021        )));
1022        let ret = self
1023            .cancel_compact_task_impl(vec![task_id], task_status)
1024            .await?;
1025        Ok(ret[0])
1026    }
1027
1028    pub async fn cancel_compact_tasks(
1029        &self,
1030        tasks: Vec<u64>,
1031        task_status: TaskStatus,
1032    ) -> Result<Vec<bool>> {
1033        self.cancel_compact_task_impl(tasks, task_status).await
1034    }
1035
1036    async fn cancel_compact_task_impl(
1037        &self,
1038        task_ids: Vec<u64>,
1039        task_status: TaskStatus,
1040    ) -> Result<Vec<bool>> {
1041        assert!(CANCEL_STATUS_SET.contains(&task_status));
1042        let tasks = task_ids
1043            .into_iter()
1044            .map(|task_id| ReportTask {
1045                task_id,
1046                task_status,
1047                sorted_output_ssts: vec![],
1048                table_stats_change: HashMap::default(),
1049                object_timestamps: HashMap::default(),
1050            })
1051            .collect_vec();
1052        let rets = self.report_compact_tasks(tasks).await?;
1053        #[cfg(test)]
1054        {
1055            self.check_state_consistency().await;
1056        }
1057        Ok(rets)
1058    }
1059
1060    async fn get_compact_tasks(
1061        &self,
1062        mut compaction_groups: Vec<CompactionGroupId>,
1063        max_select_count: usize,
1064        selector: &mut Box<dyn CompactionSelector>,
1065    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
1066        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
1067            anyhow::anyhow!("failpoint metastore error")
1068        )));
1069        compaction_groups.shuffle(&mut thread_rng());
1070        let (mut tasks, groups) = self
1071            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
1072            .await?;
1073        tasks.retain(|task| {
1074            if task.task_status == TaskStatus::Success {
1075                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
1076                false
1077            } else {
1078                true
1079            }
1080        });
1081        Ok((tasks, groups))
1082    }
1083
1084    pub async fn get_compact_task(
1085        &self,
1086        compaction_group_id: CompactionGroupId,
1087        selector: &mut Box<dyn CompactionSelector>,
1088    ) -> Result<Option<CompactTask>> {
1089        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
1090            anyhow::anyhow!("failpoint metastore error")
1091        )));
1092
1093        let (normal_tasks, _) = self
1094            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
1095            .await?;
1096        for task in normal_tasks {
1097            if task.task_status != TaskStatus::Success {
1098                return Ok(Some(task));
1099            }
1100            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
1101        }
1102        Ok(None)
1103    }
1104
1105    pub async fn manual_get_compact_task(
1106        &self,
1107        compaction_group_id: CompactionGroupId,
1108        manual_compaction_option: ManualCompactionOption,
1109    ) -> Result<Option<CompactTask>> {
1110        let mut selector: Box<dyn CompactionSelector> =
1111            Box::new(ManualCompactionSelector::new(manual_compaction_option));
1112        self.get_compact_task(compaction_group_id, &mut selector)
1113            .await
1114    }
1115
1116    pub async fn report_compact_task(
1117        &self,
1118        task_id: u64,
1119        task_status: TaskStatus,
1120        sorted_output_ssts: Vec<SstableInfo>,
1121        table_stats_change: Option<PbTableStatsMap>,
1122        object_timestamps: HashMap<HummockSstableObjectId, u64>,
1123    ) -> Result<bool> {
1124        let rets = self
1125            .report_compact_tasks(vec![ReportTask {
1126                task_id,
1127                task_status,
1128                sorted_output_ssts,
1129                table_stats_change: table_stats_change.unwrap_or_default(),
1130                object_timestamps,
1131            }])
1132            .await?;
1133        Ok(rets[0])
1134    }
1135
1136    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
1137        let compaction_guard = self.compaction.write().await;
1138        let versioning_guard = self.versioning.write().await;
1139
1140        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
1141            .await
1142    }
1143
1144    /// Finishes or cancels a compaction task, according to `task_status`.
1145    ///
1146    /// If `context_id` is not None, its validity will be checked when writing meta store.
1147    /// Its ownership of the task is checked as well.
1148    ///
1149    /// Return Ok(false) indicates either the task is not found,
1150    /// or the task is not owned by `context_id` when `context_id` is not None.
1151    pub async fn report_compact_tasks_impl(
1152        &self,
1153        report_tasks: Vec<ReportTask>,
1154        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
1155        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
1156    ) -> Result<Vec<bool>> {
1157        let deterministic_mode = self.env.opts.compaction_deterministic_test;
1158        let compaction: &mut Compaction = &mut compaction_guard;
1159        let start_time = Instant::now();
1160        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
1161        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
1162        let mut rets = vec![false; report_tasks.len()];
1163        let mut compact_task_assignment =
1164            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
1165        // The compaction task is finished.
1166        let versioning: &mut Versioning = &mut versioning_guard;
1167        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
1168
1169        // purge stale compact_status
1170        for group_id in original_keys {
1171            if !versioning.current_version.levels.contains_key(&group_id) {
1172                compact_statuses.remove(group_id);
1173            }
1174        }
1175        let mut tasks = vec![];
1176
1177        let mut version = HummockVersionTransaction::new(
1178            &mut versioning.current_version,
1179            &mut versioning.hummock_version_deltas,
1180            self.env.notification_manager(),
1181            None,
1182            &self.metrics,
1183        );
1184
1185        if deterministic_mode {
1186            version.disable_apply_to_txn();
1187        }
1188
1189        let mut version_stats = HummockVersionStatsTransaction::new(
1190            &mut versioning.version_stats,
1191            self.env.notification_manager(),
1192        );
1193        let mut success_count = 0;
1194        for (idx, task) in report_tasks.into_iter().enumerate() {
1195            rets[idx] = true;
1196            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
1197                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
1198                None => {
1199                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
1200                    rets[idx] = false;
1201                    continue;
1202                }
1203            };
1204
1205            {
1206                // apply result
1207                compact_task.task_status = task.task_status;
1208                compact_task.sorted_output_ssts = task.sorted_output_ssts;
1209            }
1210
1211            match compact_statuses.get_mut(compact_task.compaction_group_id) {
1212                Some(mut compact_status) => {
1213                    compact_status.report_compact_task(&compact_task);
1214                }
1215                None => {
1216                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
1217                    // The task is invalid and should be canceled.
1218                    // e.g.
1219                    // 1. The group is removed by the user unregistering the tables
1220                    // 2. The group is removed by the group scheduling algorithm
1221                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
1222                }
1223            }
1224
1225            let is_success = if let TaskStatus::Success = compact_task.task_status {
1226                match self
1227                    .report_compaction_sanity_check(&task.object_timestamps)
1228                    .await
1229                {
1230                    Err(e) => {
1231                        warn!(
1232                            "failed to commit compaction task {} {}",
1233                            compact_task.task_id,
1234                            e.as_report()
1235                        );
1236                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
1237                        false
1238                    }
1239                    _ => {
1240                        let group = version
1241                            .latest_version()
1242                            .levels
1243                            .get(&compact_task.compaction_group_id)
1244                            .unwrap();
1245                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
1246                        if is_expired {
1247                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
1248                            warn!(
1249                                "The task may be expired because of group split, task:\n {:?}",
1250                                compact_task_to_string(&compact_task)
1251                            );
1252                        }
1253                        !is_expired
1254                    }
1255                }
1256            } else {
1257                false
1258            };
1259            if is_success {
1260                success_count += 1;
1261                version.apply_compact_task(&compact_task);
1262                if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
1263                {
1264                    self.metrics.version_stats.reset();
1265                    versioning.local_metrics.clear();
1266                }
1267                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
1268                trigger_local_table_stat(
1269                    &self.metrics,
1270                    &mut versioning.local_metrics,
1271                    &version_stats,
1272                    &task.table_stats_change,
1273                );
1274            }
1275            tasks.push(compact_task);
1276        }
1277        if success_count > 0 {
1278            commit_multi_var!(
1279                self.meta_store_ref(),
1280                compact_statuses,
1281                compact_task_assignment,
1282                version,
1283                version_stats
1284            )?;
1285
1286            self.metrics
1287                .compact_task_batch_count
1288                .with_label_values(&["batch_report_task"])
1289                .observe(success_count as f64);
1290        } else {
1291            // The compaction task is cancelled or failed.
1292            commit_multi_var!(
1293                self.meta_store_ref(),
1294                compact_statuses,
1295                compact_task_assignment
1296            )?;
1297        }
1298
1299        let mut success_groups = vec![];
1300        for compact_task in &tasks {
1301            self.compactor_manager
1302                .remove_task_heartbeat(compact_task.task_id);
1303            tracing::trace!(
1304                "Reported compaction task. {}. cost time: {:?}",
1305                compact_task_to_string(compact_task),
1306                start_time.elapsed(),
1307            );
1308
1309            if !deterministic_mode
1310                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1311                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1312            {
1313                // only try send Dynamic compaction
1314                self.try_send_compaction_request(
1315                    compact_task.compaction_group_id,
1316                    compact_task::TaskType::Dynamic,
1317                );
1318            }
1319
1320            if compact_task.task_status == TaskStatus::Success {
1321                success_groups.push(compact_task.compaction_group_id);
1322            }
1323        }
1324
1325        trigger_compact_tasks_stat(
1326            &self.metrics,
1327            &tasks,
1328            &compaction.compaction_statuses,
1329            &versioning_guard.current_version,
1330        );
1331        drop(versioning_guard);
1332        if !success_groups.is_empty() {
1333            self.try_update_write_limits(&success_groups).await;
1334        }
1335        Ok(rets)
1336    }
1337
1338    /// Triggers compacitons to specified compaction groups.
1339    /// Don't wait for compaction finish
1340    pub async fn trigger_compaction_deterministic(
1341        &self,
1342        _base_version_id: HummockVersionId,
1343        compaction_groups: Vec<CompactionGroupId>,
1344    ) -> Result<()> {
1345        self.on_current_version(|old_version| {
1346            tracing::info!(
1347                "Trigger compaction for version {}, groups {:?}",
1348                old_version.id,
1349                compaction_groups
1350            );
1351        })
1352        .await;
1353
1354        if compaction_groups.is_empty() {
1355            return Ok(());
1356        }
1357        for compaction_group in compaction_groups {
1358            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1359        }
1360        Ok(())
1361    }
1362
1363    pub async fn trigger_manual_compaction(
1364        &self,
1365        compaction_group: CompactionGroupId,
1366        manual_compaction_option: ManualCompactionOption,
1367    ) -> Result<()> {
1368        let start_time = Instant::now();
1369
1370        // 1. Get idle compactor.
1371        let compactor = match self.compactor_manager.next_compactor() {
1372            Some(compactor) => compactor,
1373            None => {
1374                tracing::warn!("trigger_manual_compaction No compactor is available.");
1375                return Err(anyhow::anyhow!(
1376                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1377                    compaction_group
1378                )
1379                .into());
1380            }
1381        };
1382
1383        // 2. Get manual compaction task.
1384        let compact_task = self
1385            .manual_get_compact_task(compaction_group, manual_compaction_option)
1386            .await;
1387        let compact_task = match compact_task {
1388            Ok(Some(compact_task)) => compact_task,
1389            Ok(None) => {
1390                // No compaction task available.
1391                return Err(anyhow::anyhow!(
1392                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1393                    compaction_group
1394                )
1395                    .into());
1396            }
1397            Err(err) => {
1398                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1399
1400                return Err(anyhow::anyhow!(err)
1401                    .context(format!(
1402                        "Failed to get compaction task for compaction_group {}",
1403                        compaction_group,
1404                    ))
1405                    .into());
1406            }
1407        };
1408
1409        // 3. send task to compactor
1410        let compact_task_string = compact_task_to_string(&compact_task);
1411        // TODO: shall we need to cancel on meta ?
1412        compactor
1413            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1414            .with_context(|| {
1415                format!(
1416                    "Failed to trigger compaction task for compaction_group {}",
1417                    compaction_group,
1418                )
1419            })?;
1420
1421        tracing::info!(
1422            "Trigger manual compaction task. {}. cost time: {:?}",
1423            &compact_task_string,
1424            start_time.elapsed(),
1425        );
1426
1427        Ok(())
1428    }
1429
1430    /// Sends a compaction request.
1431    pub fn try_send_compaction_request(
1432        &self,
1433        compaction_group: CompactionGroupId,
1434        task_type: compact_task::TaskType,
1435    ) -> bool {
1436        match self
1437            .compaction_state
1438            .try_sched_compaction(compaction_group, task_type)
1439        {
1440            Ok(_) => true,
1441            Err(e) => {
1442                tracing::error!(
1443                    error = %e.as_report(),
1444                    "failed to send compaction request for compaction group {}",
1445                    compaction_group,
1446                );
1447                false
1448            }
1449        }
1450    }
1451
1452    pub(crate) fn calculate_vnode_partition(
1453        &self,
1454        compact_task: &mut CompactTask,
1455        compaction_config: &CompactionConfig,
1456    ) {
1457        // do not split sst by vnode partition when target_level > base_level
1458        // The purpose of data alignment is mainly to improve the parallelism of base level compaction and reduce write amplification.
1459        // However, at high level, the size of the sst file is often larger and only contains the data of a single table_id, so there is no need to cut it.
1460        if compact_task.target_level > compact_task.base_level {
1461            return;
1462        }
1463        if compaction_config.split_weight_by_vnode > 0 {
1464            for table_id in &compact_task.existing_table_ids {
1465                compact_task
1466                    .table_vnode_partition
1467                    .insert(*table_id, compact_task.split_weight_by_vnode);
1468            }
1469        } else {
1470            let mut table_size_info: HashMap<u32, u64> = HashMap::default();
1471            let mut existing_table_ids: HashSet<u32> = HashSet::default();
1472            for input_ssts in &compact_task.input_ssts {
1473                for sst in &input_ssts.table_infos {
1474                    existing_table_ids.extend(sst.table_ids.iter());
1475                    for table_id in &sst.table_ids {
1476                        *table_size_info.entry(*table_id).or_default() +=
1477                            sst.sst_size / (sst.table_ids.len() as u64);
1478                    }
1479                }
1480            }
1481            compact_task
1482                .existing_table_ids
1483                .retain(|table_id| existing_table_ids.contains(table_id));
1484
1485            let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1486            let default_partition_count = self.env.opts.partition_vnode_count;
1487            // We must ensure the partition threshold large enough to avoid too many small files.
1488            let compact_task_table_size_partition_threshold_low = self
1489                .env
1490                .opts
1491                .compact_task_table_size_partition_threshold_low;
1492            let compact_task_table_size_partition_threshold_high = self
1493                .env
1494                .opts
1495                .compact_task_table_size_partition_threshold_high;
1496            // check latest write throughput
1497            let table_write_throughput_statistic_manager =
1498                self.table_write_throughput_statistic_manager.read();
1499            let timestamp = chrono::Utc::now().timestamp();
1500            for (table_id, compact_table_size) in table_size_info {
1501                let write_throughput = table_write_throughput_statistic_manager
1502                    .get_table_throughput_descending(table_id, timestamp)
1503                    .peekable()
1504                    .peek()
1505                    .map(|item| item.throughput)
1506                    .unwrap_or(0);
1507                if compact_table_size > compact_task_table_size_partition_threshold_high
1508                    && default_partition_count > 0
1509                {
1510                    compact_task
1511                        .table_vnode_partition
1512                        .insert(table_id, default_partition_count);
1513                } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1514                    || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1515                        && compact_table_size > compaction_config.target_file_size_base))
1516                    && hybrid_vnode_count > 0
1517                {
1518                    // partition for large write throughput table. But we also need to make sure that it can not be too small.
1519                    compact_task
1520                        .table_vnode_partition
1521                        .insert(table_id, hybrid_vnode_count);
1522                } else if compact_table_size > compaction_config.target_file_size_base {
1523                    // partition for small table
1524                    compact_task.table_vnode_partition.insert(table_id, 1);
1525                }
1526            }
1527            compact_task
1528                .table_vnode_partition
1529                .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1530        }
1531    }
1532}
1533
1534#[cfg(any(test, feature = "test"))]
1535impl HummockManager {
1536    pub fn compactor_manager_ref_for_test(&self) -> crate::hummock::CompactorManagerRef {
1537        self.compactor_manager.clone()
1538    }
1539
1540    pub async fn compaction_task_from_assignment_for_test(
1541        &self,
1542        task_id: u64,
1543    ) -> Option<CompactTaskAssignment> {
1544        let compaction_guard = self.compaction.read().await;
1545        let assignment_ref = &compaction_guard.compact_task_assignment;
1546        assignment_ref.get(&task_id).cloned()
1547    }
1548
1549    pub async fn report_compact_task_for_test(
1550        &self,
1551        task_id: u64,
1552        compact_task: Option<CompactTask>,
1553        task_status: TaskStatus,
1554        sorted_output_ssts: Vec<SstableInfo>,
1555        table_stats_change: Option<PbTableStatsMap>,
1556    ) -> Result<()> {
1557        if let Some(task) = compact_task {
1558            let mut guard = self.compaction.write().await;
1559            guard.compact_task_assignment.insert(
1560                task_id,
1561                CompactTaskAssignment {
1562                    compact_task: Some(task.into()),
1563                    context_id: 0,
1564                },
1565            );
1566        }
1567
1568        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1569        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1570        self.report_compact_tasks(vec![ReportTask {
1571            task_id,
1572            task_status,
1573            sorted_output_ssts,
1574            table_stats_change: table_stats_change.unwrap_or_default(),
1575            object_timestamps: HashMap::default(),
1576        }])
1577        .await?;
1578        Ok(())
1579    }
1580}
1581
1582#[derive(Debug, Default)]
1583pub struct CompactionState {
1584    scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1585}
1586
1587impl CompactionState {
1588    pub fn new() -> Self {
1589        Self {
1590            scheduled: Default::default(),
1591        }
1592    }
1593
1594    /// Enqueues only if the target is not yet in queue.
1595    pub fn try_sched_compaction(
1596        &self,
1597        compaction_group: CompactionGroupId,
1598        task_type: TaskType,
1599    ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1600        let mut guard = self.scheduled.lock();
1601        let key = (compaction_group, task_type);
1602        if guard.contains(&key) {
1603            return Ok(false);
1604        }
1605        guard.insert(key);
1606        Ok(true)
1607    }
1608
1609    pub fn unschedule(
1610        &self,
1611        compaction_group: CompactionGroupId,
1612        task_type: compact_task::TaskType,
1613    ) {
1614        self.scheduled.lock().remove(&(compaction_group, task_type));
1615    }
1616
1617    pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1618        let guard = self.scheduled.lock();
1619        if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1620            Some(compact_task::TaskType::Dynamic)
1621        } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1622            Some(compact_task::TaskType::SpaceReclaim)
1623        } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1624            Some(compact_task::TaskType::Ttl)
1625        } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1626            Some(compact_task::TaskType::Tombstone)
1627        } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1628            Some(compact_task::TaskType::VnodeWatermark)
1629        } else {
1630            None
1631        }
1632    }
1633}
1634
1635impl Compaction {
1636    pub fn get_compact_task_assignments_by_group_id(
1637        &self,
1638        compaction_group_id: CompactionGroupId,
1639    ) -> Vec<CompactTaskAssignment> {
1640        self.compact_task_assignment
1641            .iter()
1642            .filter_map(|(_, assignment)| {
1643                if assignment
1644                    .compact_task
1645                    .as_ref()
1646                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1647                {
1648                    Some(CompactTaskAssignment {
1649                        compact_task: assignment.compact_task.clone(),
1650                        context_id: assignment.context_id,
1651                    })
1652                } else {
1653                    None
1654                }
1655            })
1656            .collect()
1657    }
1658}
1659
1660#[derive(Clone, Default)]
1661pub struct CompactionGroupStatistic {
1662    pub group_id: CompactionGroupId,
1663    pub group_size: u64,
1664    pub table_statistic: BTreeMap<StateTableId, u64>,
1665    pub compaction_group_config: CompactionGroup,
1666}
1667
1668/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1669fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1670    table_stats: &mut PbTableStatsMap,
1671    task: &CompactTask,
1672) {
1673    if task.task_type != TaskType::VnodeWatermark {
1674        return;
1675    }
1676    let mut deleted_table_keys: HashMap<u32, u64> = HashMap::default();
1677    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1678        assert_eq!(s.table_ids.len(), 1);
1679        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1680        *e += s.total_key_count;
1681    }
1682    for (table_id, delete_count) in deleted_table_keys {
1683        let Some(stats) = table_stats.get_mut(&table_id) else {
1684            continue;
1685        };
1686        if stats.total_key_count == 0 {
1687            continue;
1688        }
1689        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1690        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1691        // total_key_count is updated accurately.
1692        stats.total_key_count = new_total_key_count;
1693        // others are updated approximately.
1694        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1695        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1696    }
1697}
1698
1699#[derive(Debug, Clone)]
1700pub enum GroupState {
1701    /// The compaction group is not in emergency state.
1702    Normal,
1703
1704    /// The compaction group is in emergency state.
1705    Emergency(String), // reason
1706
1707    /// The compaction group is in write stop state.
1708    WriteStop(String), // reason
1709}
1710
1711impl GroupState {
1712    pub fn is_write_stop(&self) -> bool {
1713        matches!(self, Self::WriteStop(_))
1714    }
1715
1716    pub fn is_emergency(&self) -> bool {
1717        matches!(self, Self::Emergency(_))
1718    }
1719
1720    pub fn reason(&self) -> Option<&str> {
1721        match self {
1722            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1723            _ => None,
1724        }
1725    }
1726}
1727
1728#[derive(Clone, Default)]
1729pub struct GroupStateValidator;
1730
1731impl GroupStateValidator {
1732    pub fn write_stop_sub_level_count(
1733        level_count: usize,
1734        compaction_config: &CompactionConfig,
1735    ) -> bool {
1736        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1737        level_count > threshold
1738    }
1739
1740    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1741        l0_size
1742            > compaction_config
1743                .level0_stop_write_threshold_max_size
1744                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1745    }
1746
1747    pub fn write_stop_l0_file_count(
1748        l0_file_count: usize,
1749        compaction_config: &CompactionConfig,
1750    ) -> bool {
1751        l0_file_count
1752            > compaction_config
1753                .level0_stop_write_threshold_max_sst_count
1754                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1755                as usize
1756    }
1757
1758    pub fn emergency_l0_file_count(
1759        l0_file_count: usize,
1760        compaction_config: &CompactionConfig,
1761    ) -> bool {
1762        l0_file_count
1763            > compaction_config
1764                .emergency_level0_sst_file_count
1765                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1766                as usize
1767    }
1768
1769    pub fn emergency_l0_partition_count(
1770        last_l0_sub_level_partition_count: usize,
1771        compaction_config: &CompactionConfig,
1772    ) -> bool {
1773        last_l0_sub_level_partition_count
1774            > compaction_config
1775                .emergency_level0_sub_level_partition
1776                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1777                as usize
1778    }
1779
1780    pub fn check_single_group_write_stop(
1781        levels: &Levels,
1782        compaction_config: &CompactionConfig,
1783    ) -> GroupState {
1784        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1785            return GroupState::WriteStop(format!(
1786                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1787                levels.l0.sub_levels.len(),
1788                compaction_config.level0_stop_write_threshold_sub_level_number
1789            ));
1790        }
1791
1792        if Self::write_stop_l0_file_count(
1793            levels
1794                .l0
1795                .sub_levels
1796                .iter()
1797                .map(|l| l.table_infos.len())
1798                .sum(),
1799            compaction_config,
1800        ) {
1801            return GroupState::WriteStop(format!(
1802                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1803                levels
1804                    .l0
1805                    .sub_levels
1806                    .iter()
1807                    .map(|l| l.table_infos.len())
1808                    .sum::<usize>(),
1809                compaction_config
1810                    .level0_stop_write_threshold_max_sst_count
1811                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1812            ));
1813        }
1814
1815        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1816            return GroupState::WriteStop(format!(
1817                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1818                levels.l0.total_file_size,
1819                compaction_config
1820                    .level0_stop_write_threshold_max_size
1821                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1822            ));
1823        }
1824
1825        GroupState::Normal
1826    }
1827
1828    pub fn check_single_group_emergency(
1829        levels: &Levels,
1830        compaction_config: &CompactionConfig,
1831    ) -> GroupState {
1832        if Self::emergency_l0_file_count(
1833            levels
1834                .l0
1835                .sub_levels
1836                .iter()
1837                .map(|l| l.table_infos.len())
1838                .sum(),
1839            compaction_config,
1840        ) {
1841            return GroupState::Emergency(format!(
1842                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1843                levels
1844                    .l0
1845                    .sub_levels
1846                    .iter()
1847                    .map(|l| l.table_infos.len())
1848                    .sum::<usize>(),
1849                compaction_config
1850                    .emergency_level0_sst_file_count
1851                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1852            ));
1853        }
1854
1855        if Self::emergency_l0_partition_count(
1856            levels
1857                .l0
1858                .sub_levels
1859                .first()
1860                .map(|l| l.table_infos.len())
1861                .unwrap_or(0),
1862            compaction_config,
1863        ) {
1864            return GroupState::Emergency(format!(
1865                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1866                levels
1867                    .l0
1868                    .sub_levels
1869                    .first()
1870                    .map(|l| l.table_infos.len())
1871                    .unwrap_or(0),
1872                compaction_config
1873                    .emergency_level0_sub_level_partition
1874                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1875            ));
1876        }
1877
1878        GroupState::Normal
1879    }
1880
1881    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1882        let state = Self::check_single_group_write_stop(levels, compaction_config);
1883        if state.is_write_stop() {
1884            return state;
1885        }
1886
1887        Self::check_single_group_emergency(levels, compaction_config)
1888    }
1889}