risingwave_ctl/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16#![warn(clippy::large_futures, clippy::large_stack_frames)]
17
18use anyhow::Result;
19use clap::{Args, Parser, Subcommand};
20use cmd_impl::bench::BenchCommands;
21use cmd_impl::hummock::SstDumpArgs;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
33use crate::cmd_impl::throttle::apply_throttle;
34use crate::common::CtlContext;
35
36pub mod cmd_impl;
37pub mod common;
38
39/// risectl provides internal access to the RisingWave cluster. Generally, you will need
40/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
41/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
42/// instead of playground mode to use this tool. risectl will read environment variables
43/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
44#[derive(Parser)]
45#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
46#[clap(propagate_version = true)]
47#[clap(infer_subcommands = true)]
48pub struct CliOpts {
49    #[clap(subcommand)]
50    command: Commands,
51}
52
53#[derive(Subcommand)]
54#[clap(infer_subcommands = true)]
55enum Commands {
56    /// Commands for Compute
57    #[clap(subcommand)]
58    Compute(ComputeCommands),
59    /// Commands for Hummock
60    #[clap(subcommand)]
61    Hummock(HummockCommands),
62    /// Commands for Tables
63    #[clap(subcommand)]
64    Table(TableCommands),
65    /// Commands for Meta
66    #[clap(subcommand)]
67    Meta(MetaCommands),
68    /// Commands for Scaling
69    #[clap(subcommand)]
70    Scale(ScaleCommands),
71    /// Commands for Benchmarks
72    #[clap(subcommand)]
73    Bench(BenchCommands),
74    /// Commands for await-tree, such as dumping, analyzing and transcribing
75    #[clap(subcommand)]
76    #[clap(visible_alias("trace"))]
77    AwaitTree(AwaitTreeCommands),
78    // TODO(yuhao): profile other nodes
79    /// Commands for profilng the compute nodes
80    #[clap(subcommand)]
81    Profile(ProfileCommands),
82    #[clap(subcommand)]
83    Throttle(ThrottleCommands),
84    /// Commands for Self-testing
85    #[clap(subcommand, hide = true)]
86    Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91    /// Show all the configuration parameters on compute node
92    ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98    /// list latest Hummock version on meta node
99    ListVersion {
100        #[clap(short, long = "verbose", default_value_t = false)]
101        verbose: bool,
102
103        #[clap(long = "verbose_key_range", default_value_t = false)]
104        verbose_key_range: bool,
105    },
106
107    /// list hummock version deltas in the meta store
108    ListVersionDeltas {
109        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110        start_id: u64,
111
112        #[clap(short, long = "num-epochs", default_value_t = 100)]
113        num_epochs: u32,
114    },
115    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
116    DisableCommitEpoch,
117    /// list all Hummock key-value pairs
118    ListKv {
119        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120        epoch: u64,
121
122        #[clap(short, long = "table-id")]
123        table_id: u32,
124
125        // data directory for hummock state store. None: use default
126        data_dir: Option<String>,
127
128        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129        use_new_object_prefix_strategy: bool,
130    },
131    SstDump(SstDumpArgs),
132    /// trigger a targeted compaction through `compaction_group_id`
133    TriggerManualCompaction {
134        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135        compaction_group_id: u64,
136
137        #[clap(short, long = "table-id", default_value_t = 0)]
138        table_id: u32,
139
140        #[clap(short, long = "level", default_value_t = 1)]
141        level: u32,
142
143        #[clap(short, long = "sst-ids", value_delimiter = ',')]
144        sst_ids: Vec<u64>,
145    },
146    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
147    /// `sst_retention_time_sec`, and with `prefix` in path.
148    TriggerFullGc {
149        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150        sst_retention_time_sec: u64,
151        #[clap(short, long = "prefix", required = false)]
152        prefix: Option<String>,
153    },
154    /// List pinned versions of each worker.
155    ListPinnedVersions {},
156    /// List all compaction groups.
157    ListCompactionGroup,
158    /// Update compaction config for compaction groups.
159    UpdateCompactionConfig {
160        #[clap(long, value_delimiter = ',')]
161        compaction_group_ids: Vec<u64>,
162        #[clap(long)]
163        max_bytes_for_level_base: Option<u64>,
164        #[clap(long)]
165        max_bytes_for_level_multiplier: Option<u64>,
166        #[clap(long)]
167        max_compaction_bytes: Option<u64>,
168        #[clap(long)]
169        sub_level_max_compaction_bytes: Option<u64>,
170        #[clap(long)]
171        level0_tier_compact_file_number: Option<u64>,
172        #[clap(long)]
173        target_file_size_base: Option<u64>,
174        #[clap(long)]
175        compaction_filter_mask: Option<u32>,
176        #[clap(long)]
177        max_sub_compaction: Option<u32>,
178        #[clap(long)]
179        level0_stop_write_threshold_sub_level_number: Option<u64>,
180        #[clap(long)]
181        level0_sub_level_compact_level_count: Option<u32>,
182        #[clap(long)]
183        max_space_reclaim_bytes: Option<u64>,
184        #[clap(long)]
185        level0_max_compact_file_number: Option<u64>,
186        #[clap(long)]
187        level0_overlapping_sub_level_compact_level_count: Option<u32>,
188        #[clap(long)]
189        enable_emergency_picker: Option<bool>,
190        #[clap(long)]
191        tombstone_reclaim_ratio: Option<u32>,
192        #[clap(long)]
193        compression_level: Option<u32>,
194        #[clap(long)]
195        compression_algorithm: Option<String>,
196        #[clap(long)]
197        max_l0_compact_level: Option<u32>,
198        #[clap(long)]
199        sst_allowed_trivial_move_min_size: Option<u64>,
200        #[clap(long)]
201        disable_auto_group_scheduling: Option<bool>,
202        #[clap(long)]
203        max_overlapping_level_size: Option<u64>,
204        #[clap(long)]
205        sst_allowed_trivial_move_max_count: Option<u32>,
206        #[clap(long)]
207        emergency_level0_sst_file_count: Option<u32>,
208        #[clap(long)]
209        emergency_level0_sub_level_partition: Option<u32>,
210        #[clap(long)]
211        level0_stop_write_threshold_max_sst_count: Option<u32>,
212        #[clap(long)]
213        level0_stop_write_threshold_max_size: Option<u64>,
214        #[clap(long)]
215        enable_optimize_l0_interval_selection: Option<bool>,
216    },
217    /// Split given compaction group into two. Moves the given tables to the new group.
218    SplitCompactionGroup {
219        #[clap(long)]
220        compaction_group_id: u64,
221        #[clap(long, value_delimiter = ',')]
222        table_ids: Vec<u32>,
223        #[clap(long, default_value_t = 0)]
224        partition_vnode_count: u32,
225    },
226    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
227    PauseVersionCheckpoint,
228    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
229    ResumeVersionCheckpoint,
230    /// Replay version from the checkpoint one to the latest one.
231    ReplayVersion,
232    /// List compaction status
233    ListCompactionStatus {
234        #[clap(short, long = "verbose", default_value_t = false)]
235        verbose: bool,
236    },
237    GetCompactionScore {
238        #[clap(long)]
239        compaction_group_id: u64,
240    },
241    /// Validate the current `HummockVersion`.
242    ValidateVersion,
243    /// Rebuild table stats
244    RebuildTableStats,
245    CancelCompactTask {
246        #[clap(short, long)]
247        task_id: u64,
248    },
249    PrintUserKeyInArchive {
250        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
251        #[clap(long, value_delimiter = ',')]
252        archive_ids: Vec<u64>,
253        /// The data directory of Hummock storage, where `SSTable` objects can be found.
254        #[clap(long)]
255        data_dir: String,
256        /// KVs that are matched with the user key are printed.
257        #[clap(long)]
258        user_key: String,
259        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
260        use_new_object_prefix_strategy: bool,
261    },
262    PrintVersionDeltaInArchive {
263        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
264        #[clap(long, value_delimiter = ',')]
265        archive_ids: Vec<u64>,
266        /// The data directory of Hummock storage, where `SSTable` objects can be found.
267        #[clap(long)]
268        data_dir: String,
269        /// Version deltas that are related to the SST id are printed.
270        #[clap(long)]
271        sst_id: u64,
272        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
273        use_new_object_prefix_strategy: bool,
274    },
275    TieredCacheTracing {
276        #[clap(long)]
277        enable: bool,
278        #[clap(long)]
279        record_hybrid_insert_threshold_ms: Option<u32>,
280        #[clap(long)]
281        record_hybrid_get_threshold_ms: Option<u32>,
282        #[clap(long)]
283        record_hybrid_obtain_threshold_ms: Option<u32>,
284        #[clap(long)]
285        record_hybrid_remove_threshold_ms: Option<u32>,
286        #[clap(long)]
287        record_hybrid_fetch_threshold_ms: Option<u32>,
288    },
289    MergeCompactionGroup {
290        #[clap(long)]
291        left_group_id: u64,
292        #[clap(long)]
293        right_group_id: u64,
294    },
295    MigrateLegacyObject {
296        url: String,
297        source_dir: String,
298        target_dir: String,
299        #[clap(long, default_value = "100")]
300        concurrency: u32,
301    },
302    ResizeCache {
303        #[clap(long)]
304        meta_cache_capacity_mb: Option<u64>,
305        #[clap(long)]
306        data_cache_capacity_mb: Option<u64>,
307    },
308}
309
310#[derive(Subcommand)]
311enum TableCommands {
312    /// scan a state table with MV name
313    Scan {
314        /// name of the materialized view to operate on
315        mv_name: String,
316        // data directory for hummock state store. None: use default
317        data_dir: Option<String>,
318
319        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
320        use_new_object_prefix_strategy: bool,
321    },
322    /// scan a state table using Id
323    ScanById {
324        /// id of the state table to operate on
325        table_id: u32,
326        // data directory for hummock state store. None: use default
327        data_dir: Option<String>,
328        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
329        use_new_object_prefix_strategy: bool,
330    },
331    /// list all state tables
332    List,
333}
334
335#[derive(Subcommand, Debug)]
336enum ScaleCommands {
337    /// Mark a compute node as unschedulable
338    #[clap(verbatim_doc_comment)]
339    Cordon {
340        /// Workers that need to be cordoned, both id and host are supported.
341        #[clap(
342            long,
343            required = true,
344            value_delimiter = ',',
345            value_name = "id or host,..."
346        )]
347        workers: Vec<String>,
348    },
349    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
350    Uncordon {
351        /// Workers that need to be uncordoned, both id and host are supported.
352        #[clap(
353            long,
354            required = true,
355            value_delimiter = ',',
356            value_name = "id or host,..."
357        )]
358        workers: Vec<String>,
359    },
360}
361
362#[derive(Subcommand)]
363#[allow(clippy::large_enum_variant)]
364enum MetaCommands {
365    /// pause the stream graph
366    Pause,
367    /// resume the stream graph
368    Resume,
369    /// get cluster info
370    ClusterInfo,
371    /// get source split info
372    SourceSplitInfo {
373        #[clap(long)]
374        ignore_id: bool,
375    },
376    /// Reschedule the actors in the stream graph
377    ///
378    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
379    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
380    /// `added` when both are provided.
381    ///
382    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
383    /// ```text
384    /// {
385    ///     100: WorkerReschedule {
386    ///         increased_actor_count: { 1: 1 },
387    ///         decreased_actor_count: { 4: 1 },
388    ///     }
389    /// }
390    /// ```
391    /// Use ; to separate multiple fragment
392    #[clap(verbatim_doc_comment)]
393    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
394    Reschedule {
395        /// Plan of reschedule, needs to be used with `revision`
396        #[clap(long, requires = "revision")]
397        plan: Option<String>,
398        /// Revision of the plan
399        #[clap(long)]
400        revision: Option<u64>,
401        /// Reschedule from a specific file
402        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
403        from: Option<String>,
404        /// Show the plan only, no actual operation
405        #[clap(long, default_value = "false")]
406        dry_run: bool,
407        /// Resolve `NO_SHUFFLE` upstream
408        #[clap(long, default_value = "false")]
409        resolve_no_shuffle: bool,
410    },
411    /// backup meta by taking a meta snapshot
412    BackupMeta {
413        #[clap(long)]
414        remarks: Option<String>,
415    },
416    /// restore meta by recovering from a meta snapshot
417    RestoreMeta {
418        #[command(flatten)]
419        opts: RestoreOpts,
420    },
421    /// delete meta snapshots
422    DeleteMetaSnapshots {
423        #[clap(long, value_delimiter = ',')]
424        snapshot_ids: Vec<u64>,
425    },
426
427    /// List all existing connections in the catalog
428    ListConnections,
429
430    /// List fragment mapping for serving
431    ListServingFragmentMapping,
432
433    /// Unregister workers from the cluster
434    UnregisterWorkers {
435        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
436        #[clap(
437            long,
438            required = true,
439            value_delimiter = ',',
440            value_name = "worker_id or worker_host:worker_port, ..."
441        )]
442        workers: Vec<String>,
443
444        /// Automatic yes to prompts
445        #[clap(short = 'y', long, default_value_t = false)]
446        yes: bool,
447
448        /// The worker not found will be ignored
449        #[clap(long, default_value_t = false)]
450        ignore_not_found: bool,
451
452        /// Checking whether the fragment is occupied by workers
453        #[clap(long, default_value_t = false)]
454        check_fragment_occupied: bool,
455    },
456
457    /// Validate source interface for the cloud team
458    ValidateSource {
459        /// With properties in json format
460        /// If privatelink is used, specify `connection.id` instead of `connection.name`
461        #[clap(long)]
462        props: String,
463    },
464
465    /// Performing graph check for scaling.
466    #[clap(verbatim_doc_comment)]
467    GraphCheck {
468        /// SQL endpoint
469        #[clap(long, required = true)]
470        endpoint: String,
471    },
472
473    SetCdcTableBackfillParallelism {
474        #[clap(long, required = true)]
475        table_id: u32,
476        #[clap(long, required = true)]
477        parallelism: u32,
478    },
479}
480
481#[derive(Subcommand, Clone, Debug)]
482pub enum AwaitTreeCommands {
483    /// Dump Await Tree
484    Dump {
485        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
486        #[clap(short, long = "actor-traces-format")]
487        actor_traces_format: Option<String>,
488    },
489    /// Analyze Await Tree
490    Analyze {
491        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
492        /// The actor traces format can be either `json` or `text`. The analyze command will
493        /// automatically detect the format.
494        #[clap(long = "path")]
495        path: Option<String>,
496    },
497    /// Transcribe Await Tree From JSON to Text format
498    Transcribe {
499        /// The path to the await tree file to be transcribed
500        #[clap(long = "path")]
501        path: String,
502    },
503}
504
505#[derive(Subcommand, Clone, Debug)]
506enum TestCommands {
507    /// Test if JVM and Java libraries are working
508    Jvm,
509}
510
511#[derive(Subcommand, Clone, Debug)]
512enum ThrottleCommands {
513    Source(ThrottleCommandArgs),
514    Mv(ThrottleCommandArgs),
515}
516
517#[derive(Clone, Debug, Args)]
518pub struct ThrottleCommandArgs {
519    id: u32,
520    rate: Option<u32>,
521}
522
523#[derive(Subcommand, Clone, Debug)]
524pub enum ProfileCommands {
525    /// CPU profile
526    Cpu {
527        /// The time to active profiling for (in seconds)
528        #[clap(short, long = "sleep")]
529        sleep: u64,
530    },
531    /// Heap profile
532    Heap {
533        /// The output directory of the dumped file
534        #[clap(long = "dir")]
535        dir: Option<String>,
536    },
537}
538
539/// Start `risectl` with the given options.
540/// Cancel the operation when the given `shutdown` token triggers.
541/// Log and abort the process if any error occurs.
542///
543/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
544/// in an embedded manner.
545pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
546    let context = CtlContext::default();
547
548    tokio::select! {
549        _ = shutdown.cancelled() => {
550            // Shutdown requested, clean up the context and return.
551            context.try_close().await;
552        }
553
554        result = start_fallible(opts, &context) => {
555            if let Err(e) = result {
556                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
557                std::process::exit(1);
558            }
559        }
560    }
561}
562
563/// Start `risectl` with the given options.
564/// Return `Err` if any error occurs.
565pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
566    let result = start_impl(opts, context).await;
567    context.try_close().await;
568    result
569}
570
571#[expect(
572    clippy::large_stack_frames,
573    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
574              post-layout generator stores only one arm at a time (~13–16 KiB)."
575)]
576async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
577    match opts.command {
578        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
579            cmd_impl::compute::show_config(&host).await?
580        }
581        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
582            cmd_impl::hummock::disable_commit_epoch(context).await?
583        }
584        Commands::Hummock(HummockCommands::ListVersion {
585            verbose,
586            verbose_key_range,
587        }) => {
588            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
589        }
590        Commands::Hummock(HummockCommands::ListVersionDeltas {
591            start_id,
592            num_epochs,
593        }) => {
594            cmd_impl::hummock::list_version_deltas(
595                context,
596                HummockVersionId::new(start_id),
597                num_epochs,
598            )
599            .await?;
600        }
601        Commands::Hummock(HummockCommands::ListKv {
602            epoch,
603            table_id,
604            data_dir,
605            use_new_object_prefix_strategy,
606        }) => {
607            cmd_impl::hummock::list_kv(
608                context,
609                epoch,
610                table_id,
611                data_dir,
612                use_new_object_prefix_strategy,
613            )
614            .await?;
615        }
616        Commands::Hummock(HummockCommands::SstDump(args)) => {
617            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
618        }
619        Commands::Hummock(HummockCommands::TriggerManualCompaction {
620            compaction_group_id,
621            table_id,
622            level,
623            sst_ids,
624        }) => {
625            cmd_impl::hummock::trigger_manual_compaction(
626                context,
627                compaction_group_id,
628                table_id,
629                level,
630                sst_ids,
631            )
632            .await?
633        }
634        Commands::Hummock(HummockCommands::TriggerFullGc {
635            sst_retention_time_sec,
636            prefix,
637        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
638        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
639            list_pinned_versions(context).await?
640        }
641        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
642            cmd_impl::hummock::list_compaction_group(context).await?
643        }
644        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
645            compaction_group_ids,
646            max_bytes_for_level_base,
647            max_bytes_for_level_multiplier,
648            max_compaction_bytes,
649            sub_level_max_compaction_bytes,
650            level0_tier_compact_file_number,
651            target_file_size_base,
652            compaction_filter_mask,
653            max_sub_compaction,
654            level0_stop_write_threshold_sub_level_number,
655            level0_sub_level_compact_level_count,
656            max_space_reclaim_bytes,
657            level0_max_compact_file_number,
658            level0_overlapping_sub_level_compact_level_count,
659            enable_emergency_picker,
660            tombstone_reclaim_ratio,
661            compression_level,
662            compression_algorithm,
663            max_l0_compact_level,
664            sst_allowed_trivial_move_min_size,
665            disable_auto_group_scheduling,
666            max_overlapping_level_size,
667            sst_allowed_trivial_move_max_count,
668            emergency_level0_sst_file_count,
669            emergency_level0_sub_level_partition,
670            level0_stop_write_threshold_max_sst_count,
671            level0_stop_write_threshold_max_size,
672            enable_optimize_l0_interval_selection,
673        }) => {
674            cmd_impl::hummock::update_compaction_config(
675                context,
676                compaction_group_ids,
677                build_compaction_config_vec(
678                    max_bytes_for_level_base,
679                    max_bytes_for_level_multiplier,
680                    max_compaction_bytes,
681                    sub_level_max_compaction_bytes,
682                    level0_tier_compact_file_number,
683                    target_file_size_base,
684                    compaction_filter_mask,
685                    max_sub_compaction,
686                    level0_stop_write_threshold_sub_level_number,
687                    level0_sub_level_compact_level_count,
688                    max_space_reclaim_bytes,
689                    level0_max_compact_file_number,
690                    level0_overlapping_sub_level_compact_level_count,
691                    enable_emergency_picker,
692                    tombstone_reclaim_ratio,
693                    if let Some(level) = compression_level {
694                        assert!(compression_algorithm.is_some());
695                        Some(CompressionAlgorithm {
696                            level,
697                            compression_algorithm: compression_algorithm.unwrap(),
698                        })
699                    } else {
700                        None
701                    },
702                    max_l0_compact_level,
703                    sst_allowed_trivial_move_min_size,
704                    disable_auto_group_scheduling,
705                    max_overlapping_level_size,
706                    sst_allowed_trivial_move_max_count,
707                    emergency_level0_sst_file_count,
708                    emergency_level0_sub_level_partition,
709                    level0_stop_write_threshold_max_sst_count,
710                    level0_stop_write_threshold_max_size,
711                    enable_optimize_l0_interval_selection,
712                ),
713            )
714            .await?
715        }
716        Commands::Hummock(HummockCommands::SplitCompactionGroup {
717            compaction_group_id,
718            table_ids,
719            partition_vnode_count,
720        }) => {
721            cmd_impl::hummock::split_compaction_group(
722                context,
723                compaction_group_id,
724                &table_ids,
725                partition_vnode_count,
726            )
727            .await?;
728        }
729        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
730            cmd_impl::hummock::pause_version_checkpoint(context).await?;
731        }
732        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
733            cmd_impl::hummock::resume_version_checkpoint(context).await?;
734        }
735        Commands::Hummock(HummockCommands::ReplayVersion) => {
736            cmd_impl::hummock::replay_version(context).await?;
737        }
738        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
739            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
740        }
741        Commands::Hummock(HummockCommands::GetCompactionScore {
742            compaction_group_id,
743        }) => {
744            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
745        }
746        Commands::Hummock(HummockCommands::ValidateVersion) => {
747            cmd_impl::hummock::validate_version(context).await?;
748        }
749        Commands::Hummock(HummockCommands::RebuildTableStats) => {
750            cmd_impl::hummock::rebuild_table_stats(context).await?;
751        }
752        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
753            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
754        }
755        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
756            archive_ids,
757            data_dir,
758            sst_id,
759            use_new_object_prefix_strategy,
760        }) => {
761            cmd_impl::hummock::print_version_delta_in_archive(
762                context,
763                archive_ids.into_iter().map(HummockVersionId::new),
764                data_dir,
765                sst_id,
766                use_new_object_prefix_strategy,
767            )
768            .await?;
769        }
770        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
771            archive_ids,
772            data_dir,
773            user_key,
774            use_new_object_prefix_strategy,
775        }) => {
776            cmd_impl::hummock::print_user_key_in_archive(
777                context,
778                archive_ids.into_iter().map(HummockVersionId::new),
779                data_dir,
780                user_key,
781                use_new_object_prefix_strategy,
782            )
783            .await?;
784        }
785        Commands::Hummock(HummockCommands::TieredCacheTracing {
786            enable,
787            record_hybrid_insert_threshold_ms,
788            record_hybrid_get_threshold_ms,
789            record_hybrid_obtain_threshold_ms,
790            record_hybrid_remove_threshold_ms,
791            record_hybrid_fetch_threshold_ms,
792        }) => {
793            cmd_impl::hummock::tiered_cache_tracing(
794                context,
795                enable,
796                record_hybrid_insert_threshold_ms,
797                record_hybrid_get_threshold_ms,
798                record_hybrid_obtain_threshold_ms,
799                record_hybrid_remove_threshold_ms,
800                record_hybrid_fetch_threshold_ms,
801            )
802            .await?
803        }
804        Commands::Hummock(HummockCommands::MergeCompactionGroup {
805            left_group_id,
806            right_group_id,
807        }) => {
808            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
809                .await?
810        }
811
812        Commands::Hummock(HummockCommands::MigrateLegacyObject {
813            url,
814            source_dir,
815            target_dir,
816            concurrency,
817        }) => {
818            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
819        }
820        Commands::Hummock(HummockCommands::ResizeCache {
821            meta_cache_capacity_mb,
822            data_cache_capacity_mb,
823        }) => {
824            const MIB: u64 = 1024 * 1024;
825            cmd_impl::hummock::resize_cache(
826                context,
827                meta_cache_capacity_mb.map(|v| v * MIB),
828                data_cache_capacity_mb.map(|v| v * MIB),
829            )
830            .await?
831        }
832        Commands::Table(TableCommands::Scan {
833            mv_name,
834            data_dir,
835            use_new_object_prefix_strategy,
836        }) => {
837            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
838                .await?
839        }
840        Commands::Table(TableCommands::ScanById {
841            table_id,
842            data_dir,
843            use_new_object_prefix_strategy,
844        }) => {
845            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
846                .await?
847        }
848        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
849        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
850        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
851        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
852        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
853        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
854            cmd_impl::meta::source_split_info(context, ignore_id).await?
855        }
856        Commands::Meta(MetaCommands::Reschedule {
857            from,
858            dry_run,
859            plan,
860            revision,
861            resolve_no_shuffle,
862        }) => {
863            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
864                .await?
865        }
866        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
867            cmd_impl::meta::backup_meta(context, remarks).await?
868        }
869        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
870            risingwave_meta::backup_restore::restore(opts).await?
871        }
872        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
873            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
874        }
875        Commands::Meta(MetaCommands::ListConnections) => {
876            cmd_impl::meta::list_connections(context).await?
877        }
878        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
879            cmd_impl::meta::list_serving_fragment_mappings(context).await?
880        }
881        Commands::Meta(MetaCommands::UnregisterWorkers {
882            workers,
883            yes,
884            ignore_not_found,
885            check_fragment_occupied,
886        }) => {
887            cmd_impl::meta::unregister_workers(
888                context,
889                workers,
890                yes,
891                ignore_not_found,
892                check_fragment_occupied,
893            )
894            .await?
895        }
896        Commands::Meta(MetaCommands::ValidateSource { props }) => {
897            cmd_impl::meta::validate_source(context, props).await?
898        }
899        Commands::Meta(MetaCommands::GraphCheck { endpoint }) => {
900            cmd_impl::meta::graph_check(endpoint).await?
901        }
902        Commands::AwaitTree(AwaitTreeCommands::Dump {
903            actor_traces_format,
904        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
905        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
906            cmd_impl::await_tree::bottleneck_detect(context, path).await?
907        }
908        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
909            rw_diagnose_tools::await_tree::transcribe(path)?
910        }
911        Commands::Profile(ProfileCommands::Cpu { sleep }) => {
912            cmd_impl::profile::cpu_profile(context, sleep).await?
913        }
914        Commands::Profile(ProfileCommands::Heap { dir }) => {
915            cmd_impl::profile::heap_profile(context, dir).await?
916        }
917        Commands::Scale(ScaleCommands::Cordon { workers }) => {
918            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
919                .await?
920        }
921        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
922            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
923                .await?
924        }
925        Commands::Throttle(ThrottleCommands::Source(args)) => {
926            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
927        }
928        Commands::Throttle(ThrottleCommands::Mv(args)) => {
929            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
930        }
931        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
932            table_id,
933            parallelism,
934        }) => {
935            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
936        }
937        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
938    }
939    Ok(())
940}