risingwave_ctl/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use risingwave_common::util::tokio_util::sync::CancellationToken;
22use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
23use risingwave_meta::backup_restore::RestoreOpts;
24use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
25use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
26use thiserror_ext::AsReport;
27
28use crate::cmd_impl::hummock::{
29    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
30};
31use crate::cmd_impl::throttle::apply_throttle;
32use crate::common::CtlContext;
33
34pub mod cmd_impl;
35pub mod common;
36
37/// risectl provides internal access to the RisingWave cluster. Generally, you will need
38/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
39/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
40/// instead of playground mode to use this tool. risectl will read environment variables
41/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
42#[derive(Parser)]
43#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
44#[clap(propagate_version = true)]
45#[clap(infer_subcommands = true)]
46pub struct CliOpts {
47    #[clap(subcommand)]
48    command: Commands,
49}
50
51#[derive(Subcommand)]
52#[clap(infer_subcommands = true)]
53enum Commands {
54    /// Commands for Compute
55    #[clap(subcommand)]
56    Compute(ComputeCommands),
57    /// Commands for Hummock
58    #[clap(subcommand)]
59    Hummock(HummockCommands),
60    /// Commands for Tables
61    #[clap(subcommand)]
62    Table(TableCommands),
63    /// Commands for Meta
64    #[clap(subcommand)]
65    Meta(MetaCommands),
66    /// Commands for Scaling
67    #[clap(subcommand)]
68    Scale(ScaleCommands),
69    /// Commands for Benchmarks
70    #[clap(subcommand)]
71    Bench(BenchCommands),
72    /// Dump the await-tree of compute nodes and compactors
73    #[clap(visible_alias("trace"))]
74    AwaitTree,
75    // TODO(yuhao): profile other nodes
76    /// Commands for profilng the compute nodes
77    #[clap(subcommand)]
78    Profile(ProfileCommands),
79    #[clap(subcommand)]
80    Throttle(ThrottleCommands),
81}
82
83#[derive(Subcommand)]
84enum ComputeCommands {
85    /// Show all the configuration parameters on compute node
86    ShowConfig { host: String },
87}
88
89#[allow(clippy::large_enum_variant)]
90#[derive(Subcommand)]
91enum HummockCommands {
92    /// list latest Hummock version on meta node
93    ListVersion {
94        #[clap(short, long = "verbose", default_value_t = false)]
95        verbose: bool,
96
97        #[clap(long = "verbose_key_range", default_value_t = false)]
98        verbose_key_range: bool,
99    },
100
101    /// list hummock version deltas in the meta store
102    ListVersionDeltas {
103        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
104        start_id: u64,
105
106        #[clap(short, long = "num-epochs", default_value_t = 100)]
107        num_epochs: u32,
108    },
109    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
110    DisableCommitEpoch,
111    /// list all Hummock key-value pairs
112    ListKv {
113        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
114        epoch: u64,
115
116        #[clap(short, long = "table-id")]
117        table_id: u32,
118
119        // data directory for hummock state store. None: use default
120        data_dir: Option<String>,
121
122        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
123        use_new_object_prefix_strategy: bool,
124    },
125    SstDump(SstDumpArgs),
126    /// trigger a targeted compaction through `compaction_group_id`
127    TriggerManualCompaction {
128        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
129        compaction_group_id: u64,
130
131        #[clap(short, long = "table-id", default_value_t = 0)]
132        table_id: u32,
133
134        #[clap(short, long = "level", default_value_t = 1)]
135        level: u32,
136
137        #[clap(short, long = "sst-ids", value_delimiter = ',')]
138        sst_ids: Vec<u64>,
139    },
140    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
141    /// `sst_retention_time_sec`, and with `prefix` in path.
142    TriggerFullGc {
143        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
144        sst_retention_time_sec: u64,
145        #[clap(short, long = "prefix", required = false)]
146        prefix: Option<String>,
147    },
148    /// List pinned versions of each worker.
149    ListPinnedVersions {},
150    /// List all compaction groups.
151    ListCompactionGroup,
152    /// Update compaction config for compaction groups.
153    UpdateCompactionConfig {
154        #[clap(long, value_delimiter = ',')]
155        compaction_group_ids: Vec<u64>,
156        #[clap(long)]
157        max_bytes_for_level_base: Option<u64>,
158        #[clap(long)]
159        max_bytes_for_level_multiplier: Option<u64>,
160        #[clap(long)]
161        max_compaction_bytes: Option<u64>,
162        #[clap(long)]
163        sub_level_max_compaction_bytes: Option<u64>,
164        #[clap(long)]
165        level0_tier_compact_file_number: Option<u64>,
166        #[clap(long)]
167        target_file_size_base: Option<u64>,
168        #[clap(long)]
169        compaction_filter_mask: Option<u32>,
170        #[clap(long)]
171        max_sub_compaction: Option<u32>,
172        #[clap(long)]
173        level0_stop_write_threshold_sub_level_number: Option<u64>,
174        #[clap(long)]
175        level0_sub_level_compact_level_count: Option<u32>,
176        #[clap(long)]
177        max_space_reclaim_bytes: Option<u64>,
178        #[clap(long)]
179        level0_max_compact_file_number: Option<u64>,
180        #[clap(long)]
181        level0_overlapping_sub_level_compact_level_count: Option<u32>,
182        #[clap(long)]
183        enable_emergency_picker: Option<bool>,
184        #[clap(long)]
185        tombstone_reclaim_ratio: Option<u32>,
186        #[clap(long)]
187        compression_level: Option<u32>,
188        #[clap(long)]
189        compression_algorithm: Option<String>,
190        #[clap(long)]
191        max_l0_compact_level: Option<u32>,
192        #[clap(long)]
193        sst_allowed_trivial_move_min_size: Option<u64>,
194        #[clap(long)]
195        disable_auto_group_scheduling: Option<bool>,
196        #[clap(long)]
197        max_overlapping_level_size: Option<u64>,
198        #[clap(long)]
199        sst_allowed_trivial_move_max_count: Option<u32>,
200        #[clap(long)]
201        emergency_level0_sst_file_count: Option<u32>,
202        #[clap(long)]
203        emergency_level0_sub_level_partition: Option<u32>,
204        #[clap(long)]
205        level0_stop_write_threshold_max_sst_count: Option<u32>,
206        #[clap(long)]
207        level0_stop_write_threshold_max_size: Option<u64>,
208    },
209    /// Split given compaction group into two. Moves the given tables to the new group.
210    SplitCompactionGroup {
211        #[clap(long)]
212        compaction_group_id: u64,
213        #[clap(long, value_delimiter = ',')]
214        table_ids: Vec<u32>,
215        #[clap(long, default_value_t = 0)]
216        partition_vnode_count: u32,
217    },
218    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
219    PauseVersionCheckpoint,
220    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
221    ResumeVersionCheckpoint,
222    /// Replay version from the checkpoint one to the latest one.
223    ReplayVersion,
224    /// List compaction status
225    ListCompactionStatus {
226        #[clap(short, long = "verbose", default_value_t = false)]
227        verbose: bool,
228    },
229    GetCompactionScore {
230        #[clap(long)]
231        compaction_group_id: u64,
232    },
233    /// Validate the current `HummockVersion`.
234    ValidateVersion,
235    /// Rebuild table stats
236    RebuildTableStats,
237    CancelCompactTask {
238        #[clap(short, long)]
239        task_id: u64,
240    },
241    PrintUserKeyInArchive {
242        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
243        #[clap(long, value_delimiter = ',')]
244        archive_ids: Vec<u64>,
245        /// The data directory of Hummock storage, where `SSTable` objects can be found.
246        #[clap(long)]
247        data_dir: String,
248        /// KVs that are matched with the user key are printed.
249        #[clap(long)]
250        user_key: String,
251        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
252        use_new_object_prefix_strategy: bool,
253    },
254    PrintVersionDeltaInArchive {
255        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
256        #[clap(long, value_delimiter = ',')]
257        archive_ids: Vec<u64>,
258        /// The data directory of Hummock storage, where `SSTable` objects can be found.
259        #[clap(long)]
260        data_dir: String,
261        /// Version deltas that are related to the SST id are printed.
262        #[clap(long)]
263        sst_id: u64,
264        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
265        use_new_object_prefix_strategy: bool,
266    },
267    TieredCacheTracing {
268        #[clap(long)]
269        enable: bool,
270        #[clap(long)]
271        record_hybrid_insert_threshold_ms: Option<u32>,
272        #[clap(long)]
273        record_hybrid_get_threshold_ms: Option<u32>,
274        #[clap(long)]
275        record_hybrid_obtain_threshold_ms: Option<u32>,
276        #[clap(long)]
277        record_hybrid_remove_threshold_ms: Option<u32>,
278        #[clap(long)]
279        record_hybrid_fetch_threshold_ms: Option<u32>,
280    },
281    MergeCompactionGroup {
282        #[clap(long)]
283        left_group_id: u64,
284        #[clap(long)]
285        right_group_id: u64,
286    },
287    MigrateLegacyObject {
288        url: String,
289        source_dir: String,
290        target_dir: String,
291        #[clap(long, default_value = "100")]
292        concurrency: u32,
293    },
294    ResizeCache {
295        #[clap(long)]
296        meta_cache_capacity_mb: Option<u64>,
297        #[clap(long)]
298        data_cache_capacity_mb: Option<u64>,
299    },
300}
301
302#[derive(Subcommand)]
303enum TableCommands {
304    /// scan a state table with MV name
305    Scan {
306        /// name of the materialized view to operate on
307        mv_name: String,
308        // data directory for hummock state store. None: use default
309        data_dir: Option<String>,
310
311        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
312        use_new_object_prefix_strategy: bool,
313    },
314    /// scan a state table using Id
315    ScanById {
316        /// id of the state table to operate on
317        table_id: u32,
318        // data directory for hummock state store. None: use default
319        data_dir: Option<String>,
320        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
321        use_new_object_prefix_strategy: bool,
322    },
323    /// list all state tables
324    List,
325}
326
327#[derive(Subcommand, Debug)]
328enum ScaleCommands {
329    /// Mark a compute node as unschedulable
330    #[clap(verbatim_doc_comment)]
331    Cordon {
332        /// Workers that need to be cordoned, both id and host are supported.
333        #[clap(
334            long,
335            required = true,
336            value_delimiter = ',',
337            value_name = "id or host,..."
338        )]
339        workers: Vec<String>,
340    },
341    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
342    Uncordon {
343        /// Workers that need to be uncordoned, both id and host are supported.
344        #[clap(
345            long,
346            required = true,
347            value_delimiter = ',',
348            value_name = "id or host,..."
349        )]
350        workers: Vec<String>,
351    },
352}
353
354#[derive(Subcommand)]
355#[allow(clippy::large_enum_variant)]
356enum MetaCommands {
357    /// pause the stream graph
358    Pause,
359    /// resume the stream graph
360    Resume,
361    /// get cluster info
362    ClusterInfo,
363    /// get source split info
364    SourceSplitInfo {
365        #[clap(long)]
366        ignore_id: bool,
367    },
368    /// Reschedule the actors in the stream graph
369    ///
370    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
371    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
372    /// `added` when both are provided.
373    ///
374    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
375    /// ```text
376    /// {
377    ///     100: WorkerReschedule {
378    ///         increased_actor_count: { 1: 1 },
379    ///         decreased_actor_count: { 4: 1 },
380    ///     }
381    /// }
382    /// ```
383    /// Use ; to separate multiple fragment
384    #[clap(verbatim_doc_comment)]
385    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
386    Reschedule {
387        /// Plan of reschedule, needs to be used with `revision`
388        #[clap(long, requires = "revision")]
389        plan: Option<String>,
390        /// Revision of the plan
391        #[clap(long)]
392        revision: Option<u64>,
393        /// Reschedule from a specific file
394        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
395        from: Option<String>,
396        /// Show the plan only, no actual operation
397        #[clap(long, default_value = "false")]
398        dry_run: bool,
399        /// Resolve `NO_SHUFFLE` upstream
400        #[clap(long, default_value = "false")]
401        resolve_no_shuffle: bool,
402    },
403    /// backup meta by taking a meta snapshot
404    BackupMeta {
405        #[clap(long)]
406        remarks: Option<String>,
407    },
408    /// restore meta by recovering from a meta snapshot
409    RestoreMeta {
410        #[command(flatten)]
411        opts: RestoreOpts,
412    },
413    /// delete meta snapshots
414    DeleteMetaSnapshots {
415        #[clap(long, value_delimiter = ',')]
416        snapshot_ids: Vec<u64>,
417    },
418
419    /// List all existing connections in the catalog
420    ListConnections,
421
422    /// List fragment mapping for serving
423    ListServingFragmentMapping,
424
425    /// Unregister workers from the cluster
426    UnregisterWorkers {
427        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
428        #[clap(
429            long,
430            required = true,
431            value_delimiter = ',',
432            value_name = "worker_id or worker_host:worker_port, ..."
433        )]
434        workers: Vec<String>,
435
436        /// Automatic yes to prompts
437        #[clap(short = 'y', long, default_value_t = false)]
438        yes: bool,
439
440        /// The worker not found will be ignored
441        #[clap(long, default_value_t = false)]
442        ignore_not_found: bool,
443
444        /// Checking whether the fragment is occupied by workers
445        #[clap(long, default_value_t = false)]
446        check_fragment_occupied: bool,
447    },
448
449    /// Validate source interface for the cloud team
450    ValidateSource {
451        /// With properties in json format
452        /// If privatelink is used, specify `connection.id` instead of `connection.name`
453        #[clap(long)]
454        props: String,
455    },
456
457    /// Performing graph check for scaling.
458    #[clap(verbatim_doc_comment)]
459    GraphCheck {
460        /// SQL endpoint
461        #[clap(long, required = true)]
462        endpoint: String,
463    },
464}
465
466#[derive(Subcommand, Clone, Debug)]
467enum ThrottleCommands {
468    Source(ThrottleCommandArgs),
469    Mv(ThrottleCommandArgs),
470}
471
472#[derive(Clone, Debug, Args)]
473pub struct ThrottleCommandArgs {
474    id: u32,
475    rate: Option<u32>,
476}
477
478#[derive(Subcommand, Clone, Debug)]
479pub enum ProfileCommands {
480    /// CPU profile
481    Cpu {
482        /// The time to active profiling for (in seconds)
483        #[clap(short, long = "sleep")]
484        sleep: u64,
485    },
486    /// Heap profile
487    Heap {
488        /// The output directory of the dumped file
489        #[clap(long = "dir")]
490        dir: Option<String>,
491    },
492}
493
494/// Start `risectl` with the given options.
495/// Cancel the operation when the given `shutdown` token triggers.
496/// Log and abort the process if any error occurs.
497///
498/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
499/// in an embedded manner.
500pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
501    let context = CtlContext::default();
502
503    tokio::select! {
504        _ = shutdown.cancelled() => {
505            // Shutdown requested, clean up the context and return.
506            context.try_close().await;
507        }
508
509        result = start_fallible(opts, &context) => {
510            if let Err(e) = result {
511                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
512                std::process::exit(1);
513            }
514        }
515    }
516}
517
518/// Start `risectl` with the given options.
519/// Return `Err` if any error occurs.
520pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
521    let result = start_impl(opts, context).await;
522    context.try_close().await;
523    result
524}
525
526async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
527    match opts.command {
528        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
529            cmd_impl::compute::show_config(&host).await?
530        }
531        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
532            cmd_impl::hummock::disable_commit_epoch(context).await?
533        }
534        Commands::Hummock(HummockCommands::ListVersion {
535            verbose,
536            verbose_key_range,
537        }) => {
538            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
539        }
540        Commands::Hummock(HummockCommands::ListVersionDeltas {
541            start_id,
542            num_epochs,
543        }) => {
544            cmd_impl::hummock::list_version_deltas(
545                context,
546                HummockVersionId::new(start_id),
547                num_epochs,
548            )
549            .await?;
550        }
551        Commands::Hummock(HummockCommands::ListKv {
552            epoch,
553            table_id,
554            data_dir,
555            use_new_object_prefix_strategy,
556        }) => {
557            cmd_impl::hummock::list_kv(
558                context,
559                epoch,
560                table_id,
561                data_dir,
562                use_new_object_prefix_strategy,
563            )
564            .await?;
565        }
566        Commands::Hummock(HummockCommands::SstDump(args)) => {
567            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
568        }
569        Commands::Hummock(HummockCommands::TriggerManualCompaction {
570            compaction_group_id,
571            table_id,
572            level,
573            sst_ids,
574        }) => {
575            cmd_impl::hummock::trigger_manual_compaction(
576                context,
577                compaction_group_id,
578                table_id,
579                level,
580                sst_ids,
581            )
582            .await?
583        }
584        Commands::Hummock(HummockCommands::TriggerFullGc {
585            sst_retention_time_sec,
586            prefix,
587        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
588        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
589            list_pinned_versions(context).await?
590        }
591        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
592            cmd_impl::hummock::list_compaction_group(context).await?
593        }
594        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
595            compaction_group_ids,
596            max_bytes_for_level_base,
597            max_bytes_for_level_multiplier,
598            max_compaction_bytes,
599            sub_level_max_compaction_bytes,
600            level0_tier_compact_file_number,
601            target_file_size_base,
602            compaction_filter_mask,
603            max_sub_compaction,
604            level0_stop_write_threshold_sub_level_number,
605            level0_sub_level_compact_level_count,
606            max_space_reclaim_bytes,
607            level0_max_compact_file_number,
608            level0_overlapping_sub_level_compact_level_count,
609            enable_emergency_picker,
610            tombstone_reclaim_ratio,
611            compression_level,
612            compression_algorithm,
613            max_l0_compact_level,
614            sst_allowed_trivial_move_min_size,
615            disable_auto_group_scheduling,
616            max_overlapping_level_size,
617            sst_allowed_trivial_move_max_count,
618            emergency_level0_sst_file_count,
619            emergency_level0_sub_level_partition,
620            level0_stop_write_threshold_max_sst_count,
621            level0_stop_write_threshold_max_size,
622        }) => {
623            cmd_impl::hummock::update_compaction_config(
624                context,
625                compaction_group_ids,
626                build_compaction_config_vec(
627                    max_bytes_for_level_base,
628                    max_bytes_for_level_multiplier,
629                    max_compaction_bytes,
630                    sub_level_max_compaction_bytes,
631                    level0_tier_compact_file_number,
632                    target_file_size_base,
633                    compaction_filter_mask,
634                    max_sub_compaction,
635                    level0_stop_write_threshold_sub_level_number,
636                    level0_sub_level_compact_level_count,
637                    max_space_reclaim_bytes,
638                    level0_max_compact_file_number,
639                    level0_overlapping_sub_level_compact_level_count,
640                    enable_emergency_picker,
641                    tombstone_reclaim_ratio,
642                    if let Some(level) = compression_level {
643                        assert!(compression_algorithm.is_some());
644                        Some(CompressionAlgorithm {
645                            level,
646                            compression_algorithm: compression_algorithm.unwrap(),
647                        })
648                    } else {
649                        None
650                    },
651                    max_l0_compact_level,
652                    sst_allowed_trivial_move_min_size,
653                    disable_auto_group_scheduling,
654                    max_overlapping_level_size,
655                    sst_allowed_trivial_move_max_count,
656                    emergency_level0_sst_file_count,
657                    emergency_level0_sub_level_partition,
658                    level0_stop_write_threshold_max_sst_count,
659                    level0_stop_write_threshold_max_size,
660                ),
661            )
662            .await?
663        }
664        Commands::Hummock(HummockCommands::SplitCompactionGroup {
665            compaction_group_id,
666            table_ids,
667            partition_vnode_count,
668        }) => {
669            cmd_impl::hummock::split_compaction_group(
670                context,
671                compaction_group_id,
672                &table_ids,
673                partition_vnode_count,
674            )
675            .await?;
676        }
677        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
678            cmd_impl::hummock::pause_version_checkpoint(context).await?;
679        }
680        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
681            cmd_impl::hummock::resume_version_checkpoint(context).await?;
682        }
683        Commands::Hummock(HummockCommands::ReplayVersion) => {
684            cmd_impl::hummock::replay_version(context).await?;
685        }
686        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
687            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
688        }
689        Commands::Hummock(HummockCommands::GetCompactionScore {
690            compaction_group_id,
691        }) => {
692            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
693        }
694        Commands::Hummock(HummockCommands::ValidateVersion) => {
695            cmd_impl::hummock::validate_version(context).await?;
696        }
697        Commands::Hummock(HummockCommands::RebuildTableStats) => {
698            cmd_impl::hummock::rebuild_table_stats(context).await?;
699        }
700        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
701            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
702        }
703        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
704            archive_ids,
705            data_dir,
706            sst_id,
707            use_new_object_prefix_strategy,
708        }) => {
709            cmd_impl::hummock::print_version_delta_in_archive(
710                context,
711                archive_ids.into_iter().map(HummockVersionId::new),
712                data_dir,
713                sst_id,
714                use_new_object_prefix_strategy,
715            )
716            .await?;
717        }
718        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
719            archive_ids,
720            data_dir,
721            user_key,
722            use_new_object_prefix_strategy,
723        }) => {
724            cmd_impl::hummock::print_user_key_in_archive(
725                context,
726                archive_ids.into_iter().map(HummockVersionId::new),
727                data_dir,
728                user_key,
729                use_new_object_prefix_strategy,
730            )
731            .await?;
732        }
733        Commands::Hummock(HummockCommands::TieredCacheTracing {
734            enable,
735            record_hybrid_insert_threshold_ms,
736            record_hybrid_get_threshold_ms,
737            record_hybrid_obtain_threshold_ms,
738            record_hybrid_remove_threshold_ms,
739            record_hybrid_fetch_threshold_ms,
740        }) => {
741            cmd_impl::hummock::tiered_cache_tracing(
742                context,
743                enable,
744                record_hybrid_insert_threshold_ms,
745                record_hybrid_get_threshold_ms,
746                record_hybrid_obtain_threshold_ms,
747                record_hybrid_remove_threshold_ms,
748                record_hybrid_fetch_threshold_ms,
749            )
750            .await?
751        }
752        Commands::Hummock(HummockCommands::MergeCompactionGroup {
753            left_group_id,
754            right_group_id,
755        }) => {
756            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
757                .await?
758        }
759
760        Commands::Hummock(HummockCommands::MigrateLegacyObject {
761            url,
762            source_dir,
763            target_dir,
764            concurrency,
765        }) => {
766            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
767        }
768        Commands::Hummock(HummockCommands::ResizeCache {
769            meta_cache_capacity_mb,
770            data_cache_capacity_mb,
771        }) => {
772            const MIB: u64 = 1024 * 1024;
773            cmd_impl::hummock::resize_cache(
774                context,
775                meta_cache_capacity_mb.map(|v| v * MIB),
776                data_cache_capacity_mb.map(|v| v * MIB),
777            )
778            .await?
779        }
780        Commands::Table(TableCommands::Scan {
781            mv_name,
782            data_dir,
783            use_new_object_prefix_strategy,
784        }) => {
785            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
786                .await?
787        }
788        Commands::Table(TableCommands::ScanById {
789            table_id,
790            data_dir,
791            use_new_object_prefix_strategy,
792        }) => {
793            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
794                .await?
795        }
796        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
797        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
798        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
799        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
800        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
801        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
802            cmd_impl::meta::source_split_info(context, ignore_id).await?
803        }
804        Commands::Meta(MetaCommands::Reschedule {
805            from,
806            dry_run,
807            plan,
808            revision,
809            resolve_no_shuffle,
810        }) => {
811            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
812                .await?
813        }
814        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
815            cmd_impl::meta::backup_meta(context, remarks).await?
816        }
817        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
818            risingwave_meta::backup_restore::restore(opts).await?
819        }
820        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
821            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
822        }
823        Commands::Meta(MetaCommands::ListConnections) => {
824            cmd_impl::meta::list_connections(context).await?
825        }
826        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
827            cmd_impl::meta::list_serving_fragment_mappings(context).await?
828        }
829        Commands::Meta(MetaCommands::UnregisterWorkers {
830            workers,
831            yes,
832            ignore_not_found,
833            check_fragment_occupied,
834        }) => {
835            cmd_impl::meta::unregister_workers(
836                context,
837                workers,
838                yes,
839                ignore_not_found,
840                check_fragment_occupied,
841            )
842            .await?
843        }
844        Commands::Meta(MetaCommands::ValidateSource { props }) => {
845            cmd_impl::meta::validate_source(context, props).await?
846        }
847        Commands::Meta(MetaCommands::GraphCheck { endpoint }) => {
848            cmd_impl::meta::graph_check(endpoint).await?
849        }
850        Commands::AwaitTree => cmd_impl::await_tree::dump(context).await?,
851        Commands::Profile(ProfileCommands::Cpu { sleep }) => {
852            cmd_impl::profile::cpu_profile(context, sleep).await?
853        }
854        Commands::Profile(ProfileCommands::Heap { dir }) => {
855            cmd_impl::profile::heap_profile(context, dir).await?
856        }
857        Commands::Scale(ScaleCommands::Cordon { workers }) => {
858            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
859                .await?
860        }
861        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
862            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
863                .await?
864        }
865        Commands::Throttle(ThrottleCommands::Source(args)) => {
866            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
867        }
868        Commands::Throttle(ThrottleCommands::Mv(args)) => {
869            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
870        }
871    }
872    Ok(())
873}