risingwave_ctl/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::profile::ProfileWorkerType;
33use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
34use crate::cmd_impl::throttle::apply_throttle;
35use crate::common::CtlContext;
36
37pub mod cmd_impl;
38pub mod common;
39
40/// risectl provides internal access to the RisingWave cluster. Generally, you will need
41/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
42/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
43/// instead of playground mode to use this tool. risectl will read environment variables
44/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
45#[derive(Parser)]
46#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
47#[clap(propagate_version = true)]
48#[clap(infer_subcommands = true)]
49pub struct CliOpts {
50    #[clap(subcommand)]
51    command: Commands,
52}
53
54#[derive(Subcommand)]
55#[clap(infer_subcommands = true)]
56enum Commands {
57    /// Commands for Compute
58    #[clap(subcommand)]
59    Compute(ComputeCommands),
60    /// Commands for Hummock
61    #[clap(subcommand)]
62    Hummock(HummockCommands),
63    /// Commands for Tables
64    #[clap(subcommand)]
65    Table(TableCommands),
66    /// Commands for Meta
67    #[clap(subcommand)]
68    Meta(MetaCommands),
69    /// Commands for Scaling
70    #[clap(subcommand)]
71    Scale(ScaleCommands),
72    /// Commands for Benchmarks
73    #[clap(subcommand)]
74    Bench(BenchCommands),
75    /// Commands for await-tree, such as dumping, analyzing and transcribing
76    #[clap(subcommand)]
77    #[clap(visible_alias("trace"))]
78    AwaitTree(AwaitTreeCommands),
79    /// Commands for profiling nodes
80    #[clap(subcommand)]
81    Profile(ProfileCommands),
82    #[clap(subcommand)]
83    Throttle(ThrottleCommands),
84    /// Commands for Self-testing
85    #[clap(subcommand, hide = true)]
86    Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91    /// Show all the configuration parameters on compute node
92    ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98    /// list latest Hummock version on meta node
99    ListVersion {
100        #[clap(short, long = "verbose", default_value_t = false)]
101        verbose: bool,
102
103        #[clap(long = "verbose_key_range", default_value_t = false)]
104        verbose_key_range: bool,
105    },
106
107    /// list hummock version deltas in the meta store
108    ListVersionDeltas {
109        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110        start_id: u64,
111
112        #[clap(short, long = "num-epochs", default_value_t = 100)]
113        num_epochs: u32,
114    },
115    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
116    DisableCommitEpoch,
117    /// list all Hummock key-value pairs
118    ListKv {
119        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120        epoch: u64,
121
122        #[clap(short, long = "table-id")]
123        table_id: u32,
124
125        // data directory for hummock state store. None: use default
126        data_dir: Option<String>,
127
128        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129        use_new_object_prefix_strategy: bool,
130    },
131    SstDump(SstDumpArgs),
132    /// trigger a targeted compaction through `compaction_group_id`
133    TriggerManualCompaction {
134        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135        compaction_group_id: u64,
136
137        #[clap(short, long = "table-id", default_value_t = 0)]
138        table_id: u32,
139
140        #[clap(short, long = "level", default_value_t = 1)]
141        level: u32,
142
143        #[clap(short, long = "sst-ids", value_delimiter = ',')]
144        sst_ids: Vec<u64>,
145    },
146    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
147    /// `sst_retention_time_sec`, and with `prefix` in path.
148    TriggerFullGc {
149        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150        sst_retention_time_sec: u64,
151        #[clap(short, long = "prefix", required = false)]
152        prefix: Option<String>,
153    },
154    /// List pinned versions of each worker.
155    ListPinnedVersions {},
156    /// List all compaction groups.
157    ListCompactionGroup,
158    /// Update compaction config for compaction groups.
159    UpdateCompactionConfig {
160        #[clap(long, value_delimiter = ',')]
161        compaction_group_ids: Vec<u64>,
162        #[clap(long)]
163        max_bytes_for_level_base: Option<u64>,
164        #[clap(long)]
165        max_bytes_for_level_multiplier: Option<u64>,
166        #[clap(long)]
167        max_compaction_bytes: Option<u64>,
168        #[clap(long)]
169        sub_level_max_compaction_bytes: Option<u64>,
170        #[clap(long)]
171        level0_tier_compact_file_number: Option<u64>,
172        #[clap(long)]
173        target_file_size_base: Option<u64>,
174        #[clap(long)]
175        compaction_filter_mask: Option<u32>,
176        #[clap(long)]
177        max_sub_compaction: Option<u32>,
178        #[clap(long)]
179        level0_stop_write_threshold_sub_level_number: Option<u64>,
180        #[clap(long)]
181        level0_sub_level_compact_level_count: Option<u32>,
182        #[clap(long)]
183        max_space_reclaim_bytes: Option<u64>,
184        #[clap(long)]
185        level0_max_compact_file_number: Option<u64>,
186        #[clap(long)]
187        level0_overlapping_sub_level_compact_level_count: Option<u32>,
188        #[clap(long)]
189        enable_emergency_picker: Option<bool>,
190        #[clap(long)]
191        tombstone_reclaim_ratio: Option<u32>,
192        #[clap(long)]
193        compression_level: Option<u32>,
194        #[clap(long)]
195        compression_algorithm: Option<String>,
196        #[clap(long)]
197        max_l0_compact_level: Option<u32>,
198        #[clap(long)]
199        sst_allowed_trivial_move_min_size: Option<u64>,
200        #[clap(long)]
201        disable_auto_group_scheduling: Option<bool>,
202        #[clap(long)]
203        max_overlapping_level_size: Option<u64>,
204        #[clap(long)]
205        sst_allowed_trivial_move_max_count: Option<u32>,
206        #[clap(long)]
207        emergency_level0_sst_file_count: Option<u32>,
208        #[clap(long)]
209        emergency_level0_sub_level_partition: Option<u32>,
210        #[clap(long)]
211        level0_stop_write_threshold_max_sst_count: Option<u32>,
212        #[clap(long)]
213        level0_stop_write_threshold_max_size: Option<u64>,
214        #[clap(long)]
215        enable_optimize_l0_interval_selection: Option<bool>,
216        #[clap(long)]
217        vnode_aligned_level_size_threshold: Option<u64>,
218        #[clap(long)]
219        max_kv_count_for_xor16: Option<u64>,
220    },
221    /// Split given compaction group into two. Moves the given tables to the new group.
222    SplitCompactionGroup {
223        #[clap(long)]
224        compaction_group_id: u64,
225        #[clap(long, value_delimiter = ',')]
226        table_ids: Vec<u32>,
227        #[clap(long, default_value_t = 0)]
228        partition_vnode_count: u32,
229    },
230    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
231    PauseVersionCheckpoint,
232    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
233    ResumeVersionCheckpoint,
234    /// Replay version from the checkpoint one to the latest one.
235    ReplayVersion,
236    /// List compaction status
237    ListCompactionStatus {
238        #[clap(short, long = "verbose", default_value_t = false)]
239        verbose: bool,
240    },
241    GetCompactionScore {
242        #[clap(long)]
243        compaction_group_id: u64,
244    },
245    /// Validate the current `HummockVersion`.
246    ValidateVersion,
247    /// Rebuild table stats
248    RebuildTableStats,
249    CancelCompactTask {
250        #[clap(short, long)]
251        task_id: u64,
252    },
253    PrintUserKeyInArchive {
254        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
255        #[clap(long, value_delimiter = ',')]
256        archive_ids: Vec<u64>,
257        /// The data directory of Hummock storage, where `SSTable` objects can be found.
258        #[clap(long)]
259        data_dir: String,
260        /// KVs that are matched with the user key are printed.
261        #[clap(long)]
262        user_key: String,
263        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
264        use_new_object_prefix_strategy: bool,
265    },
266    PrintVersionDeltaInArchive {
267        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
268        #[clap(long, value_delimiter = ',')]
269        archive_ids: Vec<u64>,
270        /// The data directory of Hummock storage, where `SSTable` objects can be found.
271        #[clap(long)]
272        data_dir: String,
273        /// Version deltas that are related to the SST id are printed.
274        #[clap(long)]
275        sst_id: u64,
276        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
277        use_new_object_prefix_strategy: bool,
278    },
279    TieredCacheTracing {
280        #[clap(long)]
281        enable: bool,
282        #[clap(long)]
283        record_hybrid_insert_threshold_ms: Option<u32>,
284        #[clap(long)]
285        record_hybrid_get_threshold_ms: Option<u32>,
286        #[clap(long)]
287        record_hybrid_obtain_threshold_ms: Option<u32>,
288        #[clap(long)]
289        record_hybrid_remove_threshold_ms: Option<u32>,
290        #[clap(long)]
291        record_hybrid_fetch_threshold_ms: Option<u32>,
292    },
293    MergeCompactionGroup {
294        #[clap(long)]
295        left_group_id: u64,
296        #[clap(long)]
297        right_group_id: u64,
298    },
299    MigrateLegacyObject {
300        url: String,
301        source_dir: String,
302        target_dir: String,
303        #[clap(long, default_value = "100")]
304        concurrency: u32,
305    },
306    ResizeCache {
307        #[clap(long)]
308        meta_cache_capacity_mb: Option<u64>,
309        #[clap(long)]
310        data_cache_capacity_mb: Option<u64>,
311    },
312}
313
314#[derive(Subcommand)]
315enum TableCommands {
316    /// scan a state table with MV name
317    Scan {
318        /// name of the materialized view to operate on
319        mv_name: String,
320        // data directory for hummock state store. None: use default
321        data_dir: Option<String>,
322
323        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
324        use_new_object_prefix_strategy: bool,
325    },
326    /// scan a state table using Id
327    ScanById {
328        /// id of the state table to operate on
329        table_id: u32,
330        // data directory for hummock state store. None: use default
331        data_dir: Option<String>,
332        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
333        use_new_object_prefix_strategy: bool,
334    },
335    /// list all state tables
336    List,
337}
338
339#[derive(Subcommand, Debug)]
340enum ScaleCommands {
341    /// Mark a compute node as unschedulable
342    #[clap(verbatim_doc_comment)]
343    Cordon {
344        /// Workers that need to be cordoned, both id and host are supported.
345        #[clap(
346            long,
347            required = true,
348            value_delimiter = ',',
349            value_name = "id or host,..."
350        )]
351        workers: Vec<String>,
352    },
353    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
354    Uncordon {
355        /// Workers that need to be uncordoned, both id and host are supported.
356        #[clap(
357            long,
358            required = true,
359            value_delimiter = ',',
360            value_name = "id or host,..."
361        )]
362        workers: Vec<String>,
363    },
364}
365
366#[derive(Subcommand)]
367#[allow(clippy::large_enum_variant)]
368enum MetaCommands {
369    /// pause the stream graph
370    Pause,
371    /// resume the stream graph
372    Resume,
373    /// get cluster info
374    ClusterInfo,
375    /// get source split info
376    SourceSplitInfo {
377        #[clap(long)]
378        ignore_id: bool,
379    },
380    /// Reschedule the actors in the stream graph
381    ///
382    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
383    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
384    /// `added` when both are provided.
385    ///
386    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
387    /// ```text
388    /// {
389    ///     100: WorkerReschedule {
390    ///         increased_actor_count: { 1: 1 },
391    ///         decreased_actor_count: { 4: 1 },
392    ///     }
393    /// }
394    /// ```
395    /// Use ; to separate multiple fragment
396    #[clap(verbatim_doc_comment)]
397    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
398    Reschedule {
399        /// Plan of reschedule, needs to be used with `revision`
400        #[clap(long, requires = "revision")]
401        plan: Option<String>,
402        /// Revision of the plan
403        #[clap(long)]
404        revision: Option<u64>,
405        /// Reschedule from a specific file
406        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
407        from: Option<String>,
408        /// Show the plan only, no actual operation
409        #[clap(long, default_value = "false")]
410        dry_run: bool,
411        /// Resolve `NO_SHUFFLE` upstream
412        #[clap(long, default_value = "false")]
413        resolve_no_shuffle: bool,
414    },
415    /// backup meta by taking a meta snapshot
416    BackupMeta {
417        #[clap(long)]
418        remarks: Option<String>,
419    },
420    /// restore meta by recovering from a meta snapshot
421    RestoreMeta {
422        #[command(flatten)]
423        opts: RestoreOpts,
424    },
425    /// delete meta snapshots
426    DeleteMetaSnapshots {
427        #[clap(long, value_delimiter = ',')]
428        snapshot_ids: Vec<u64>,
429    },
430
431    /// List all existing connections in the catalog
432    ListConnections,
433
434    /// List fragment mapping for serving
435    ListServingFragmentMapping,
436
437    /// Unregister workers from the cluster
438    UnregisterWorkers {
439        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
440        #[clap(
441            long,
442            required = true,
443            value_delimiter = ',',
444            value_name = "worker_id or worker_host:worker_port, ..."
445        )]
446        workers: Vec<String>,
447
448        /// Automatic yes to prompts
449        #[clap(short = 'y', long, default_value_t = false)]
450        yes: bool,
451
452        /// The worker not found will be ignored
453        #[clap(long, default_value_t = false)]
454        ignore_not_found: bool,
455
456        /// Checking whether the fragment is occupied by workers
457        #[clap(long, default_value_t = false)]
458        check_fragment_occupied: bool,
459    },
460
461    /// Validate source interface for the cloud team
462    ValidateSource {
463        /// With properties in json format
464        /// If privatelink is used, specify `connection.id` instead of `connection.name`
465        #[clap(long)]
466        props: String,
467    },
468
469    SetCdcTableBackfillParallelism {
470        #[clap(long, required = true)]
471        table_id: u32,
472        #[clap(long, required = true)]
473        parallelism: u32,
474    },
475}
476
477#[derive(Subcommand, Clone, Debug)]
478pub enum AwaitTreeCommands {
479    /// Dump Await Tree
480    Dump {
481        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
482        #[clap(short, long = "actor-traces-format")]
483        actor_traces_format: Option<String>,
484    },
485    /// Analyze Await Tree
486    Analyze {
487        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
488        /// The actor traces format can be either `json` or `text`. The analyze command will
489        /// automatically detect the format.
490        #[clap(long = "path")]
491        path: Option<String>,
492    },
493    /// Transcribe Await Tree From JSON to Text format
494    Transcribe {
495        /// The path to the await tree file to be transcribed
496        #[clap(long = "path")]
497        path: String,
498    },
499}
500
501#[derive(Subcommand, Clone, Debug)]
502enum TestCommands {
503    /// Test if JVM and Java libraries are working
504    Jvm,
505}
506
507#[derive(Subcommand, Clone, Debug)]
508enum ThrottleCommands {
509    Source(ThrottleCommandArgs),
510    Mv(ThrottleCommandArgs),
511}
512
513#[derive(Clone, Debug, Args)]
514pub struct ThrottleCommandArgs {
515    id: u32,
516    rate: Option<u32>,
517}
518
519#[derive(Subcommand, Clone, Debug)]
520pub enum ProfileCommands {
521    /// CPU profile
522    Cpu {
523        /// The time to active profiling for (in seconds)
524        #[clap(short, long = "sleep")]
525        sleep: u64,
526        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
527        #[clap(long = "worker-type", value_name = "TYPE")]
528        worker_types: Vec<ProfileWorkerType>,
529    },
530    /// Heap profile
531    Heap {
532        /// The output directory of the dumped file
533        #[clap(long = "dir")]
534        dir: Option<String>,
535        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
536        #[clap(long = "worker-type", value_name = "TYPE")]
537        worker_types: Vec<ProfileWorkerType>,
538    },
539}
540
541/// Start `risectl` with the given options.
542/// Cancel the operation when the given `shutdown` token triggers.
543/// Log and abort the process if any error occurs.
544///
545/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
546/// in an embedded manner.
547pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
548    let context = CtlContext::default();
549
550    tokio::select! {
551        _ = shutdown.cancelled() => {
552            // Shutdown requested, clean up the context and return.
553            context.try_close().await;
554        }
555
556        result = start_fallible(opts, &context) => {
557            if let Err(e) = result {
558                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
559                std::process::exit(1);
560            }
561        }
562    }
563}
564
565/// Start `risectl` with the given options.
566/// Return `Err` if any error occurs.
567pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
568    let result = start_impl(opts, context).await;
569    context.try_close().await;
570    result
571}
572
573#[expect(
574    clippy::large_stack_frames,
575    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
576              post-layout generator stores only one arm at a time (~13–16 KiB)."
577)]
578async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
579    match opts.command {
580        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
581            cmd_impl::compute::show_config(&host).await?
582        }
583        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
584            cmd_impl::hummock::disable_commit_epoch(context).await?
585        }
586        Commands::Hummock(HummockCommands::ListVersion {
587            verbose,
588            verbose_key_range,
589        }) => {
590            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
591        }
592        Commands::Hummock(HummockCommands::ListVersionDeltas {
593            start_id,
594            num_epochs,
595        }) => {
596            cmd_impl::hummock::list_version_deltas(
597                context,
598                HummockVersionId::new(start_id),
599                num_epochs,
600            )
601            .await?;
602        }
603        Commands::Hummock(HummockCommands::ListKv {
604            epoch,
605            table_id,
606            data_dir,
607            use_new_object_prefix_strategy,
608        }) => {
609            cmd_impl::hummock::list_kv(
610                context,
611                epoch,
612                table_id,
613                data_dir,
614                use_new_object_prefix_strategy,
615            )
616            .await?;
617        }
618        Commands::Hummock(HummockCommands::SstDump(args)) => {
619            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
620        }
621        Commands::Hummock(HummockCommands::TriggerManualCompaction {
622            compaction_group_id,
623            table_id,
624            level,
625            sst_ids,
626        }) => {
627            cmd_impl::hummock::trigger_manual_compaction(
628                context,
629                compaction_group_id,
630                table_id.into(),
631                level,
632                sst_ids,
633            )
634            .await?
635        }
636        Commands::Hummock(HummockCommands::TriggerFullGc {
637            sst_retention_time_sec,
638            prefix,
639        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
640        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
641            list_pinned_versions(context).await?
642        }
643        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
644            cmd_impl::hummock::list_compaction_group(context).await?
645        }
646        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
647            compaction_group_ids,
648            max_bytes_for_level_base,
649            max_bytes_for_level_multiplier,
650            max_compaction_bytes,
651            sub_level_max_compaction_bytes,
652            level0_tier_compact_file_number,
653            target_file_size_base,
654            compaction_filter_mask,
655            max_sub_compaction,
656            level0_stop_write_threshold_sub_level_number,
657            level0_sub_level_compact_level_count,
658            max_space_reclaim_bytes,
659            level0_max_compact_file_number,
660            level0_overlapping_sub_level_compact_level_count,
661            enable_emergency_picker,
662            tombstone_reclaim_ratio,
663            compression_level,
664            compression_algorithm,
665            max_l0_compact_level,
666            sst_allowed_trivial_move_min_size,
667            disable_auto_group_scheduling,
668            max_overlapping_level_size,
669            sst_allowed_trivial_move_max_count,
670            emergency_level0_sst_file_count,
671            emergency_level0_sub_level_partition,
672            level0_stop_write_threshold_max_sst_count,
673            level0_stop_write_threshold_max_size,
674            enable_optimize_l0_interval_selection,
675            vnode_aligned_level_size_threshold,
676            max_kv_count_for_xor16,
677        }) => {
678            cmd_impl::hummock::update_compaction_config(
679                context,
680                compaction_group_ids,
681                build_compaction_config_vec(
682                    max_bytes_for_level_base,
683                    max_bytes_for_level_multiplier,
684                    max_compaction_bytes,
685                    sub_level_max_compaction_bytes,
686                    level0_tier_compact_file_number,
687                    target_file_size_base,
688                    compaction_filter_mask,
689                    max_sub_compaction,
690                    level0_stop_write_threshold_sub_level_number,
691                    level0_sub_level_compact_level_count,
692                    max_space_reclaim_bytes,
693                    level0_max_compact_file_number,
694                    level0_overlapping_sub_level_compact_level_count,
695                    enable_emergency_picker,
696                    tombstone_reclaim_ratio,
697                    if let Some(level) = compression_level {
698                        assert!(compression_algorithm.is_some());
699                        Some(CompressionAlgorithm {
700                            level,
701                            compression_algorithm: compression_algorithm.unwrap(),
702                        })
703                    } else {
704                        None
705                    },
706                    max_l0_compact_level,
707                    sst_allowed_trivial_move_min_size,
708                    disable_auto_group_scheduling,
709                    max_overlapping_level_size,
710                    sst_allowed_trivial_move_max_count,
711                    emergency_level0_sst_file_count,
712                    emergency_level0_sub_level_partition,
713                    level0_stop_write_threshold_max_sst_count,
714                    level0_stop_write_threshold_max_size,
715                    enable_optimize_l0_interval_selection,
716                    vnode_aligned_level_size_threshold,
717                    max_kv_count_for_xor16,
718                ),
719            )
720            .await?
721        }
722        Commands::Hummock(HummockCommands::SplitCompactionGroup {
723            compaction_group_id,
724            table_ids,
725            partition_vnode_count,
726        }) => {
727            cmd_impl::hummock::split_compaction_group(
728                context,
729                compaction_group_id,
730                &table_ids.into_iter().map_into().collect_vec(),
731                partition_vnode_count,
732            )
733            .await?;
734        }
735        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
736            cmd_impl::hummock::pause_version_checkpoint(context).await?;
737        }
738        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
739            cmd_impl::hummock::resume_version_checkpoint(context).await?;
740        }
741        Commands::Hummock(HummockCommands::ReplayVersion) => {
742            cmd_impl::hummock::replay_version(context).await?;
743        }
744        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
745            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
746        }
747        Commands::Hummock(HummockCommands::GetCompactionScore {
748            compaction_group_id,
749        }) => {
750            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
751        }
752        Commands::Hummock(HummockCommands::ValidateVersion) => {
753            cmd_impl::hummock::validate_version(context).await?;
754        }
755        Commands::Hummock(HummockCommands::RebuildTableStats) => {
756            cmd_impl::hummock::rebuild_table_stats(context).await?;
757        }
758        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
759            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
760        }
761        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
762            archive_ids,
763            data_dir,
764            sst_id,
765            use_new_object_prefix_strategy,
766        }) => {
767            cmd_impl::hummock::print_version_delta_in_archive(
768                context,
769                archive_ids.into_iter().map(HummockVersionId::new),
770                data_dir,
771                sst_id,
772                use_new_object_prefix_strategy,
773            )
774            .await?;
775        }
776        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
777            archive_ids,
778            data_dir,
779            user_key,
780            use_new_object_prefix_strategy,
781        }) => {
782            cmd_impl::hummock::print_user_key_in_archive(
783                context,
784                archive_ids.into_iter().map(HummockVersionId::new),
785                data_dir,
786                user_key,
787                use_new_object_prefix_strategy,
788            )
789            .await?;
790        }
791        Commands::Hummock(HummockCommands::TieredCacheTracing {
792            enable,
793            record_hybrid_insert_threshold_ms,
794            record_hybrid_get_threshold_ms,
795            record_hybrid_obtain_threshold_ms,
796            record_hybrid_remove_threshold_ms,
797            record_hybrid_fetch_threshold_ms,
798        }) => {
799            cmd_impl::hummock::tiered_cache_tracing(
800                context,
801                enable,
802                record_hybrid_insert_threshold_ms,
803                record_hybrid_get_threshold_ms,
804                record_hybrid_obtain_threshold_ms,
805                record_hybrid_remove_threshold_ms,
806                record_hybrid_fetch_threshold_ms,
807            )
808            .await?
809        }
810        Commands::Hummock(HummockCommands::MergeCompactionGroup {
811            left_group_id,
812            right_group_id,
813        }) => {
814            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
815                .await?
816        }
817
818        Commands::Hummock(HummockCommands::MigrateLegacyObject {
819            url,
820            source_dir,
821            target_dir,
822            concurrency,
823        }) => {
824            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
825        }
826        Commands::Hummock(HummockCommands::ResizeCache {
827            meta_cache_capacity_mb,
828            data_cache_capacity_mb,
829        }) => {
830            const MIB: u64 = 1024 * 1024;
831            cmd_impl::hummock::resize_cache(
832                context,
833                meta_cache_capacity_mb.map(|v| v * MIB),
834                data_cache_capacity_mb.map(|v| v * MIB),
835            )
836            .await?
837        }
838        Commands::Table(TableCommands::Scan {
839            mv_name,
840            data_dir,
841            use_new_object_prefix_strategy,
842        }) => {
843            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
844                .await?
845        }
846        Commands::Table(TableCommands::ScanById {
847            table_id,
848            data_dir,
849            use_new_object_prefix_strategy,
850        }) => {
851            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
852                .await?
853        }
854        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
855        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
856        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
857        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
858        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
859        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
860            cmd_impl::meta::source_split_info(context, ignore_id).await?
861        }
862        Commands::Meta(MetaCommands::Reschedule {
863            from,
864            dry_run,
865            plan,
866            revision,
867            resolve_no_shuffle,
868        }) => {
869            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
870                .await?
871        }
872        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
873            cmd_impl::meta::backup_meta(context, remarks).await?
874        }
875        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
876            risingwave_meta::backup_restore::restore(opts).await?
877        }
878        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
879            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
880        }
881        Commands::Meta(MetaCommands::ListConnections) => {
882            cmd_impl::meta::list_connections(context).await?
883        }
884        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
885            cmd_impl::meta::list_serving_fragment_mappings(context).await?
886        }
887        Commands::Meta(MetaCommands::UnregisterWorkers {
888            workers,
889            yes,
890            ignore_not_found,
891            check_fragment_occupied,
892        }) => {
893            cmd_impl::meta::unregister_workers(
894                context,
895                workers,
896                yes,
897                ignore_not_found,
898                check_fragment_occupied,
899            )
900            .await?
901        }
902        Commands::Meta(MetaCommands::ValidateSource { props }) => {
903            cmd_impl::meta::validate_source(context, props).await?
904        }
905        Commands::AwaitTree(AwaitTreeCommands::Dump {
906            actor_traces_format,
907        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
908        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
909            cmd_impl::await_tree::bottleneck_detect(context, path).await?
910        }
911        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
912            rw_diagnose_tools::await_tree::transcribe(path)?
913        }
914        Commands::Profile(ProfileCommands::Cpu {
915            sleep,
916            worker_types,
917        }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
918        Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
919            cmd_impl::profile::heap_profile(context, dir, worker_types).await?
920        }
921        Commands::Scale(ScaleCommands::Cordon { workers }) => {
922            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
923                .await?
924        }
925        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
926            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
927                .await?
928        }
929        Commands::Throttle(ThrottleCommands::Source(args)) => {
930            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
931        }
932        Commands::Throttle(ThrottleCommands::Mv(args)) => {
933            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
934        }
935        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
936            table_id,
937            parallelism,
938        }) => {
939            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
940        }
941        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
942    }
943    Ok(())
944}