risingwave_ctl/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
33use crate::cmd_impl::throttle::apply_throttle;
34use crate::common::CtlContext;
35
36pub mod cmd_impl;
37pub mod common;
38
39/// risectl provides internal access to the RisingWave cluster. Generally, you will need
40/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
41/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
42/// instead of playground mode to use this tool. risectl will read environment variables
43/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
44#[derive(Parser)]
45#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
46#[clap(propagate_version = true)]
47#[clap(infer_subcommands = true)]
48pub struct CliOpts {
49    #[clap(subcommand)]
50    command: Commands,
51}
52
53#[derive(Subcommand)]
54#[clap(infer_subcommands = true)]
55enum Commands {
56    /// Commands for Compute
57    #[clap(subcommand)]
58    Compute(ComputeCommands),
59    /// Commands for Hummock
60    #[clap(subcommand)]
61    Hummock(HummockCommands),
62    /// Commands for Tables
63    #[clap(subcommand)]
64    Table(TableCommands),
65    /// Commands for Meta
66    #[clap(subcommand)]
67    Meta(MetaCommands),
68    /// Commands for Scaling
69    #[clap(subcommand)]
70    Scale(ScaleCommands),
71    /// Commands for Benchmarks
72    #[clap(subcommand)]
73    Bench(BenchCommands),
74    /// Commands for await-tree, such as dumping, analyzing and transcribing
75    #[clap(subcommand)]
76    #[clap(visible_alias("trace"))]
77    AwaitTree(AwaitTreeCommands),
78    // TODO(yuhao): profile other nodes
79    /// Commands for profilng the compute nodes
80    #[clap(subcommand)]
81    Profile(ProfileCommands),
82    #[clap(subcommand)]
83    Throttle(ThrottleCommands),
84    /// Commands for Self-testing
85    #[clap(subcommand, hide = true)]
86    Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91    /// Show all the configuration parameters on compute node
92    ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98    /// list latest Hummock version on meta node
99    ListVersion {
100        #[clap(short, long = "verbose", default_value_t = false)]
101        verbose: bool,
102
103        #[clap(long = "verbose_key_range", default_value_t = false)]
104        verbose_key_range: bool,
105    },
106
107    /// list hummock version deltas in the meta store
108    ListVersionDeltas {
109        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110        start_id: u64,
111
112        #[clap(short, long = "num-epochs", default_value_t = 100)]
113        num_epochs: u32,
114    },
115    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
116    DisableCommitEpoch,
117    /// list all Hummock key-value pairs
118    ListKv {
119        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120        epoch: u64,
121
122        #[clap(short, long = "table-id")]
123        table_id: u32,
124
125        // data directory for hummock state store. None: use default
126        data_dir: Option<String>,
127
128        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129        use_new_object_prefix_strategy: bool,
130    },
131    SstDump(SstDumpArgs),
132    /// trigger a targeted compaction through `compaction_group_id`
133    TriggerManualCompaction {
134        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135        compaction_group_id: u64,
136
137        #[clap(short, long = "table-id", default_value_t = 0)]
138        table_id: u32,
139
140        #[clap(short, long = "level", default_value_t = 1)]
141        level: u32,
142
143        #[clap(short, long = "sst-ids", value_delimiter = ',')]
144        sst_ids: Vec<u64>,
145    },
146    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
147    /// `sst_retention_time_sec`, and with `prefix` in path.
148    TriggerFullGc {
149        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150        sst_retention_time_sec: u64,
151        #[clap(short, long = "prefix", required = false)]
152        prefix: Option<String>,
153    },
154    /// List pinned versions of each worker.
155    ListPinnedVersions {},
156    /// List all compaction groups.
157    ListCompactionGroup,
158    /// Update compaction config for compaction groups.
159    UpdateCompactionConfig {
160        #[clap(long, value_delimiter = ',')]
161        compaction_group_ids: Vec<u64>,
162        #[clap(long)]
163        max_bytes_for_level_base: Option<u64>,
164        #[clap(long)]
165        max_bytes_for_level_multiplier: Option<u64>,
166        #[clap(long)]
167        max_compaction_bytes: Option<u64>,
168        #[clap(long)]
169        sub_level_max_compaction_bytes: Option<u64>,
170        #[clap(long)]
171        level0_tier_compact_file_number: Option<u64>,
172        #[clap(long)]
173        target_file_size_base: Option<u64>,
174        #[clap(long)]
175        compaction_filter_mask: Option<u32>,
176        #[clap(long)]
177        max_sub_compaction: Option<u32>,
178        #[clap(long)]
179        level0_stop_write_threshold_sub_level_number: Option<u64>,
180        #[clap(long)]
181        level0_sub_level_compact_level_count: Option<u32>,
182        #[clap(long)]
183        max_space_reclaim_bytes: Option<u64>,
184        #[clap(long)]
185        level0_max_compact_file_number: Option<u64>,
186        #[clap(long)]
187        level0_overlapping_sub_level_compact_level_count: Option<u32>,
188        #[clap(long)]
189        enable_emergency_picker: Option<bool>,
190        #[clap(long)]
191        tombstone_reclaim_ratio: Option<u32>,
192        #[clap(long)]
193        compression_level: Option<u32>,
194        #[clap(long)]
195        compression_algorithm: Option<String>,
196        #[clap(long)]
197        max_l0_compact_level: Option<u32>,
198        #[clap(long)]
199        sst_allowed_trivial_move_min_size: Option<u64>,
200        #[clap(long)]
201        disable_auto_group_scheduling: Option<bool>,
202        #[clap(long)]
203        max_overlapping_level_size: Option<u64>,
204        #[clap(long)]
205        sst_allowed_trivial_move_max_count: Option<u32>,
206        #[clap(long)]
207        emergency_level0_sst_file_count: Option<u32>,
208        #[clap(long)]
209        emergency_level0_sub_level_partition: Option<u32>,
210        #[clap(long)]
211        level0_stop_write_threshold_max_sst_count: Option<u32>,
212        #[clap(long)]
213        level0_stop_write_threshold_max_size: Option<u64>,
214        #[clap(long)]
215        enable_optimize_l0_interval_selection: Option<bool>,
216    },
217    /// Split given compaction group into two. Moves the given tables to the new group.
218    SplitCompactionGroup {
219        #[clap(long)]
220        compaction_group_id: u64,
221        #[clap(long, value_delimiter = ',')]
222        table_ids: Vec<u32>,
223        #[clap(long, default_value_t = 0)]
224        partition_vnode_count: u32,
225    },
226    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
227    PauseVersionCheckpoint,
228    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
229    ResumeVersionCheckpoint,
230    /// Replay version from the checkpoint one to the latest one.
231    ReplayVersion,
232    /// List compaction status
233    ListCompactionStatus {
234        #[clap(short, long = "verbose", default_value_t = false)]
235        verbose: bool,
236    },
237    GetCompactionScore {
238        #[clap(long)]
239        compaction_group_id: u64,
240    },
241    /// Validate the current `HummockVersion`.
242    ValidateVersion,
243    /// Rebuild table stats
244    RebuildTableStats,
245    CancelCompactTask {
246        #[clap(short, long)]
247        task_id: u64,
248    },
249    PrintUserKeyInArchive {
250        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
251        #[clap(long, value_delimiter = ',')]
252        archive_ids: Vec<u64>,
253        /// The data directory of Hummock storage, where `SSTable` objects can be found.
254        #[clap(long)]
255        data_dir: String,
256        /// KVs that are matched with the user key are printed.
257        #[clap(long)]
258        user_key: String,
259        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
260        use_new_object_prefix_strategy: bool,
261    },
262    PrintVersionDeltaInArchive {
263        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
264        #[clap(long, value_delimiter = ',')]
265        archive_ids: Vec<u64>,
266        /// The data directory of Hummock storage, where `SSTable` objects can be found.
267        #[clap(long)]
268        data_dir: String,
269        /// Version deltas that are related to the SST id are printed.
270        #[clap(long)]
271        sst_id: u64,
272        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
273        use_new_object_prefix_strategy: bool,
274    },
275    TieredCacheTracing {
276        #[clap(long)]
277        enable: bool,
278        #[clap(long)]
279        record_hybrid_insert_threshold_ms: Option<u32>,
280        #[clap(long)]
281        record_hybrid_get_threshold_ms: Option<u32>,
282        #[clap(long)]
283        record_hybrid_obtain_threshold_ms: Option<u32>,
284        #[clap(long)]
285        record_hybrid_remove_threshold_ms: Option<u32>,
286        #[clap(long)]
287        record_hybrid_fetch_threshold_ms: Option<u32>,
288    },
289    MergeCompactionGroup {
290        #[clap(long)]
291        left_group_id: u64,
292        #[clap(long)]
293        right_group_id: u64,
294    },
295    MigrateLegacyObject {
296        url: String,
297        source_dir: String,
298        target_dir: String,
299        #[clap(long, default_value = "100")]
300        concurrency: u32,
301    },
302    ResizeCache {
303        #[clap(long)]
304        meta_cache_capacity_mb: Option<u64>,
305        #[clap(long)]
306        data_cache_capacity_mb: Option<u64>,
307    },
308}
309
310#[derive(Subcommand)]
311enum TableCommands {
312    /// scan a state table with MV name
313    Scan {
314        /// name of the materialized view to operate on
315        mv_name: String,
316        // data directory for hummock state store. None: use default
317        data_dir: Option<String>,
318
319        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
320        use_new_object_prefix_strategy: bool,
321    },
322    /// scan a state table using Id
323    ScanById {
324        /// id of the state table to operate on
325        table_id: u32,
326        // data directory for hummock state store. None: use default
327        data_dir: Option<String>,
328        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
329        use_new_object_prefix_strategy: bool,
330    },
331    /// list all state tables
332    List,
333}
334
335#[derive(Subcommand, Debug)]
336enum ScaleCommands {
337    /// Mark a compute node as unschedulable
338    #[clap(verbatim_doc_comment)]
339    Cordon {
340        /// Workers that need to be cordoned, both id and host are supported.
341        #[clap(
342            long,
343            required = true,
344            value_delimiter = ',',
345            value_name = "id or host,..."
346        )]
347        workers: Vec<String>,
348    },
349    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
350    Uncordon {
351        /// Workers that need to be uncordoned, both id and host are supported.
352        #[clap(
353            long,
354            required = true,
355            value_delimiter = ',',
356            value_name = "id or host,..."
357        )]
358        workers: Vec<String>,
359    },
360}
361
362#[derive(Subcommand)]
363#[allow(clippy::large_enum_variant)]
364enum MetaCommands {
365    /// pause the stream graph
366    Pause,
367    /// resume the stream graph
368    Resume,
369    /// get cluster info
370    ClusterInfo,
371    /// get source split info
372    SourceSplitInfo {
373        #[clap(long)]
374        ignore_id: bool,
375    },
376    /// Reschedule the actors in the stream graph
377    ///
378    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
379    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
380    /// `added` when both are provided.
381    ///
382    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
383    /// ```text
384    /// {
385    ///     100: WorkerReschedule {
386    ///         increased_actor_count: { 1: 1 },
387    ///         decreased_actor_count: { 4: 1 },
388    ///     }
389    /// }
390    /// ```
391    /// Use ; to separate multiple fragment
392    #[clap(verbatim_doc_comment)]
393    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
394    Reschedule {
395        /// Plan of reschedule, needs to be used with `revision`
396        #[clap(long, requires = "revision")]
397        plan: Option<String>,
398        /// Revision of the plan
399        #[clap(long)]
400        revision: Option<u64>,
401        /// Reschedule from a specific file
402        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
403        from: Option<String>,
404        /// Show the plan only, no actual operation
405        #[clap(long, default_value = "false")]
406        dry_run: bool,
407        /// Resolve `NO_SHUFFLE` upstream
408        #[clap(long, default_value = "false")]
409        resolve_no_shuffle: bool,
410    },
411    /// backup meta by taking a meta snapshot
412    BackupMeta {
413        #[clap(long)]
414        remarks: Option<String>,
415    },
416    /// restore meta by recovering from a meta snapshot
417    RestoreMeta {
418        #[command(flatten)]
419        opts: RestoreOpts,
420    },
421    /// delete meta snapshots
422    DeleteMetaSnapshots {
423        #[clap(long, value_delimiter = ',')]
424        snapshot_ids: Vec<u64>,
425    },
426
427    /// List all existing connections in the catalog
428    ListConnections,
429
430    /// List fragment mapping for serving
431    ListServingFragmentMapping,
432
433    /// Unregister workers from the cluster
434    UnregisterWorkers {
435        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
436        #[clap(
437            long,
438            required = true,
439            value_delimiter = ',',
440            value_name = "worker_id or worker_host:worker_port, ..."
441        )]
442        workers: Vec<String>,
443
444        /// Automatic yes to prompts
445        #[clap(short = 'y', long, default_value_t = false)]
446        yes: bool,
447
448        /// The worker not found will be ignored
449        #[clap(long, default_value_t = false)]
450        ignore_not_found: bool,
451
452        /// Checking whether the fragment is occupied by workers
453        #[clap(long, default_value_t = false)]
454        check_fragment_occupied: bool,
455    },
456
457    /// Validate source interface for the cloud team
458    ValidateSource {
459        /// With properties in json format
460        /// If privatelink is used, specify `connection.id` instead of `connection.name`
461        #[clap(long)]
462        props: String,
463    },
464
465    SetCdcTableBackfillParallelism {
466        #[clap(long, required = true)]
467        table_id: u32,
468        #[clap(long, required = true)]
469        parallelism: u32,
470    },
471}
472
473#[derive(Subcommand, Clone, Debug)]
474pub enum AwaitTreeCommands {
475    /// Dump Await Tree
476    Dump {
477        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
478        #[clap(short, long = "actor-traces-format")]
479        actor_traces_format: Option<String>,
480    },
481    /// Analyze Await Tree
482    Analyze {
483        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
484        /// The actor traces format can be either `json` or `text`. The analyze command will
485        /// automatically detect the format.
486        #[clap(long = "path")]
487        path: Option<String>,
488    },
489    /// Transcribe Await Tree From JSON to Text format
490    Transcribe {
491        /// The path to the await tree file to be transcribed
492        #[clap(long = "path")]
493        path: String,
494    },
495}
496
497#[derive(Subcommand, Clone, Debug)]
498enum TestCommands {
499    /// Test if JVM and Java libraries are working
500    Jvm,
501}
502
503#[derive(Subcommand, Clone, Debug)]
504enum ThrottleCommands {
505    Source(ThrottleCommandArgs),
506    Mv(ThrottleCommandArgs),
507}
508
509#[derive(Clone, Debug, Args)]
510pub struct ThrottleCommandArgs {
511    id: u32,
512    rate: Option<u32>,
513}
514
515#[derive(Subcommand, Clone, Debug)]
516pub enum ProfileCommands {
517    /// CPU profile
518    Cpu {
519        /// The time to active profiling for (in seconds)
520        #[clap(short, long = "sleep")]
521        sleep: u64,
522    },
523    /// Heap profile
524    Heap {
525        /// The output directory of the dumped file
526        #[clap(long = "dir")]
527        dir: Option<String>,
528    },
529}
530
531/// Start `risectl` with the given options.
532/// Cancel the operation when the given `shutdown` token triggers.
533/// Log and abort the process if any error occurs.
534///
535/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
536/// in an embedded manner.
537pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
538    let context = CtlContext::default();
539
540    tokio::select! {
541        _ = shutdown.cancelled() => {
542            // Shutdown requested, clean up the context and return.
543            context.try_close().await;
544        }
545
546        result = start_fallible(opts, &context) => {
547            if let Err(e) = result {
548                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
549                std::process::exit(1);
550            }
551        }
552    }
553}
554
555/// Start `risectl` with the given options.
556/// Return `Err` if any error occurs.
557pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
558    let result = start_impl(opts, context).await;
559    context.try_close().await;
560    result
561}
562
563#[expect(
564    clippy::large_stack_frames,
565    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
566              post-layout generator stores only one arm at a time (~13–16 KiB)."
567)]
568async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
569    match opts.command {
570        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
571            cmd_impl::compute::show_config(&host).await?
572        }
573        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
574            cmd_impl::hummock::disable_commit_epoch(context).await?
575        }
576        Commands::Hummock(HummockCommands::ListVersion {
577            verbose,
578            verbose_key_range,
579        }) => {
580            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
581        }
582        Commands::Hummock(HummockCommands::ListVersionDeltas {
583            start_id,
584            num_epochs,
585        }) => {
586            cmd_impl::hummock::list_version_deltas(
587                context,
588                HummockVersionId::new(start_id),
589                num_epochs,
590            )
591            .await?;
592        }
593        Commands::Hummock(HummockCommands::ListKv {
594            epoch,
595            table_id,
596            data_dir,
597            use_new_object_prefix_strategy,
598        }) => {
599            cmd_impl::hummock::list_kv(
600                context,
601                epoch,
602                table_id,
603                data_dir,
604                use_new_object_prefix_strategy,
605            )
606            .await?;
607        }
608        Commands::Hummock(HummockCommands::SstDump(args)) => {
609            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
610        }
611        Commands::Hummock(HummockCommands::TriggerManualCompaction {
612            compaction_group_id,
613            table_id,
614            level,
615            sst_ids,
616        }) => {
617            cmd_impl::hummock::trigger_manual_compaction(
618                context,
619                compaction_group_id,
620                table_id.into(),
621                level,
622                sst_ids,
623            )
624            .await?
625        }
626        Commands::Hummock(HummockCommands::TriggerFullGc {
627            sst_retention_time_sec,
628            prefix,
629        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
630        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
631            list_pinned_versions(context).await?
632        }
633        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
634            cmd_impl::hummock::list_compaction_group(context).await?
635        }
636        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
637            compaction_group_ids,
638            max_bytes_for_level_base,
639            max_bytes_for_level_multiplier,
640            max_compaction_bytes,
641            sub_level_max_compaction_bytes,
642            level0_tier_compact_file_number,
643            target_file_size_base,
644            compaction_filter_mask,
645            max_sub_compaction,
646            level0_stop_write_threshold_sub_level_number,
647            level0_sub_level_compact_level_count,
648            max_space_reclaim_bytes,
649            level0_max_compact_file_number,
650            level0_overlapping_sub_level_compact_level_count,
651            enable_emergency_picker,
652            tombstone_reclaim_ratio,
653            compression_level,
654            compression_algorithm,
655            max_l0_compact_level,
656            sst_allowed_trivial_move_min_size,
657            disable_auto_group_scheduling,
658            max_overlapping_level_size,
659            sst_allowed_trivial_move_max_count,
660            emergency_level0_sst_file_count,
661            emergency_level0_sub_level_partition,
662            level0_stop_write_threshold_max_sst_count,
663            level0_stop_write_threshold_max_size,
664            enable_optimize_l0_interval_selection,
665        }) => {
666            cmd_impl::hummock::update_compaction_config(
667                context,
668                compaction_group_ids,
669                build_compaction_config_vec(
670                    max_bytes_for_level_base,
671                    max_bytes_for_level_multiplier,
672                    max_compaction_bytes,
673                    sub_level_max_compaction_bytes,
674                    level0_tier_compact_file_number,
675                    target_file_size_base,
676                    compaction_filter_mask,
677                    max_sub_compaction,
678                    level0_stop_write_threshold_sub_level_number,
679                    level0_sub_level_compact_level_count,
680                    max_space_reclaim_bytes,
681                    level0_max_compact_file_number,
682                    level0_overlapping_sub_level_compact_level_count,
683                    enable_emergency_picker,
684                    tombstone_reclaim_ratio,
685                    if let Some(level) = compression_level {
686                        assert!(compression_algorithm.is_some());
687                        Some(CompressionAlgorithm {
688                            level,
689                            compression_algorithm: compression_algorithm.unwrap(),
690                        })
691                    } else {
692                        None
693                    },
694                    max_l0_compact_level,
695                    sst_allowed_trivial_move_min_size,
696                    disable_auto_group_scheduling,
697                    max_overlapping_level_size,
698                    sst_allowed_trivial_move_max_count,
699                    emergency_level0_sst_file_count,
700                    emergency_level0_sub_level_partition,
701                    level0_stop_write_threshold_max_sst_count,
702                    level0_stop_write_threshold_max_size,
703                    enable_optimize_l0_interval_selection,
704                ),
705            )
706            .await?
707        }
708        Commands::Hummock(HummockCommands::SplitCompactionGroup {
709            compaction_group_id,
710            table_ids,
711            partition_vnode_count,
712        }) => {
713            cmd_impl::hummock::split_compaction_group(
714                context,
715                compaction_group_id,
716                &table_ids.into_iter().map_into().collect_vec(),
717                partition_vnode_count,
718            )
719            .await?;
720        }
721        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
722            cmd_impl::hummock::pause_version_checkpoint(context).await?;
723        }
724        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
725            cmd_impl::hummock::resume_version_checkpoint(context).await?;
726        }
727        Commands::Hummock(HummockCommands::ReplayVersion) => {
728            cmd_impl::hummock::replay_version(context).await?;
729        }
730        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
731            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
732        }
733        Commands::Hummock(HummockCommands::GetCompactionScore {
734            compaction_group_id,
735        }) => {
736            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
737        }
738        Commands::Hummock(HummockCommands::ValidateVersion) => {
739            cmd_impl::hummock::validate_version(context).await?;
740        }
741        Commands::Hummock(HummockCommands::RebuildTableStats) => {
742            cmd_impl::hummock::rebuild_table_stats(context).await?;
743        }
744        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
745            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
746        }
747        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
748            archive_ids,
749            data_dir,
750            sst_id,
751            use_new_object_prefix_strategy,
752        }) => {
753            cmd_impl::hummock::print_version_delta_in_archive(
754                context,
755                archive_ids.into_iter().map(HummockVersionId::new),
756                data_dir,
757                sst_id,
758                use_new_object_prefix_strategy,
759            )
760            .await?;
761        }
762        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
763            archive_ids,
764            data_dir,
765            user_key,
766            use_new_object_prefix_strategy,
767        }) => {
768            cmd_impl::hummock::print_user_key_in_archive(
769                context,
770                archive_ids.into_iter().map(HummockVersionId::new),
771                data_dir,
772                user_key,
773                use_new_object_prefix_strategy,
774            )
775            .await?;
776        }
777        Commands::Hummock(HummockCommands::TieredCacheTracing {
778            enable,
779            record_hybrid_insert_threshold_ms,
780            record_hybrid_get_threshold_ms,
781            record_hybrid_obtain_threshold_ms,
782            record_hybrid_remove_threshold_ms,
783            record_hybrid_fetch_threshold_ms,
784        }) => {
785            cmd_impl::hummock::tiered_cache_tracing(
786                context,
787                enable,
788                record_hybrid_insert_threshold_ms,
789                record_hybrid_get_threshold_ms,
790                record_hybrid_obtain_threshold_ms,
791                record_hybrid_remove_threshold_ms,
792                record_hybrid_fetch_threshold_ms,
793            )
794            .await?
795        }
796        Commands::Hummock(HummockCommands::MergeCompactionGroup {
797            left_group_id,
798            right_group_id,
799        }) => {
800            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
801                .await?
802        }
803
804        Commands::Hummock(HummockCommands::MigrateLegacyObject {
805            url,
806            source_dir,
807            target_dir,
808            concurrency,
809        }) => {
810            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
811        }
812        Commands::Hummock(HummockCommands::ResizeCache {
813            meta_cache_capacity_mb,
814            data_cache_capacity_mb,
815        }) => {
816            const MIB: u64 = 1024 * 1024;
817            cmd_impl::hummock::resize_cache(
818                context,
819                meta_cache_capacity_mb.map(|v| v * MIB),
820                data_cache_capacity_mb.map(|v| v * MIB),
821            )
822            .await?
823        }
824        Commands::Table(TableCommands::Scan {
825            mv_name,
826            data_dir,
827            use_new_object_prefix_strategy,
828        }) => {
829            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
830                .await?
831        }
832        Commands::Table(TableCommands::ScanById {
833            table_id,
834            data_dir,
835            use_new_object_prefix_strategy,
836        }) => {
837            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
838                .await?
839        }
840        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
841        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
842        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
843        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
844        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
845        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
846            cmd_impl::meta::source_split_info(context, ignore_id).await?
847        }
848        Commands::Meta(MetaCommands::Reschedule {
849            from,
850            dry_run,
851            plan,
852            revision,
853            resolve_no_shuffle,
854        }) => {
855            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
856                .await?
857        }
858        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
859            cmd_impl::meta::backup_meta(context, remarks).await?
860        }
861        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
862            risingwave_meta::backup_restore::restore(opts).await?
863        }
864        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
865            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
866        }
867        Commands::Meta(MetaCommands::ListConnections) => {
868            cmd_impl::meta::list_connections(context).await?
869        }
870        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
871            cmd_impl::meta::list_serving_fragment_mappings(context).await?
872        }
873        Commands::Meta(MetaCommands::UnregisterWorkers {
874            workers,
875            yes,
876            ignore_not_found,
877            check_fragment_occupied,
878        }) => {
879            cmd_impl::meta::unregister_workers(
880                context,
881                workers,
882                yes,
883                ignore_not_found,
884                check_fragment_occupied,
885            )
886            .await?
887        }
888        Commands::Meta(MetaCommands::ValidateSource { props }) => {
889            cmd_impl::meta::validate_source(context, props).await?
890        }
891        Commands::AwaitTree(AwaitTreeCommands::Dump {
892            actor_traces_format,
893        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
894        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
895            cmd_impl::await_tree::bottleneck_detect(context, path).await?
896        }
897        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
898            rw_diagnose_tools::await_tree::transcribe(path)?
899        }
900        Commands::Profile(ProfileCommands::Cpu { sleep }) => {
901            cmd_impl::profile::cpu_profile(context, sleep).await?
902        }
903        Commands::Profile(ProfileCommands::Heap { dir }) => {
904            cmd_impl::profile::heap_profile(context, dir).await?
905        }
906        Commands::Scale(ScaleCommands::Cordon { workers }) => {
907            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
908                .await?
909        }
910        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
911            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
912                .await?
913        }
914        Commands::Throttle(ThrottleCommands::Source(args)) => {
915            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
916        }
917        Commands::Throttle(ThrottleCommands::Mv(args)) => {
918            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
919        }
920        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
921            table_id,
922            parallelism,
923        }) => {
924            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
925        }
926        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
927    }
928    Ok(())
929}