risingwave_ctl/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
33use crate::cmd_impl::throttle::apply_throttle;
34use crate::common::CtlContext;
35
36pub mod cmd_impl;
37pub mod common;
38
39/// risectl provides internal access to the RisingWave cluster. Generally, you will need
40/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
41/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
42/// instead of playground mode to use this tool. risectl will read environment variables
43/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
44#[derive(Parser)]
45#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
46#[clap(propagate_version = true)]
47#[clap(infer_subcommands = true)]
48pub struct CliOpts {
49    #[clap(subcommand)]
50    command: Commands,
51}
52
53#[derive(Subcommand)]
54#[clap(infer_subcommands = true)]
55enum Commands {
56    /// Commands for Compute
57    #[clap(subcommand)]
58    Compute(ComputeCommands),
59    /// Commands for Hummock
60    #[clap(subcommand)]
61    Hummock(HummockCommands),
62    /// Commands for Tables
63    #[clap(subcommand)]
64    Table(TableCommands),
65    /// Commands for Meta
66    #[clap(subcommand)]
67    Meta(MetaCommands),
68    /// Commands for Scaling
69    #[clap(subcommand)]
70    Scale(ScaleCommands),
71    /// Commands for Benchmarks
72    #[clap(subcommand)]
73    Bench(BenchCommands),
74    /// Commands for await-tree, such as dumping, analyzing and transcribing
75    #[clap(subcommand)]
76    #[clap(visible_alias("trace"))]
77    AwaitTree(AwaitTreeCommands),
78    // TODO(yuhao): profile other nodes
79    /// Commands for profilng the compute nodes
80    #[clap(subcommand)]
81    Profile(ProfileCommands),
82    #[clap(subcommand)]
83    Throttle(ThrottleCommands),
84    /// Commands for Self-testing
85    #[clap(subcommand, hide = true)]
86    Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91    /// Show all the configuration parameters on compute node
92    ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98    /// list latest Hummock version on meta node
99    ListVersion {
100        #[clap(short, long = "verbose", default_value_t = false)]
101        verbose: bool,
102
103        #[clap(long = "verbose_key_range", default_value_t = false)]
104        verbose_key_range: bool,
105    },
106
107    /// list hummock version deltas in the meta store
108    ListVersionDeltas {
109        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110        start_id: u64,
111
112        #[clap(short, long = "num-epochs", default_value_t = 100)]
113        num_epochs: u32,
114    },
115    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
116    DisableCommitEpoch,
117    /// list all Hummock key-value pairs
118    ListKv {
119        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120        epoch: u64,
121
122        #[clap(short, long = "table-id")]
123        table_id: u32,
124
125        // data directory for hummock state store. None: use default
126        data_dir: Option<String>,
127
128        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129        use_new_object_prefix_strategy: bool,
130    },
131    SstDump(SstDumpArgs),
132    /// trigger a targeted compaction through `compaction_group_id`
133    TriggerManualCompaction {
134        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135        compaction_group_id: u64,
136
137        #[clap(short, long = "table-id", default_value_t = 0)]
138        table_id: u32,
139
140        #[clap(short, long = "level", default_value_t = 1)]
141        level: u32,
142
143        #[clap(short, long = "sst-ids", value_delimiter = ',')]
144        sst_ids: Vec<u64>,
145    },
146    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
147    /// `sst_retention_time_sec`, and with `prefix` in path.
148    TriggerFullGc {
149        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150        sst_retention_time_sec: u64,
151        #[clap(short, long = "prefix", required = false)]
152        prefix: Option<String>,
153    },
154    /// List pinned versions of each worker.
155    ListPinnedVersions {},
156    /// List all compaction groups.
157    ListCompactionGroup,
158    /// Update compaction config for compaction groups.
159    UpdateCompactionConfig {
160        #[clap(long, value_delimiter = ',')]
161        compaction_group_ids: Vec<u64>,
162        #[clap(long)]
163        max_bytes_for_level_base: Option<u64>,
164        #[clap(long)]
165        max_bytes_for_level_multiplier: Option<u64>,
166        #[clap(long)]
167        max_compaction_bytes: Option<u64>,
168        #[clap(long)]
169        sub_level_max_compaction_bytes: Option<u64>,
170        #[clap(long)]
171        level0_tier_compact_file_number: Option<u64>,
172        #[clap(long)]
173        target_file_size_base: Option<u64>,
174        #[clap(long)]
175        compaction_filter_mask: Option<u32>,
176        #[clap(long)]
177        max_sub_compaction: Option<u32>,
178        #[clap(long)]
179        level0_stop_write_threshold_sub_level_number: Option<u64>,
180        #[clap(long)]
181        level0_sub_level_compact_level_count: Option<u32>,
182        #[clap(long)]
183        max_space_reclaim_bytes: Option<u64>,
184        #[clap(long)]
185        level0_max_compact_file_number: Option<u64>,
186        #[clap(long)]
187        level0_overlapping_sub_level_compact_level_count: Option<u32>,
188        #[clap(long)]
189        enable_emergency_picker: Option<bool>,
190        #[clap(long)]
191        tombstone_reclaim_ratio: Option<u32>,
192        #[clap(long)]
193        compression_level: Option<u32>,
194        #[clap(long)]
195        compression_algorithm: Option<String>,
196        #[clap(long)]
197        max_l0_compact_level: Option<u32>,
198        #[clap(long)]
199        sst_allowed_trivial_move_min_size: Option<u64>,
200        #[clap(long)]
201        disable_auto_group_scheduling: Option<bool>,
202        #[clap(long)]
203        max_overlapping_level_size: Option<u64>,
204        #[clap(long)]
205        sst_allowed_trivial_move_max_count: Option<u32>,
206        #[clap(long)]
207        emergency_level0_sst_file_count: Option<u32>,
208        #[clap(long)]
209        emergency_level0_sub_level_partition: Option<u32>,
210        #[clap(long)]
211        level0_stop_write_threshold_max_sst_count: Option<u32>,
212        #[clap(long)]
213        level0_stop_write_threshold_max_size: Option<u64>,
214        #[clap(long)]
215        enable_optimize_l0_interval_selection: Option<bool>,
216        #[clap(long)]
217        vnode_aligned_level_size_threshold: Option<u64>,
218        #[clap(long)]
219        max_kv_count_for_xor16: Option<u64>,
220    },
221    /// Split given compaction group into two. Moves the given tables to the new group.
222    SplitCompactionGroup {
223        #[clap(long)]
224        compaction_group_id: u64,
225        #[clap(long, value_delimiter = ',')]
226        table_ids: Vec<u32>,
227        #[clap(long, default_value_t = 0)]
228        partition_vnode_count: u32,
229    },
230    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
231    PauseVersionCheckpoint,
232    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
233    ResumeVersionCheckpoint,
234    /// Replay version from the checkpoint one to the latest one.
235    ReplayVersion,
236    /// List compaction status
237    ListCompactionStatus {
238        #[clap(short, long = "verbose", default_value_t = false)]
239        verbose: bool,
240    },
241    GetCompactionScore {
242        #[clap(long)]
243        compaction_group_id: u64,
244    },
245    /// Validate the current `HummockVersion`.
246    ValidateVersion,
247    /// Rebuild table stats
248    RebuildTableStats,
249    CancelCompactTask {
250        #[clap(short, long)]
251        task_id: u64,
252    },
253    PrintUserKeyInArchive {
254        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
255        #[clap(long, value_delimiter = ',')]
256        archive_ids: Vec<u64>,
257        /// The data directory of Hummock storage, where `SSTable` objects can be found.
258        #[clap(long)]
259        data_dir: String,
260        /// KVs that are matched with the user key are printed.
261        #[clap(long)]
262        user_key: String,
263        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
264        use_new_object_prefix_strategy: bool,
265    },
266    PrintVersionDeltaInArchive {
267        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
268        #[clap(long, value_delimiter = ',')]
269        archive_ids: Vec<u64>,
270        /// The data directory of Hummock storage, where `SSTable` objects can be found.
271        #[clap(long)]
272        data_dir: String,
273        /// Version deltas that are related to the SST id are printed.
274        #[clap(long)]
275        sst_id: u64,
276        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
277        use_new_object_prefix_strategy: bool,
278    },
279    TieredCacheTracing {
280        #[clap(long)]
281        enable: bool,
282        #[clap(long)]
283        record_hybrid_insert_threshold_ms: Option<u32>,
284        #[clap(long)]
285        record_hybrid_get_threshold_ms: Option<u32>,
286        #[clap(long)]
287        record_hybrid_obtain_threshold_ms: Option<u32>,
288        #[clap(long)]
289        record_hybrid_remove_threshold_ms: Option<u32>,
290        #[clap(long)]
291        record_hybrid_fetch_threshold_ms: Option<u32>,
292    },
293    MergeCompactionGroup {
294        #[clap(long)]
295        left_group_id: u64,
296        #[clap(long)]
297        right_group_id: u64,
298    },
299    MigrateLegacyObject {
300        url: String,
301        source_dir: String,
302        target_dir: String,
303        #[clap(long, default_value = "100")]
304        concurrency: u32,
305    },
306    ResizeCache {
307        #[clap(long)]
308        meta_cache_capacity_mb: Option<u64>,
309        #[clap(long)]
310        data_cache_capacity_mb: Option<u64>,
311    },
312}
313
314#[derive(Subcommand)]
315enum TableCommands {
316    /// scan a state table with MV name
317    Scan {
318        /// name of the materialized view to operate on
319        mv_name: String,
320        // data directory for hummock state store. None: use default
321        data_dir: Option<String>,
322
323        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
324        use_new_object_prefix_strategy: bool,
325    },
326    /// scan a state table using Id
327    ScanById {
328        /// id of the state table to operate on
329        table_id: u32,
330        // data directory for hummock state store. None: use default
331        data_dir: Option<String>,
332        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
333        use_new_object_prefix_strategy: bool,
334    },
335    /// list all state tables
336    List,
337}
338
339#[derive(Subcommand, Debug)]
340enum ScaleCommands {
341    /// Mark a compute node as unschedulable
342    #[clap(verbatim_doc_comment)]
343    Cordon {
344        /// Workers that need to be cordoned, both id and host are supported.
345        #[clap(
346            long,
347            required = true,
348            value_delimiter = ',',
349            value_name = "id or host,..."
350        )]
351        workers: Vec<String>,
352    },
353    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
354    Uncordon {
355        /// Workers that need to be uncordoned, both id and host are supported.
356        #[clap(
357            long,
358            required = true,
359            value_delimiter = ',',
360            value_name = "id or host,..."
361        )]
362        workers: Vec<String>,
363    },
364}
365
366#[derive(Subcommand)]
367#[allow(clippy::large_enum_variant)]
368enum MetaCommands {
369    /// pause the stream graph
370    Pause,
371    /// resume the stream graph
372    Resume,
373    /// get cluster info
374    ClusterInfo,
375    /// get source split info
376    SourceSplitInfo {
377        #[clap(long)]
378        ignore_id: bool,
379    },
380    /// Reschedule the actors in the stream graph
381    ///
382    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
383    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
384    /// `added` when both are provided.
385    ///
386    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
387    /// ```text
388    /// {
389    ///     100: WorkerReschedule {
390    ///         increased_actor_count: { 1: 1 },
391    ///         decreased_actor_count: { 4: 1 },
392    ///     }
393    /// }
394    /// ```
395    /// Use ; to separate multiple fragment
396    #[clap(verbatim_doc_comment)]
397    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
398    Reschedule {
399        /// Plan of reschedule, needs to be used with `revision`
400        #[clap(long, requires = "revision")]
401        plan: Option<String>,
402        /// Revision of the plan
403        #[clap(long)]
404        revision: Option<u64>,
405        /// Reschedule from a specific file
406        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
407        from: Option<String>,
408        /// Show the plan only, no actual operation
409        #[clap(long, default_value = "false")]
410        dry_run: bool,
411        /// Resolve `NO_SHUFFLE` upstream
412        #[clap(long, default_value = "false")]
413        resolve_no_shuffle: bool,
414    },
415    /// backup meta by taking a meta snapshot
416    BackupMeta {
417        #[clap(long)]
418        remarks: Option<String>,
419    },
420    /// restore meta by recovering from a meta snapshot
421    RestoreMeta {
422        #[command(flatten)]
423        opts: RestoreOpts,
424    },
425    /// delete meta snapshots
426    DeleteMetaSnapshots {
427        #[clap(long, value_delimiter = ',')]
428        snapshot_ids: Vec<u64>,
429    },
430
431    /// List all existing connections in the catalog
432    ListConnections,
433
434    /// List fragment mapping for serving
435    ListServingFragmentMapping,
436
437    /// Unregister workers from the cluster
438    UnregisterWorkers {
439        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
440        #[clap(
441            long,
442            required = true,
443            value_delimiter = ',',
444            value_name = "worker_id or worker_host:worker_port, ..."
445        )]
446        workers: Vec<String>,
447
448        /// Automatic yes to prompts
449        #[clap(short = 'y', long, default_value_t = false)]
450        yes: bool,
451
452        /// The worker not found will be ignored
453        #[clap(long, default_value_t = false)]
454        ignore_not_found: bool,
455
456        /// Checking whether the fragment is occupied by workers
457        #[clap(long, default_value_t = false)]
458        check_fragment_occupied: bool,
459    },
460
461    /// Validate source interface for the cloud team
462    ValidateSource {
463        /// With properties in json format
464        /// If privatelink is used, specify `connection.id` instead of `connection.name`
465        #[clap(long)]
466        props: String,
467    },
468
469    SetCdcTableBackfillParallelism {
470        #[clap(long, required = true)]
471        table_id: u32,
472        #[clap(long, required = true)]
473        parallelism: u32,
474    },
475}
476
477#[derive(Subcommand, Clone, Debug)]
478pub enum AwaitTreeCommands {
479    /// Dump Await Tree
480    Dump {
481        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
482        #[clap(short, long = "actor-traces-format")]
483        actor_traces_format: Option<String>,
484    },
485    /// Analyze Await Tree
486    Analyze {
487        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
488        /// The actor traces format can be either `json` or `text`. The analyze command will
489        /// automatically detect the format.
490        #[clap(long = "path")]
491        path: Option<String>,
492    },
493    /// Transcribe Await Tree From JSON to Text format
494    Transcribe {
495        /// The path to the await tree file to be transcribed
496        #[clap(long = "path")]
497        path: String,
498    },
499}
500
501#[derive(Subcommand, Clone, Debug)]
502enum TestCommands {
503    /// Test if JVM and Java libraries are working
504    Jvm,
505}
506
507#[derive(Subcommand, Clone, Debug)]
508enum ThrottleCommands {
509    Source(ThrottleCommandArgs),
510    Mv(ThrottleCommandArgs),
511}
512
513#[derive(Clone, Debug, Args)]
514pub struct ThrottleCommandArgs {
515    id: u32,
516    rate: Option<u32>,
517}
518
519#[derive(Subcommand, Clone, Debug)]
520pub enum ProfileCommands {
521    /// CPU profile
522    Cpu {
523        /// The time to active profiling for (in seconds)
524        #[clap(short, long = "sleep")]
525        sleep: u64,
526    },
527    /// Heap profile
528    Heap {
529        /// The output directory of the dumped file
530        #[clap(long = "dir")]
531        dir: Option<String>,
532    },
533}
534
535/// Start `risectl` with the given options.
536/// Cancel the operation when the given `shutdown` token triggers.
537/// Log and abort the process if any error occurs.
538///
539/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
540/// in an embedded manner.
541pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
542    let context = CtlContext::default();
543
544    tokio::select! {
545        _ = shutdown.cancelled() => {
546            // Shutdown requested, clean up the context and return.
547            context.try_close().await;
548        }
549
550        result = start_fallible(opts, &context) => {
551            if let Err(e) = result {
552                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
553                std::process::exit(1);
554            }
555        }
556    }
557}
558
559/// Start `risectl` with the given options.
560/// Return `Err` if any error occurs.
561pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
562    let result = start_impl(opts, context).await;
563    context.try_close().await;
564    result
565}
566
567#[expect(
568    clippy::large_stack_frames,
569    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
570              post-layout generator stores only one arm at a time (~13–16 KiB)."
571)]
572async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
573    match opts.command {
574        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
575            cmd_impl::compute::show_config(&host).await?
576        }
577        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
578            cmd_impl::hummock::disable_commit_epoch(context).await?
579        }
580        Commands::Hummock(HummockCommands::ListVersion {
581            verbose,
582            verbose_key_range,
583        }) => {
584            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
585        }
586        Commands::Hummock(HummockCommands::ListVersionDeltas {
587            start_id,
588            num_epochs,
589        }) => {
590            cmd_impl::hummock::list_version_deltas(
591                context,
592                HummockVersionId::new(start_id),
593                num_epochs,
594            )
595            .await?;
596        }
597        Commands::Hummock(HummockCommands::ListKv {
598            epoch,
599            table_id,
600            data_dir,
601            use_new_object_prefix_strategy,
602        }) => {
603            cmd_impl::hummock::list_kv(
604                context,
605                epoch,
606                table_id,
607                data_dir,
608                use_new_object_prefix_strategy,
609            )
610            .await?;
611        }
612        Commands::Hummock(HummockCommands::SstDump(args)) => {
613            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
614        }
615        Commands::Hummock(HummockCommands::TriggerManualCompaction {
616            compaction_group_id,
617            table_id,
618            level,
619            sst_ids,
620        }) => {
621            cmd_impl::hummock::trigger_manual_compaction(
622                context,
623                compaction_group_id,
624                table_id.into(),
625                level,
626                sst_ids,
627            )
628            .await?
629        }
630        Commands::Hummock(HummockCommands::TriggerFullGc {
631            sst_retention_time_sec,
632            prefix,
633        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
634        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
635            list_pinned_versions(context).await?
636        }
637        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
638            cmd_impl::hummock::list_compaction_group(context).await?
639        }
640        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
641            compaction_group_ids,
642            max_bytes_for_level_base,
643            max_bytes_for_level_multiplier,
644            max_compaction_bytes,
645            sub_level_max_compaction_bytes,
646            level0_tier_compact_file_number,
647            target_file_size_base,
648            compaction_filter_mask,
649            max_sub_compaction,
650            level0_stop_write_threshold_sub_level_number,
651            level0_sub_level_compact_level_count,
652            max_space_reclaim_bytes,
653            level0_max_compact_file_number,
654            level0_overlapping_sub_level_compact_level_count,
655            enable_emergency_picker,
656            tombstone_reclaim_ratio,
657            compression_level,
658            compression_algorithm,
659            max_l0_compact_level,
660            sst_allowed_trivial_move_min_size,
661            disable_auto_group_scheduling,
662            max_overlapping_level_size,
663            sst_allowed_trivial_move_max_count,
664            emergency_level0_sst_file_count,
665            emergency_level0_sub_level_partition,
666            level0_stop_write_threshold_max_sst_count,
667            level0_stop_write_threshold_max_size,
668            enable_optimize_l0_interval_selection,
669            vnode_aligned_level_size_threshold,
670            max_kv_count_for_xor16,
671        }) => {
672            cmd_impl::hummock::update_compaction_config(
673                context,
674                compaction_group_ids,
675                build_compaction_config_vec(
676                    max_bytes_for_level_base,
677                    max_bytes_for_level_multiplier,
678                    max_compaction_bytes,
679                    sub_level_max_compaction_bytes,
680                    level0_tier_compact_file_number,
681                    target_file_size_base,
682                    compaction_filter_mask,
683                    max_sub_compaction,
684                    level0_stop_write_threshold_sub_level_number,
685                    level0_sub_level_compact_level_count,
686                    max_space_reclaim_bytes,
687                    level0_max_compact_file_number,
688                    level0_overlapping_sub_level_compact_level_count,
689                    enable_emergency_picker,
690                    tombstone_reclaim_ratio,
691                    if let Some(level) = compression_level {
692                        assert!(compression_algorithm.is_some());
693                        Some(CompressionAlgorithm {
694                            level,
695                            compression_algorithm: compression_algorithm.unwrap(),
696                        })
697                    } else {
698                        None
699                    },
700                    max_l0_compact_level,
701                    sst_allowed_trivial_move_min_size,
702                    disable_auto_group_scheduling,
703                    max_overlapping_level_size,
704                    sst_allowed_trivial_move_max_count,
705                    emergency_level0_sst_file_count,
706                    emergency_level0_sub_level_partition,
707                    level0_stop_write_threshold_max_sst_count,
708                    level0_stop_write_threshold_max_size,
709                    enable_optimize_l0_interval_selection,
710                    vnode_aligned_level_size_threshold,
711                    max_kv_count_for_xor16,
712                ),
713            )
714            .await?
715        }
716        Commands::Hummock(HummockCommands::SplitCompactionGroup {
717            compaction_group_id,
718            table_ids,
719            partition_vnode_count,
720        }) => {
721            cmd_impl::hummock::split_compaction_group(
722                context,
723                compaction_group_id,
724                &table_ids.into_iter().map_into().collect_vec(),
725                partition_vnode_count,
726            )
727            .await?;
728        }
729        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
730            cmd_impl::hummock::pause_version_checkpoint(context).await?;
731        }
732        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
733            cmd_impl::hummock::resume_version_checkpoint(context).await?;
734        }
735        Commands::Hummock(HummockCommands::ReplayVersion) => {
736            cmd_impl::hummock::replay_version(context).await?;
737        }
738        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
739            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
740        }
741        Commands::Hummock(HummockCommands::GetCompactionScore {
742            compaction_group_id,
743        }) => {
744            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
745        }
746        Commands::Hummock(HummockCommands::ValidateVersion) => {
747            cmd_impl::hummock::validate_version(context).await?;
748        }
749        Commands::Hummock(HummockCommands::RebuildTableStats) => {
750            cmd_impl::hummock::rebuild_table_stats(context).await?;
751        }
752        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
753            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
754        }
755        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
756            archive_ids,
757            data_dir,
758            sst_id,
759            use_new_object_prefix_strategy,
760        }) => {
761            cmd_impl::hummock::print_version_delta_in_archive(
762                context,
763                archive_ids.into_iter().map(HummockVersionId::new),
764                data_dir,
765                sst_id,
766                use_new_object_prefix_strategy,
767            )
768            .await?;
769        }
770        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
771            archive_ids,
772            data_dir,
773            user_key,
774            use_new_object_prefix_strategy,
775        }) => {
776            cmd_impl::hummock::print_user_key_in_archive(
777                context,
778                archive_ids.into_iter().map(HummockVersionId::new),
779                data_dir,
780                user_key,
781                use_new_object_prefix_strategy,
782            )
783            .await?;
784        }
785        Commands::Hummock(HummockCommands::TieredCacheTracing {
786            enable,
787            record_hybrid_insert_threshold_ms,
788            record_hybrid_get_threshold_ms,
789            record_hybrid_obtain_threshold_ms,
790            record_hybrid_remove_threshold_ms,
791            record_hybrid_fetch_threshold_ms,
792        }) => {
793            cmd_impl::hummock::tiered_cache_tracing(
794                context,
795                enable,
796                record_hybrid_insert_threshold_ms,
797                record_hybrid_get_threshold_ms,
798                record_hybrid_obtain_threshold_ms,
799                record_hybrid_remove_threshold_ms,
800                record_hybrid_fetch_threshold_ms,
801            )
802            .await?
803        }
804        Commands::Hummock(HummockCommands::MergeCompactionGroup {
805            left_group_id,
806            right_group_id,
807        }) => {
808            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
809                .await?
810        }
811
812        Commands::Hummock(HummockCommands::MigrateLegacyObject {
813            url,
814            source_dir,
815            target_dir,
816            concurrency,
817        }) => {
818            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
819        }
820        Commands::Hummock(HummockCommands::ResizeCache {
821            meta_cache_capacity_mb,
822            data_cache_capacity_mb,
823        }) => {
824            const MIB: u64 = 1024 * 1024;
825            cmd_impl::hummock::resize_cache(
826                context,
827                meta_cache_capacity_mb.map(|v| v * MIB),
828                data_cache_capacity_mb.map(|v| v * MIB),
829            )
830            .await?
831        }
832        Commands::Table(TableCommands::Scan {
833            mv_name,
834            data_dir,
835            use_new_object_prefix_strategy,
836        }) => {
837            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
838                .await?
839        }
840        Commands::Table(TableCommands::ScanById {
841            table_id,
842            data_dir,
843            use_new_object_prefix_strategy,
844        }) => {
845            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
846                .await?
847        }
848        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
849        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
850        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
851        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
852        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
853        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
854            cmd_impl::meta::source_split_info(context, ignore_id).await?
855        }
856        Commands::Meta(MetaCommands::Reschedule {
857            from,
858            dry_run,
859            plan,
860            revision,
861            resolve_no_shuffle,
862        }) => {
863            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
864                .await?
865        }
866        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
867            cmd_impl::meta::backup_meta(context, remarks).await?
868        }
869        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
870            risingwave_meta::backup_restore::restore(opts).await?
871        }
872        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
873            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
874        }
875        Commands::Meta(MetaCommands::ListConnections) => {
876            cmd_impl::meta::list_connections(context).await?
877        }
878        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
879            cmd_impl::meta::list_serving_fragment_mappings(context).await?
880        }
881        Commands::Meta(MetaCommands::UnregisterWorkers {
882            workers,
883            yes,
884            ignore_not_found,
885            check_fragment_occupied,
886        }) => {
887            cmd_impl::meta::unregister_workers(
888                context,
889                workers,
890                yes,
891                ignore_not_found,
892                check_fragment_occupied,
893            )
894            .await?
895        }
896        Commands::Meta(MetaCommands::ValidateSource { props }) => {
897            cmd_impl::meta::validate_source(context, props).await?
898        }
899        Commands::AwaitTree(AwaitTreeCommands::Dump {
900            actor_traces_format,
901        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
902        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
903            cmd_impl::await_tree::bottleneck_detect(context, path).await?
904        }
905        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
906            rw_diagnose_tools::await_tree::transcribe(path)?
907        }
908        Commands::Profile(ProfileCommands::Cpu { sleep }) => {
909            cmd_impl::profile::cpu_profile(context, sleep).await?
910        }
911        Commands::Profile(ProfileCommands::Heap { dir }) => {
912            cmd_impl::profile::heap_profile(context, dir).await?
913        }
914        Commands::Scale(ScaleCommands::Cordon { workers }) => {
915            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
916                .await?
917        }
918        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
919            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
920                .await?
921        }
922        Commands::Throttle(ThrottleCommands::Source(args)) => {
923            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
924        }
925        Commands::Throttle(ThrottleCommands::Mv(args)) => {
926            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
927        }
928        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
929            table_id,
930            parallelism,
931        }) => {
932            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
933        }
934        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
935    }
936    Ok(())
937}