risingwave_ctl/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use risingwave_common::util::tokio_util::sync::CancellationToken;
22use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
23use risingwave_meta::backup_restore::RestoreOpts;
24use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
25use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
26use thiserror_ext::AsReport;
27
28use crate::cmd_impl::hummock::{
29    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
30};
31use crate::cmd_impl::throttle::apply_throttle;
32use crate::common::CtlContext;
33
34pub mod cmd_impl;
35pub mod common;
36
37/// risectl provides internal access to the RisingWave cluster. Generally, you will need
38/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
39/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
40/// instead of playground mode to use this tool. risectl will read environment variables
41/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
42#[derive(Parser)]
43#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
44#[clap(propagate_version = true)]
45#[clap(infer_subcommands = true)]
46pub struct CliOpts {
47    #[clap(subcommand)]
48    command: Commands,
49}
50
51#[derive(Subcommand)]
52#[clap(infer_subcommands = true)]
53enum Commands {
54    /// Commands for Compute
55    #[clap(subcommand)]
56    Compute(ComputeCommands),
57    /// Commands for Hummock
58    #[clap(subcommand)]
59    Hummock(HummockCommands),
60    /// Commands for Tables
61    #[clap(subcommand)]
62    Table(TableCommands),
63    /// Commands for Meta
64    #[clap(subcommand)]
65    Meta(MetaCommands),
66    /// Commands for Scaling
67    #[clap(subcommand)]
68    Scale(ScaleCommands),
69    /// Commands for Benchmarks
70    #[clap(subcommand)]
71    Bench(BenchCommands),
72    /// Commands for await-tree, such as dumping, analyzing and transcribing
73    #[clap(subcommand)]
74    #[clap(visible_alias("trace"))]
75    AwaitTree(AwaitTreeCommands),
76    // TODO(yuhao): profile other nodes
77    /// Commands for profilng the compute nodes
78    #[clap(subcommand)]
79    Profile(ProfileCommands),
80    #[clap(subcommand)]
81    Throttle(ThrottleCommands),
82}
83
84#[derive(Subcommand)]
85enum ComputeCommands {
86    /// Show all the configuration parameters on compute node
87    ShowConfig { host: String },
88}
89
90#[allow(clippy::large_enum_variant)]
91#[derive(Subcommand)]
92enum HummockCommands {
93    /// list latest Hummock version on meta node
94    ListVersion {
95        #[clap(short, long = "verbose", default_value_t = false)]
96        verbose: bool,
97
98        #[clap(long = "verbose_key_range", default_value_t = false)]
99        verbose_key_range: bool,
100    },
101
102    /// list hummock version deltas in the meta store
103    ListVersionDeltas {
104        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
105        start_id: u64,
106
107        #[clap(short, long = "num-epochs", default_value_t = 100)]
108        num_epochs: u32,
109    },
110    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
111    DisableCommitEpoch,
112    /// list all Hummock key-value pairs
113    ListKv {
114        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
115        epoch: u64,
116
117        #[clap(short, long = "table-id")]
118        table_id: u32,
119
120        // data directory for hummock state store. None: use default
121        data_dir: Option<String>,
122
123        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
124        use_new_object_prefix_strategy: bool,
125    },
126    SstDump(SstDumpArgs),
127    /// trigger a targeted compaction through `compaction_group_id`
128    TriggerManualCompaction {
129        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
130        compaction_group_id: u64,
131
132        #[clap(short, long = "table-id", default_value_t = 0)]
133        table_id: u32,
134
135        #[clap(short, long = "level", default_value_t = 1)]
136        level: u32,
137
138        #[clap(short, long = "sst-ids", value_delimiter = ',')]
139        sst_ids: Vec<u64>,
140    },
141    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
142    /// `sst_retention_time_sec`, and with `prefix` in path.
143    TriggerFullGc {
144        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
145        sst_retention_time_sec: u64,
146        #[clap(short, long = "prefix", required = false)]
147        prefix: Option<String>,
148    },
149    /// List pinned versions of each worker.
150    ListPinnedVersions {},
151    /// List all compaction groups.
152    ListCompactionGroup,
153    /// Update compaction config for compaction groups.
154    UpdateCompactionConfig {
155        #[clap(long, value_delimiter = ',')]
156        compaction_group_ids: Vec<u64>,
157        #[clap(long)]
158        max_bytes_for_level_base: Option<u64>,
159        #[clap(long)]
160        max_bytes_for_level_multiplier: Option<u64>,
161        #[clap(long)]
162        max_compaction_bytes: Option<u64>,
163        #[clap(long)]
164        sub_level_max_compaction_bytes: Option<u64>,
165        #[clap(long)]
166        level0_tier_compact_file_number: Option<u64>,
167        #[clap(long)]
168        target_file_size_base: Option<u64>,
169        #[clap(long)]
170        compaction_filter_mask: Option<u32>,
171        #[clap(long)]
172        max_sub_compaction: Option<u32>,
173        #[clap(long)]
174        level0_stop_write_threshold_sub_level_number: Option<u64>,
175        #[clap(long)]
176        level0_sub_level_compact_level_count: Option<u32>,
177        #[clap(long)]
178        max_space_reclaim_bytes: Option<u64>,
179        #[clap(long)]
180        level0_max_compact_file_number: Option<u64>,
181        #[clap(long)]
182        level0_overlapping_sub_level_compact_level_count: Option<u32>,
183        #[clap(long)]
184        enable_emergency_picker: Option<bool>,
185        #[clap(long)]
186        tombstone_reclaim_ratio: Option<u32>,
187        #[clap(long)]
188        compression_level: Option<u32>,
189        #[clap(long)]
190        compression_algorithm: Option<String>,
191        #[clap(long)]
192        max_l0_compact_level: Option<u32>,
193        #[clap(long)]
194        sst_allowed_trivial_move_min_size: Option<u64>,
195        #[clap(long)]
196        disable_auto_group_scheduling: Option<bool>,
197        #[clap(long)]
198        max_overlapping_level_size: Option<u64>,
199        #[clap(long)]
200        sst_allowed_trivial_move_max_count: Option<u32>,
201        #[clap(long)]
202        emergency_level0_sst_file_count: Option<u32>,
203        #[clap(long)]
204        emergency_level0_sub_level_partition: Option<u32>,
205        #[clap(long)]
206        level0_stop_write_threshold_max_sst_count: Option<u32>,
207        #[clap(long)]
208        level0_stop_write_threshold_max_size: Option<u64>,
209        #[clap(long)]
210        enable_optimize_l0_interval_selection: Option<bool>,
211    },
212    /// Split given compaction group into two. Moves the given tables to the new group.
213    SplitCompactionGroup {
214        #[clap(long)]
215        compaction_group_id: u64,
216        #[clap(long, value_delimiter = ',')]
217        table_ids: Vec<u32>,
218        #[clap(long, default_value_t = 0)]
219        partition_vnode_count: u32,
220    },
221    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
222    PauseVersionCheckpoint,
223    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
224    ResumeVersionCheckpoint,
225    /// Replay version from the checkpoint one to the latest one.
226    ReplayVersion,
227    /// List compaction status
228    ListCompactionStatus {
229        #[clap(short, long = "verbose", default_value_t = false)]
230        verbose: bool,
231    },
232    GetCompactionScore {
233        #[clap(long)]
234        compaction_group_id: u64,
235    },
236    /// Validate the current `HummockVersion`.
237    ValidateVersion,
238    /// Rebuild table stats
239    RebuildTableStats,
240    CancelCompactTask {
241        #[clap(short, long)]
242        task_id: u64,
243    },
244    PrintUserKeyInArchive {
245        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
246        #[clap(long, value_delimiter = ',')]
247        archive_ids: Vec<u64>,
248        /// The data directory of Hummock storage, where `SSTable` objects can be found.
249        #[clap(long)]
250        data_dir: String,
251        /// KVs that are matched with the user key are printed.
252        #[clap(long)]
253        user_key: String,
254        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
255        use_new_object_prefix_strategy: bool,
256    },
257    PrintVersionDeltaInArchive {
258        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
259        #[clap(long, value_delimiter = ',')]
260        archive_ids: Vec<u64>,
261        /// The data directory of Hummock storage, where `SSTable` objects can be found.
262        #[clap(long)]
263        data_dir: String,
264        /// Version deltas that are related to the SST id are printed.
265        #[clap(long)]
266        sst_id: u64,
267        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
268        use_new_object_prefix_strategy: bool,
269    },
270    TieredCacheTracing {
271        #[clap(long)]
272        enable: bool,
273        #[clap(long)]
274        record_hybrid_insert_threshold_ms: Option<u32>,
275        #[clap(long)]
276        record_hybrid_get_threshold_ms: Option<u32>,
277        #[clap(long)]
278        record_hybrid_obtain_threshold_ms: Option<u32>,
279        #[clap(long)]
280        record_hybrid_remove_threshold_ms: Option<u32>,
281        #[clap(long)]
282        record_hybrid_fetch_threshold_ms: Option<u32>,
283    },
284    MergeCompactionGroup {
285        #[clap(long)]
286        left_group_id: u64,
287        #[clap(long)]
288        right_group_id: u64,
289    },
290    MigrateLegacyObject {
291        url: String,
292        source_dir: String,
293        target_dir: String,
294        #[clap(long, default_value = "100")]
295        concurrency: u32,
296    },
297    ResizeCache {
298        #[clap(long)]
299        meta_cache_capacity_mb: Option<u64>,
300        #[clap(long)]
301        data_cache_capacity_mb: Option<u64>,
302    },
303}
304
305#[derive(Subcommand)]
306enum TableCommands {
307    /// scan a state table with MV name
308    Scan {
309        /// name of the materialized view to operate on
310        mv_name: String,
311        // data directory for hummock state store. None: use default
312        data_dir: Option<String>,
313
314        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
315        use_new_object_prefix_strategy: bool,
316    },
317    /// scan a state table using Id
318    ScanById {
319        /// id of the state table to operate on
320        table_id: u32,
321        // data directory for hummock state store. None: use default
322        data_dir: Option<String>,
323        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
324        use_new_object_prefix_strategy: bool,
325    },
326    /// list all state tables
327    List,
328}
329
330#[derive(Subcommand, Debug)]
331enum ScaleCommands {
332    /// Mark a compute node as unschedulable
333    #[clap(verbatim_doc_comment)]
334    Cordon {
335        /// Workers that need to be cordoned, both id and host are supported.
336        #[clap(
337            long,
338            required = true,
339            value_delimiter = ',',
340            value_name = "id or host,..."
341        )]
342        workers: Vec<String>,
343    },
344    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
345    Uncordon {
346        /// Workers that need to be uncordoned, both id and host are supported.
347        #[clap(
348            long,
349            required = true,
350            value_delimiter = ',',
351            value_name = "id or host,..."
352        )]
353        workers: Vec<String>,
354    },
355}
356
357#[derive(Subcommand)]
358#[allow(clippy::large_enum_variant)]
359enum MetaCommands {
360    /// pause the stream graph
361    Pause,
362    /// resume the stream graph
363    Resume,
364    /// get cluster info
365    ClusterInfo,
366    /// get source split info
367    SourceSplitInfo {
368        #[clap(long)]
369        ignore_id: bool,
370    },
371    /// Reschedule the actors in the stream graph
372    ///
373    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
374    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
375    /// `added` when both are provided.
376    ///
377    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
378    /// ```text
379    /// {
380    ///     100: WorkerReschedule {
381    ///         increased_actor_count: { 1: 1 },
382    ///         decreased_actor_count: { 4: 1 },
383    ///     }
384    /// }
385    /// ```
386    /// Use ; to separate multiple fragment
387    #[clap(verbatim_doc_comment)]
388    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
389    Reschedule {
390        /// Plan of reschedule, needs to be used with `revision`
391        #[clap(long, requires = "revision")]
392        plan: Option<String>,
393        /// Revision of the plan
394        #[clap(long)]
395        revision: Option<u64>,
396        /// Reschedule from a specific file
397        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
398        from: Option<String>,
399        /// Show the plan only, no actual operation
400        #[clap(long, default_value = "false")]
401        dry_run: bool,
402        /// Resolve `NO_SHUFFLE` upstream
403        #[clap(long, default_value = "false")]
404        resolve_no_shuffle: bool,
405    },
406    /// backup meta by taking a meta snapshot
407    BackupMeta {
408        #[clap(long)]
409        remarks: Option<String>,
410    },
411    /// restore meta by recovering from a meta snapshot
412    RestoreMeta {
413        #[command(flatten)]
414        opts: RestoreOpts,
415    },
416    /// delete meta snapshots
417    DeleteMetaSnapshots {
418        #[clap(long, value_delimiter = ',')]
419        snapshot_ids: Vec<u64>,
420    },
421
422    /// List all existing connections in the catalog
423    ListConnections,
424
425    /// List fragment mapping for serving
426    ListServingFragmentMapping,
427
428    /// Unregister workers from the cluster
429    UnregisterWorkers {
430        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
431        #[clap(
432            long,
433            required = true,
434            value_delimiter = ',',
435            value_name = "worker_id or worker_host:worker_port, ..."
436        )]
437        workers: Vec<String>,
438
439        /// Automatic yes to prompts
440        #[clap(short = 'y', long, default_value_t = false)]
441        yes: bool,
442
443        /// The worker not found will be ignored
444        #[clap(long, default_value_t = false)]
445        ignore_not_found: bool,
446
447        /// Checking whether the fragment is occupied by workers
448        #[clap(long, default_value_t = false)]
449        check_fragment_occupied: bool,
450    },
451
452    /// Validate source interface for the cloud team
453    ValidateSource {
454        /// With properties in json format
455        /// If privatelink is used, specify `connection.id` instead of `connection.name`
456        #[clap(long)]
457        props: String,
458    },
459
460    /// Performing graph check for scaling.
461    #[clap(verbatim_doc_comment)]
462    GraphCheck {
463        /// SQL endpoint
464        #[clap(long, required = true)]
465        endpoint: String,
466    },
467}
468
469#[derive(Subcommand, Clone, Debug)]
470pub enum AwaitTreeCommands {
471    /// Dump Await Tree
472    Dump {
473        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
474        #[clap(short, long = "actor-traces-format")]
475        actor_traces_format: Option<String>,
476    },
477    /// Analyze Await Tree
478    Analyze {
479        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
480        /// The actor traces format can be either `json` or `text`. The analyze command will
481        /// automatically detect the format.
482        #[clap(long = "path")]
483        path: Option<String>,
484    },
485    /// Transcribe Await Tree From JSON to Text format
486    Transcribe {
487        /// The path to the await tree file to be transcribed
488        #[clap(long = "path")]
489        path: String,
490    },
491}
492
493#[derive(Subcommand, Clone, Debug)]
494enum ThrottleCommands {
495    Source(ThrottleCommandArgs),
496    Mv(ThrottleCommandArgs),
497}
498
499#[derive(Clone, Debug, Args)]
500pub struct ThrottleCommandArgs {
501    id: u32,
502    rate: Option<u32>,
503}
504
505#[derive(Subcommand, Clone, Debug)]
506pub enum ProfileCommands {
507    /// CPU profile
508    Cpu {
509        /// The time to active profiling for (in seconds)
510        #[clap(short, long = "sleep")]
511        sleep: u64,
512    },
513    /// Heap profile
514    Heap {
515        /// The output directory of the dumped file
516        #[clap(long = "dir")]
517        dir: Option<String>,
518    },
519}
520
521/// Start `risectl` with the given options.
522/// Cancel the operation when the given `shutdown` token triggers.
523/// Log and abort the process if any error occurs.
524///
525/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
526/// in an embedded manner.
527pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
528    let context = CtlContext::default();
529
530    tokio::select! {
531        _ = shutdown.cancelled() => {
532            // Shutdown requested, clean up the context and return.
533            context.try_close().await;
534        }
535
536        result = start_fallible(opts, &context) => {
537            if let Err(e) = result {
538                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
539                std::process::exit(1);
540            }
541        }
542    }
543}
544
545/// Start `risectl` with the given options.
546/// Return `Err` if any error occurs.
547pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
548    let result = start_impl(opts, context).await;
549    context.try_close().await;
550    result
551}
552
553async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
554    match opts.command {
555        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
556            cmd_impl::compute::show_config(&host).await?
557        }
558        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
559            cmd_impl::hummock::disable_commit_epoch(context).await?
560        }
561        Commands::Hummock(HummockCommands::ListVersion {
562            verbose,
563            verbose_key_range,
564        }) => {
565            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
566        }
567        Commands::Hummock(HummockCommands::ListVersionDeltas {
568            start_id,
569            num_epochs,
570        }) => {
571            cmd_impl::hummock::list_version_deltas(
572                context,
573                HummockVersionId::new(start_id),
574                num_epochs,
575            )
576            .await?;
577        }
578        Commands::Hummock(HummockCommands::ListKv {
579            epoch,
580            table_id,
581            data_dir,
582            use_new_object_prefix_strategy,
583        }) => {
584            cmd_impl::hummock::list_kv(
585                context,
586                epoch,
587                table_id,
588                data_dir,
589                use_new_object_prefix_strategy,
590            )
591            .await?;
592        }
593        Commands::Hummock(HummockCommands::SstDump(args)) => {
594            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
595        }
596        Commands::Hummock(HummockCommands::TriggerManualCompaction {
597            compaction_group_id,
598            table_id,
599            level,
600            sst_ids,
601        }) => {
602            cmd_impl::hummock::trigger_manual_compaction(
603                context,
604                compaction_group_id,
605                table_id,
606                level,
607                sst_ids,
608            )
609            .await?
610        }
611        Commands::Hummock(HummockCommands::TriggerFullGc {
612            sst_retention_time_sec,
613            prefix,
614        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
615        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
616            list_pinned_versions(context).await?
617        }
618        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
619            cmd_impl::hummock::list_compaction_group(context).await?
620        }
621        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
622            compaction_group_ids,
623            max_bytes_for_level_base,
624            max_bytes_for_level_multiplier,
625            max_compaction_bytes,
626            sub_level_max_compaction_bytes,
627            level0_tier_compact_file_number,
628            target_file_size_base,
629            compaction_filter_mask,
630            max_sub_compaction,
631            level0_stop_write_threshold_sub_level_number,
632            level0_sub_level_compact_level_count,
633            max_space_reclaim_bytes,
634            level0_max_compact_file_number,
635            level0_overlapping_sub_level_compact_level_count,
636            enable_emergency_picker,
637            tombstone_reclaim_ratio,
638            compression_level,
639            compression_algorithm,
640            max_l0_compact_level,
641            sst_allowed_trivial_move_min_size,
642            disable_auto_group_scheduling,
643            max_overlapping_level_size,
644            sst_allowed_trivial_move_max_count,
645            emergency_level0_sst_file_count,
646            emergency_level0_sub_level_partition,
647            level0_stop_write_threshold_max_sst_count,
648            level0_stop_write_threshold_max_size,
649            enable_optimize_l0_interval_selection,
650        }) => {
651            cmd_impl::hummock::update_compaction_config(
652                context,
653                compaction_group_ids,
654                build_compaction_config_vec(
655                    max_bytes_for_level_base,
656                    max_bytes_for_level_multiplier,
657                    max_compaction_bytes,
658                    sub_level_max_compaction_bytes,
659                    level0_tier_compact_file_number,
660                    target_file_size_base,
661                    compaction_filter_mask,
662                    max_sub_compaction,
663                    level0_stop_write_threshold_sub_level_number,
664                    level0_sub_level_compact_level_count,
665                    max_space_reclaim_bytes,
666                    level0_max_compact_file_number,
667                    level0_overlapping_sub_level_compact_level_count,
668                    enable_emergency_picker,
669                    tombstone_reclaim_ratio,
670                    if let Some(level) = compression_level {
671                        assert!(compression_algorithm.is_some());
672                        Some(CompressionAlgorithm {
673                            level,
674                            compression_algorithm: compression_algorithm.unwrap(),
675                        })
676                    } else {
677                        None
678                    },
679                    max_l0_compact_level,
680                    sst_allowed_trivial_move_min_size,
681                    disable_auto_group_scheduling,
682                    max_overlapping_level_size,
683                    sst_allowed_trivial_move_max_count,
684                    emergency_level0_sst_file_count,
685                    emergency_level0_sub_level_partition,
686                    level0_stop_write_threshold_max_sst_count,
687                    level0_stop_write_threshold_max_size,
688                    enable_optimize_l0_interval_selection,
689                ),
690            )
691            .await?
692        }
693        Commands::Hummock(HummockCommands::SplitCompactionGroup {
694            compaction_group_id,
695            table_ids,
696            partition_vnode_count,
697        }) => {
698            cmd_impl::hummock::split_compaction_group(
699                context,
700                compaction_group_id,
701                &table_ids,
702                partition_vnode_count,
703            )
704            .await?;
705        }
706        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
707            cmd_impl::hummock::pause_version_checkpoint(context).await?;
708        }
709        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
710            cmd_impl::hummock::resume_version_checkpoint(context).await?;
711        }
712        Commands::Hummock(HummockCommands::ReplayVersion) => {
713            cmd_impl::hummock::replay_version(context).await?;
714        }
715        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
716            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
717        }
718        Commands::Hummock(HummockCommands::GetCompactionScore {
719            compaction_group_id,
720        }) => {
721            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
722        }
723        Commands::Hummock(HummockCommands::ValidateVersion) => {
724            cmd_impl::hummock::validate_version(context).await?;
725        }
726        Commands::Hummock(HummockCommands::RebuildTableStats) => {
727            cmd_impl::hummock::rebuild_table_stats(context).await?;
728        }
729        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
730            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
731        }
732        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
733            archive_ids,
734            data_dir,
735            sst_id,
736            use_new_object_prefix_strategy,
737        }) => {
738            cmd_impl::hummock::print_version_delta_in_archive(
739                context,
740                archive_ids.into_iter().map(HummockVersionId::new),
741                data_dir,
742                sst_id,
743                use_new_object_prefix_strategy,
744            )
745            .await?;
746        }
747        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
748            archive_ids,
749            data_dir,
750            user_key,
751            use_new_object_prefix_strategy,
752        }) => {
753            cmd_impl::hummock::print_user_key_in_archive(
754                context,
755                archive_ids.into_iter().map(HummockVersionId::new),
756                data_dir,
757                user_key,
758                use_new_object_prefix_strategy,
759            )
760            .await?;
761        }
762        Commands::Hummock(HummockCommands::TieredCacheTracing {
763            enable,
764            record_hybrid_insert_threshold_ms,
765            record_hybrid_get_threshold_ms,
766            record_hybrid_obtain_threshold_ms,
767            record_hybrid_remove_threshold_ms,
768            record_hybrid_fetch_threshold_ms,
769        }) => {
770            cmd_impl::hummock::tiered_cache_tracing(
771                context,
772                enable,
773                record_hybrid_insert_threshold_ms,
774                record_hybrid_get_threshold_ms,
775                record_hybrid_obtain_threshold_ms,
776                record_hybrid_remove_threshold_ms,
777                record_hybrid_fetch_threshold_ms,
778            )
779            .await?
780        }
781        Commands::Hummock(HummockCommands::MergeCompactionGroup {
782            left_group_id,
783            right_group_id,
784        }) => {
785            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
786                .await?
787        }
788
789        Commands::Hummock(HummockCommands::MigrateLegacyObject {
790            url,
791            source_dir,
792            target_dir,
793            concurrency,
794        }) => {
795            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
796        }
797        Commands::Hummock(HummockCommands::ResizeCache {
798            meta_cache_capacity_mb,
799            data_cache_capacity_mb,
800        }) => {
801            const MIB: u64 = 1024 * 1024;
802            cmd_impl::hummock::resize_cache(
803                context,
804                meta_cache_capacity_mb.map(|v| v * MIB),
805                data_cache_capacity_mb.map(|v| v * MIB),
806            )
807            .await?
808        }
809        Commands::Table(TableCommands::Scan {
810            mv_name,
811            data_dir,
812            use_new_object_prefix_strategy,
813        }) => {
814            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
815                .await?
816        }
817        Commands::Table(TableCommands::ScanById {
818            table_id,
819            data_dir,
820            use_new_object_prefix_strategy,
821        }) => {
822            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
823                .await?
824        }
825        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
826        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
827        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
828        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
829        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
830        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
831            cmd_impl::meta::source_split_info(context, ignore_id).await?
832        }
833        Commands::Meta(MetaCommands::Reschedule {
834            from,
835            dry_run,
836            plan,
837            revision,
838            resolve_no_shuffle,
839        }) => {
840            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
841                .await?
842        }
843        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
844            cmd_impl::meta::backup_meta(context, remarks).await?
845        }
846        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
847            risingwave_meta::backup_restore::restore(opts).await?
848        }
849        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
850            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
851        }
852        Commands::Meta(MetaCommands::ListConnections) => {
853            cmd_impl::meta::list_connections(context).await?
854        }
855        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
856            cmd_impl::meta::list_serving_fragment_mappings(context).await?
857        }
858        Commands::Meta(MetaCommands::UnregisterWorkers {
859            workers,
860            yes,
861            ignore_not_found,
862            check_fragment_occupied,
863        }) => {
864            cmd_impl::meta::unregister_workers(
865                context,
866                workers,
867                yes,
868                ignore_not_found,
869                check_fragment_occupied,
870            )
871            .await?
872        }
873        Commands::Meta(MetaCommands::ValidateSource { props }) => {
874            cmd_impl::meta::validate_source(context, props).await?
875        }
876        Commands::Meta(MetaCommands::GraphCheck { endpoint }) => {
877            cmd_impl::meta::graph_check(endpoint).await?
878        }
879        Commands::AwaitTree(AwaitTreeCommands::Dump {
880            actor_traces_format,
881        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
882        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
883            cmd_impl::await_tree::bottleneck_detect(context, path).await?
884        }
885        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
886            rw_diagnose_tools::await_tree::transcribe(path)?
887        }
888        Commands::Profile(ProfileCommands::Cpu { sleep }) => {
889            cmd_impl::profile::cpu_profile(context, sleep).await?
890        }
891        Commands::Profile(ProfileCommands::Heap { dir }) => {
892            cmd_impl::profile::heap_profile(context, dir).await?
893        }
894        Commands::Scale(ScaleCommands::Cordon { workers }) => {
895            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
896                .await?
897        }
898        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
899            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
900                .await?
901        }
902        Commands::Throttle(ThrottleCommands::Source(args)) => {
903            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
904        }
905        Commands::Throttle(ThrottleCommands::Mv(args)) => {
906            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
907        }
908    }
909    Ok(())
910}