risingwave_ctl/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
33use crate::cmd_impl::throttle::apply_throttle;
34use crate::common::CtlContext;
35
36pub mod cmd_impl;
37pub mod common;
38
39/// risectl provides internal access to the RisingWave cluster. Generally, you will need
40/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
41/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
42/// instead of playground mode to use this tool. risectl will read environment variables
43/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
44#[derive(Parser)]
45#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
46#[clap(propagate_version = true)]
47#[clap(infer_subcommands = true)]
48pub struct CliOpts {
49    #[clap(subcommand)]
50    command: Commands,
51}
52
53#[derive(Subcommand)]
54#[clap(infer_subcommands = true)]
55enum Commands {
56    /// Commands for Compute
57    #[clap(subcommand)]
58    Compute(ComputeCommands),
59    /// Commands for Hummock
60    #[clap(subcommand)]
61    Hummock(HummockCommands),
62    /// Commands for Tables
63    #[clap(subcommand)]
64    Table(TableCommands),
65    /// Commands for Meta
66    #[clap(subcommand)]
67    Meta(MetaCommands),
68    /// Commands for Scaling
69    #[clap(subcommand)]
70    Scale(ScaleCommands),
71    /// Commands for Benchmarks
72    #[clap(subcommand)]
73    Bench(BenchCommands),
74    /// Commands for await-tree, such as dumping, analyzing and transcribing
75    #[clap(subcommand)]
76    #[clap(visible_alias("trace"))]
77    AwaitTree(AwaitTreeCommands),
78    // TODO(yuhao): profile other nodes
79    /// Commands for profilng the compute nodes
80    #[clap(subcommand)]
81    Profile(ProfileCommands),
82    #[clap(subcommand)]
83    Throttle(ThrottleCommands),
84    /// Commands for Self-testing
85    #[clap(subcommand, hide = true)]
86    Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91    /// Show all the configuration parameters on compute node
92    ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98    /// list latest Hummock version on meta node
99    ListVersion {
100        #[clap(short, long = "verbose", default_value_t = false)]
101        verbose: bool,
102
103        #[clap(long = "verbose_key_range", default_value_t = false)]
104        verbose_key_range: bool,
105    },
106
107    /// list hummock version deltas in the meta store
108    ListVersionDeltas {
109        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110        start_id: u64,
111
112        #[clap(short, long = "num-epochs", default_value_t = 100)]
113        num_epochs: u32,
114    },
115    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
116    DisableCommitEpoch,
117    /// list all Hummock key-value pairs
118    ListKv {
119        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120        epoch: u64,
121
122        #[clap(short, long = "table-id")]
123        table_id: u32,
124
125        // data directory for hummock state store. None: use default
126        data_dir: Option<String>,
127
128        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129        use_new_object_prefix_strategy: bool,
130    },
131    SstDump(SstDumpArgs),
132    /// trigger a targeted compaction through `compaction_group_id`
133    TriggerManualCompaction {
134        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135        compaction_group_id: u64,
136
137        #[clap(short, long = "table-id", default_value_t = 0)]
138        table_id: u32,
139
140        #[clap(short, long = "level", default_value_t = 1)]
141        level: u32,
142
143        #[clap(short, long = "sst-ids", value_delimiter = ',')]
144        sst_ids: Vec<u64>,
145    },
146    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
147    /// `sst_retention_time_sec`, and with `prefix` in path.
148    TriggerFullGc {
149        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150        sst_retention_time_sec: u64,
151        #[clap(short, long = "prefix", required = false)]
152        prefix: Option<String>,
153    },
154    /// List pinned versions of each worker.
155    ListPinnedVersions {},
156    /// List all compaction groups.
157    ListCompactionGroup,
158    /// Update compaction config for compaction groups.
159    UpdateCompactionConfig {
160        #[clap(long, value_delimiter = ',')]
161        compaction_group_ids: Vec<u64>,
162        #[clap(long)]
163        max_bytes_for_level_base: Option<u64>,
164        #[clap(long)]
165        max_bytes_for_level_multiplier: Option<u64>,
166        #[clap(long)]
167        max_compaction_bytes: Option<u64>,
168        #[clap(long)]
169        sub_level_max_compaction_bytes: Option<u64>,
170        #[clap(long)]
171        level0_tier_compact_file_number: Option<u64>,
172        #[clap(long)]
173        target_file_size_base: Option<u64>,
174        #[clap(long)]
175        compaction_filter_mask: Option<u32>,
176        #[clap(long)]
177        max_sub_compaction: Option<u32>,
178        #[clap(long)]
179        level0_stop_write_threshold_sub_level_number: Option<u64>,
180        #[clap(long)]
181        level0_sub_level_compact_level_count: Option<u32>,
182        #[clap(long)]
183        max_space_reclaim_bytes: Option<u64>,
184        #[clap(long)]
185        level0_max_compact_file_number: Option<u64>,
186        #[clap(long)]
187        level0_overlapping_sub_level_compact_level_count: Option<u32>,
188        #[clap(long)]
189        enable_emergency_picker: Option<bool>,
190        #[clap(long)]
191        tombstone_reclaim_ratio: Option<u32>,
192        #[clap(long)]
193        compression_level: Option<u32>,
194        #[clap(long)]
195        compression_algorithm: Option<String>,
196        #[clap(long)]
197        max_l0_compact_level: Option<u32>,
198        #[clap(long)]
199        sst_allowed_trivial_move_min_size: Option<u64>,
200        #[clap(long)]
201        disable_auto_group_scheduling: Option<bool>,
202        #[clap(long)]
203        max_overlapping_level_size: Option<u64>,
204        #[clap(long)]
205        sst_allowed_trivial_move_max_count: Option<u32>,
206        #[clap(long)]
207        emergency_level0_sst_file_count: Option<u32>,
208        #[clap(long)]
209        emergency_level0_sub_level_partition: Option<u32>,
210        #[clap(long)]
211        level0_stop_write_threshold_max_sst_count: Option<u32>,
212        #[clap(long)]
213        level0_stop_write_threshold_max_size: Option<u64>,
214        #[clap(long)]
215        enable_optimize_l0_interval_selection: Option<bool>,
216        #[clap(long)]
217        vnode_aligned_level_size_threshold: Option<u64>,
218    },
219    /// Split given compaction group into two. Moves the given tables to the new group.
220    SplitCompactionGroup {
221        #[clap(long)]
222        compaction_group_id: u64,
223        #[clap(long, value_delimiter = ',')]
224        table_ids: Vec<u32>,
225        #[clap(long, default_value_t = 0)]
226        partition_vnode_count: u32,
227    },
228    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
229    PauseVersionCheckpoint,
230    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
231    ResumeVersionCheckpoint,
232    /// Replay version from the checkpoint one to the latest one.
233    ReplayVersion,
234    /// List compaction status
235    ListCompactionStatus {
236        #[clap(short, long = "verbose", default_value_t = false)]
237        verbose: bool,
238    },
239    GetCompactionScore {
240        #[clap(long)]
241        compaction_group_id: u64,
242    },
243    /// Validate the current `HummockVersion`.
244    ValidateVersion,
245    /// Rebuild table stats
246    RebuildTableStats,
247    CancelCompactTask {
248        #[clap(short, long)]
249        task_id: u64,
250    },
251    PrintUserKeyInArchive {
252        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
253        #[clap(long, value_delimiter = ',')]
254        archive_ids: Vec<u64>,
255        /// The data directory of Hummock storage, where `SSTable` objects can be found.
256        #[clap(long)]
257        data_dir: String,
258        /// KVs that are matched with the user key are printed.
259        #[clap(long)]
260        user_key: String,
261        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
262        use_new_object_prefix_strategy: bool,
263    },
264    PrintVersionDeltaInArchive {
265        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
266        #[clap(long, value_delimiter = ',')]
267        archive_ids: Vec<u64>,
268        /// The data directory of Hummock storage, where `SSTable` objects can be found.
269        #[clap(long)]
270        data_dir: String,
271        /// Version deltas that are related to the SST id are printed.
272        #[clap(long)]
273        sst_id: u64,
274        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
275        use_new_object_prefix_strategy: bool,
276    },
277    TieredCacheTracing {
278        #[clap(long)]
279        enable: bool,
280        #[clap(long)]
281        record_hybrid_insert_threshold_ms: Option<u32>,
282        #[clap(long)]
283        record_hybrid_get_threshold_ms: Option<u32>,
284        #[clap(long)]
285        record_hybrid_obtain_threshold_ms: Option<u32>,
286        #[clap(long)]
287        record_hybrid_remove_threshold_ms: Option<u32>,
288        #[clap(long)]
289        record_hybrid_fetch_threshold_ms: Option<u32>,
290    },
291    MergeCompactionGroup {
292        #[clap(long)]
293        left_group_id: u64,
294        #[clap(long)]
295        right_group_id: u64,
296    },
297    MigrateLegacyObject {
298        url: String,
299        source_dir: String,
300        target_dir: String,
301        #[clap(long, default_value = "100")]
302        concurrency: u32,
303    },
304    ResizeCache {
305        #[clap(long)]
306        meta_cache_capacity_mb: Option<u64>,
307        #[clap(long)]
308        data_cache_capacity_mb: Option<u64>,
309    },
310}
311
312#[derive(Subcommand)]
313enum TableCommands {
314    /// scan a state table with MV name
315    Scan {
316        /// name of the materialized view to operate on
317        mv_name: String,
318        // data directory for hummock state store. None: use default
319        data_dir: Option<String>,
320
321        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
322        use_new_object_prefix_strategy: bool,
323    },
324    /// scan a state table using Id
325    ScanById {
326        /// id of the state table to operate on
327        table_id: u32,
328        // data directory for hummock state store. None: use default
329        data_dir: Option<String>,
330        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
331        use_new_object_prefix_strategy: bool,
332    },
333    /// list all state tables
334    List,
335}
336
337#[derive(Subcommand, Debug)]
338enum ScaleCommands {
339    /// Mark a compute node as unschedulable
340    #[clap(verbatim_doc_comment)]
341    Cordon {
342        /// Workers that need to be cordoned, both id and host are supported.
343        #[clap(
344            long,
345            required = true,
346            value_delimiter = ',',
347            value_name = "id or host,..."
348        )]
349        workers: Vec<String>,
350    },
351    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
352    Uncordon {
353        /// Workers that need to be uncordoned, both id and host are supported.
354        #[clap(
355            long,
356            required = true,
357            value_delimiter = ',',
358            value_name = "id or host,..."
359        )]
360        workers: Vec<String>,
361    },
362}
363
364#[derive(Subcommand)]
365#[allow(clippy::large_enum_variant)]
366enum MetaCommands {
367    /// pause the stream graph
368    Pause,
369    /// resume the stream graph
370    Resume,
371    /// get cluster info
372    ClusterInfo,
373    /// get source split info
374    SourceSplitInfo {
375        #[clap(long)]
376        ignore_id: bool,
377    },
378    /// Reschedule the actors in the stream graph
379    ///
380    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
381    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
382    /// `added` when both are provided.
383    ///
384    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
385    /// ```text
386    /// {
387    ///     100: WorkerReschedule {
388    ///         increased_actor_count: { 1: 1 },
389    ///         decreased_actor_count: { 4: 1 },
390    ///     }
391    /// }
392    /// ```
393    /// Use ; to separate multiple fragment
394    #[clap(verbatim_doc_comment)]
395    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
396    Reschedule {
397        /// Plan of reschedule, needs to be used with `revision`
398        #[clap(long, requires = "revision")]
399        plan: Option<String>,
400        /// Revision of the plan
401        #[clap(long)]
402        revision: Option<u64>,
403        /// Reschedule from a specific file
404        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
405        from: Option<String>,
406        /// Show the plan only, no actual operation
407        #[clap(long, default_value = "false")]
408        dry_run: bool,
409        /// Resolve `NO_SHUFFLE` upstream
410        #[clap(long, default_value = "false")]
411        resolve_no_shuffle: bool,
412    },
413    /// backup meta by taking a meta snapshot
414    BackupMeta {
415        #[clap(long)]
416        remarks: Option<String>,
417    },
418    /// restore meta by recovering from a meta snapshot
419    RestoreMeta {
420        #[command(flatten)]
421        opts: RestoreOpts,
422    },
423    /// delete meta snapshots
424    DeleteMetaSnapshots {
425        #[clap(long, value_delimiter = ',')]
426        snapshot_ids: Vec<u64>,
427    },
428
429    /// List all existing connections in the catalog
430    ListConnections,
431
432    /// List fragment mapping for serving
433    ListServingFragmentMapping,
434
435    /// Unregister workers from the cluster
436    UnregisterWorkers {
437        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
438        #[clap(
439            long,
440            required = true,
441            value_delimiter = ',',
442            value_name = "worker_id or worker_host:worker_port, ..."
443        )]
444        workers: Vec<String>,
445
446        /// Automatic yes to prompts
447        #[clap(short = 'y', long, default_value_t = false)]
448        yes: bool,
449
450        /// The worker not found will be ignored
451        #[clap(long, default_value_t = false)]
452        ignore_not_found: bool,
453
454        /// Checking whether the fragment is occupied by workers
455        #[clap(long, default_value_t = false)]
456        check_fragment_occupied: bool,
457    },
458
459    /// Validate source interface for the cloud team
460    ValidateSource {
461        /// With properties in json format
462        /// If privatelink is used, specify `connection.id` instead of `connection.name`
463        #[clap(long)]
464        props: String,
465    },
466
467    SetCdcTableBackfillParallelism {
468        #[clap(long, required = true)]
469        table_id: u32,
470        #[clap(long, required = true)]
471        parallelism: u32,
472    },
473}
474
475#[derive(Subcommand, Clone, Debug)]
476pub enum AwaitTreeCommands {
477    /// Dump Await Tree
478    Dump {
479        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
480        #[clap(short, long = "actor-traces-format")]
481        actor_traces_format: Option<String>,
482    },
483    /// Analyze Await Tree
484    Analyze {
485        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
486        /// The actor traces format can be either `json` or `text`. The analyze command will
487        /// automatically detect the format.
488        #[clap(long = "path")]
489        path: Option<String>,
490    },
491    /// Transcribe Await Tree From JSON to Text format
492    Transcribe {
493        /// The path to the await tree file to be transcribed
494        #[clap(long = "path")]
495        path: String,
496    },
497}
498
499#[derive(Subcommand, Clone, Debug)]
500enum TestCommands {
501    /// Test if JVM and Java libraries are working
502    Jvm,
503}
504
505#[derive(Subcommand, Clone, Debug)]
506enum ThrottleCommands {
507    Source(ThrottleCommandArgs),
508    Mv(ThrottleCommandArgs),
509}
510
511#[derive(Clone, Debug, Args)]
512pub struct ThrottleCommandArgs {
513    id: u32,
514    rate: Option<u32>,
515}
516
517#[derive(Subcommand, Clone, Debug)]
518pub enum ProfileCommands {
519    /// CPU profile
520    Cpu {
521        /// The time to active profiling for (in seconds)
522        #[clap(short, long = "sleep")]
523        sleep: u64,
524    },
525    /// Heap profile
526    Heap {
527        /// The output directory of the dumped file
528        #[clap(long = "dir")]
529        dir: Option<String>,
530    },
531}
532
533/// Start `risectl` with the given options.
534/// Cancel the operation when the given `shutdown` token triggers.
535/// Log and abort the process if any error occurs.
536///
537/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
538/// in an embedded manner.
539pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
540    let context = CtlContext::default();
541
542    tokio::select! {
543        _ = shutdown.cancelled() => {
544            // Shutdown requested, clean up the context and return.
545            context.try_close().await;
546        }
547
548        result = start_fallible(opts, &context) => {
549            if let Err(e) = result {
550                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
551                std::process::exit(1);
552            }
553        }
554    }
555}
556
557/// Start `risectl` with the given options.
558/// Return `Err` if any error occurs.
559pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
560    let result = start_impl(opts, context).await;
561    context.try_close().await;
562    result
563}
564
565#[expect(
566    clippy::large_stack_frames,
567    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
568              post-layout generator stores only one arm at a time (~13–16 KiB)."
569)]
570async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
571    match opts.command {
572        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
573            cmd_impl::compute::show_config(&host).await?
574        }
575        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
576            cmd_impl::hummock::disable_commit_epoch(context).await?
577        }
578        Commands::Hummock(HummockCommands::ListVersion {
579            verbose,
580            verbose_key_range,
581        }) => {
582            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
583        }
584        Commands::Hummock(HummockCommands::ListVersionDeltas {
585            start_id,
586            num_epochs,
587        }) => {
588            cmd_impl::hummock::list_version_deltas(
589                context,
590                HummockVersionId::new(start_id),
591                num_epochs,
592            )
593            .await?;
594        }
595        Commands::Hummock(HummockCommands::ListKv {
596            epoch,
597            table_id,
598            data_dir,
599            use_new_object_prefix_strategy,
600        }) => {
601            cmd_impl::hummock::list_kv(
602                context,
603                epoch,
604                table_id,
605                data_dir,
606                use_new_object_prefix_strategy,
607            )
608            .await?;
609        }
610        Commands::Hummock(HummockCommands::SstDump(args)) => {
611            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
612        }
613        Commands::Hummock(HummockCommands::TriggerManualCompaction {
614            compaction_group_id,
615            table_id,
616            level,
617            sst_ids,
618        }) => {
619            cmd_impl::hummock::trigger_manual_compaction(
620                context,
621                compaction_group_id,
622                table_id.into(),
623                level,
624                sst_ids,
625            )
626            .await?
627        }
628        Commands::Hummock(HummockCommands::TriggerFullGc {
629            sst_retention_time_sec,
630            prefix,
631        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
632        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
633            list_pinned_versions(context).await?
634        }
635        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
636            cmd_impl::hummock::list_compaction_group(context).await?
637        }
638        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
639            compaction_group_ids,
640            max_bytes_for_level_base,
641            max_bytes_for_level_multiplier,
642            max_compaction_bytes,
643            sub_level_max_compaction_bytes,
644            level0_tier_compact_file_number,
645            target_file_size_base,
646            compaction_filter_mask,
647            max_sub_compaction,
648            level0_stop_write_threshold_sub_level_number,
649            level0_sub_level_compact_level_count,
650            max_space_reclaim_bytes,
651            level0_max_compact_file_number,
652            level0_overlapping_sub_level_compact_level_count,
653            enable_emergency_picker,
654            tombstone_reclaim_ratio,
655            compression_level,
656            compression_algorithm,
657            max_l0_compact_level,
658            sst_allowed_trivial_move_min_size,
659            disable_auto_group_scheduling,
660            max_overlapping_level_size,
661            sst_allowed_trivial_move_max_count,
662            emergency_level0_sst_file_count,
663            emergency_level0_sub_level_partition,
664            level0_stop_write_threshold_max_sst_count,
665            level0_stop_write_threshold_max_size,
666            enable_optimize_l0_interval_selection,
667            vnode_aligned_level_size_threshold,
668        }) => {
669            cmd_impl::hummock::update_compaction_config(
670                context,
671                compaction_group_ids,
672                build_compaction_config_vec(
673                    max_bytes_for_level_base,
674                    max_bytes_for_level_multiplier,
675                    max_compaction_bytes,
676                    sub_level_max_compaction_bytes,
677                    level0_tier_compact_file_number,
678                    target_file_size_base,
679                    compaction_filter_mask,
680                    max_sub_compaction,
681                    level0_stop_write_threshold_sub_level_number,
682                    level0_sub_level_compact_level_count,
683                    max_space_reclaim_bytes,
684                    level0_max_compact_file_number,
685                    level0_overlapping_sub_level_compact_level_count,
686                    enable_emergency_picker,
687                    tombstone_reclaim_ratio,
688                    if let Some(level) = compression_level {
689                        assert!(compression_algorithm.is_some());
690                        Some(CompressionAlgorithm {
691                            level,
692                            compression_algorithm: compression_algorithm.unwrap(),
693                        })
694                    } else {
695                        None
696                    },
697                    max_l0_compact_level,
698                    sst_allowed_trivial_move_min_size,
699                    disable_auto_group_scheduling,
700                    max_overlapping_level_size,
701                    sst_allowed_trivial_move_max_count,
702                    emergency_level0_sst_file_count,
703                    emergency_level0_sub_level_partition,
704                    level0_stop_write_threshold_max_sst_count,
705                    level0_stop_write_threshold_max_size,
706                    enable_optimize_l0_interval_selection,
707                    vnode_aligned_level_size_threshold,
708                ),
709            )
710            .await?
711        }
712        Commands::Hummock(HummockCommands::SplitCompactionGroup {
713            compaction_group_id,
714            table_ids,
715            partition_vnode_count,
716        }) => {
717            cmd_impl::hummock::split_compaction_group(
718                context,
719                compaction_group_id,
720                &table_ids.into_iter().map_into().collect_vec(),
721                partition_vnode_count,
722            )
723            .await?;
724        }
725        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
726            cmd_impl::hummock::pause_version_checkpoint(context).await?;
727        }
728        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
729            cmd_impl::hummock::resume_version_checkpoint(context).await?;
730        }
731        Commands::Hummock(HummockCommands::ReplayVersion) => {
732            cmd_impl::hummock::replay_version(context).await?;
733        }
734        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
735            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
736        }
737        Commands::Hummock(HummockCommands::GetCompactionScore {
738            compaction_group_id,
739        }) => {
740            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
741        }
742        Commands::Hummock(HummockCommands::ValidateVersion) => {
743            cmd_impl::hummock::validate_version(context).await?;
744        }
745        Commands::Hummock(HummockCommands::RebuildTableStats) => {
746            cmd_impl::hummock::rebuild_table_stats(context).await?;
747        }
748        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
749            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
750        }
751        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
752            archive_ids,
753            data_dir,
754            sst_id,
755            use_new_object_prefix_strategy,
756        }) => {
757            cmd_impl::hummock::print_version_delta_in_archive(
758                context,
759                archive_ids.into_iter().map(HummockVersionId::new),
760                data_dir,
761                sst_id,
762                use_new_object_prefix_strategy,
763            )
764            .await?;
765        }
766        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
767            archive_ids,
768            data_dir,
769            user_key,
770            use_new_object_prefix_strategy,
771        }) => {
772            cmd_impl::hummock::print_user_key_in_archive(
773                context,
774                archive_ids.into_iter().map(HummockVersionId::new),
775                data_dir,
776                user_key,
777                use_new_object_prefix_strategy,
778            )
779            .await?;
780        }
781        Commands::Hummock(HummockCommands::TieredCacheTracing {
782            enable,
783            record_hybrid_insert_threshold_ms,
784            record_hybrid_get_threshold_ms,
785            record_hybrid_obtain_threshold_ms,
786            record_hybrid_remove_threshold_ms,
787            record_hybrid_fetch_threshold_ms,
788        }) => {
789            cmd_impl::hummock::tiered_cache_tracing(
790                context,
791                enable,
792                record_hybrid_insert_threshold_ms,
793                record_hybrid_get_threshold_ms,
794                record_hybrid_obtain_threshold_ms,
795                record_hybrid_remove_threshold_ms,
796                record_hybrid_fetch_threshold_ms,
797            )
798            .await?
799        }
800        Commands::Hummock(HummockCommands::MergeCompactionGroup {
801            left_group_id,
802            right_group_id,
803        }) => {
804            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
805                .await?
806        }
807
808        Commands::Hummock(HummockCommands::MigrateLegacyObject {
809            url,
810            source_dir,
811            target_dir,
812            concurrency,
813        }) => {
814            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
815        }
816        Commands::Hummock(HummockCommands::ResizeCache {
817            meta_cache_capacity_mb,
818            data_cache_capacity_mb,
819        }) => {
820            const MIB: u64 = 1024 * 1024;
821            cmd_impl::hummock::resize_cache(
822                context,
823                meta_cache_capacity_mb.map(|v| v * MIB),
824                data_cache_capacity_mb.map(|v| v * MIB),
825            )
826            .await?
827        }
828        Commands::Table(TableCommands::Scan {
829            mv_name,
830            data_dir,
831            use_new_object_prefix_strategy,
832        }) => {
833            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
834                .await?
835        }
836        Commands::Table(TableCommands::ScanById {
837            table_id,
838            data_dir,
839            use_new_object_prefix_strategy,
840        }) => {
841            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
842                .await?
843        }
844        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
845        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
846        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
847        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
848        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
849        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
850            cmd_impl::meta::source_split_info(context, ignore_id).await?
851        }
852        Commands::Meta(MetaCommands::Reschedule {
853            from,
854            dry_run,
855            plan,
856            revision,
857            resolve_no_shuffle,
858        }) => {
859            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
860                .await?
861        }
862        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
863            cmd_impl::meta::backup_meta(context, remarks).await?
864        }
865        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
866            risingwave_meta::backup_restore::restore(opts).await?
867        }
868        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
869            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
870        }
871        Commands::Meta(MetaCommands::ListConnections) => {
872            cmd_impl::meta::list_connections(context).await?
873        }
874        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
875            cmd_impl::meta::list_serving_fragment_mappings(context).await?
876        }
877        Commands::Meta(MetaCommands::UnregisterWorkers {
878            workers,
879            yes,
880            ignore_not_found,
881            check_fragment_occupied,
882        }) => {
883            cmd_impl::meta::unregister_workers(
884                context,
885                workers,
886                yes,
887                ignore_not_found,
888                check_fragment_occupied,
889            )
890            .await?
891        }
892        Commands::Meta(MetaCommands::ValidateSource { props }) => {
893            cmd_impl::meta::validate_source(context, props).await?
894        }
895        Commands::AwaitTree(AwaitTreeCommands::Dump {
896            actor_traces_format,
897        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
898        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
899            cmd_impl::await_tree::bottleneck_detect(context, path).await?
900        }
901        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
902            rw_diagnose_tools::await_tree::transcribe(path)?
903        }
904        Commands::Profile(ProfileCommands::Cpu { sleep }) => {
905            cmd_impl::profile::cpu_profile(context, sleep).await?
906        }
907        Commands::Profile(ProfileCommands::Heap { dir }) => {
908            cmd_impl::profile::heap_profile(context, dir).await?
909        }
910        Commands::Scale(ScaleCommands::Cordon { workers }) => {
911            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
912                .await?
913        }
914        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
915            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
916                .await?
917        }
918        Commands::Throttle(ThrottleCommands::Source(args)) => {
919            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
920        }
921        Commands::Throttle(ThrottleCommands::Mv(args)) => {
922            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
923        }
924        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
925            table_id,
926            parallelism,
927        }) => {
928            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
929        }
930        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
931    }
932    Ok(())
933}