risingwave_ctl/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use risingwave_common::util::tokio_util::sync::CancellationToken;
22use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
23use risingwave_meta::backup_restore::RestoreOpts;
24use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
25use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
26use thiserror_ext::AsReport;
27
28use crate::cmd_impl::hummock::{
29    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
30};
31use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
32use crate::cmd_impl::throttle::apply_throttle;
33use crate::common::CtlContext;
34
35pub mod cmd_impl;
36pub mod common;
37
38/// risectl provides internal access to the RisingWave cluster. Generally, you will need
39/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
40/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
41/// instead of playground mode to use this tool. risectl will read environment variables
42/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
43#[derive(Parser)]
44#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
45#[clap(propagate_version = true)]
46#[clap(infer_subcommands = true)]
47pub struct CliOpts {
48    #[clap(subcommand)]
49    command: Commands,
50}
51
52#[derive(Subcommand)]
53#[clap(infer_subcommands = true)]
54enum Commands {
55    /// Commands for Compute
56    #[clap(subcommand)]
57    Compute(ComputeCommands),
58    /// Commands for Hummock
59    #[clap(subcommand)]
60    Hummock(HummockCommands),
61    /// Commands for Tables
62    #[clap(subcommand)]
63    Table(TableCommands),
64    /// Commands for Meta
65    #[clap(subcommand)]
66    Meta(MetaCommands),
67    /// Commands for Scaling
68    #[clap(subcommand)]
69    Scale(ScaleCommands),
70    /// Commands for Benchmarks
71    #[clap(subcommand)]
72    Bench(BenchCommands),
73    /// Commands for await-tree, such as dumping, analyzing and transcribing
74    #[clap(subcommand)]
75    #[clap(visible_alias("trace"))]
76    AwaitTree(AwaitTreeCommands),
77    // TODO(yuhao): profile other nodes
78    /// Commands for profilng the compute nodes
79    #[clap(subcommand)]
80    Profile(ProfileCommands),
81    #[clap(subcommand)]
82    Throttle(ThrottleCommands),
83    /// Commands for Self-testing
84    #[clap(subcommand, hide = true)]
85    Test(TestCommands),
86}
87
88#[derive(Subcommand)]
89enum ComputeCommands {
90    /// Show all the configuration parameters on compute node
91    ShowConfig { host: String },
92}
93
94#[allow(clippy::large_enum_variant)]
95#[derive(Subcommand)]
96enum HummockCommands {
97    /// list latest Hummock version on meta node
98    ListVersion {
99        #[clap(short, long = "verbose", default_value_t = false)]
100        verbose: bool,
101
102        #[clap(long = "verbose_key_range", default_value_t = false)]
103        verbose_key_range: bool,
104    },
105
106    /// list hummock version deltas in the meta store
107    ListVersionDeltas {
108        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
109        start_id: u64,
110
111        #[clap(short, long = "num-epochs", default_value_t = 100)]
112        num_epochs: u32,
113    },
114    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
115    DisableCommitEpoch,
116    /// list all Hummock key-value pairs
117    ListKv {
118        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
119        epoch: u64,
120
121        #[clap(short, long = "table-id")]
122        table_id: u32,
123
124        // data directory for hummock state store. None: use default
125        data_dir: Option<String>,
126
127        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
128        use_new_object_prefix_strategy: bool,
129    },
130    SstDump(SstDumpArgs),
131    /// trigger a targeted compaction through `compaction_group_id`
132    TriggerManualCompaction {
133        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
134        compaction_group_id: u64,
135
136        #[clap(short, long = "table-id", default_value_t = 0)]
137        table_id: u32,
138
139        #[clap(short, long = "level", default_value_t = 1)]
140        level: u32,
141
142        #[clap(short, long = "sst-ids", value_delimiter = ',')]
143        sst_ids: Vec<u64>,
144    },
145    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
146    /// `sst_retention_time_sec`, and with `prefix` in path.
147    TriggerFullGc {
148        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
149        sst_retention_time_sec: u64,
150        #[clap(short, long = "prefix", required = false)]
151        prefix: Option<String>,
152    },
153    /// List pinned versions of each worker.
154    ListPinnedVersions {},
155    /// List all compaction groups.
156    ListCompactionGroup,
157    /// Update compaction config for compaction groups.
158    UpdateCompactionConfig {
159        #[clap(long, value_delimiter = ',')]
160        compaction_group_ids: Vec<u64>,
161        #[clap(long)]
162        max_bytes_for_level_base: Option<u64>,
163        #[clap(long)]
164        max_bytes_for_level_multiplier: Option<u64>,
165        #[clap(long)]
166        max_compaction_bytes: Option<u64>,
167        #[clap(long)]
168        sub_level_max_compaction_bytes: Option<u64>,
169        #[clap(long)]
170        level0_tier_compact_file_number: Option<u64>,
171        #[clap(long)]
172        target_file_size_base: Option<u64>,
173        #[clap(long)]
174        compaction_filter_mask: Option<u32>,
175        #[clap(long)]
176        max_sub_compaction: Option<u32>,
177        #[clap(long)]
178        level0_stop_write_threshold_sub_level_number: Option<u64>,
179        #[clap(long)]
180        level0_sub_level_compact_level_count: Option<u32>,
181        #[clap(long)]
182        max_space_reclaim_bytes: Option<u64>,
183        #[clap(long)]
184        level0_max_compact_file_number: Option<u64>,
185        #[clap(long)]
186        level0_overlapping_sub_level_compact_level_count: Option<u32>,
187        #[clap(long)]
188        enable_emergency_picker: Option<bool>,
189        #[clap(long)]
190        tombstone_reclaim_ratio: Option<u32>,
191        #[clap(long)]
192        compression_level: Option<u32>,
193        #[clap(long)]
194        compression_algorithm: Option<String>,
195        #[clap(long)]
196        max_l0_compact_level: Option<u32>,
197        #[clap(long)]
198        sst_allowed_trivial_move_min_size: Option<u64>,
199        #[clap(long)]
200        disable_auto_group_scheduling: Option<bool>,
201        #[clap(long)]
202        max_overlapping_level_size: Option<u64>,
203        #[clap(long)]
204        sst_allowed_trivial_move_max_count: Option<u32>,
205        #[clap(long)]
206        emergency_level0_sst_file_count: Option<u32>,
207        #[clap(long)]
208        emergency_level0_sub_level_partition: Option<u32>,
209        #[clap(long)]
210        level0_stop_write_threshold_max_sst_count: Option<u32>,
211        #[clap(long)]
212        level0_stop_write_threshold_max_size: Option<u64>,
213        #[clap(long)]
214        enable_optimize_l0_interval_selection: Option<bool>,
215    },
216    /// Split given compaction group into two. Moves the given tables to the new group.
217    SplitCompactionGroup {
218        #[clap(long)]
219        compaction_group_id: u64,
220        #[clap(long, value_delimiter = ',')]
221        table_ids: Vec<u32>,
222        #[clap(long, default_value_t = 0)]
223        partition_vnode_count: u32,
224    },
225    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
226    PauseVersionCheckpoint,
227    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
228    ResumeVersionCheckpoint,
229    /// Replay version from the checkpoint one to the latest one.
230    ReplayVersion,
231    /// List compaction status
232    ListCompactionStatus {
233        #[clap(short, long = "verbose", default_value_t = false)]
234        verbose: bool,
235    },
236    GetCompactionScore {
237        #[clap(long)]
238        compaction_group_id: u64,
239    },
240    /// Validate the current `HummockVersion`.
241    ValidateVersion,
242    /// Rebuild table stats
243    RebuildTableStats,
244    CancelCompactTask {
245        #[clap(short, long)]
246        task_id: u64,
247    },
248    PrintUserKeyInArchive {
249        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
250        #[clap(long, value_delimiter = ',')]
251        archive_ids: Vec<u64>,
252        /// The data directory of Hummock storage, where `SSTable` objects can be found.
253        #[clap(long)]
254        data_dir: String,
255        /// KVs that are matched with the user key are printed.
256        #[clap(long)]
257        user_key: String,
258        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
259        use_new_object_prefix_strategy: bool,
260    },
261    PrintVersionDeltaInArchive {
262        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
263        #[clap(long, value_delimiter = ',')]
264        archive_ids: Vec<u64>,
265        /// The data directory of Hummock storage, where `SSTable` objects can be found.
266        #[clap(long)]
267        data_dir: String,
268        /// Version deltas that are related to the SST id are printed.
269        #[clap(long)]
270        sst_id: u64,
271        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
272        use_new_object_prefix_strategy: bool,
273    },
274    TieredCacheTracing {
275        #[clap(long)]
276        enable: bool,
277        #[clap(long)]
278        record_hybrid_insert_threshold_ms: Option<u32>,
279        #[clap(long)]
280        record_hybrid_get_threshold_ms: Option<u32>,
281        #[clap(long)]
282        record_hybrid_obtain_threshold_ms: Option<u32>,
283        #[clap(long)]
284        record_hybrid_remove_threshold_ms: Option<u32>,
285        #[clap(long)]
286        record_hybrid_fetch_threshold_ms: Option<u32>,
287    },
288    MergeCompactionGroup {
289        #[clap(long)]
290        left_group_id: u64,
291        #[clap(long)]
292        right_group_id: u64,
293    },
294    MigrateLegacyObject {
295        url: String,
296        source_dir: String,
297        target_dir: String,
298        #[clap(long, default_value = "100")]
299        concurrency: u32,
300    },
301    ResizeCache {
302        #[clap(long)]
303        meta_cache_capacity_mb: Option<u64>,
304        #[clap(long)]
305        data_cache_capacity_mb: Option<u64>,
306    },
307}
308
309#[derive(Subcommand)]
310enum TableCommands {
311    /// scan a state table with MV name
312    Scan {
313        /// name of the materialized view to operate on
314        mv_name: String,
315        // data directory for hummock state store. None: use default
316        data_dir: Option<String>,
317
318        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
319        use_new_object_prefix_strategy: bool,
320    },
321    /// scan a state table using Id
322    ScanById {
323        /// id of the state table to operate on
324        table_id: u32,
325        // data directory for hummock state store. None: use default
326        data_dir: Option<String>,
327        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
328        use_new_object_prefix_strategy: bool,
329    },
330    /// list all state tables
331    List,
332}
333
334#[derive(Subcommand, Debug)]
335enum ScaleCommands {
336    /// Mark a compute node as unschedulable
337    #[clap(verbatim_doc_comment)]
338    Cordon {
339        /// Workers that need to be cordoned, both id and host are supported.
340        #[clap(
341            long,
342            required = true,
343            value_delimiter = ',',
344            value_name = "id or host,..."
345        )]
346        workers: Vec<String>,
347    },
348    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
349    Uncordon {
350        /// Workers that need to be uncordoned, both id and host are supported.
351        #[clap(
352            long,
353            required = true,
354            value_delimiter = ',',
355            value_name = "id or host,..."
356        )]
357        workers: Vec<String>,
358    },
359}
360
361#[derive(Subcommand)]
362#[allow(clippy::large_enum_variant)]
363enum MetaCommands {
364    /// pause the stream graph
365    Pause,
366    /// resume the stream graph
367    Resume,
368    /// get cluster info
369    ClusterInfo,
370    /// get source split info
371    SourceSplitInfo {
372        #[clap(long)]
373        ignore_id: bool,
374    },
375    /// Reschedule the actors in the stream graph
376    ///
377    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
378    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
379    /// `added` when both are provided.
380    ///
381    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
382    /// ```text
383    /// {
384    ///     100: WorkerReschedule {
385    ///         increased_actor_count: { 1: 1 },
386    ///         decreased_actor_count: { 4: 1 },
387    ///     }
388    /// }
389    /// ```
390    /// Use ; to separate multiple fragment
391    #[clap(verbatim_doc_comment)]
392    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
393    Reschedule {
394        /// Plan of reschedule, needs to be used with `revision`
395        #[clap(long, requires = "revision")]
396        plan: Option<String>,
397        /// Revision of the plan
398        #[clap(long)]
399        revision: Option<u64>,
400        /// Reschedule from a specific file
401        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
402        from: Option<String>,
403        /// Show the plan only, no actual operation
404        #[clap(long, default_value = "false")]
405        dry_run: bool,
406        /// Resolve `NO_SHUFFLE` upstream
407        #[clap(long, default_value = "false")]
408        resolve_no_shuffle: bool,
409    },
410    /// backup meta by taking a meta snapshot
411    BackupMeta {
412        #[clap(long)]
413        remarks: Option<String>,
414    },
415    /// restore meta by recovering from a meta snapshot
416    RestoreMeta {
417        #[command(flatten)]
418        opts: RestoreOpts,
419    },
420    /// delete meta snapshots
421    DeleteMetaSnapshots {
422        #[clap(long, value_delimiter = ',')]
423        snapshot_ids: Vec<u64>,
424    },
425
426    /// List all existing connections in the catalog
427    ListConnections,
428
429    /// List fragment mapping for serving
430    ListServingFragmentMapping,
431
432    /// Unregister workers from the cluster
433    UnregisterWorkers {
434        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
435        #[clap(
436            long,
437            required = true,
438            value_delimiter = ',',
439            value_name = "worker_id or worker_host:worker_port, ..."
440        )]
441        workers: Vec<String>,
442
443        /// Automatic yes to prompts
444        #[clap(short = 'y', long, default_value_t = false)]
445        yes: bool,
446
447        /// The worker not found will be ignored
448        #[clap(long, default_value_t = false)]
449        ignore_not_found: bool,
450
451        /// Checking whether the fragment is occupied by workers
452        #[clap(long, default_value_t = false)]
453        check_fragment_occupied: bool,
454    },
455
456    /// Validate source interface for the cloud team
457    ValidateSource {
458        /// With properties in json format
459        /// If privatelink is used, specify `connection.id` instead of `connection.name`
460        #[clap(long)]
461        props: String,
462    },
463
464    SetCdcTableBackfillParallelism {
465        #[clap(long, required = true)]
466        table_id: u32,
467        #[clap(long, required = true)]
468        parallelism: u32,
469    },
470}
471
472#[derive(Subcommand, Clone, Debug)]
473pub enum AwaitTreeCommands {
474    /// Dump Await Tree
475    Dump {
476        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
477        #[clap(short, long = "actor-traces-format")]
478        actor_traces_format: Option<String>,
479    },
480    /// Analyze Await Tree
481    Analyze {
482        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
483        /// The actor traces format can be either `json` or `text`. The analyze command will
484        /// automatically detect the format.
485        #[clap(long = "path")]
486        path: Option<String>,
487    },
488    /// Transcribe Await Tree From JSON to Text format
489    Transcribe {
490        /// The path to the await tree file to be transcribed
491        #[clap(long = "path")]
492        path: String,
493    },
494}
495
496#[derive(Subcommand, Clone, Debug)]
497enum TestCommands {
498    /// Test if JVM and Java libraries are working
499    Jvm,
500}
501
502#[derive(Subcommand, Clone, Debug)]
503enum ThrottleCommands {
504    Source(ThrottleCommandArgs),
505    Mv(ThrottleCommandArgs),
506}
507
508#[derive(Clone, Debug, Args)]
509pub struct ThrottleCommandArgs {
510    id: u32,
511    rate: Option<u32>,
512}
513
514#[derive(Subcommand, Clone, Debug)]
515pub enum ProfileCommands {
516    /// CPU profile
517    Cpu {
518        /// The time to active profiling for (in seconds)
519        #[clap(short, long = "sleep")]
520        sleep: u64,
521    },
522    /// Heap profile
523    Heap {
524        /// The output directory of the dumped file
525        #[clap(long = "dir")]
526        dir: Option<String>,
527    },
528}
529
530/// Start `risectl` with the given options.
531/// Cancel the operation when the given `shutdown` token triggers.
532/// Log and abort the process if any error occurs.
533///
534/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
535/// in an embedded manner.
536pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
537    let context = CtlContext::default();
538
539    tokio::select! {
540        _ = shutdown.cancelled() => {
541            // Shutdown requested, clean up the context and return.
542            context.try_close().await;
543        }
544
545        result = start_fallible(opts, &context) => {
546            if let Err(e) = result {
547                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
548                std::process::exit(1);
549            }
550        }
551    }
552}
553
554/// Start `risectl` with the given options.
555/// Return `Err` if any error occurs.
556pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
557    let result = start_impl(opts, context).await;
558    context.try_close().await;
559    result
560}
561
562#[expect(
563    clippy::large_stack_frames,
564    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
565              post-layout generator stores only one arm at a time (~13–16 KiB)."
566)]
567async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
568    match opts.command {
569        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
570            cmd_impl::compute::show_config(&host).await?
571        }
572        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
573            cmd_impl::hummock::disable_commit_epoch(context).await?
574        }
575        Commands::Hummock(HummockCommands::ListVersion {
576            verbose,
577            verbose_key_range,
578        }) => {
579            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
580        }
581        Commands::Hummock(HummockCommands::ListVersionDeltas {
582            start_id,
583            num_epochs,
584        }) => {
585            cmd_impl::hummock::list_version_deltas(
586                context,
587                HummockVersionId::new(start_id),
588                num_epochs,
589            )
590            .await?;
591        }
592        Commands::Hummock(HummockCommands::ListKv {
593            epoch,
594            table_id,
595            data_dir,
596            use_new_object_prefix_strategy,
597        }) => {
598            cmd_impl::hummock::list_kv(
599                context,
600                epoch,
601                table_id,
602                data_dir,
603                use_new_object_prefix_strategy,
604            )
605            .await?;
606        }
607        Commands::Hummock(HummockCommands::SstDump(args)) => {
608            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
609        }
610        Commands::Hummock(HummockCommands::TriggerManualCompaction {
611            compaction_group_id,
612            table_id,
613            level,
614            sst_ids,
615        }) => {
616            cmd_impl::hummock::trigger_manual_compaction(
617                context,
618                compaction_group_id,
619                table_id,
620                level,
621                sst_ids,
622            )
623            .await?
624        }
625        Commands::Hummock(HummockCommands::TriggerFullGc {
626            sst_retention_time_sec,
627            prefix,
628        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
629        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
630            list_pinned_versions(context).await?
631        }
632        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
633            cmd_impl::hummock::list_compaction_group(context).await?
634        }
635        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
636            compaction_group_ids,
637            max_bytes_for_level_base,
638            max_bytes_for_level_multiplier,
639            max_compaction_bytes,
640            sub_level_max_compaction_bytes,
641            level0_tier_compact_file_number,
642            target_file_size_base,
643            compaction_filter_mask,
644            max_sub_compaction,
645            level0_stop_write_threshold_sub_level_number,
646            level0_sub_level_compact_level_count,
647            max_space_reclaim_bytes,
648            level0_max_compact_file_number,
649            level0_overlapping_sub_level_compact_level_count,
650            enable_emergency_picker,
651            tombstone_reclaim_ratio,
652            compression_level,
653            compression_algorithm,
654            max_l0_compact_level,
655            sst_allowed_trivial_move_min_size,
656            disable_auto_group_scheduling,
657            max_overlapping_level_size,
658            sst_allowed_trivial_move_max_count,
659            emergency_level0_sst_file_count,
660            emergency_level0_sub_level_partition,
661            level0_stop_write_threshold_max_sst_count,
662            level0_stop_write_threshold_max_size,
663            enable_optimize_l0_interval_selection,
664        }) => {
665            cmd_impl::hummock::update_compaction_config(
666                context,
667                compaction_group_ids,
668                build_compaction_config_vec(
669                    max_bytes_for_level_base,
670                    max_bytes_for_level_multiplier,
671                    max_compaction_bytes,
672                    sub_level_max_compaction_bytes,
673                    level0_tier_compact_file_number,
674                    target_file_size_base,
675                    compaction_filter_mask,
676                    max_sub_compaction,
677                    level0_stop_write_threshold_sub_level_number,
678                    level0_sub_level_compact_level_count,
679                    max_space_reclaim_bytes,
680                    level0_max_compact_file_number,
681                    level0_overlapping_sub_level_compact_level_count,
682                    enable_emergency_picker,
683                    tombstone_reclaim_ratio,
684                    if let Some(level) = compression_level {
685                        assert!(compression_algorithm.is_some());
686                        Some(CompressionAlgorithm {
687                            level,
688                            compression_algorithm: compression_algorithm.unwrap(),
689                        })
690                    } else {
691                        None
692                    },
693                    max_l0_compact_level,
694                    sst_allowed_trivial_move_min_size,
695                    disable_auto_group_scheduling,
696                    max_overlapping_level_size,
697                    sst_allowed_trivial_move_max_count,
698                    emergency_level0_sst_file_count,
699                    emergency_level0_sub_level_partition,
700                    level0_stop_write_threshold_max_sst_count,
701                    level0_stop_write_threshold_max_size,
702                    enable_optimize_l0_interval_selection,
703                ),
704            )
705            .await?
706        }
707        Commands::Hummock(HummockCommands::SplitCompactionGroup {
708            compaction_group_id,
709            table_ids,
710            partition_vnode_count,
711        }) => {
712            cmd_impl::hummock::split_compaction_group(
713                context,
714                compaction_group_id,
715                &table_ids,
716                partition_vnode_count,
717            )
718            .await?;
719        }
720        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
721            cmd_impl::hummock::pause_version_checkpoint(context).await?;
722        }
723        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
724            cmd_impl::hummock::resume_version_checkpoint(context).await?;
725        }
726        Commands::Hummock(HummockCommands::ReplayVersion) => {
727            cmd_impl::hummock::replay_version(context).await?;
728        }
729        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
730            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
731        }
732        Commands::Hummock(HummockCommands::GetCompactionScore {
733            compaction_group_id,
734        }) => {
735            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
736        }
737        Commands::Hummock(HummockCommands::ValidateVersion) => {
738            cmd_impl::hummock::validate_version(context).await?;
739        }
740        Commands::Hummock(HummockCommands::RebuildTableStats) => {
741            cmd_impl::hummock::rebuild_table_stats(context).await?;
742        }
743        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
744            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
745        }
746        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
747            archive_ids,
748            data_dir,
749            sst_id,
750            use_new_object_prefix_strategy,
751        }) => {
752            cmd_impl::hummock::print_version_delta_in_archive(
753                context,
754                archive_ids.into_iter().map(HummockVersionId::new),
755                data_dir,
756                sst_id,
757                use_new_object_prefix_strategy,
758            )
759            .await?;
760        }
761        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
762            archive_ids,
763            data_dir,
764            user_key,
765            use_new_object_prefix_strategy,
766        }) => {
767            cmd_impl::hummock::print_user_key_in_archive(
768                context,
769                archive_ids.into_iter().map(HummockVersionId::new),
770                data_dir,
771                user_key,
772                use_new_object_prefix_strategy,
773            )
774            .await?;
775        }
776        Commands::Hummock(HummockCommands::TieredCacheTracing {
777            enable,
778            record_hybrid_insert_threshold_ms,
779            record_hybrid_get_threshold_ms,
780            record_hybrid_obtain_threshold_ms,
781            record_hybrid_remove_threshold_ms,
782            record_hybrid_fetch_threshold_ms,
783        }) => {
784            cmd_impl::hummock::tiered_cache_tracing(
785                context,
786                enable,
787                record_hybrid_insert_threshold_ms,
788                record_hybrid_get_threshold_ms,
789                record_hybrid_obtain_threshold_ms,
790                record_hybrid_remove_threshold_ms,
791                record_hybrid_fetch_threshold_ms,
792            )
793            .await?
794        }
795        Commands::Hummock(HummockCommands::MergeCompactionGroup {
796            left_group_id,
797            right_group_id,
798        }) => {
799            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
800                .await?
801        }
802
803        Commands::Hummock(HummockCommands::MigrateLegacyObject {
804            url,
805            source_dir,
806            target_dir,
807            concurrency,
808        }) => {
809            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
810        }
811        Commands::Hummock(HummockCommands::ResizeCache {
812            meta_cache_capacity_mb,
813            data_cache_capacity_mb,
814        }) => {
815            const MIB: u64 = 1024 * 1024;
816            cmd_impl::hummock::resize_cache(
817                context,
818                meta_cache_capacity_mb.map(|v| v * MIB),
819                data_cache_capacity_mb.map(|v| v * MIB),
820            )
821            .await?
822        }
823        Commands::Table(TableCommands::Scan {
824            mv_name,
825            data_dir,
826            use_new_object_prefix_strategy,
827        }) => {
828            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
829                .await?
830        }
831        Commands::Table(TableCommands::ScanById {
832            table_id,
833            data_dir,
834            use_new_object_prefix_strategy,
835        }) => {
836            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
837                .await?
838        }
839        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
840        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
841        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
842        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
843        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
844        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
845            cmd_impl::meta::source_split_info(context, ignore_id).await?
846        }
847        Commands::Meta(MetaCommands::Reschedule {
848            from,
849            dry_run,
850            plan,
851            revision,
852            resolve_no_shuffle,
853        }) => {
854            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
855                .await?
856        }
857        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
858            cmd_impl::meta::backup_meta(context, remarks).await?
859        }
860        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
861            risingwave_meta::backup_restore::restore(opts).await?
862        }
863        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
864            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
865        }
866        Commands::Meta(MetaCommands::ListConnections) => {
867            cmd_impl::meta::list_connections(context).await?
868        }
869        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
870            cmd_impl::meta::list_serving_fragment_mappings(context).await?
871        }
872        Commands::Meta(MetaCommands::UnregisterWorkers {
873            workers,
874            yes,
875            ignore_not_found,
876            check_fragment_occupied,
877        }) => {
878            cmd_impl::meta::unregister_workers(
879                context,
880                workers,
881                yes,
882                ignore_not_found,
883                check_fragment_occupied,
884            )
885            .await?
886        }
887        Commands::Meta(MetaCommands::ValidateSource { props }) => {
888            cmd_impl::meta::validate_source(context, props).await?
889        }
890        Commands::AwaitTree(AwaitTreeCommands::Dump {
891            actor_traces_format,
892        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
893        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
894            cmd_impl::await_tree::bottleneck_detect(context, path).await?
895        }
896        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
897            rw_diagnose_tools::await_tree::transcribe(path)?
898        }
899        Commands::Profile(ProfileCommands::Cpu { sleep }) => {
900            cmd_impl::profile::cpu_profile(context, sleep).await?
901        }
902        Commands::Profile(ProfileCommands::Heap { dir }) => {
903            cmd_impl::profile::heap_profile(context, dir).await?
904        }
905        Commands::Scale(ScaleCommands::Cordon { workers }) => {
906            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
907                .await?
908        }
909        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
910            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
911                .await?
912        }
913        Commands::Throttle(ThrottleCommands::Source(args)) => {
914            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
915        }
916        Commands::Throttle(ThrottleCommands::Mv(args)) => {
917            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
918        }
919        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
920            table_id,
921            parallelism,
922        }) => {
923            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
924        }
925        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
926    }
927    Ok(())
928}