risingwave_ctl/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{ArgGroup, Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::profile::ProfileWorkerType;
33use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
34use crate::cmd_impl::throttle::apply_throttle;
35use crate::common::CtlContext;
36
37pub mod cmd_impl;
38pub mod common;
39
40/// risectl provides internal access to the RisingWave cluster. Generally, you will need
41/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
42/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
43/// instead of playground mode to use this tool. risectl will read environment variables
44/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
45#[derive(Parser)]
46#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
47#[clap(propagate_version = true)]
48#[clap(infer_subcommands = true)]
49pub struct CliOpts {
50    #[clap(subcommand)]
51    command: Commands,
52}
53
54#[derive(Subcommand)]
55#[clap(infer_subcommands = true)]
56enum Commands {
57    /// Commands for Compute
58    #[clap(subcommand)]
59    Compute(ComputeCommands),
60    /// Commands for Hummock
61    #[clap(subcommand)]
62    Hummock(HummockCommands),
63    /// Commands for Tables
64    #[clap(subcommand)]
65    Table(TableCommands),
66    /// Commands for Meta
67    #[clap(subcommand)]
68    Meta(MetaCommands),
69    /// Commands for Scaling
70    #[clap(subcommand)]
71    Scale(ScaleCommands),
72    /// Commands for Benchmarks
73    #[clap(subcommand)]
74    Bench(BenchCommands),
75    /// Commands for await-tree, such as dumping, analyzing and transcribing
76    #[clap(subcommand)]
77    #[clap(visible_alias("trace"))]
78    AwaitTree(AwaitTreeCommands),
79    /// Commands for profiling nodes
80    #[clap(subcommand)]
81    Profile(ProfileCommands),
82    #[clap(subcommand)]
83    Throttle(ThrottleCommands),
84    /// Commands for Self-testing
85    #[clap(subcommand, hide = true)]
86    Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91    /// Show all the configuration parameters on compute node
92    ShowConfig { host: String },
93}
94
95#[allow(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98    /// list latest Hummock version on meta node
99    ListVersion {
100        #[clap(short, long = "verbose", default_value_t = false)]
101        verbose: bool,
102
103        #[clap(long = "verbose_key_range", default_value_t = false)]
104        verbose_key_range: bool,
105    },
106
107    /// list hummock version deltas in the meta store
108    ListVersionDeltas {
109        #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
110        start_id: u64,
111
112        #[clap(short, long = "num-epochs", default_value_t = 100)]
113        num_epochs: u32,
114    },
115    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
116    DisableCommitEpoch,
117    /// list all Hummock key-value pairs
118    ListKv {
119        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120        epoch: u64,
121
122        #[clap(short, long = "table-id")]
123        table_id: u32,
124
125        // data directory for hummock state store. None: use default
126        data_dir: Option<String>,
127
128        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129        use_new_object_prefix_strategy: bool,
130    },
131    SstDump(SstDumpArgs),
132    /// trigger a targeted compaction through `compaction_group_id`
133    TriggerManualCompaction {
134        #[clap(short, long = "compaction-group-id", default_value_t = 2)]
135        compaction_group_id: u64,
136
137        #[clap(short, long = "table-id", default_value_t = 0)]
138        table_id: u32,
139
140        #[clap(short, long = "level", default_value_t = 1)]
141        level: u32,
142
143        #[clap(short, long = "sst-ids", value_delimiter = ',')]
144        sst_ids: Vec<u64>,
145    },
146    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
147    /// `sst_retention_time_sec`, and with `prefix` in path.
148    TriggerFullGc {
149        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
150        sst_retention_time_sec: u64,
151        #[clap(short, long = "prefix", required = false)]
152        prefix: Option<String>,
153    },
154    /// List pinned versions of each worker.
155    ListPinnedVersions {},
156    /// List all compaction groups.
157    ListCompactionGroup,
158    /// Update compaction config for compaction groups.
159    UpdateCompactionConfig {
160        #[clap(long, value_delimiter = ',')]
161        compaction_group_ids: Vec<u64>,
162        #[clap(long)]
163        max_bytes_for_level_base: Option<u64>,
164        #[clap(long)]
165        max_bytes_for_level_multiplier: Option<u64>,
166        #[clap(long)]
167        max_compaction_bytes: Option<u64>,
168        #[clap(long)]
169        sub_level_max_compaction_bytes: Option<u64>,
170        #[clap(long)]
171        level0_tier_compact_file_number: Option<u64>,
172        #[clap(long)]
173        target_file_size_base: Option<u64>,
174        #[clap(long)]
175        compaction_filter_mask: Option<u32>,
176        #[clap(long)]
177        max_sub_compaction: Option<u32>,
178        #[clap(long)]
179        level0_stop_write_threshold_sub_level_number: Option<u64>,
180        #[clap(long)]
181        level0_sub_level_compact_level_count: Option<u32>,
182        #[clap(long)]
183        max_space_reclaim_bytes: Option<u64>,
184        #[clap(long)]
185        level0_max_compact_file_number: Option<u64>,
186        #[clap(long)]
187        level0_overlapping_sub_level_compact_level_count: Option<u32>,
188        #[clap(long)]
189        enable_emergency_picker: Option<bool>,
190        #[clap(long)]
191        tombstone_reclaim_ratio: Option<u32>,
192        #[clap(long)]
193        compression_level: Option<u32>,
194        #[clap(long)]
195        compression_algorithm: Option<String>,
196        #[clap(long)]
197        max_l0_compact_level: Option<u32>,
198        #[clap(long)]
199        sst_allowed_trivial_move_min_size: Option<u64>,
200        #[clap(long)]
201        disable_auto_group_scheduling: Option<bool>,
202        #[clap(long)]
203        max_overlapping_level_size: Option<u64>,
204        #[clap(long)]
205        sst_allowed_trivial_move_max_count: Option<u32>,
206        #[clap(long)]
207        emergency_level0_sst_file_count: Option<u32>,
208        #[clap(long)]
209        emergency_level0_sub_level_partition: Option<u32>,
210        #[clap(long)]
211        level0_stop_write_threshold_max_sst_count: Option<u32>,
212        #[clap(long)]
213        level0_stop_write_threshold_max_size: Option<u64>,
214        #[clap(long)]
215        enable_optimize_l0_interval_selection: Option<bool>,
216        #[clap(long)]
217        vnode_aligned_level_size_threshold: Option<u64>,
218        #[clap(long)]
219        max_kv_count_for_xor16: Option<u64>,
220    },
221    /// Split given compaction group into two. Moves the given tables to the new group.
222    SplitCompactionGroup {
223        #[clap(long)]
224        compaction_group_id: u64,
225        #[clap(long, value_delimiter = ',')]
226        table_ids: Vec<u32>,
227        #[clap(long, default_value_t = 0)]
228        partition_vnode_count: u32,
229    },
230    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
231    PauseVersionCheckpoint,
232    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
233    ResumeVersionCheckpoint,
234    /// Replay version from the checkpoint one to the latest one.
235    ReplayVersion,
236    /// List compaction status
237    ListCompactionStatus {
238        #[clap(short, long = "verbose", default_value_t = false)]
239        verbose: bool,
240    },
241    GetCompactionScore {
242        #[clap(long)]
243        compaction_group_id: u64,
244    },
245    /// Validate the current `HummockVersion`.
246    ValidateVersion,
247    /// Rebuild table stats
248    RebuildTableStats,
249    CancelCompactTask {
250        #[clap(short, long)]
251        task_id: u64,
252    },
253    PrintUserKeyInArchive {
254        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
255        #[clap(long, value_delimiter = ',')]
256        archive_ids: Vec<u64>,
257        /// The data directory of Hummock storage, where `SSTable` objects can be found.
258        #[clap(long)]
259        data_dir: String,
260        /// KVs that are matched with the user key are printed.
261        #[clap(long)]
262        user_key: String,
263        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
264        use_new_object_prefix_strategy: bool,
265    },
266    PrintVersionDeltaInArchive {
267        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
268        #[clap(long, value_delimiter = ',')]
269        archive_ids: Vec<u64>,
270        /// The data directory of Hummock storage, where `SSTable` objects can be found.
271        #[clap(long)]
272        data_dir: String,
273        /// Version deltas that are related to the SST id are printed.
274        #[clap(long)]
275        sst_id: u64,
276        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
277        use_new_object_prefix_strategy: bool,
278    },
279    TieredCacheTracing {
280        #[clap(long)]
281        enable: bool,
282        #[clap(long)]
283        record_hybrid_insert_threshold_ms: Option<u32>,
284        #[clap(long)]
285        record_hybrid_get_threshold_ms: Option<u32>,
286        #[clap(long)]
287        record_hybrid_obtain_threshold_ms: Option<u32>,
288        #[clap(long)]
289        record_hybrid_remove_threshold_ms: Option<u32>,
290        #[clap(long)]
291        record_hybrid_fetch_threshold_ms: Option<u32>,
292    },
293    MergeCompactionGroup {
294        #[clap(long)]
295        left_group_id: u64,
296        #[clap(long)]
297        right_group_id: u64,
298    },
299    MigrateLegacyObject {
300        url: String,
301        source_dir: String,
302        target_dir: String,
303        #[clap(long, default_value = "100")]
304        concurrency: u32,
305    },
306    ResizeCache {
307        #[clap(long)]
308        meta_cache_capacity_mb: Option<u64>,
309        #[clap(long)]
310        data_cache_capacity_mb: Option<u64>,
311    },
312}
313
314#[derive(Subcommand)]
315enum TableCommands {
316    /// scan a state table with MV name
317    Scan {
318        /// name of the materialized view to operate on
319        mv_name: String,
320        // data directory for hummock state store. None: use default
321        data_dir: Option<String>,
322
323        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
324        use_new_object_prefix_strategy: bool,
325    },
326    /// scan a state table using Id
327    ScanById {
328        /// id of the state table to operate on
329        table_id: u32,
330        // data directory for hummock state store. None: use default
331        data_dir: Option<String>,
332        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
333        use_new_object_prefix_strategy: bool,
334    },
335    /// list all state tables
336    List,
337}
338
339#[derive(Subcommand, Debug)]
340enum ScaleCommands {
341    /// Mark a compute node as unschedulable
342    #[clap(verbatim_doc_comment)]
343    Cordon {
344        /// Workers that need to be cordoned, both id and host are supported.
345        #[clap(
346            long,
347            required = true,
348            value_delimiter = ',',
349            value_name = "id or host,..."
350        )]
351        workers: Vec<String>,
352    },
353    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
354    Uncordon {
355        /// Workers that need to be uncordoned, both id and host are supported.
356        #[clap(
357            long,
358            required = true,
359            value_delimiter = ',',
360            value_name = "id or host,..."
361        )]
362        workers: Vec<String>,
363    },
364}
365
366#[derive(Subcommand)]
367#[allow(clippy::large_enum_variant)]
368enum MetaCommands {
369    /// pause the stream graph
370    Pause,
371    /// resume the stream graph
372    Resume,
373    /// force resume backfill for troubleshooting
374    #[clap(
375        group(
376            ArgGroup::new("resume_backfill_target")
377                .required(true)
378                .args(&["job_id", "fragment_id"])
379        )
380    )]
381    ResumeBackfill {
382        #[clap(long)]
383        job_id: Option<u32>,
384        #[clap(long)]
385        fragment_id: Option<u32>,
386    },
387    /// get cluster info
388    ClusterInfo,
389    /// get source split info
390    SourceSplitInfo {
391        #[clap(long)]
392        ignore_id: bool,
393    },
394    /// Reschedule the actors in the stream graph
395    ///
396    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
397    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
398    /// `added` when both are provided.
399    ///
400    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
401    /// ```text
402    /// {
403    ///     100: WorkerReschedule {
404    ///         increased_actor_count: { 1: 1 },
405    ///         decreased_actor_count: { 4: 1 },
406    ///     }
407    /// }
408    /// ```
409    /// Use ; to separate multiple fragment
410    #[clap(verbatim_doc_comment)]
411    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
412    Reschedule {
413        /// Plan of reschedule, needs to be used with `revision`
414        #[clap(long, requires = "revision")]
415        plan: Option<String>,
416        /// Revision of the plan
417        #[clap(long)]
418        revision: Option<u64>,
419        /// Reschedule from a specific file
420        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
421        from: Option<String>,
422        /// Show the plan only, no actual operation
423        #[clap(long, default_value = "false")]
424        dry_run: bool,
425        /// Resolve `NO_SHUFFLE` upstream
426        #[clap(long, default_value = "false")]
427        resolve_no_shuffle: bool,
428    },
429    /// backup meta by taking a meta snapshot
430    BackupMeta {
431        #[clap(long)]
432        remarks: Option<String>,
433    },
434    /// restore meta by recovering from a meta snapshot
435    RestoreMeta {
436        #[command(flatten)]
437        opts: RestoreOpts,
438    },
439    /// delete meta snapshots
440    DeleteMetaSnapshots {
441        #[clap(long, value_delimiter = ',')]
442        snapshot_ids: Vec<u64>,
443    },
444
445    /// List all existing connections in the catalog
446    ListConnections,
447
448    /// List fragment mapping for serving
449    ListServingFragmentMapping,
450
451    /// Unregister workers from the cluster
452    UnregisterWorkers {
453        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
454        #[clap(
455            long,
456            required = true,
457            value_delimiter = ',',
458            value_name = "worker_id or worker_host:worker_port, ..."
459        )]
460        workers: Vec<String>,
461
462        /// Automatic yes to prompts
463        #[clap(short = 'y', long, default_value_t = false)]
464        yes: bool,
465
466        /// The worker not found will be ignored
467        #[clap(long, default_value_t = false)]
468        ignore_not_found: bool,
469
470        /// Checking whether the fragment is occupied by workers
471        #[clap(long, default_value_t = false)]
472        check_fragment_occupied: bool,
473    },
474
475    /// Validate source interface for the cloud team
476    ValidateSource {
477        /// With properties in json format
478        /// If privatelink is used, specify `connection.id` instead of `connection.name`
479        #[clap(long)]
480        props: String,
481    },
482
483    SetCdcTableBackfillParallelism {
484        #[clap(long, required = true)]
485        table_id: u32,
486        #[clap(long, required = true)]
487        parallelism: u32,
488    },
489}
490
491#[derive(Subcommand, Clone, Debug)]
492pub enum AwaitTreeCommands {
493    /// Dump Await Tree
494    Dump {
495        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
496        #[clap(short, long = "actor-traces-format")]
497        actor_traces_format: Option<String>,
498    },
499    /// Analyze Await Tree
500    Analyze {
501        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
502        /// The actor traces format can be either `json` or `text`. The analyze command will
503        /// automatically detect the format.
504        #[clap(long = "path")]
505        path: Option<String>,
506    },
507    /// Transcribe Await Tree From JSON to Text format
508    Transcribe {
509        /// The path to the await tree file to be transcribed
510        #[clap(long = "path")]
511        path: String,
512    },
513}
514
515#[derive(Subcommand, Clone, Debug)]
516enum TestCommands {
517    /// Test if JVM and Java libraries are working
518    Jvm,
519}
520
521#[derive(Subcommand, Clone, Debug)]
522enum ThrottleCommands {
523    Source(ThrottleCommandArgs),
524    Mv(ThrottleCommandArgs),
525    Sink(ThrottleCommandArgs),
526}
527
528#[derive(Clone, Debug, clap::ValueEnum)]
529pub enum ThrottleTypeArg {
530    Dml,
531    Backfill,
532    Source,
533    Sink,
534}
535
536#[derive(Clone, Debug, Args)]
537pub struct ThrottleCommandArgs {
538    /// The ID of the object to throttle
539    #[clap(long, required = true)]
540    id: u32,
541    /// The rate limit to apply
542    #[clap(long)]
543    rate: Option<u32>,
544    /// The type of throttle to apply
545    #[clap(long, value_enum, required = true)]
546    throttle_type: ThrottleTypeArg,
547}
548
549#[derive(Subcommand, Clone, Debug)]
550pub enum ProfileCommands {
551    /// CPU profile
552    Cpu {
553        /// The time to active profiling for (in seconds)
554        #[clap(short, long = "sleep")]
555        sleep: u64,
556        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
557        #[clap(long = "worker-type", value_name = "TYPE")]
558        worker_types: Vec<ProfileWorkerType>,
559    },
560    /// Heap profile
561    Heap {
562        /// The output directory of the dumped file
563        #[clap(long = "dir")]
564        dir: Option<String>,
565        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
566        #[clap(long = "worker-type", value_name = "TYPE")]
567        worker_types: Vec<ProfileWorkerType>,
568    },
569}
570
571/// Start `risectl` with the given options.
572/// Cancel the operation when the given `shutdown` token triggers.
573/// Log and abort the process if any error occurs.
574///
575/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
576/// in an embedded manner.
577pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
578    let context = CtlContext::default();
579
580    tokio::select! {
581        _ = shutdown.cancelled() => {
582            // Shutdown requested, clean up the context and return.
583            context.try_close().await;
584        }
585
586        result = start_fallible(opts, &context) => {
587            if let Err(e) = result {
588                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
589                std::process::exit(1);
590            }
591        }
592    }
593}
594
595/// Start `risectl` with the given options.
596/// Return `Err` if any error occurs.
597pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
598    let result = start_impl(opts, context).await;
599    context.try_close().await;
600    result
601}
602
603#[expect(
604    clippy::large_stack_frames,
605    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
606              post-layout generator stores only one arm at a time (~13–16 KiB)."
607)]
608async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
609    match opts.command {
610        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
611            cmd_impl::compute::show_config(&host).await?
612        }
613        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
614            cmd_impl::hummock::disable_commit_epoch(context).await?
615        }
616        Commands::Hummock(HummockCommands::ListVersion {
617            verbose,
618            verbose_key_range,
619        }) => {
620            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
621        }
622        Commands::Hummock(HummockCommands::ListVersionDeltas {
623            start_id,
624            num_epochs,
625        }) => {
626            cmd_impl::hummock::list_version_deltas(
627                context,
628                HummockVersionId::new(start_id),
629                num_epochs,
630            )
631            .await?;
632        }
633        Commands::Hummock(HummockCommands::ListKv {
634            epoch,
635            table_id,
636            data_dir,
637            use_new_object_prefix_strategy,
638        }) => {
639            cmd_impl::hummock::list_kv(
640                context,
641                epoch,
642                table_id,
643                data_dir,
644                use_new_object_prefix_strategy,
645            )
646            .await?;
647        }
648        Commands::Hummock(HummockCommands::SstDump(args)) => {
649            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
650        }
651        Commands::Hummock(HummockCommands::TriggerManualCompaction {
652            compaction_group_id,
653            table_id,
654            level,
655            sst_ids,
656        }) => {
657            cmd_impl::hummock::trigger_manual_compaction(
658                context,
659                compaction_group_id,
660                table_id.into(),
661                level,
662                sst_ids,
663            )
664            .await?
665        }
666        Commands::Hummock(HummockCommands::TriggerFullGc {
667            sst_retention_time_sec,
668            prefix,
669        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
670        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
671            list_pinned_versions(context).await?
672        }
673        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
674            cmd_impl::hummock::list_compaction_group(context).await?
675        }
676        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
677            compaction_group_ids,
678            max_bytes_for_level_base,
679            max_bytes_for_level_multiplier,
680            max_compaction_bytes,
681            sub_level_max_compaction_bytes,
682            level0_tier_compact_file_number,
683            target_file_size_base,
684            compaction_filter_mask,
685            max_sub_compaction,
686            level0_stop_write_threshold_sub_level_number,
687            level0_sub_level_compact_level_count,
688            max_space_reclaim_bytes,
689            level0_max_compact_file_number,
690            level0_overlapping_sub_level_compact_level_count,
691            enable_emergency_picker,
692            tombstone_reclaim_ratio,
693            compression_level,
694            compression_algorithm,
695            max_l0_compact_level,
696            sst_allowed_trivial_move_min_size,
697            disable_auto_group_scheduling,
698            max_overlapping_level_size,
699            sst_allowed_trivial_move_max_count,
700            emergency_level0_sst_file_count,
701            emergency_level0_sub_level_partition,
702            level0_stop_write_threshold_max_sst_count,
703            level0_stop_write_threshold_max_size,
704            enable_optimize_l0_interval_selection,
705            vnode_aligned_level_size_threshold,
706            max_kv_count_for_xor16,
707        }) => {
708            cmd_impl::hummock::update_compaction_config(
709                context,
710                compaction_group_ids,
711                build_compaction_config_vec(
712                    max_bytes_for_level_base,
713                    max_bytes_for_level_multiplier,
714                    max_compaction_bytes,
715                    sub_level_max_compaction_bytes,
716                    level0_tier_compact_file_number,
717                    target_file_size_base,
718                    compaction_filter_mask,
719                    max_sub_compaction,
720                    level0_stop_write_threshold_sub_level_number,
721                    level0_sub_level_compact_level_count,
722                    max_space_reclaim_bytes,
723                    level0_max_compact_file_number,
724                    level0_overlapping_sub_level_compact_level_count,
725                    enable_emergency_picker,
726                    tombstone_reclaim_ratio,
727                    if let Some(level) = compression_level {
728                        assert!(compression_algorithm.is_some());
729                        Some(CompressionAlgorithm {
730                            level,
731                            compression_algorithm: compression_algorithm.unwrap(),
732                        })
733                    } else {
734                        None
735                    },
736                    max_l0_compact_level,
737                    sst_allowed_trivial_move_min_size,
738                    disable_auto_group_scheduling,
739                    max_overlapping_level_size,
740                    sst_allowed_trivial_move_max_count,
741                    emergency_level0_sst_file_count,
742                    emergency_level0_sub_level_partition,
743                    level0_stop_write_threshold_max_sst_count,
744                    level0_stop_write_threshold_max_size,
745                    enable_optimize_l0_interval_selection,
746                    vnode_aligned_level_size_threshold,
747                    max_kv_count_for_xor16,
748                ),
749            )
750            .await?
751        }
752        Commands::Hummock(HummockCommands::SplitCompactionGroup {
753            compaction_group_id,
754            table_ids,
755            partition_vnode_count,
756        }) => {
757            cmd_impl::hummock::split_compaction_group(
758                context,
759                compaction_group_id,
760                &table_ids.into_iter().map_into().collect_vec(),
761                partition_vnode_count,
762            )
763            .await?;
764        }
765        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
766            cmd_impl::hummock::pause_version_checkpoint(context).await?;
767        }
768        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
769            cmd_impl::hummock::resume_version_checkpoint(context).await?;
770        }
771        Commands::Hummock(HummockCommands::ReplayVersion) => {
772            cmd_impl::hummock::replay_version(context).await?;
773        }
774        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
775            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
776        }
777        Commands::Hummock(HummockCommands::GetCompactionScore {
778            compaction_group_id,
779        }) => {
780            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
781        }
782        Commands::Hummock(HummockCommands::ValidateVersion) => {
783            cmd_impl::hummock::validate_version(context).await?;
784        }
785        Commands::Hummock(HummockCommands::RebuildTableStats) => {
786            cmd_impl::hummock::rebuild_table_stats(context).await?;
787        }
788        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
789            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
790        }
791        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
792            archive_ids,
793            data_dir,
794            sst_id,
795            use_new_object_prefix_strategy,
796        }) => {
797            cmd_impl::hummock::print_version_delta_in_archive(
798                context,
799                archive_ids.into_iter().map(HummockVersionId::new),
800                data_dir,
801                sst_id,
802                use_new_object_prefix_strategy,
803            )
804            .await?;
805        }
806        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
807            archive_ids,
808            data_dir,
809            user_key,
810            use_new_object_prefix_strategy,
811        }) => {
812            cmd_impl::hummock::print_user_key_in_archive(
813                context,
814                archive_ids.into_iter().map(HummockVersionId::new),
815                data_dir,
816                user_key,
817                use_new_object_prefix_strategy,
818            )
819            .await?;
820        }
821        Commands::Hummock(HummockCommands::TieredCacheTracing {
822            enable,
823            record_hybrid_insert_threshold_ms,
824            record_hybrid_get_threshold_ms,
825            record_hybrid_obtain_threshold_ms,
826            record_hybrid_remove_threshold_ms,
827            record_hybrid_fetch_threshold_ms,
828        }) => {
829            cmd_impl::hummock::tiered_cache_tracing(
830                context,
831                enable,
832                record_hybrid_insert_threshold_ms,
833                record_hybrid_get_threshold_ms,
834                record_hybrid_obtain_threshold_ms,
835                record_hybrid_remove_threshold_ms,
836                record_hybrid_fetch_threshold_ms,
837            )
838            .await?
839        }
840        Commands::Hummock(HummockCommands::MergeCompactionGroup {
841            left_group_id,
842            right_group_id,
843        }) => {
844            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
845                .await?
846        }
847
848        Commands::Hummock(HummockCommands::MigrateLegacyObject {
849            url,
850            source_dir,
851            target_dir,
852            concurrency,
853        }) => {
854            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
855        }
856        Commands::Hummock(HummockCommands::ResizeCache {
857            meta_cache_capacity_mb,
858            data_cache_capacity_mb,
859        }) => {
860            const MIB: u64 = 1024 * 1024;
861            cmd_impl::hummock::resize_cache(
862                context,
863                meta_cache_capacity_mb.map(|v| v * MIB),
864                data_cache_capacity_mb.map(|v| v * MIB),
865            )
866            .await?
867        }
868        Commands::Table(TableCommands::Scan {
869            mv_name,
870            data_dir,
871            use_new_object_prefix_strategy,
872        }) => {
873            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
874                .await?
875        }
876        Commands::Table(TableCommands::ScanById {
877            table_id,
878            data_dir,
879            use_new_object_prefix_strategy,
880        }) => {
881            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
882                .await?
883        }
884        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
885        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
886        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
887        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
888        Commands::Meta(MetaCommands::ResumeBackfill {
889            job_id,
890            fragment_id,
891        }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
892        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
893        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
894            cmd_impl::meta::source_split_info(context, ignore_id).await?
895        }
896        Commands::Meta(MetaCommands::Reschedule {
897            from,
898            dry_run,
899            plan,
900            revision,
901            resolve_no_shuffle,
902        }) => {
903            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
904                .await?
905        }
906        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
907            cmd_impl::meta::backup_meta(context, remarks).await?
908        }
909        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
910            risingwave_meta::backup_restore::restore(opts).await?
911        }
912        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
913            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
914        }
915        Commands::Meta(MetaCommands::ListConnections) => {
916            cmd_impl::meta::list_connections(context).await?
917        }
918        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
919            cmd_impl::meta::list_serving_fragment_mappings(context).await?
920        }
921        Commands::Meta(MetaCommands::UnregisterWorkers {
922            workers,
923            yes,
924            ignore_not_found,
925            check_fragment_occupied,
926        }) => {
927            cmd_impl::meta::unregister_workers(
928                context,
929                workers,
930                yes,
931                ignore_not_found,
932                check_fragment_occupied,
933            )
934            .await?
935        }
936        Commands::Meta(MetaCommands::ValidateSource { props }) => {
937            cmd_impl::meta::validate_source(context, props).await?
938        }
939        Commands::AwaitTree(AwaitTreeCommands::Dump {
940            actor_traces_format,
941        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
942        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
943            cmd_impl::await_tree::bottleneck_detect(context, path).await?
944        }
945        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
946            rw_diagnose_tools::await_tree::transcribe(path)?
947        }
948        Commands::Profile(ProfileCommands::Cpu {
949            sleep,
950            worker_types,
951        }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
952        Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
953            cmd_impl::profile::heap_profile(context, dir, worker_types).await?
954        }
955        Commands::Scale(ScaleCommands::Cordon { workers }) => {
956            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
957                .await?
958        }
959        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
960            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
961                .await?
962        }
963        Commands::Throttle(ThrottleCommands::Source(args)) => {
964            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
965        }
966        Commands::Throttle(ThrottleCommands::Mv(args)) => {
967            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
968        }
969        Commands::Throttle(ThrottleCommands::Sink(args)) => {
970            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
971        }
972        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
973            table_id,
974            parallelism,
975        }) => {
976            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
977        }
978        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
979    }
980    Ok(())
981}