risingwave_ctl/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{ArgGroup, Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
26    CompressionAlgorithm, SstableFilterKind, SstableFilterLayout,
27};
28use risingwave_pb::id::{CompactionGroupId, FragmentId, HummockSstableId, JobId, TableId};
29use thiserror_ext::AsReport;
30
31use crate::cmd_impl::hummock::{
32    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
33};
34use crate::cmd_impl::profile::ProfileWorkerType;
35use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
36use crate::cmd_impl::throttle::apply_throttle;
37use crate::common::CtlContext;
38
39pub mod cmd_impl;
40pub mod common;
41
42/// risectl provides internal access to the RisingWave cluster. Generally, you will need
43/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
44/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
45/// instead of playground mode to use this tool. risectl will read environment variables
46/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
47#[derive(Parser)]
48#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
49#[clap(propagate_version = true)]
50#[clap(infer_subcommands = true)]
51pub struct CliOpts {
52    #[clap(subcommand)]
53    command: Commands,
54}
55
56#[derive(Subcommand)]
57#[clap(infer_subcommands = true)]
58enum Commands {
59    /// Commands for Compute
60    #[clap(subcommand)]
61    Compute(ComputeCommands),
62    /// Commands for Hummock
63    #[clap(subcommand)]
64    Hummock(HummockCommands),
65    /// Commands for Tables
66    #[clap(subcommand)]
67    Table(TableCommands),
68    /// Commands for Meta
69    #[clap(subcommand)]
70    Meta(MetaCommands),
71    /// Commands for Benchmarks
72    #[clap(subcommand)]
73    Bench(BenchCommands),
74    /// Commands for await-tree, such as dumping, analyzing and transcribing
75    #[clap(subcommand)]
76    #[clap(visible_alias("trace"))]
77    AwaitTree(AwaitTreeCommands),
78    /// Commands for profiling nodes
79    #[clap(subcommand)]
80    Profile(ProfileCommands),
81    #[clap(subcommand)]
82    Throttle(ThrottleCommands),
83    /// Commands for Self-testing
84    #[clap(subcommand, hide = true)]
85    Test(TestCommands),
86}
87
88#[derive(Subcommand)]
89enum ComputeCommands {
90    /// Show all the configuration parameters on compute node
91    ShowConfig { host: String },
92}
93
94#[expect(clippy::large_enum_variant)]
95#[derive(Subcommand)]
96enum HummockCommands {
97    /// list latest Hummock version on meta node
98    ListVersion {
99        #[clap(short, long = "verbose", default_value_t = false)]
100        verbose: bool,
101
102        #[clap(long = "verbose_key_range", default_value_t = false)]
103        verbose_key_range: bool,
104    },
105
106    /// list hummock version deltas in the meta store
107    ListVersionDeltas {
108        #[clap(short, long = "start-version-delta-id", default_value_t = HummockVersionId::new(0))]
109        start_id: HummockVersionId,
110
111        #[clap(short, long = "num-epochs", default_value_t = 100)]
112        num_epochs: u32,
113    },
114    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
115    DisableCommitEpoch,
116    /// list all Hummock key-value pairs
117    ListKv {
118        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
119        epoch: u64,
120
121        #[clap(short, long = "table-id")]
122        table_id: TableId,
123
124        // data directory for hummock state store. None: use default
125        data_dir: Option<String>,
126
127        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
128        use_new_object_prefix_strategy: bool,
129    },
130    SstDump(SstDumpArgs),
131    /// trigger a targeted compaction through `compaction_group_id`
132    TriggerManualCompaction {
133        #[clap(short, long = "compaction-group-id", default_value_t = CompactionGroupId::new(2))]
134        compaction_group_id: CompactionGroupId,
135
136        #[clap(short, long = "table-id", default_value_t = 0)]
137        table_id: u32,
138
139        #[clap(short, long = "level", value_delimiter = ',', default_values_t = vec![1u32])]
140        levels: Vec<u32>,
141
142        #[clap(long = "target-level")]
143        target_level: Option<u32>,
144
145        #[clap(short, long = "sst-ids", value_delimiter = ',')]
146        sst_ids: Vec<HummockSstableId>,
147
148        #[clap(long = "exclusive", default_value_t = false)]
149        exclusive: bool,
150
151        #[clap(long = "retry-interval-ms", default_value_t = 1000)]
152        retry_interval_ms: u64,
153    },
154    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
155    /// `sst_retention_time_sec`, and with `prefix` in path.
156    TriggerFullGc {
157        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
158        sst_retention_time_sec: u64,
159        #[clap(short, long = "prefix", required = false)]
160        prefix: Option<String>,
161    },
162    /// List pinned versions of each worker.
163    ListPinnedVersions {},
164    /// List all compaction groups.
165    ListCompactionGroup,
166    /// Update compaction config for compaction groups.
167    UpdateCompactionConfig {
168        #[clap(long, value_delimiter = ',')]
169        compaction_group_ids: Vec<CompactionGroupId>,
170        #[clap(long)]
171        max_bytes_for_level_base: Option<u64>,
172        #[clap(long)]
173        max_bytes_for_level_multiplier: Option<u64>,
174        #[clap(long)]
175        max_compaction_bytes: Option<u64>,
176        #[clap(long)]
177        sub_level_max_compaction_bytes: Option<u64>,
178        #[clap(long)]
179        level0_tier_compact_file_number: Option<u64>,
180        #[clap(long)]
181        target_file_size_base: Option<u64>,
182        #[clap(long)]
183        compaction_filter_mask: Option<u32>,
184        #[clap(long)]
185        max_sub_compaction: Option<u32>,
186        #[clap(long)]
187        level0_stop_write_threshold_sub_level_number: Option<u64>,
188        #[clap(long)]
189        level0_sub_level_compact_level_count: Option<u32>,
190        #[clap(long)]
191        max_space_reclaim_bytes: Option<u64>,
192        #[clap(long)]
193        level0_max_compact_file_number: Option<u64>,
194        #[clap(long)]
195        level0_overlapping_sub_level_compact_level_count: Option<u32>,
196        #[clap(long)]
197        enable_emergency_picker: Option<bool>,
198        #[clap(long)]
199        tombstone_reclaim_ratio: Option<u32>,
200        #[clap(long)]
201        compression_level: Option<u32>,
202        #[clap(long)]
203        compression_algorithm: Option<String>,
204        /// LSM level index to update, e.g. 0 for L0, 6 for L6.
205        #[clap(long, requires = "sstable_filter_kind")]
206        sstable_filter_kind_level: Option<u32>,
207        /// Xor filter family to use for this level. Supported values: "xor16", "xor8".
208        #[clap(long, requires = "sstable_filter_kind_level")]
209        sstable_filter_kind: Option<String>,
210        /// LSM level index to update, e.g. 0 for L0, 6 for L6.
211        #[clap(long, requires = "sstable_filter_layout")]
212        sstable_filter_layout_level: Option<u32>,
213        /// Filter layout for this level.
214        ///
215        /// Supported values:
216        /// - "auto": decide by heuristics (currently by kv-count threshold)
217        /// - "plain" / "normal": always use a single non-blocked filter, ignoring kv-count threshold
218        #[clap(long, requires = "sstable_filter_layout_level")]
219        sstable_filter_layout: Option<String>,
220        #[clap(long)]
221        max_l0_compact_level: Option<u32>,
222        #[clap(long)]
223        sst_allowed_trivial_move_min_size: Option<u64>,
224        #[clap(long)]
225        disable_auto_group_scheduling: Option<bool>,
226        #[clap(long)]
227        max_overlapping_level_size: Option<u64>,
228        #[clap(long)]
229        sst_allowed_trivial_move_max_count: Option<u32>,
230        #[clap(long)]
231        emergency_level0_sst_file_count: Option<u32>,
232        #[clap(long)]
233        emergency_level0_sub_level_partition: Option<u32>,
234        #[clap(long)]
235        level0_stop_write_threshold_max_sst_count: Option<u32>,
236        #[clap(long)]
237        level0_stop_write_threshold_max_size: Option<u64>,
238        #[clap(long)]
239        enable_optimize_l0_interval_selection: Option<bool>,
240        /// KV-count threshold for using blocked xor filters when output layout is "auto".
241        ///
242        /// Note: shared-buffer flush does not read compaction group config, so this setting only
243        /// applies to compaction tasks.
244        #[clap(long)]
245        blocked_xor_filter_kv_count_threshold: Option<u64>,
246        #[clap(long)]
247        max_vnode_key_range_bytes: Option<u64>,
248    },
249    /// Split given compaction group into two. Moves the given tables to the new group.
250    SplitCompactionGroup {
251        #[clap(long)]
252        compaction_group_id: CompactionGroupId,
253        #[clap(long, value_delimiter = ',')]
254        table_ids: Vec<TableId>,
255        #[clap(long, default_value_t = 0)]
256        partition_vnode_count: u32,
257    },
258    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
259    PauseVersionCheckpoint,
260    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
261    ResumeVersionCheckpoint,
262    /// Replay version from the checkpoint one to the latest one.
263    ReplayVersion,
264    /// List compaction status
265    ListCompactionStatus {
266        #[clap(short, long = "verbose", default_value_t = false)]
267        verbose: bool,
268    },
269    GetCompactionScore {
270        #[clap(long)]
271        compaction_group_id: CompactionGroupId,
272    },
273    /// Validate the current `HummockVersion`.
274    ValidateVersion,
275    /// Rebuild table stats
276    RebuildTableStats,
277    CancelCompactTask {
278        #[clap(short, long)]
279        task_id: u64,
280    },
281    PrintUserKeyInArchive {
282        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
283        #[clap(long, value_delimiter = ',')]
284        archive_ids: Vec<u64>,
285        /// The data directory of Hummock storage, where `SSTable` objects can be found.
286        #[clap(long)]
287        data_dir: String,
288        /// KVs that are matched with the user key are printed.
289        #[clap(long)]
290        user_key: String,
291        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
292        use_new_object_prefix_strategy: bool,
293    },
294    PrintVersionDeltaInArchive {
295        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
296        #[clap(long, value_delimiter = ',')]
297        archive_ids: Vec<u64>,
298        /// The data directory of Hummock storage, where `SSTable` objects can be found.
299        #[clap(long)]
300        data_dir: String,
301        /// Version deltas that are related to the SST id are printed.
302        #[clap(long)]
303        sst_id: HummockSstableId,
304        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
305        use_new_object_prefix_strategy: bool,
306    },
307    TieredCacheTracing {
308        #[clap(long)]
309        enable: bool,
310        #[clap(long)]
311        record_hybrid_insert_threshold_ms: Option<u32>,
312        #[clap(long)]
313        record_hybrid_get_threshold_ms: Option<u32>,
314        #[clap(long)]
315        record_hybrid_obtain_threshold_ms: Option<u32>,
316        #[clap(long)]
317        record_hybrid_remove_threshold_ms: Option<u32>,
318        #[clap(long)]
319        record_hybrid_fetch_threshold_ms: Option<u32>,
320    },
321    MergeCompactionGroup {
322        #[clap(long)]
323        left_group_id: CompactionGroupId,
324        #[clap(long)]
325        right_group_id: CompactionGroupId,
326    },
327    MigrateLegacyObject {
328        url: String,
329        source_dir: String,
330        target_dir: String,
331        #[clap(long, default_value = "100")]
332        concurrency: u32,
333    },
334    ResizeCache {
335        #[clap(long)]
336        meta_cache_capacity_mb: Option<u64>,
337        #[clap(long)]
338        data_cache_capacity_mb: Option<u64>,
339    },
340}
341
342#[derive(Subcommand)]
343enum TableCommands {
344    /// scan a state table with MV name
345    Scan {
346        /// name of the materialized view to operate on
347        mv_name: String,
348        // data directory for hummock state store. None: use default
349        data_dir: Option<String>,
350
351        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
352        use_new_object_prefix_strategy: bool,
353    },
354    /// scan a state table using Id
355    ScanById {
356        /// id of the state table to operate on
357        table_id: TableId,
358        // data directory for hummock state store. None: use default
359        data_dir: Option<String>,
360        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
361        use_new_object_prefix_strategy: bool,
362    },
363    /// list all state tables
364    List,
365}
366
367#[derive(Subcommand)]
368#[expect(clippy::large_enum_variant)]
369enum MetaCommands {
370    /// pause the stream graph
371    Pause,
372    /// resume the stream graph
373    Resume,
374    /// force resume backfill for troubleshooting
375    #[clap(
376        group(
377            ArgGroup::new("resume_backfill_target")
378                .required(true)
379                .args(&["job_id", "fragment_id"])
380        )
381    )]
382    ResumeBackfill {
383        #[clap(long)]
384        job_id: Option<JobId>,
385        #[clap(long)]
386        fragment_id: Option<FragmentId>,
387    },
388    /// get cluster info
389    ClusterInfo,
390    /// get source split info
391    SourceSplitInfo {
392        #[clap(long)]
393        ignore_id: bool,
394    },
395    /// Reschedule the actors in the stream graph
396    ///
397    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
398    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
399    /// `added` when both are provided.
400    ///
401    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
402    /// ```text
403    /// {
404    ///     100: WorkerReschedule {
405    ///         increased_actor_count: { 1: 1 },
406    ///         decreased_actor_count: { 4: 1 },
407    ///     }
408    /// }
409    /// ```
410    /// Use ; to separate multiple fragment
411    #[clap(verbatim_doc_comment)]
412    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
413    Reschedule {
414        /// Plan of reschedule, needs to be used with `revision`
415        #[clap(long, requires = "revision")]
416        plan: Option<String>,
417        /// Revision of the plan
418        #[clap(long)]
419        revision: Option<u64>,
420        /// Reschedule from a specific file
421        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
422        from: Option<String>,
423        /// Show the plan only, no actual operation
424        #[clap(long, default_value = "false")]
425        dry_run: bool,
426        /// Resolve `NO_SHUFFLE` upstream
427        #[clap(long, default_value = "false")]
428        resolve_no_shuffle: bool,
429    },
430    /// backup meta by taking a meta snapshot
431    BackupMeta {
432        #[clap(long)]
433        remarks: Option<String>,
434    },
435    /// restore meta by recovering from a meta snapshot
436    RestoreMeta {
437        #[command(flatten)]
438        opts: RestoreOpts,
439    },
440    /// delete meta snapshots
441    DeleteMetaSnapshots {
442        #[clap(long, value_delimiter = ',')]
443        snapshot_ids: Vec<u64>,
444    },
445
446    /// List all existing connections in the catalog
447    ListConnections,
448
449    /// List fragment mapping for serving
450    ListServingFragmentMapping,
451
452    /// Unregister workers from the cluster
453    UnregisterWorkers {
454        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
455        #[clap(
456            long,
457            required = true,
458            value_delimiter = ',',
459            value_name = "worker_id or worker_host:worker_port, ..."
460        )]
461        workers: Vec<String>,
462
463        /// Automatic yes to prompts
464        #[clap(short = 'y', long, default_value_t = false)]
465        yes: bool,
466
467        /// The worker not found will be ignored
468        #[clap(long, default_value_t = false)]
469        ignore_not_found: bool,
470
471        /// Checking whether the fragment is occupied by workers
472        #[clap(long, default_value_t = false)]
473        check_fragment_occupied: bool,
474    },
475
476    /// Validate source interface for the cloud team
477    ValidateSource {
478        /// With properties in json format
479        /// If privatelink is used, specify `connection.id` instead of `connection.name`
480        #[clap(long)]
481        props: String,
482    },
483
484    SetCdcTableBackfillParallelism {
485        #[clap(long, required = true)]
486        table_id: u32,
487        #[clap(long, required = true)]
488        parallelism: u32,
489    },
490
491    /// Alter source connector properties with pause/resume orchestration (UNSAFE)
492    /// This operation pauses the source, updates properties, and resumes.
493    AlterSourcePropertiesSafe {
494        /// Source ID to update
495        #[clap(long)]
496        source_id: u32,
497        /// Properties to change in JSON format, e.g. '{"properties.bootstrap.server": "new-broker:9092"}'
498        #[clap(long)]
499        props: String,
500        /// Reset split assignments after property change (for major upstream changes)
501        #[clap(long, default_value_t = false)]
502        reset_splits: bool,
503    },
504
505    /// Reset source split assignments (UNSAFE - admin only)
506    /// Clears cached split state and triggers re-discovery from upstream.
507    ResetSourceSplits {
508        /// Source ID to reset
509        #[clap(long)]
510        source_id: u32,
511    },
512
513    /// Inject specific offsets into source splits (UNSAFE - admin only)
514    /// WARNING: This can cause data duplication or loss!
515    InjectSourceOffsets {
516        /// Source ID to inject offsets for
517        #[clap(long)]
518        source_id: u32,
519        /// Split offsets in JSON format, e.g. '{"split-0": "100", "split-1": "200"}'
520        #[clap(long)]
521        offsets: String,
522    },
523}
524
525#[derive(Subcommand, Clone, Debug)]
526pub enum AwaitTreeCommands {
527    /// Dump Await Tree
528    Dump {
529        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
530        #[clap(short, long = "actor-traces-format")]
531        actor_traces_format: Option<String>,
532    },
533    /// Analyze Await Tree
534    Analyze {
535        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
536        /// The actor traces format can be either `json` or `text`. The analyze command will
537        /// automatically detect the format.
538        #[clap(long = "path")]
539        path: Option<String>,
540    },
541    /// Transcribe Await Tree From JSON to Text format
542    Transcribe {
543        /// The path to the await tree file to be transcribed
544        #[clap(long = "path")]
545        path: String,
546    },
547}
548
549#[derive(Subcommand, Clone, Debug)]
550enum TestCommands {
551    /// Test if JVM and Java libraries are working
552    Jvm,
553}
554
555#[derive(Subcommand, Clone, Debug)]
556enum ThrottleCommands {
557    Source(ThrottleCommandArgs),
558    Mv(ThrottleCommandArgs),
559    Sink(ThrottleCommandArgs),
560}
561
562#[derive(Clone, Debug, clap::ValueEnum)]
563pub enum ThrottleTypeArg {
564    Dml,
565    Backfill,
566    Source,
567    Sink,
568}
569
570#[derive(Clone, Debug, Args)]
571pub struct ThrottleCommandArgs {
572    /// The ID of the object to throttle
573    #[clap(long, required = true)]
574    id: u32,
575    /// The rate limit to apply
576    #[clap(long)]
577    rate: Option<u32>,
578    /// The type of throttle to apply
579    #[clap(long, value_enum, required = true)]
580    throttle_type: ThrottleTypeArg,
581}
582
583#[derive(Subcommand, Clone, Debug)]
584pub enum ProfileCommands {
585    /// CPU profile
586    Cpu {
587        /// The time to active profiling for (in seconds)
588        #[clap(short, long = "sleep")]
589        sleep: u64,
590        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
591        #[clap(long = "worker-type", value_name = "TYPE")]
592        worker_types: Vec<ProfileWorkerType>,
593    },
594    /// Heap profile
595    Heap {
596        /// The output directory of the dumped file
597        #[clap(long = "dir")]
598        dir: Option<String>,
599        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
600        #[clap(long = "worker-type", value_name = "TYPE")]
601        worker_types: Vec<ProfileWorkerType>,
602    },
603}
604
605/// Start `risectl` with the given options.
606/// Cancel the operation when the given `shutdown` token triggers.
607/// Log and abort the process if any error occurs.
608///
609/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
610/// in an embedded manner.
611pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
612    let context = CtlContext::default();
613
614    tokio::select! {
615        _ = shutdown.cancelled() => {
616            // Shutdown requested, clean up the context and return.
617            context.try_close().await;
618        }
619
620        result = start_fallible(opts, &context) => {
621            if let Err(e) = result {
622                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
623                std::process::exit(1);
624            }
625        }
626    }
627}
628
629/// Start `risectl` with the given options.
630/// Return `Err` if any error occurs.
631pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
632    let result = start_impl(opts, context).await;
633    context.try_close().await;
634    result
635}
636
637#[expect(
638    clippy::large_stack_frames,
639    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
640              post-layout generator stores only one arm at a time (~13–16 KiB)."
641)]
642async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
643    match opts.command {
644        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
645            cmd_impl::compute::show_config(&host).await?
646        }
647        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
648            cmd_impl::hummock::disable_commit_epoch(context).await?
649        }
650        Commands::Hummock(HummockCommands::ListVersion {
651            verbose,
652            verbose_key_range,
653        }) => {
654            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
655        }
656        Commands::Hummock(HummockCommands::ListVersionDeltas {
657            start_id,
658            num_epochs,
659        }) => {
660            cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
661        }
662        Commands::Hummock(HummockCommands::ListKv {
663            epoch,
664            table_id,
665            data_dir,
666            use_new_object_prefix_strategy,
667        }) => {
668            cmd_impl::hummock::list_kv(
669                context,
670                epoch,
671                table_id,
672                data_dir,
673                use_new_object_prefix_strategy,
674            )
675            .await?;
676        }
677        Commands::Hummock(HummockCommands::SstDump(args)) => {
678            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
679        }
680        Commands::Hummock(HummockCommands::TriggerManualCompaction {
681            compaction_group_id,
682            table_id,
683            levels,
684            target_level,
685            sst_ids,
686            exclusive,
687            retry_interval_ms,
688        }) => {
689            cmd_impl::hummock::trigger_manual_compaction(
690                context,
691                compaction_group_id,
692                table_id.into(),
693                levels,
694                target_level,
695                sst_ids,
696                exclusive,
697                retry_interval_ms,
698            )
699            .await?
700        }
701        Commands::Hummock(HummockCommands::TriggerFullGc {
702            sst_retention_time_sec,
703            prefix,
704        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
705        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
706            list_pinned_versions(context).await?
707        }
708        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
709            cmd_impl::hummock::list_compaction_group(context).await?
710        }
711        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
712            compaction_group_ids,
713            max_bytes_for_level_base,
714            max_bytes_for_level_multiplier,
715            max_compaction_bytes,
716            sub_level_max_compaction_bytes,
717            level0_tier_compact_file_number,
718            target_file_size_base,
719            compaction_filter_mask,
720            max_sub_compaction,
721            level0_stop_write_threshold_sub_level_number,
722            level0_sub_level_compact_level_count,
723            max_space_reclaim_bytes,
724            level0_max_compact_file_number,
725            level0_overlapping_sub_level_compact_level_count,
726            enable_emergency_picker,
727            tombstone_reclaim_ratio,
728            compression_level,
729            compression_algorithm,
730            sstable_filter_kind_level,
731            sstable_filter_kind,
732            sstable_filter_layout_level,
733            sstable_filter_layout,
734            max_l0_compact_level,
735            sst_allowed_trivial_move_min_size,
736            disable_auto_group_scheduling,
737            max_overlapping_level_size,
738            sst_allowed_trivial_move_max_count,
739            emergency_level0_sst_file_count,
740            emergency_level0_sub_level_partition,
741            level0_stop_write_threshold_max_sst_count,
742            level0_stop_write_threshold_max_size,
743            enable_optimize_l0_interval_selection,
744            blocked_xor_filter_kv_count_threshold,
745            max_vnode_key_range_bytes,
746        }) => {
747            cmd_impl::hummock::update_compaction_config(
748                context,
749                compaction_group_ids,
750                build_compaction_config_vec(
751                    max_bytes_for_level_base,
752                    max_bytes_for_level_multiplier,
753                    max_compaction_bytes,
754                    sub_level_max_compaction_bytes,
755                    level0_tier_compact_file_number,
756                    target_file_size_base,
757                    compaction_filter_mask,
758                    max_sub_compaction,
759                    level0_stop_write_threshold_sub_level_number,
760                    level0_sub_level_compact_level_count,
761                    max_space_reclaim_bytes,
762                    level0_max_compact_file_number,
763                    level0_overlapping_sub_level_compact_level_count,
764                    enable_emergency_picker,
765                    tombstone_reclaim_ratio,
766                    if let Some(level) = compression_level {
767                        assert!(compression_algorithm.is_some());
768                        Some(CompressionAlgorithm {
769                            level,
770                            compression_algorithm: compression_algorithm.unwrap(),
771                        })
772                    } else {
773                        None
774                    },
775                    if let (Some(level), Some(filter_kind)) =
776                        (sstable_filter_kind_level, sstable_filter_kind)
777                    {
778                        Some(SstableFilterKind { level, filter_kind })
779                    } else {
780                        None
781                    },
782                    if let (Some(level), Some(layout)) =
783                        (sstable_filter_layout_level, sstable_filter_layout)
784                    {
785                        Some(SstableFilterLayout { level, layout })
786                    } else {
787                        None
788                    },
789                    max_l0_compact_level,
790                    sst_allowed_trivial_move_min_size,
791                    disable_auto_group_scheduling,
792                    max_overlapping_level_size,
793                    sst_allowed_trivial_move_max_count,
794                    emergency_level0_sst_file_count,
795                    emergency_level0_sub_level_partition,
796                    level0_stop_write_threshold_max_sst_count,
797                    level0_stop_write_threshold_max_size,
798                    enable_optimize_l0_interval_selection,
799                    blocked_xor_filter_kv_count_threshold,
800                    max_vnode_key_range_bytes,
801                ),
802            )
803            .await?
804        }
805        Commands::Hummock(HummockCommands::SplitCompactionGroup {
806            compaction_group_id,
807            table_ids,
808            partition_vnode_count,
809        }) => {
810            cmd_impl::hummock::split_compaction_group(
811                context,
812                compaction_group_id,
813                &table_ids.into_iter().map_into().collect_vec(),
814                partition_vnode_count,
815            )
816            .await?;
817        }
818        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
819            cmd_impl::hummock::pause_version_checkpoint(context).await?;
820        }
821        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
822            cmd_impl::hummock::resume_version_checkpoint(context).await?;
823        }
824        Commands::Hummock(HummockCommands::ReplayVersion) => {
825            cmd_impl::hummock::replay_version(context).await?;
826        }
827        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
828            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
829        }
830        Commands::Hummock(HummockCommands::GetCompactionScore {
831            compaction_group_id,
832        }) => {
833            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
834        }
835        Commands::Hummock(HummockCommands::ValidateVersion) => {
836            cmd_impl::hummock::validate_version(context).await?;
837        }
838        Commands::Hummock(HummockCommands::RebuildTableStats) => {
839            cmd_impl::hummock::rebuild_table_stats(context).await?;
840        }
841        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
842            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
843        }
844        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
845            archive_ids,
846            data_dir,
847            sst_id,
848            use_new_object_prefix_strategy,
849        }) => {
850            cmd_impl::hummock::print_version_delta_in_archive(
851                context,
852                archive_ids.into_iter().map(HummockVersionId::new),
853                data_dir,
854                sst_id,
855                use_new_object_prefix_strategy,
856            )
857            .await?;
858        }
859        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
860            archive_ids,
861            data_dir,
862            user_key,
863            use_new_object_prefix_strategy,
864        }) => {
865            cmd_impl::hummock::print_user_key_in_archive(
866                context,
867                archive_ids.into_iter().map(HummockVersionId::new),
868                data_dir,
869                user_key,
870                use_new_object_prefix_strategy,
871            )
872            .await?;
873        }
874        Commands::Hummock(HummockCommands::TieredCacheTracing {
875            enable,
876            record_hybrid_insert_threshold_ms,
877            record_hybrid_get_threshold_ms,
878            record_hybrid_obtain_threshold_ms,
879            record_hybrid_remove_threshold_ms,
880            record_hybrid_fetch_threshold_ms,
881        }) => {
882            cmd_impl::hummock::tiered_cache_tracing(
883                context,
884                enable,
885                record_hybrid_insert_threshold_ms,
886                record_hybrid_get_threshold_ms,
887                record_hybrid_obtain_threshold_ms,
888                record_hybrid_remove_threshold_ms,
889                record_hybrid_fetch_threshold_ms,
890            )
891            .await?
892        }
893        Commands::Hummock(HummockCommands::MergeCompactionGroup {
894            left_group_id,
895            right_group_id,
896        }) => {
897            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
898                .await?
899        }
900
901        Commands::Hummock(HummockCommands::MigrateLegacyObject {
902            url,
903            source_dir,
904            target_dir,
905            concurrency,
906        }) => {
907            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
908        }
909        Commands::Hummock(HummockCommands::ResizeCache {
910            meta_cache_capacity_mb,
911            data_cache_capacity_mb,
912        }) => {
913            const MIB: u64 = 1024 * 1024;
914            cmd_impl::hummock::resize_cache(
915                context,
916                meta_cache_capacity_mb.map(|v| v * MIB),
917                data_cache_capacity_mb.map(|v| v * MIB),
918            )
919            .await?
920        }
921        Commands::Table(TableCommands::Scan {
922            mv_name,
923            data_dir,
924            use_new_object_prefix_strategy,
925        }) => {
926            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
927                .await?
928        }
929        Commands::Table(TableCommands::ScanById {
930            table_id,
931            data_dir,
932            use_new_object_prefix_strategy,
933        }) => {
934            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
935                .await?
936        }
937        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
938        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
939        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
940        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
941        Commands::Meta(MetaCommands::ResumeBackfill {
942            job_id,
943            fragment_id,
944        }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
945        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
946        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
947            cmd_impl::meta::source_split_info(context, ignore_id).await?
948        }
949        Commands::Meta(MetaCommands::Reschedule {
950            from,
951            dry_run,
952            plan,
953            revision,
954            resolve_no_shuffle,
955        }) => {
956            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
957                .await?
958        }
959        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
960            cmd_impl::meta::backup_meta(context, remarks).await?
961        }
962        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
963            risingwave_meta::backup_restore::restore(opts).await?
964        }
965        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
966            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
967        }
968        Commands::Meta(MetaCommands::ListConnections) => {
969            cmd_impl::meta::list_connections(context).await?
970        }
971        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
972            cmd_impl::meta::list_serving_fragment_mappings(context).await?
973        }
974        Commands::Meta(MetaCommands::UnregisterWorkers {
975            workers,
976            yes,
977            ignore_not_found,
978            check_fragment_occupied,
979        }) => {
980            cmd_impl::meta::unregister_workers(
981                context,
982                workers,
983                yes,
984                ignore_not_found,
985                check_fragment_occupied,
986            )
987            .await?
988        }
989        Commands::Meta(MetaCommands::ValidateSource { props }) => {
990            cmd_impl::meta::validate_source(context, props).await?
991        }
992        Commands::AwaitTree(AwaitTreeCommands::Dump {
993            actor_traces_format,
994        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
995        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
996            cmd_impl::await_tree::bottleneck_detect(context, path).await?
997        }
998        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
999            rw_diagnose_tools::await_tree::transcribe(path)?
1000        }
1001        Commands::Profile(ProfileCommands::Cpu {
1002            sleep,
1003            worker_types,
1004        }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
1005        Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
1006            cmd_impl::profile::heap_profile(context, dir, worker_types).await?
1007        }
1008        Commands::Throttle(ThrottleCommands::Source(args)) => {
1009            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
1010        }
1011        Commands::Throttle(ThrottleCommands::Mv(args)) => {
1012            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
1013        }
1014        Commands::Throttle(ThrottleCommands::Sink(args)) => {
1015            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
1016        }
1017        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
1018            table_id,
1019            parallelism,
1020        }) => {
1021            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
1022        }
1023        Commands::Meta(MetaCommands::AlterSourcePropertiesSafe {
1024            source_id,
1025            props,
1026            reset_splits,
1027        }) => {
1028            cmd_impl::meta::alter_source_properties_safe(context, source_id, props, reset_splits)
1029                .await?;
1030        }
1031        Commands::Meta(MetaCommands::ResetSourceSplits { source_id }) => {
1032            cmd_impl::meta::reset_source_splits(context, source_id).await?;
1033        }
1034        Commands::Meta(MetaCommands::InjectSourceOffsets { source_id, offsets }) => {
1035            cmd_impl::meta::inject_source_offsets(context, source_id, offsets).await?;
1036        }
1037        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
1038    }
1039    Ok(())
1040}