Skip to main content

risingwave_ctl/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16#![allow(unfulfilled_lint_expectations)]
17
18use anyhow::Result;
19use clap::{ArgGroup, Args, Parser, Subcommand};
20use cmd_impl::bench::BenchCommands;
21use cmd_impl::hummock::SstDumpArgs;
22use itertools::Itertools;
23use risingwave_common::util::tokio_util::sync::CancellationToken;
24use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
25use risingwave_meta::backup_restore::RestoreOpts;
26use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
27    CompressionAlgorithm, SstableFilterLayout, SstableFilterType,
28};
29use risingwave_pb::id::{CompactionGroupId, FragmentId, HummockSstableId, JobId, TableId};
30use thiserror_ext::AsReport;
31
32use crate::cmd_impl::hummock::{
33    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
34};
35use crate::cmd_impl::profile::ProfileWorkerType;
36use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
37use crate::cmd_impl::throttle::apply_throttle;
38use crate::common::CtlContext;
39
40pub mod cmd_impl;
41pub mod common;
42
43/// risectl provides internal access to the RisingWave cluster. Generally, you will need
44/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
45/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
46/// instead of playground mode to use this tool. risectl will read environment variables
47/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
48#[derive(Parser)]
49#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
50#[clap(propagate_version = true)]
51#[clap(infer_subcommands = true)]
52pub struct CliOpts {
53    #[clap(subcommand)]
54    command: Commands,
55}
56
57#[derive(Subcommand)]
58#[clap(infer_subcommands = true)]
59enum Commands {
60    /// Commands for Compute
61    #[clap(subcommand)]
62    Compute(ComputeCommands),
63    /// Commands for Hummock
64    #[clap(subcommand)]
65    Hummock(HummockCommands),
66    /// Commands for Tables
67    #[clap(subcommand)]
68    Table(TableCommands),
69    /// Commands for Meta
70    #[clap(subcommand)]
71    Meta(MetaCommands),
72    /// Commands for Benchmarks
73    #[clap(subcommand)]
74    Bench(BenchCommands),
75    /// Commands for await-tree, such as dumping, analyzing and transcribing
76    #[clap(subcommand)]
77    #[clap(visible_alias("trace"))]
78    AwaitTree(AwaitTreeCommands),
79    /// Commands for profiling nodes
80    #[clap(subcommand)]
81    Profile(ProfileCommands),
82    #[clap(subcommand)]
83    Throttle(ThrottleCommands),
84    /// Commands for Self-testing
85    #[clap(subcommand, hide = true)]
86    Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91    /// Show all the configuration parameters on compute node
92    ShowConfig { host: String },
93}
94
95#[expect(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98    /// list latest Hummock version on meta node
99    ListVersion {
100        #[clap(short, long = "verbose", default_value_t = false)]
101        verbose: bool,
102
103        #[clap(long = "verbose_key_range", default_value_t = false)]
104        verbose_key_range: bool,
105    },
106
107    /// list hummock version deltas in the meta store
108    ListVersionDeltas {
109        #[clap(short, long = "start-version-delta-id", default_value_t = HummockVersionId::new(0))]
110        start_id: HummockVersionId,
111
112        #[clap(short, long = "num-epochs", default_value_t = 100)]
113        num_epochs: u32,
114    },
115    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
116    DisableCommitEpoch,
117    /// list all Hummock key-value pairs
118    ListKv {
119        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120        epoch: u64,
121
122        #[clap(short, long = "table-id")]
123        table_id: TableId,
124
125        // data directory for hummock state store. None: use default
126        data_dir: Option<String>,
127
128        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129        use_new_object_prefix_strategy: bool,
130    },
131    SstDump(SstDumpArgs),
132    /// trigger a targeted compaction through `compaction_group_id`
133    TriggerManualCompaction {
134        #[clap(short, long = "compaction-group-id", default_value_t = CompactionGroupId::new(2))]
135        compaction_group_id: CompactionGroupId,
136
137        #[clap(short, long = "table-id", default_value_t = 0)]
138        table_id: u32,
139
140        #[clap(short, long = "level", value_delimiter = ',', default_values_t = vec![1u32])]
141        levels: Vec<u32>,
142
143        #[clap(long = "target-level")]
144        target_level: Option<u32>,
145
146        #[clap(short, long = "sst-ids", value_delimiter = ',')]
147        sst_ids: Vec<HummockSstableId>,
148
149        #[clap(long = "exclusive", default_value_t = false)]
150        exclusive: bool,
151
152        #[clap(long = "retry-interval-ms", default_value_t = 1000)]
153        retry_interval_ms: u64,
154    },
155    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
156    /// `sst_retention_time_sec`, and with `prefix` in path.
157    TriggerFullGc {
158        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
159        sst_retention_time_sec: u64,
160        #[clap(short, long = "prefix", required = false)]
161        prefix: Option<String>,
162    },
163    /// List pinned versions of each worker.
164    ListPinnedVersions {},
165    /// List all compaction groups.
166    ListCompactionGroup,
167    /// Update compaction config for compaction groups.
168    UpdateCompactionConfig {
169        #[clap(long, value_delimiter = ',')]
170        compaction_group_ids: Vec<CompactionGroupId>,
171        #[clap(long)]
172        max_bytes_for_level_base: Option<u64>,
173        #[clap(long)]
174        max_bytes_for_level_multiplier: Option<u64>,
175        #[clap(long)]
176        max_compaction_bytes: Option<u64>,
177        #[clap(long)]
178        sub_level_max_compaction_bytes: Option<u64>,
179        #[clap(long)]
180        level0_tier_compact_file_number: Option<u64>,
181        #[clap(long)]
182        target_file_size_base: Option<u64>,
183        #[clap(long)]
184        compaction_filter_mask: Option<u32>,
185        #[clap(long)]
186        max_sub_compaction: Option<u32>,
187        #[clap(long)]
188        level0_stop_write_threshold_sub_level_number: Option<u64>,
189        #[clap(long)]
190        level0_sub_level_compact_level_count: Option<u32>,
191        #[clap(long)]
192        max_space_reclaim_bytes: Option<u64>,
193        #[clap(long)]
194        level0_max_compact_file_number: Option<u64>,
195        #[clap(long)]
196        level0_overlapping_sub_level_compact_level_count: Option<u32>,
197        #[clap(long)]
198        enable_emergency_picker: Option<bool>,
199        #[clap(long)]
200        tombstone_reclaim_ratio: Option<u32>,
201        #[clap(long)]
202        compression_level: Option<u32>,
203        #[clap(long)]
204        compression_algorithm: Option<String>,
205        /// LSM level index to update, e.g. 0 for L0, 6 for L6.
206        #[clap(long, requires = "sstable_filter_type")]
207        sstable_filter_type_level: Option<u32>,
208        /// SST filter type to use for this level. Supported values: "none", "xor16", "xor8".
209        #[clap(long, requires = "sstable_filter_type_level")]
210        sstable_filter_type: Option<String>,
211        /// LSM level index to update, e.g. 0 for L0, 6 for L6.
212        #[clap(long, requires = "sstable_filter_layout")]
213        sstable_filter_layout_level: Option<u32>,
214        /// Filter layout for this level.
215        ///
216        /// Supported values:
217        /// - "auto": decide by heuristics (currently by kv-count threshold)
218        /// - "plain": always use a single non-blocked filter, ignoring kv-count threshold
219        /// - "blocked": always use block-based filters, ignoring kv-count threshold
220        #[clap(long, requires = "sstable_filter_layout_level")]
221        sstable_filter_layout: Option<String>,
222        #[clap(long)]
223        max_l0_compact_level: Option<u32>,
224        #[clap(long)]
225        sst_allowed_trivial_move_min_size: Option<u64>,
226        #[clap(long)]
227        disable_auto_group_scheduling: Option<bool>,
228        #[clap(long)]
229        max_overlapping_level_size: Option<u64>,
230        #[clap(long)]
231        sst_allowed_trivial_move_max_count: Option<u32>,
232        #[clap(long)]
233        emergency_level0_sst_file_count: Option<u32>,
234        #[clap(long)]
235        emergency_level0_sub_level_partition: Option<u32>,
236        #[clap(long)]
237        level0_stop_write_threshold_max_sst_count: Option<u32>,
238        #[clap(long)]
239        level0_stop_write_threshold_max_size: Option<u64>,
240        #[clap(long)]
241        enable_optimize_l0_interval_selection: Option<bool>,
242        /// KV-count threshold for using blocked xor filters when output layout is "auto".
243        ///
244        /// Note: shared-buffer flush does not read compaction group config, so this setting only
245        /// applies to compaction tasks.
246        #[clap(long)]
247        blocked_xor_filter_kv_count_threshold: Option<u64>,
248        #[clap(long)]
249        max_vnode_key_range_bytes: Option<u64>,
250    },
251    /// Split given compaction group into two. Moves the given tables to the new group.
252    SplitCompactionGroup {
253        #[clap(long)]
254        compaction_group_id: CompactionGroupId,
255        #[clap(long, value_delimiter = ',')]
256        table_ids: Vec<TableId>,
257        #[clap(long, default_value_t = 0)]
258        partition_vnode_count: u32,
259    },
260    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
261    PauseVersionCheckpoint,
262    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
263    ResumeVersionCheckpoint,
264    /// Replay version from the checkpoint one to the latest one.
265    ReplayVersion,
266    /// List compaction status
267    ListCompactionStatus {
268        #[clap(short, long = "verbose", default_value_t = false)]
269        verbose: bool,
270    },
271    GetCompactionScore {
272        #[clap(long)]
273        compaction_group_id: CompactionGroupId,
274    },
275    /// Validate the current `HummockVersion`.
276    ValidateVersion,
277    /// Rebuild table stats
278    RebuildTableStats,
279    CancelCompactTask {
280        #[clap(short, long)]
281        task_id: u64,
282    },
283    PrintUserKeyInArchive {
284        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
285        #[clap(long, value_delimiter = ',')]
286        archive_ids: Vec<u64>,
287        /// The data directory of Hummock storage, where `SSTable` objects can be found.
288        #[clap(long)]
289        data_dir: String,
290        /// KVs that are matched with the user key are printed.
291        #[clap(long)]
292        user_key: String,
293        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
294        use_new_object_prefix_strategy: bool,
295    },
296    PrintVersionDeltaInArchive {
297        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
298        #[clap(long, value_delimiter = ',')]
299        archive_ids: Vec<u64>,
300        /// The data directory of Hummock storage, where `SSTable` objects can be found.
301        #[clap(long)]
302        data_dir: String,
303        /// Version deltas that are related to the SST id are printed.
304        #[clap(long)]
305        sst_id: HummockSstableId,
306        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
307        use_new_object_prefix_strategy: bool,
308    },
309    TieredCacheTracing {
310        #[clap(long)]
311        enable: bool,
312        #[clap(long)]
313        record_hybrid_insert_threshold_ms: Option<u32>,
314        #[clap(long)]
315        record_hybrid_get_threshold_ms: Option<u32>,
316        #[clap(long)]
317        record_hybrid_obtain_threshold_ms: Option<u32>,
318        #[clap(long)]
319        record_hybrid_remove_threshold_ms: Option<u32>,
320        #[clap(long)]
321        record_hybrid_fetch_threshold_ms: Option<u32>,
322    },
323    MergeCompactionGroup {
324        #[clap(long)]
325        left_group_id: CompactionGroupId,
326        #[clap(long)]
327        right_group_id: CompactionGroupId,
328    },
329    MigrateLegacyObject {
330        url: String,
331        source_dir: String,
332        target_dir: String,
333        #[clap(long, default_value = "100")]
334        concurrency: u32,
335    },
336    ResizeCache {
337        #[clap(long)]
338        meta_cache_capacity_mb: Option<u64>,
339        #[clap(long)]
340        data_cache_capacity_mb: Option<u64>,
341    },
342}
343
344#[derive(Subcommand)]
345enum TableCommands {
346    /// scan a state table with MV name
347    Scan {
348        /// name of the materialized view to operate on
349        mv_name: String,
350        // data directory for hummock state store. None: use default
351        data_dir: Option<String>,
352
353        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
354        use_new_object_prefix_strategy: bool,
355    },
356    /// scan a state table using Id
357    ScanById {
358        /// id of the state table to operate on
359        table_id: TableId,
360        // data directory for hummock state store. None: use default
361        data_dir: Option<String>,
362        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
363        use_new_object_prefix_strategy: bool,
364    },
365    /// list all state tables
366    List,
367}
368
369#[derive(Subcommand)]
370#[expect(clippy::large_enum_variant)]
371enum MetaCommands {
372    /// pause the stream graph
373    Pause,
374    /// resume the stream graph
375    Resume,
376    /// force resume backfill for troubleshooting
377    #[clap(
378        group(
379            ArgGroup::new("resume_backfill_target")
380                .required(true)
381                .args(&["job_id", "fragment_id"])
382        )
383    )]
384    ResumeBackfill {
385        #[clap(long)]
386        job_id: Option<JobId>,
387        #[clap(long)]
388        fragment_id: Option<FragmentId>,
389    },
390    /// get cluster info
391    ClusterInfo,
392    /// get source split info
393    SourceSplitInfo {
394        #[clap(long)]
395        ignore_id: bool,
396    },
397    /// Reschedule the actors in the stream graph
398    ///
399    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
400    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
401    /// `added` when both are provided.
402    ///
403    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
404    /// ```text
405    /// {
406    ///     100: WorkerReschedule {
407    ///         increased_actor_count: { 1: 1 },
408    ///         decreased_actor_count: { 4: 1 },
409    ///     }
410    /// }
411    /// ```
412    /// Use ; to separate multiple fragment
413    #[clap(verbatim_doc_comment)]
414    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
415    Reschedule {
416        /// Plan of reschedule, needs to be used with `revision`
417        #[clap(long, requires = "revision")]
418        plan: Option<String>,
419        /// Revision of the plan
420        #[clap(long)]
421        revision: Option<u64>,
422        /// Reschedule from a specific file
423        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
424        from: Option<String>,
425        /// Show the plan only, no actual operation
426        #[clap(long, default_value = "false")]
427        dry_run: bool,
428        /// Resolve `NO_SHUFFLE` upstream
429        #[clap(long, default_value = "false")]
430        resolve_no_shuffle: bool,
431    },
432    /// backup meta by taking a meta snapshot
433    BackupMeta {
434        #[clap(long)]
435        remarks: Option<String>,
436    },
437    /// restore meta by recovering from a meta snapshot
438    RestoreMeta {
439        #[command(flatten)]
440        opts: RestoreOpts,
441    },
442    /// delete meta snapshots
443    DeleteMetaSnapshots {
444        #[clap(long, value_delimiter = ',')]
445        snapshot_ids: Vec<u64>,
446    },
447
448    /// List all existing connections in the catalog
449    ListConnections,
450
451    /// List fragment mapping for serving
452    ListServingFragmentMapping,
453
454    /// Unregister workers from the cluster
455    UnregisterWorkers {
456        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
457        #[clap(
458            long,
459            required = true,
460            value_delimiter = ',',
461            value_name = "worker_id or worker_host:worker_port, ..."
462        )]
463        workers: Vec<String>,
464
465        /// Automatic yes to prompts
466        #[clap(short = 'y', long, default_value_t = false)]
467        yes: bool,
468
469        /// The worker not found will be ignored
470        #[clap(long, default_value_t = false)]
471        ignore_not_found: bool,
472
473        /// Checking whether the fragment is occupied by workers
474        #[clap(long, default_value_t = false)]
475        check_fragment_occupied: bool,
476    },
477
478    /// Validate source interface for the cloud team
479    ValidateSource {
480        /// With properties in json format
481        /// If privatelink is used, specify `connection.id` instead of `connection.name`
482        #[clap(long)]
483        props: String,
484    },
485
486    SetCdcTableBackfillParallelism {
487        #[clap(long, required = true)]
488        table_id: u32,
489        #[clap(long, required = true)]
490        parallelism: u32,
491    },
492
493    /// Alter source connector properties with pause/resume orchestration (UNSAFE)
494    /// This operation pauses the source, updates properties, and resumes.
495    AlterSourcePropertiesSafe {
496        /// Source ID to update
497        #[clap(long)]
498        source_id: u32,
499        /// Properties to change in JSON format, e.g. '{"properties.bootstrap.server": "new-broker:9092"}'
500        #[clap(long)]
501        props: String,
502        /// Reset split assignments after property change (for major upstream changes)
503        #[clap(long, default_value_t = false)]
504        reset_splits: bool,
505    },
506
507    /// Reset source split assignments (UNSAFE - admin only)
508    /// Clears cached split state and triggers re-discovery from upstream.
509    ResetSourceSplits {
510        /// Source ID to reset
511        #[clap(long)]
512        source_id: u32,
513    },
514
515    /// Inject specific offsets into source splits (UNSAFE - admin only)
516    /// WARNING: This can cause data duplication or loss!
517    InjectSourceOffsets {
518        /// Source ID to inject offsets for
519        #[clap(long)]
520        source_id: u32,
521        /// Split offsets in JSON format, e.g. '{"split-0": "100", "split-1": "200"}'
522        #[clap(long)]
523        offsets: String,
524    },
525
526    /// Apply all schema changes under `src/meta/model/migration` to the meta
527    /// store without starting a meta node. Mirrors `SqlMetaStore::up`.
528    CreateMetaStoreSchema {
529        #[command(flatten)]
530        opts: cmd_impl::meta::CreateMetaStoreSchemaOpts,
531    },
532}
533
534#[derive(Subcommand, Clone, Debug)]
535pub enum AwaitTreeCommands {
536    /// Dump Await Tree
537    Dump {
538        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
539        #[clap(short, long = "actor-traces-format")]
540        actor_traces_format: Option<String>,
541    },
542    /// Analyze Await Tree
543    Analyze {
544        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
545        /// The actor traces format can be either `json` or `text`. The analyze command will
546        /// automatically detect the format.
547        #[clap(long = "path")]
548        path: Option<String>,
549    },
550    /// Transcribe Await Tree From JSON to Text format
551    Transcribe {
552        /// The path to the await tree file to be transcribed
553        #[clap(long = "path")]
554        path: String,
555    },
556}
557
558#[derive(Subcommand, Clone, Debug)]
559enum TestCommands {
560    /// Test if JVM and Java libraries are working
561    Jvm,
562}
563
564#[derive(Subcommand, Clone, Debug)]
565enum ThrottleCommands {
566    Source(ThrottleCommandArgs),
567    Mv(ThrottleCommandArgs),
568    Sink(ThrottleCommandArgs),
569}
570
571#[derive(Clone, Debug, clap::ValueEnum)]
572pub enum ThrottleTypeArg {
573    Dml,
574    Backfill,
575    Source,
576    Sink,
577}
578
579#[derive(Clone, Debug, Args)]
580pub struct ThrottleCommandArgs {
581    /// The ID of the object to throttle
582    #[clap(long, required = true)]
583    id: u32,
584    /// The rate limit to apply
585    #[clap(long)]
586    rate: Option<u32>,
587    /// The type of throttle to apply
588    #[clap(long, value_enum, required = true)]
589    throttle_type: ThrottleTypeArg,
590}
591
592#[derive(Subcommand, Clone, Debug)]
593pub enum ProfileCommands {
594    /// CPU profile
595    Cpu {
596        /// The time to active profiling for (in seconds)
597        #[clap(short, long = "sleep")]
598        sleep: u64,
599        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
600        #[clap(long = "worker-type", value_name = "TYPE")]
601        worker_types: Vec<ProfileWorkerType>,
602    },
603    /// Heap profile
604    Heap {
605        /// The output directory of the dumped file
606        #[clap(long = "dir")]
607        dir: Option<String>,
608        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
609        #[clap(long = "worker-type", value_name = "TYPE")]
610        worker_types: Vec<ProfileWorkerType>,
611    },
612}
613
614/// Start `risectl` with the given options.
615/// Cancel the operation when the given `shutdown` token triggers.
616/// Log and abort the process if any error occurs.
617///
618/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
619/// in an embedded manner.
620pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
621    let context = CtlContext::default();
622
623    tokio::select! {
624        _ = shutdown.cancelled() => {
625            // Shutdown requested, clean up the context and return.
626            context.try_close().await;
627        }
628
629        result = start_fallible(opts, &context) => {
630            if let Err(e) = result {
631                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
632                std::process::exit(1);
633            }
634        }
635    }
636}
637
638/// Start `risectl` with the given options.
639/// Return `Err` if any error occurs.
640pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
641    let result = start_impl(opts, context).await;
642    context.try_close().await;
643    result
644}
645
646async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
647    match opts.command {
648        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
649            cmd_impl::compute::show_config(&host).await?
650        }
651        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
652            cmd_impl::hummock::disable_commit_epoch(context).await?
653        }
654        Commands::Hummock(HummockCommands::ListVersion {
655            verbose,
656            verbose_key_range,
657        }) => {
658            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
659        }
660        Commands::Hummock(HummockCommands::ListVersionDeltas {
661            start_id,
662            num_epochs,
663        }) => {
664            cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
665        }
666        Commands::Hummock(HummockCommands::ListKv {
667            epoch,
668            table_id,
669            data_dir,
670            use_new_object_prefix_strategy,
671        }) => {
672            cmd_impl::hummock::list_kv(
673                context,
674                epoch,
675                table_id,
676                data_dir,
677                use_new_object_prefix_strategy,
678            )
679            .await?;
680        }
681        Commands::Hummock(HummockCommands::SstDump(args)) => {
682            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
683        }
684        Commands::Hummock(HummockCommands::TriggerManualCompaction {
685            compaction_group_id,
686            table_id,
687            levels,
688            target_level,
689            sst_ids,
690            exclusive,
691            retry_interval_ms,
692        }) => {
693            cmd_impl::hummock::trigger_manual_compaction(
694                context,
695                compaction_group_id,
696                table_id.into(),
697                levels,
698                target_level,
699                sst_ids,
700                exclusive,
701                retry_interval_ms,
702            )
703            .await?
704        }
705        Commands::Hummock(HummockCommands::TriggerFullGc {
706            sst_retention_time_sec,
707            prefix,
708        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
709        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
710            list_pinned_versions(context).await?
711        }
712        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
713            cmd_impl::hummock::list_compaction_group(context).await?
714        }
715        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
716            compaction_group_ids,
717            max_bytes_for_level_base,
718            max_bytes_for_level_multiplier,
719            max_compaction_bytes,
720            sub_level_max_compaction_bytes,
721            level0_tier_compact_file_number,
722            target_file_size_base,
723            compaction_filter_mask,
724            max_sub_compaction,
725            level0_stop_write_threshold_sub_level_number,
726            level0_sub_level_compact_level_count,
727            max_space_reclaim_bytes,
728            level0_max_compact_file_number,
729            level0_overlapping_sub_level_compact_level_count,
730            enable_emergency_picker,
731            tombstone_reclaim_ratio,
732            compression_level,
733            compression_algorithm,
734            sstable_filter_type_level,
735            sstable_filter_type,
736            sstable_filter_layout_level,
737            sstable_filter_layout,
738            max_l0_compact_level,
739            sst_allowed_trivial_move_min_size,
740            disable_auto_group_scheduling,
741            max_overlapping_level_size,
742            sst_allowed_trivial_move_max_count,
743            emergency_level0_sst_file_count,
744            emergency_level0_sub_level_partition,
745            level0_stop_write_threshold_max_sst_count,
746            level0_stop_write_threshold_max_size,
747            enable_optimize_l0_interval_selection,
748            blocked_xor_filter_kv_count_threshold,
749            max_vnode_key_range_bytes,
750        }) => {
751            cmd_impl::hummock::update_compaction_config(
752                context,
753                compaction_group_ids,
754                build_compaction_config_vec(
755                    max_bytes_for_level_base,
756                    max_bytes_for_level_multiplier,
757                    max_compaction_bytes,
758                    sub_level_max_compaction_bytes,
759                    level0_tier_compact_file_number,
760                    target_file_size_base,
761                    compaction_filter_mask,
762                    max_sub_compaction,
763                    level0_stop_write_threshold_sub_level_number,
764                    level0_sub_level_compact_level_count,
765                    max_space_reclaim_bytes,
766                    level0_max_compact_file_number,
767                    level0_overlapping_sub_level_compact_level_count,
768                    enable_emergency_picker,
769                    tombstone_reclaim_ratio,
770                    if let Some(level) = compression_level {
771                        assert!(compression_algorithm.is_some());
772                        Some(CompressionAlgorithm {
773                            level,
774                            compression_algorithm: compression_algorithm.unwrap(),
775                        })
776                    } else {
777                        None
778                    },
779                    if let (Some(level), Some(filter_type)) =
780                        (sstable_filter_type_level, sstable_filter_type)
781                    {
782                        Some(SstableFilterType { level, filter_type })
783                    } else {
784                        None
785                    },
786                    if let (Some(level), Some(layout)) =
787                        (sstable_filter_layout_level, sstable_filter_layout)
788                    {
789                        Some(SstableFilterLayout { level, layout })
790                    } else {
791                        None
792                    },
793                    max_l0_compact_level,
794                    sst_allowed_trivial_move_min_size,
795                    disable_auto_group_scheduling,
796                    max_overlapping_level_size,
797                    sst_allowed_trivial_move_max_count,
798                    emergency_level0_sst_file_count,
799                    emergency_level0_sub_level_partition,
800                    level0_stop_write_threshold_max_sst_count,
801                    level0_stop_write_threshold_max_size,
802                    enable_optimize_l0_interval_selection,
803                    blocked_xor_filter_kv_count_threshold,
804                    max_vnode_key_range_bytes,
805                ),
806            )
807            .await?
808        }
809        Commands::Hummock(HummockCommands::SplitCompactionGroup {
810            compaction_group_id,
811            table_ids,
812            partition_vnode_count,
813        }) => {
814            cmd_impl::hummock::split_compaction_group(
815                context,
816                compaction_group_id,
817                &table_ids.into_iter().map_into().collect_vec(),
818                partition_vnode_count,
819            )
820            .await?;
821        }
822        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
823            cmd_impl::hummock::pause_version_checkpoint(context).await?;
824        }
825        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
826            cmd_impl::hummock::resume_version_checkpoint(context).await?;
827        }
828        Commands::Hummock(HummockCommands::ReplayVersion) => {
829            cmd_impl::hummock::replay_version(context).await?;
830        }
831        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
832            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
833        }
834        Commands::Hummock(HummockCommands::GetCompactionScore {
835            compaction_group_id,
836        }) => {
837            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
838        }
839        Commands::Hummock(HummockCommands::ValidateVersion) => {
840            cmd_impl::hummock::validate_version(context).await?;
841        }
842        Commands::Hummock(HummockCommands::RebuildTableStats) => {
843            cmd_impl::hummock::rebuild_table_stats(context).await?;
844        }
845        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
846            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
847        }
848        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
849            archive_ids,
850            data_dir,
851            sst_id,
852            use_new_object_prefix_strategy,
853        }) => {
854            cmd_impl::hummock::print_version_delta_in_archive(
855                context,
856                archive_ids.into_iter().map(HummockVersionId::new),
857                data_dir,
858                sst_id,
859                use_new_object_prefix_strategy,
860            )
861            .await?;
862        }
863        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
864            archive_ids,
865            data_dir,
866            user_key,
867            use_new_object_prefix_strategy,
868        }) => {
869            cmd_impl::hummock::print_user_key_in_archive(
870                context,
871                archive_ids.into_iter().map(HummockVersionId::new),
872                data_dir,
873                user_key,
874                use_new_object_prefix_strategy,
875            )
876            .await?;
877        }
878        Commands::Hummock(HummockCommands::TieredCacheTracing {
879            enable,
880            record_hybrid_insert_threshold_ms,
881            record_hybrid_get_threshold_ms,
882            record_hybrid_obtain_threshold_ms,
883            record_hybrid_remove_threshold_ms,
884            record_hybrid_fetch_threshold_ms,
885        }) => {
886            cmd_impl::hummock::tiered_cache_tracing(
887                context,
888                enable,
889                record_hybrid_insert_threshold_ms,
890                record_hybrid_get_threshold_ms,
891                record_hybrid_obtain_threshold_ms,
892                record_hybrid_remove_threshold_ms,
893                record_hybrid_fetch_threshold_ms,
894            )
895            .await?
896        }
897        Commands::Hummock(HummockCommands::MergeCompactionGroup {
898            left_group_id,
899            right_group_id,
900        }) => {
901            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
902                .await?
903        }
904
905        Commands::Hummock(HummockCommands::MigrateLegacyObject {
906            url,
907            source_dir,
908            target_dir,
909            concurrency,
910        }) => {
911            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
912        }
913        Commands::Hummock(HummockCommands::ResizeCache {
914            meta_cache_capacity_mb,
915            data_cache_capacity_mb,
916        }) => {
917            const MIB: u64 = 1024 * 1024;
918            cmd_impl::hummock::resize_cache(
919                context,
920                meta_cache_capacity_mb.map(|v| v * MIB),
921                data_cache_capacity_mb.map(|v| v * MIB),
922            )
923            .await?
924        }
925        Commands::Table(TableCommands::Scan {
926            mv_name,
927            data_dir,
928            use_new_object_prefix_strategy,
929        }) => {
930            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
931                .await?
932        }
933        Commands::Table(TableCommands::ScanById {
934            table_id,
935            data_dir,
936            use_new_object_prefix_strategy,
937        }) => {
938            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
939                .await?
940        }
941        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
942        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
943        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
944        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
945        Commands::Meta(MetaCommands::ResumeBackfill {
946            job_id,
947            fragment_id,
948        }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
949        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
950        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
951            cmd_impl::meta::source_split_info(context, ignore_id).await?
952        }
953        Commands::Meta(MetaCommands::Reschedule {
954            from,
955            dry_run,
956            plan,
957            revision,
958            resolve_no_shuffle,
959        }) => {
960            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
961                .await?
962        }
963        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
964            cmd_impl::meta::backup_meta(context, remarks).await?
965        }
966        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
967            risingwave_meta::backup_restore::restore(opts).await?
968        }
969        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
970            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
971        }
972        Commands::Meta(MetaCommands::ListConnections) => {
973            cmd_impl::meta::list_connections(context).await?
974        }
975        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
976            cmd_impl::meta::list_serving_fragment_mappings(context).await?
977        }
978        Commands::Meta(MetaCommands::UnregisterWorkers {
979            workers,
980            yes,
981            ignore_not_found,
982            check_fragment_occupied,
983        }) => {
984            cmd_impl::meta::unregister_workers(
985                context,
986                workers,
987                yes,
988                ignore_not_found,
989                check_fragment_occupied,
990            )
991            .await?
992        }
993        Commands::Meta(MetaCommands::ValidateSource { props }) => {
994            cmd_impl::meta::validate_source(context, props).await?
995        }
996        Commands::AwaitTree(AwaitTreeCommands::Dump {
997            actor_traces_format,
998        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
999        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
1000            cmd_impl::await_tree::bottleneck_detect(context, path).await?
1001        }
1002        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
1003            rw_diagnose_tools::await_tree::transcribe(path)?
1004        }
1005        Commands::Profile(ProfileCommands::Cpu {
1006            sleep,
1007            worker_types,
1008        }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
1009        Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
1010            cmd_impl::profile::heap_profile(context, dir, worker_types).await?
1011        }
1012        Commands::Throttle(ThrottleCommands::Source(args)) => {
1013            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
1014        }
1015        Commands::Throttle(ThrottleCommands::Mv(args)) => {
1016            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
1017        }
1018        Commands::Throttle(ThrottleCommands::Sink(args)) => {
1019            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
1020        }
1021        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
1022            table_id,
1023            parallelism,
1024        }) => {
1025            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
1026        }
1027        Commands::Meta(MetaCommands::AlterSourcePropertiesSafe {
1028            source_id,
1029            props,
1030            reset_splits,
1031        }) => {
1032            cmd_impl::meta::alter_source_properties_safe(context, source_id, props, reset_splits)
1033                .await?;
1034        }
1035        Commands::Meta(MetaCommands::ResetSourceSplits { source_id }) => {
1036            cmd_impl::meta::reset_source_splits(context, source_id).await?;
1037        }
1038        Commands::Meta(MetaCommands::InjectSourceOffsets { source_id, offsets }) => {
1039            cmd_impl::meta::inject_source_offsets(context, source_id, offsets).await?;
1040        }
1041        Commands::Meta(MetaCommands::CreateMetaStoreSchema { opts }) => {
1042            cmd_impl::meta::create_meta_store_schema(opts).await?;
1043        }
1044        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
1045    }
1046    Ok(())
1047}