risingwave_ctl/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{ArgGroup, Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::id::{CompactionGroupId, FragmentId, HummockSstableId, JobId, TableId};
27use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
28use thiserror_ext::AsReport;
29
30use crate::cmd_impl::hummock::{
31    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
32};
33use crate::cmd_impl::profile::ProfileWorkerType;
34use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
35use crate::cmd_impl::throttle::apply_throttle;
36use crate::common::CtlContext;
37
38pub mod cmd_impl;
39pub mod common;
40
41/// risectl provides internal access to the RisingWave cluster. Generally, you will need
42/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
43/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
44/// instead of playground mode to use this tool. risectl will read environment variables
45/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
46#[derive(Parser)]
47#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
48#[clap(propagate_version = true)]
49#[clap(infer_subcommands = true)]
50pub struct CliOpts {
51    #[clap(subcommand)]
52    command: Commands,
53}
54
55#[derive(Subcommand)]
56#[clap(infer_subcommands = true)]
57enum Commands {
58    /// Commands for Compute
59    #[clap(subcommand)]
60    Compute(ComputeCommands),
61    /// Commands for Hummock
62    #[clap(subcommand)]
63    Hummock(HummockCommands),
64    /// Commands for Tables
65    #[clap(subcommand)]
66    Table(TableCommands),
67    /// Commands for Meta
68    #[clap(subcommand)]
69    Meta(MetaCommands),
70    /// Commands for Scaling
71    #[clap(subcommand)]
72    Scale(ScaleCommands),
73    /// Commands for Benchmarks
74    #[clap(subcommand)]
75    Bench(BenchCommands),
76    /// Commands for await-tree, such as dumping, analyzing and transcribing
77    #[clap(subcommand)]
78    #[clap(visible_alias("trace"))]
79    AwaitTree(AwaitTreeCommands),
80    /// Commands for profiling nodes
81    #[clap(subcommand)]
82    Profile(ProfileCommands),
83    #[clap(subcommand)]
84    Throttle(ThrottleCommands),
85    /// Commands for Self-testing
86    #[clap(subcommand, hide = true)]
87    Test(TestCommands),
88}
89
90#[derive(Subcommand)]
91enum ComputeCommands {
92    /// Show all the configuration parameters on compute node
93    ShowConfig { host: String },
94}
95
96#[allow(clippy::large_enum_variant)]
97#[derive(Subcommand)]
98enum HummockCommands {
99    /// list latest Hummock version on meta node
100    ListVersion {
101        #[clap(short, long = "verbose", default_value_t = false)]
102        verbose: bool,
103
104        #[clap(long = "verbose_key_range", default_value_t = false)]
105        verbose_key_range: bool,
106    },
107
108    /// list hummock version deltas in the meta store
109    ListVersionDeltas {
110        #[clap(short, long = "start-version-delta-id", default_value_t = HummockVersionId::new(0))]
111        start_id: HummockVersionId,
112
113        #[clap(short, long = "num-epochs", default_value_t = 100)]
114        num_epochs: u32,
115    },
116    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
117    DisableCommitEpoch,
118    /// list all Hummock key-value pairs
119    ListKv {
120        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
121        epoch: u64,
122
123        #[clap(short, long = "table-id")]
124        table_id: TableId,
125
126        // data directory for hummock state store. None: use default
127        data_dir: Option<String>,
128
129        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
130        use_new_object_prefix_strategy: bool,
131    },
132    SstDump(SstDumpArgs),
133    /// trigger a targeted compaction through `compaction_group_id`
134    TriggerManualCompaction {
135        #[clap(short, long = "compaction-group-id", default_value_t = CompactionGroupId::new(2))]
136        compaction_group_id: CompactionGroupId,
137
138        #[clap(short, long = "table-id", default_value_t = 0)]
139        table_id: u32,
140
141        #[clap(short, long = "level", value_delimiter = ',', default_values_t = vec![1u32])]
142        levels: Vec<u32>,
143
144        #[clap(short, long = "sst-ids", value_delimiter = ',')]
145        sst_ids: Vec<HummockSstableId>,
146    },
147    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
148    /// `sst_retention_time_sec`, and with `prefix` in path.
149    TriggerFullGc {
150        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
151        sst_retention_time_sec: u64,
152        #[clap(short, long = "prefix", required = false)]
153        prefix: Option<String>,
154    },
155    /// List pinned versions of each worker.
156    ListPinnedVersions {},
157    /// List all compaction groups.
158    ListCompactionGroup,
159    /// Update compaction config for compaction groups.
160    UpdateCompactionConfig {
161        #[clap(long, value_delimiter = ',')]
162        compaction_group_ids: Vec<CompactionGroupId>,
163        #[clap(long)]
164        max_bytes_for_level_base: Option<u64>,
165        #[clap(long)]
166        max_bytes_for_level_multiplier: Option<u64>,
167        #[clap(long)]
168        max_compaction_bytes: Option<u64>,
169        #[clap(long)]
170        sub_level_max_compaction_bytes: Option<u64>,
171        #[clap(long)]
172        level0_tier_compact_file_number: Option<u64>,
173        #[clap(long)]
174        target_file_size_base: Option<u64>,
175        #[clap(long)]
176        compaction_filter_mask: Option<u32>,
177        #[clap(long)]
178        max_sub_compaction: Option<u32>,
179        #[clap(long)]
180        level0_stop_write_threshold_sub_level_number: Option<u64>,
181        #[clap(long)]
182        level0_sub_level_compact_level_count: Option<u32>,
183        #[clap(long)]
184        max_space_reclaim_bytes: Option<u64>,
185        #[clap(long)]
186        level0_max_compact_file_number: Option<u64>,
187        #[clap(long)]
188        level0_overlapping_sub_level_compact_level_count: Option<u32>,
189        #[clap(long)]
190        enable_emergency_picker: Option<bool>,
191        #[clap(long)]
192        tombstone_reclaim_ratio: Option<u32>,
193        #[clap(long)]
194        compression_level: Option<u32>,
195        #[clap(long)]
196        compression_algorithm: Option<String>,
197        #[clap(long)]
198        max_l0_compact_level: Option<u32>,
199        #[clap(long)]
200        sst_allowed_trivial_move_min_size: Option<u64>,
201        #[clap(long)]
202        disable_auto_group_scheduling: Option<bool>,
203        #[clap(long)]
204        max_overlapping_level_size: Option<u64>,
205        #[clap(long)]
206        sst_allowed_trivial_move_max_count: Option<u32>,
207        #[clap(long)]
208        emergency_level0_sst_file_count: Option<u32>,
209        #[clap(long)]
210        emergency_level0_sub_level_partition: Option<u32>,
211        #[clap(long)]
212        level0_stop_write_threshold_max_sst_count: Option<u32>,
213        #[clap(long)]
214        level0_stop_write_threshold_max_size: Option<u64>,
215        #[clap(long)]
216        enable_optimize_l0_interval_selection: Option<bool>,
217        #[clap(long)]
218        vnode_aligned_level_size_threshold: Option<u64>,
219        #[clap(long)]
220        max_kv_count_for_xor16: Option<u64>,
221    },
222    /// Split given compaction group into two. Moves the given tables to the new group.
223    SplitCompactionGroup {
224        #[clap(long)]
225        compaction_group_id: CompactionGroupId,
226        #[clap(long, value_delimiter = ',')]
227        table_ids: Vec<TableId>,
228        #[clap(long, default_value_t = 0)]
229        partition_vnode_count: u32,
230    },
231    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
232    PauseVersionCheckpoint,
233    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
234    ResumeVersionCheckpoint,
235    /// Replay version from the checkpoint one to the latest one.
236    ReplayVersion,
237    /// List compaction status
238    ListCompactionStatus {
239        #[clap(short, long = "verbose", default_value_t = false)]
240        verbose: bool,
241    },
242    GetCompactionScore {
243        #[clap(long)]
244        compaction_group_id: CompactionGroupId,
245    },
246    /// Validate the current `HummockVersion`.
247    ValidateVersion,
248    /// Rebuild table stats
249    RebuildTableStats,
250    CancelCompactTask {
251        #[clap(short, long)]
252        task_id: u64,
253    },
254    PrintUserKeyInArchive {
255        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
256        #[clap(long, value_delimiter = ',')]
257        archive_ids: Vec<u64>,
258        /// The data directory of Hummock storage, where `SSTable` objects can be found.
259        #[clap(long)]
260        data_dir: String,
261        /// KVs that are matched with the user key are printed.
262        #[clap(long)]
263        user_key: String,
264        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
265        use_new_object_prefix_strategy: bool,
266    },
267    PrintVersionDeltaInArchive {
268        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
269        #[clap(long, value_delimiter = ',')]
270        archive_ids: Vec<u64>,
271        /// The data directory of Hummock storage, where `SSTable` objects can be found.
272        #[clap(long)]
273        data_dir: String,
274        /// Version deltas that are related to the SST id are printed.
275        #[clap(long)]
276        sst_id: HummockSstableId,
277        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
278        use_new_object_prefix_strategy: bool,
279    },
280    TieredCacheTracing {
281        #[clap(long)]
282        enable: bool,
283        #[clap(long)]
284        record_hybrid_insert_threshold_ms: Option<u32>,
285        #[clap(long)]
286        record_hybrid_get_threshold_ms: Option<u32>,
287        #[clap(long)]
288        record_hybrid_obtain_threshold_ms: Option<u32>,
289        #[clap(long)]
290        record_hybrid_remove_threshold_ms: Option<u32>,
291        #[clap(long)]
292        record_hybrid_fetch_threshold_ms: Option<u32>,
293    },
294    MergeCompactionGroup {
295        #[clap(long)]
296        left_group_id: CompactionGroupId,
297        #[clap(long)]
298        right_group_id: CompactionGroupId,
299    },
300    MigrateLegacyObject {
301        url: String,
302        source_dir: String,
303        target_dir: String,
304        #[clap(long, default_value = "100")]
305        concurrency: u32,
306    },
307    ResizeCache {
308        #[clap(long)]
309        meta_cache_capacity_mb: Option<u64>,
310        #[clap(long)]
311        data_cache_capacity_mb: Option<u64>,
312    },
313}
314
315#[derive(Subcommand)]
316enum TableCommands {
317    /// scan a state table with MV name
318    Scan {
319        /// name of the materialized view to operate on
320        mv_name: String,
321        // data directory for hummock state store. None: use default
322        data_dir: Option<String>,
323
324        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
325        use_new_object_prefix_strategy: bool,
326    },
327    /// scan a state table using Id
328    ScanById {
329        /// id of the state table to operate on
330        table_id: TableId,
331        // data directory for hummock state store. None: use default
332        data_dir: Option<String>,
333        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
334        use_new_object_prefix_strategy: bool,
335    },
336    /// list all state tables
337    List,
338}
339
340#[derive(Subcommand, Debug)]
341enum ScaleCommands {
342    /// Mark a compute node as unschedulable
343    #[clap(verbatim_doc_comment)]
344    Cordon {
345        /// Workers that need to be cordoned, both id and host are supported.
346        #[clap(
347            long,
348            required = true,
349            value_delimiter = ',',
350            value_name = "id or host,..."
351        )]
352        workers: Vec<String>,
353    },
354    /// mark a compute node as schedulable. Nodes are schedulable unless they are cordoned
355    Uncordon {
356        /// Workers that need to be uncordoned, both id and host are supported.
357        #[clap(
358            long,
359            required = true,
360            value_delimiter = ',',
361            value_name = "id or host,..."
362        )]
363        workers: Vec<String>,
364    },
365}
366
367#[derive(Subcommand)]
368#[allow(clippy::large_enum_variant)]
369enum MetaCommands {
370    /// pause the stream graph
371    Pause,
372    /// resume the stream graph
373    Resume,
374    /// force resume backfill for troubleshooting
375    #[clap(
376        group(
377            ArgGroup::new("resume_backfill_target")
378                .required(true)
379                .args(&["job_id", "fragment_id"])
380        )
381    )]
382    ResumeBackfill {
383        #[clap(long)]
384        job_id: Option<JobId>,
385        #[clap(long)]
386        fragment_id: Option<FragmentId>,
387    },
388    /// get cluster info
389    ClusterInfo,
390    /// get source split info
391    SourceSplitInfo {
392        #[clap(long)]
393        ignore_id: bool,
394    },
395    /// Reschedule the actors in the stream graph
396    ///
397    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
398    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
399    /// `added` when both are provided.
400    ///
401    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
402    /// ```text
403    /// {
404    ///     100: WorkerReschedule {
405    ///         increased_actor_count: { 1: 1 },
406    ///         decreased_actor_count: { 4: 1 },
407    ///     }
408    /// }
409    /// ```
410    /// Use ; to separate multiple fragment
411    #[clap(verbatim_doc_comment)]
412    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
413    Reschedule {
414        /// Plan of reschedule, needs to be used with `revision`
415        #[clap(long, requires = "revision")]
416        plan: Option<String>,
417        /// Revision of the plan
418        #[clap(long)]
419        revision: Option<u64>,
420        /// Reschedule from a specific file
421        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
422        from: Option<String>,
423        /// Show the plan only, no actual operation
424        #[clap(long, default_value = "false")]
425        dry_run: bool,
426        /// Resolve `NO_SHUFFLE` upstream
427        #[clap(long, default_value = "false")]
428        resolve_no_shuffle: bool,
429    },
430    /// backup meta by taking a meta snapshot
431    BackupMeta {
432        #[clap(long)]
433        remarks: Option<String>,
434    },
435    /// restore meta by recovering from a meta snapshot
436    RestoreMeta {
437        #[command(flatten)]
438        opts: RestoreOpts,
439    },
440    /// delete meta snapshots
441    DeleteMetaSnapshots {
442        #[clap(long, value_delimiter = ',')]
443        snapshot_ids: Vec<u64>,
444    },
445
446    /// List all existing connections in the catalog
447    ListConnections,
448
449    /// List fragment mapping for serving
450    ListServingFragmentMapping,
451
452    /// Unregister workers from the cluster
453    UnregisterWorkers {
454        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
455        #[clap(
456            long,
457            required = true,
458            value_delimiter = ',',
459            value_name = "worker_id or worker_host:worker_port, ..."
460        )]
461        workers: Vec<String>,
462
463        /// Automatic yes to prompts
464        #[clap(short = 'y', long, default_value_t = false)]
465        yes: bool,
466
467        /// The worker not found will be ignored
468        #[clap(long, default_value_t = false)]
469        ignore_not_found: bool,
470
471        /// Checking whether the fragment is occupied by workers
472        #[clap(long, default_value_t = false)]
473        check_fragment_occupied: bool,
474    },
475
476    /// Validate source interface for the cloud team
477    ValidateSource {
478        /// With properties in json format
479        /// If privatelink is used, specify `connection.id` instead of `connection.name`
480        #[clap(long)]
481        props: String,
482    },
483
484    SetCdcTableBackfillParallelism {
485        #[clap(long, required = true)]
486        table_id: u32,
487        #[clap(long, required = true)]
488        parallelism: u32,
489    },
490
491    /// Alter source connector properties with pause/resume orchestration (UNSAFE)
492    /// This operation pauses the source, updates properties, and resumes.
493    AlterSourcePropertiesSafe {
494        /// Source ID to update
495        #[clap(long)]
496        source_id: u32,
497        /// Properties to change in JSON format, e.g. '{"properties.bootstrap.server": "new-broker:9092"}'
498        #[clap(long)]
499        props: String,
500        /// Reset split assignments after property change (for major upstream changes)
501        #[clap(long, default_value_t = false)]
502        reset_splits: bool,
503    },
504
505    /// Reset source split assignments (UNSAFE - admin only)
506    /// Clears cached split state and triggers re-discovery from upstream.
507    ResetSourceSplits {
508        /// Source ID to reset
509        #[clap(long)]
510        source_id: u32,
511    },
512
513    /// Inject specific offsets into source splits (UNSAFE - admin only)
514    /// WARNING: This can cause data duplication or loss!
515    InjectSourceOffsets {
516        /// Source ID to inject offsets for
517        #[clap(long)]
518        source_id: u32,
519        /// Split offsets in JSON format, e.g. '{"split-0": "100", "split-1": "200"}'
520        #[clap(long)]
521        offsets: String,
522    },
523}
524
525#[derive(Subcommand, Clone, Debug)]
526pub enum AwaitTreeCommands {
527    /// Dump Await Tree
528    Dump {
529        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
530        #[clap(short, long = "actor-traces-format")]
531        actor_traces_format: Option<String>,
532    },
533    /// Analyze Await Tree
534    Analyze {
535        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
536        /// The actor traces format can be either `json` or `text`. The analyze command will
537        /// automatically detect the format.
538        #[clap(long = "path")]
539        path: Option<String>,
540    },
541    /// Transcribe Await Tree From JSON to Text format
542    Transcribe {
543        /// The path to the await tree file to be transcribed
544        #[clap(long = "path")]
545        path: String,
546    },
547}
548
549#[derive(Subcommand, Clone, Debug)]
550enum TestCommands {
551    /// Test if JVM and Java libraries are working
552    Jvm,
553}
554
555#[derive(Subcommand, Clone, Debug)]
556enum ThrottleCommands {
557    Source(ThrottleCommandArgs),
558    Mv(ThrottleCommandArgs),
559    Sink(ThrottleCommandArgs),
560}
561
562#[derive(Clone, Debug, clap::ValueEnum)]
563pub enum ThrottleTypeArg {
564    Dml,
565    Backfill,
566    Source,
567    Sink,
568}
569
570#[derive(Clone, Debug, Args)]
571pub struct ThrottleCommandArgs {
572    /// The ID of the object to throttle
573    #[clap(long, required = true)]
574    id: u32,
575    /// The rate limit to apply
576    #[clap(long)]
577    rate: Option<u32>,
578    /// The type of throttle to apply
579    #[clap(long, value_enum, required = true)]
580    throttle_type: ThrottleTypeArg,
581}
582
583#[derive(Subcommand, Clone, Debug)]
584pub enum ProfileCommands {
585    /// CPU profile
586    Cpu {
587        /// The time to active profiling for (in seconds)
588        #[clap(short, long = "sleep")]
589        sleep: u64,
590        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
591        #[clap(long = "worker-type", value_name = "TYPE")]
592        worker_types: Vec<ProfileWorkerType>,
593    },
594    /// Heap profile
595    Heap {
596        /// The output directory of the dumped file
597        #[clap(long = "dir")]
598        dir: Option<String>,
599        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
600        #[clap(long = "worker-type", value_name = "TYPE")]
601        worker_types: Vec<ProfileWorkerType>,
602    },
603}
604
605/// Start `risectl` with the given options.
606/// Cancel the operation when the given `shutdown` token triggers.
607/// Log and abort the process if any error occurs.
608///
609/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
610/// in an embedded manner.
611pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
612    let context = CtlContext::default();
613
614    tokio::select! {
615        _ = shutdown.cancelled() => {
616            // Shutdown requested, clean up the context and return.
617            context.try_close().await;
618        }
619
620        result = start_fallible(opts, &context) => {
621            if let Err(e) = result {
622                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
623                std::process::exit(1);
624            }
625        }
626    }
627}
628
629/// Start `risectl` with the given options.
630/// Return `Err` if any error occurs.
631pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
632    let result = start_impl(opts, context).await;
633    context.try_close().await;
634    result
635}
636
637#[expect(
638    clippy::large_stack_frames,
639    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
640              post-layout generator stores only one arm at a time (~13–16 KiB)."
641)]
642async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
643    match opts.command {
644        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
645            cmd_impl::compute::show_config(&host).await?
646        }
647        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
648            cmd_impl::hummock::disable_commit_epoch(context).await?
649        }
650        Commands::Hummock(HummockCommands::ListVersion {
651            verbose,
652            verbose_key_range,
653        }) => {
654            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
655        }
656        Commands::Hummock(HummockCommands::ListVersionDeltas {
657            start_id,
658            num_epochs,
659        }) => {
660            cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
661        }
662        Commands::Hummock(HummockCommands::ListKv {
663            epoch,
664            table_id,
665            data_dir,
666            use_new_object_prefix_strategy,
667        }) => {
668            cmd_impl::hummock::list_kv(
669                context,
670                epoch,
671                table_id,
672                data_dir,
673                use_new_object_prefix_strategy,
674            )
675            .await?;
676        }
677        Commands::Hummock(HummockCommands::SstDump(args)) => {
678            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
679        }
680        Commands::Hummock(HummockCommands::TriggerManualCompaction {
681            compaction_group_id,
682            table_id,
683            levels,
684            sst_ids,
685        }) => {
686            cmd_impl::hummock::trigger_manual_compaction(
687                context,
688                compaction_group_id,
689                table_id.into(),
690                levels,
691                sst_ids,
692            )
693            .await?
694        }
695        Commands::Hummock(HummockCommands::TriggerFullGc {
696            sst_retention_time_sec,
697            prefix,
698        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
699        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
700            list_pinned_versions(context).await?
701        }
702        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
703            cmd_impl::hummock::list_compaction_group(context).await?
704        }
705        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
706            compaction_group_ids,
707            max_bytes_for_level_base,
708            max_bytes_for_level_multiplier,
709            max_compaction_bytes,
710            sub_level_max_compaction_bytes,
711            level0_tier_compact_file_number,
712            target_file_size_base,
713            compaction_filter_mask,
714            max_sub_compaction,
715            level0_stop_write_threshold_sub_level_number,
716            level0_sub_level_compact_level_count,
717            max_space_reclaim_bytes,
718            level0_max_compact_file_number,
719            level0_overlapping_sub_level_compact_level_count,
720            enable_emergency_picker,
721            tombstone_reclaim_ratio,
722            compression_level,
723            compression_algorithm,
724            max_l0_compact_level,
725            sst_allowed_trivial_move_min_size,
726            disable_auto_group_scheduling,
727            max_overlapping_level_size,
728            sst_allowed_trivial_move_max_count,
729            emergency_level0_sst_file_count,
730            emergency_level0_sub_level_partition,
731            level0_stop_write_threshold_max_sst_count,
732            level0_stop_write_threshold_max_size,
733            enable_optimize_l0_interval_selection,
734            vnode_aligned_level_size_threshold,
735            max_kv_count_for_xor16,
736        }) => {
737            cmd_impl::hummock::update_compaction_config(
738                context,
739                compaction_group_ids,
740                build_compaction_config_vec(
741                    max_bytes_for_level_base,
742                    max_bytes_for_level_multiplier,
743                    max_compaction_bytes,
744                    sub_level_max_compaction_bytes,
745                    level0_tier_compact_file_number,
746                    target_file_size_base,
747                    compaction_filter_mask,
748                    max_sub_compaction,
749                    level0_stop_write_threshold_sub_level_number,
750                    level0_sub_level_compact_level_count,
751                    max_space_reclaim_bytes,
752                    level0_max_compact_file_number,
753                    level0_overlapping_sub_level_compact_level_count,
754                    enable_emergency_picker,
755                    tombstone_reclaim_ratio,
756                    if let Some(level) = compression_level {
757                        assert!(compression_algorithm.is_some());
758                        Some(CompressionAlgorithm {
759                            level,
760                            compression_algorithm: compression_algorithm.unwrap(),
761                        })
762                    } else {
763                        None
764                    },
765                    max_l0_compact_level,
766                    sst_allowed_trivial_move_min_size,
767                    disable_auto_group_scheduling,
768                    max_overlapping_level_size,
769                    sst_allowed_trivial_move_max_count,
770                    emergency_level0_sst_file_count,
771                    emergency_level0_sub_level_partition,
772                    level0_stop_write_threshold_max_sst_count,
773                    level0_stop_write_threshold_max_size,
774                    enable_optimize_l0_interval_selection,
775                    vnode_aligned_level_size_threshold,
776                    max_kv_count_for_xor16,
777                ),
778            )
779            .await?
780        }
781        Commands::Hummock(HummockCommands::SplitCompactionGroup {
782            compaction_group_id,
783            table_ids,
784            partition_vnode_count,
785        }) => {
786            cmd_impl::hummock::split_compaction_group(
787                context,
788                compaction_group_id,
789                &table_ids.into_iter().map_into().collect_vec(),
790                partition_vnode_count,
791            )
792            .await?;
793        }
794        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
795            cmd_impl::hummock::pause_version_checkpoint(context).await?;
796        }
797        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
798            cmd_impl::hummock::resume_version_checkpoint(context).await?;
799        }
800        Commands::Hummock(HummockCommands::ReplayVersion) => {
801            cmd_impl::hummock::replay_version(context).await?;
802        }
803        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
804            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
805        }
806        Commands::Hummock(HummockCommands::GetCompactionScore {
807            compaction_group_id,
808        }) => {
809            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
810        }
811        Commands::Hummock(HummockCommands::ValidateVersion) => {
812            cmd_impl::hummock::validate_version(context).await?;
813        }
814        Commands::Hummock(HummockCommands::RebuildTableStats) => {
815            cmd_impl::hummock::rebuild_table_stats(context).await?;
816        }
817        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
818            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
819        }
820        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
821            archive_ids,
822            data_dir,
823            sst_id,
824            use_new_object_prefix_strategy,
825        }) => {
826            cmd_impl::hummock::print_version_delta_in_archive(
827                context,
828                archive_ids.into_iter().map(HummockVersionId::new),
829                data_dir,
830                sst_id,
831                use_new_object_prefix_strategy,
832            )
833            .await?;
834        }
835        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
836            archive_ids,
837            data_dir,
838            user_key,
839            use_new_object_prefix_strategy,
840        }) => {
841            cmd_impl::hummock::print_user_key_in_archive(
842                context,
843                archive_ids.into_iter().map(HummockVersionId::new),
844                data_dir,
845                user_key,
846                use_new_object_prefix_strategy,
847            )
848            .await?;
849        }
850        Commands::Hummock(HummockCommands::TieredCacheTracing {
851            enable,
852            record_hybrid_insert_threshold_ms,
853            record_hybrid_get_threshold_ms,
854            record_hybrid_obtain_threshold_ms,
855            record_hybrid_remove_threshold_ms,
856            record_hybrid_fetch_threshold_ms,
857        }) => {
858            cmd_impl::hummock::tiered_cache_tracing(
859                context,
860                enable,
861                record_hybrid_insert_threshold_ms,
862                record_hybrid_get_threshold_ms,
863                record_hybrid_obtain_threshold_ms,
864                record_hybrid_remove_threshold_ms,
865                record_hybrid_fetch_threshold_ms,
866            )
867            .await?
868        }
869        Commands::Hummock(HummockCommands::MergeCompactionGroup {
870            left_group_id,
871            right_group_id,
872        }) => {
873            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
874                .await?
875        }
876
877        Commands::Hummock(HummockCommands::MigrateLegacyObject {
878            url,
879            source_dir,
880            target_dir,
881            concurrency,
882        }) => {
883            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
884        }
885        Commands::Hummock(HummockCommands::ResizeCache {
886            meta_cache_capacity_mb,
887            data_cache_capacity_mb,
888        }) => {
889            const MIB: u64 = 1024 * 1024;
890            cmd_impl::hummock::resize_cache(
891                context,
892                meta_cache_capacity_mb.map(|v| v * MIB),
893                data_cache_capacity_mb.map(|v| v * MIB),
894            )
895            .await?
896        }
897        Commands::Table(TableCommands::Scan {
898            mv_name,
899            data_dir,
900            use_new_object_prefix_strategy,
901        }) => {
902            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
903                .await?
904        }
905        Commands::Table(TableCommands::ScanById {
906            table_id,
907            data_dir,
908            use_new_object_prefix_strategy,
909        }) => {
910            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
911                .await?
912        }
913        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
914        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
915        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
916        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
917        Commands::Meta(MetaCommands::ResumeBackfill {
918            job_id,
919            fragment_id,
920        }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
921        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
922        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
923            cmd_impl::meta::source_split_info(context, ignore_id).await?
924        }
925        Commands::Meta(MetaCommands::Reschedule {
926            from,
927            dry_run,
928            plan,
929            revision,
930            resolve_no_shuffle,
931        }) => {
932            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
933                .await?
934        }
935        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
936            cmd_impl::meta::backup_meta(context, remarks).await?
937        }
938        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
939            risingwave_meta::backup_restore::restore(opts).await?
940        }
941        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
942            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
943        }
944        Commands::Meta(MetaCommands::ListConnections) => {
945            cmd_impl::meta::list_connections(context).await?
946        }
947        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
948            cmd_impl::meta::list_serving_fragment_mappings(context).await?
949        }
950        Commands::Meta(MetaCommands::UnregisterWorkers {
951            workers,
952            yes,
953            ignore_not_found,
954            check_fragment_occupied,
955        }) => {
956            cmd_impl::meta::unregister_workers(
957                context,
958                workers,
959                yes,
960                ignore_not_found,
961                check_fragment_occupied,
962            )
963            .await?
964        }
965        Commands::Meta(MetaCommands::ValidateSource { props }) => {
966            cmd_impl::meta::validate_source(context, props).await?
967        }
968        Commands::AwaitTree(AwaitTreeCommands::Dump {
969            actor_traces_format,
970        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
971        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
972            cmd_impl::await_tree::bottleneck_detect(context, path).await?
973        }
974        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
975            rw_diagnose_tools::await_tree::transcribe(path)?
976        }
977        Commands::Profile(ProfileCommands::Cpu {
978            sleep,
979            worker_types,
980        }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
981        Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
982            cmd_impl::profile::heap_profile(context, dir, worker_types).await?
983        }
984        Commands::Scale(ScaleCommands::Cordon { workers }) => {
985            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
986                .await?
987        }
988        Commands::Scale(ScaleCommands::Uncordon { workers }) => {
989            cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
990                .await?
991        }
992        Commands::Throttle(ThrottleCommands::Source(args)) => {
993            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
994        }
995        Commands::Throttle(ThrottleCommands::Mv(args)) => {
996            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
997        }
998        Commands::Throttle(ThrottleCommands::Sink(args)) => {
999            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
1000        }
1001        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
1002            table_id,
1003            parallelism,
1004        }) => {
1005            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
1006        }
1007        Commands::Meta(MetaCommands::AlterSourcePropertiesSafe {
1008            source_id,
1009            props,
1010            reset_splits,
1011        }) => {
1012            cmd_impl::meta::alter_source_properties_safe(context, source_id, props, reset_splits)
1013                .await?;
1014        }
1015        Commands::Meta(MetaCommands::ResetSourceSplits { source_id }) => {
1016            cmd_impl::meta::reset_source_splits(context, source_id).await?;
1017        }
1018        Commands::Meta(MetaCommands::InjectSourceOffsets { source_id, offsets }) => {
1019            cmd_impl::meta::inject_source_offsets(context, source_id, offsets).await?;
1020        }
1021        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
1022    }
1023    Ok(())
1024}