risingwave_ctl/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{ArgGroup, Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::id::{CompactionGroupId, FragmentId, HummockSstableId, JobId, TableId};
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30    build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::profile::ProfileWorkerType;
33use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
34use crate::cmd_impl::throttle::apply_throttle;
35use crate::common::CtlContext;
36
37pub mod cmd_impl;
38pub mod common;
39
40/// risectl provides internal access to the RisingWave cluster. Generally, you will need
41/// to provide the meta address and the state store URL to enable risectl to access the cluster. You
42/// must start RisingWave in full cluster mode (e.g. enable MinIO and compactor in risedev.yml)
43/// instead of playground mode to use this tool. risectl will read environment variables
44/// `RW_META_ADDR` and `RW_HUMMOCK_URL` to configure itself.
45#[derive(Parser)]
46#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
47#[clap(propagate_version = true)]
48#[clap(infer_subcommands = true)]
49pub struct CliOpts {
50    #[clap(subcommand)]
51    command: Commands,
52}
53
54#[derive(Subcommand)]
55#[clap(infer_subcommands = true)]
56enum Commands {
57    /// Commands for Compute
58    #[clap(subcommand)]
59    Compute(ComputeCommands),
60    /// Commands for Hummock
61    #[clap(subcommand)]
62    Hummock(HummockCommands),
63    /// Commands for Tables
64    #[clap(subcommand)]
65    Table(TableCommands),
66    /// Commands for Meta
67    #[clap(subcommand)]
68    Meta(MetaCommands),
69    /// Commands for Benchmarks
70    #[clap(subcommand)]
71    Bench(BenchCommands),
72    /// Commands for await-tree, such as dumping, analyzing and transcribing
73    #[clap(subcommand)]
74    #[clap(visible_alias("trace"))]
75    AwaitTree(AwaitTreeCommands),
76    /// Commands for profiling nodes
77    #[clap(subcommand)]
78    Profile(ProfileCommands),
79    #[clap(subcommand)]
80    Throttle(ThrottleCommands),
81    /// Commands for Self-testing
82    #[clap(subcommand, hide = true)]
83    Test(TestCommands),
84}
85
86#[derive(Subcommand)]
87enum ComputeCommands {
88    /// Show all the configuration parameters on compute node
89    ShowConfig { host: String },
90}
91
92#[allow(clippy::large_enum_variant)]
93#[derive(Subcommand)]
94enum HummockCommands {
95    /// list latest Hummock version on meta node
96    ListVersion {
97        #[clap(short, long = "verbose", default_value_t = false)]
98        verbose: bool,
99
100        #[clap(long = "verbose_key_range", default_value_t = false)]
101        verbose_key_range: bool,
102    },
103
104    /// list hummock version deltas in the meta store
105    ListVersionDeltas {
106        #[clap(short, long = "start-version-delta-id", default_value_t = HummockVersionId::new(0))]
107        start_id: HummockVersionId,
108
109        #[clap(short, long = "num-epochs", default_value_t = 100)]
110        num_epochs: u32,
111    },
112    /// Forbid hummock commit new epochs, which is a prerequisite for compaction deterministic test
113    DisableCommitEpoch,
114    /// list all Hummock key-value pairs
115    ListKv {
116        #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
117        epoch: u64,
118
119        #[clap(short, long = "table-id")]
120        table_id: TableId,
121
122        // data directory for hummock state store. None: use default
123        data_dir: Option<String>,
124
125        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
126        use_new_object_prefix_strategy: bool,
127    },
128    SstDump(SstDumpArgs),
129    /// trigger a targeted compaction through `compaction_group_id`
130    TriggerManualCompaction {
131        #[clap(short, long = "compaction-group-id", default_value_t = CompactionGroupId::new(2))]
132        compaction_group_id: CompactionGroupId,
133
134        #[clap(short, long = "table-id", default_value_t = 0)]
135        table_id: u32,
136
137        #[clap(short, long = "level", value_delimiter = ',', default_values_t = vec![1u32])]
138        levels: Vec<u32>,
139
140        #[clap(short, long = "sst-ids", value_delimiter = ',')]
141        sst_ids: Vec<HummockSstableId>,
142
143        #[clap(long = "exclusive", default_value_t = false)]
144        exclusive: bool,
145
146        #[clap(long = "retry-interval-ms", default_value_t = 1000)]
147        retry_interval_ms: u64,
148    },
149    /// Trigger a full GC for SSTs that is not pinned, with timestamp <= now -
150    /// `sst_retention_time_sec`, and with `prefix` in path.
151    TriggerFullGc {
152        #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
153        sst_retention_time_sec: u64,
154        #[clap(short, long = "prefix", required = false)]
155        prefix: Option<String>,
156    },
157    /// List pinned versions of each worker.
158    ListPinnedVersions {},
159    /// List all compaction groups.
160    ListCompactionGroup,
161    /// Update compaction config for compaction groups.
162    UpdateCompactionConfig {
163        #[clap(long, value_delimiter = ',')]
164        compaction_group_ids: Vec<CompactionGroupId>,
165        #[clap(long)]
166        max_bytes_for_level_base: Option<u64>,
167        #[clap(long)]
168        max_bytes_for_level_multiplier: Option<u64>,
169        #[clap(long)]
170        max_compaction_bytes: Option<u64>,
171        #[clap(long)]
172        sub_level_max_compaction_bytes: Option<u64>,
173        #[clap(long)]
174        level0_tier_compact_file_number: Option<u64>,
175        #[clap(long)]
176        target_file_size_base: Option<u64>,
177        #[clap(long)]
178        compaction_filter_mask: Option<u32>,
179        #[clap(long)]
180        max_sub_compaction: Option<u32>,
181        #[clap(long)]
182        level0_stop_write_threshold_sub_level_number: Option<u64>,
183        #[clap(long)]
184        level0_sub_level_compact_level_count: Option<u32>,
185        #[clap(long)]
186        max_space_reclaim_bytes: Option<u64>,
187        #[clap(long)]
188        level0_max_compact_file_number: Option<u64>,
189        #[clap(long)]
190        level0_overlapping_sub_level_compact_level_count: Option<u32>,
191        #[clap(long)]
192        enable_emergency_picker: Option<bool>,
193        #[clap(long)]
194        tombstone_reclaim_ratio: Option<u32>,
195        #[clap(long)]
196        compression_level: Option<u32>,
197        #[clap(long)]
198        compression_algorithm: Option<String>,
199        #[clap(long)]
200        max_l0_compact_level: Option<u32>,
201        #[clap(long)]
202        sst_allowed_trivial_move_min_size: Option<u64>,
203        #[clap(long)]
204        disable_auto_group_scheduling: Option<bool>,
205        #[clap(long)]
206        max_overlapping_level_size: Option<u64>,
207        #[clap(long)]
208        sst_allowed_trivial_move_max_count: Option<u32>,
209        #[clap(long)]
210        emergency_level0_sst_file_count: Option<u32>,
211        #[clap(long)]
212        emergency_level0_sub_level_partition: Option<u32>,
213        #[clap(long)]
214        level0_stop_write_threshold_max_sst_count: Option<u32>,
215        #[clap(long)]
216        level0_stop_write_threshold_max_size: Option<u64>,
217        #[clap(long)]
218        enable_optimize_l0_interval_selection: Option<bool>,
219        #[clap(long)]
220        max_kv_count_for_xor16: Option<u64>,
221        #[clap(long)]
222        max_vnode_key_range_bytes: Option<u64>,
223    },
224    /// Split given compaction group into two. Moves the given tables to the new group.
225    SplitCompactionGroup {
226        #[clap(long)]
227        compaction_group_id: CompactionGroupId,
228        #[clap(long, value_delimiter = ',')]
229        table_ids: Vec<TableId>,
230        #[clap(long, default_value_t = 0)]
231        partition_vnode_count: u32,
232    },
233    /// Pause version checkpoint, which subsequently pauses GC of delta log and SST object.
234    PauseVersionCheckpoint,
235    /// Resume version checkpoint, which subsequently resumes GC of delta log and SST object.
236    ResumeVersionCheckpoint,
237    /// Replay version from the checkpoint one to the latest one.
238    ReplayVersion,
239    /// List compaction status
240    ListCompactionStatus {
241        #[clap(short, long = "verbose", default_value_t = false)]
242        verbose: bool,
243    },
244    GetCompactionScore {
245        #[clap(long)]
246        compaction_group_id: CompactionGroupId,
247    },
248    /// Validate the current `HummockVersion`.
249    ValidateVersion,
250    /// Rebuild table stats
251    RebuildTableStats,
252    CancelCompactTask {
253        #[clap(short, long)]
254        task_id: u64,
255    },
256    PrintUserKeyInArchive {
257        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
258        #[clap(long, value_delimiter = ',')]
259        archive_ids: Vec<u64>,
260        /// The data directory of Hummock storage, where `SSTable` objects can be found.
261        #[clap(long)]
262        data_dir: String,
263        /// KVs that are matched with the user key are printed.
264        #[clap(long)]
265        user_key: String,
266        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
267        use_new_object_prefix_strategy: bool,
268    },
269    PrintVersionDeltaInArchive {
270        /// The ident of the archive file in object store. It's also the first Hummock version id of this archive.
271        #[clap(long, value_delimiter = ',')]
272        archive_ids: Vec<u64>,
273        /// The data directory of Hummock storage, where `SSTable` objects can be found.
274        #[clap(long)]
275        data_dir: String,
276        /// Version deltas that are related to the SST id are printed.
277        #[clap(long)]
278        sst_id: HummockSstableId,
279        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
280        use_new_object_prefix_strategy: bool,
281    },
282    TieredCacheTracing {
283        #[clap(long)]
284        enable: bool,
285        #[clap(long)]
286        record_hybrid_insert_threshold_ms: Option<u32>,
287        #[clap(long)]
288        record_hybrid_get_threshold_ms: Option<u32>,
289        #[clap(long)]
290        record_hybrid_obtain_threshold_ms: Option<u32>,
291        #[clap(long)]
292        record_hybrid_remove_threshold_ms: Option<u32>,
293        #[clap(long)]
294        record_hybrid_fetch_threshold_ms: Option<u32>,
295    },
296    MergeCompactionGroup {
297        #[clap(long)]
298        left_group_id: CompactionGroupId,
299        #[clap(long)]
300        right_group_id: CompactionGroupId,
301    },
302    MigrateLegacyObject {
303        url: String,
304        source_dir: String,
305        target_dir: String,
306        #[clap(long, default_value = "100")]
307        concurrency: u32,
308    },
309    ResizeCache {
310        #[clap(long)]
311        meta_cache_capacity_mb: Option<u64>,
312        #[clap(long)]
313        data_cache_capacity_mb: Option<u64>,
314    },
315}
316
317#[derive(Subcommand)]
318enum TableCommands {
319    /// scan a state table with MV name
320    Scan {
321        /// name of the materialized view to operate on
322        mv_name: String,
323        // data directory for hummock state store. None: use default
324        data_dir: Option<String>,
325
326        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
327        use_new_object_prefix_strategy: bool,
328    },
329    /// scan a state table using Id
330    ScanById {
331        /// id of the state table to operate on
332        table_id: TableId,
333        // data directory for hummock state store. None: use default
334        data_dir: Option<String>,
335        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
336        use_new_object_prefix_strategy: bool,
337    },
338    /// list all state tables
339    List,
340}
341
342#[derive(Subcommand)]
343#[allow(clippy::large_enum_variant)]
344enum MetaCommands {
345    /// pause the stream graph
346    Pause,
347    /// resume the stream graph
348    Resume,
349    /// force resume backfill for troubleshooting
350    #[clap(
351        group(
352            ArgGroup::new("resume_backfill_target")
353                .required(true)
354                .args(&["job_id", "fragment_id"])
355        )
356    )]
357    ResumeBackfill {
358        #[clap(long)]
359        job_id: Option<JobId>,
360        #[clap(long)]
361        fragment_id: Option<FragmentId>,
362    },
363    /// get cluster info
364    ClusterInfo,
365    /// get source split info
366    SourceSplitInfo {
367        #[clap(long)]
368        ignore_id: bool,
369    },
370    /// Reschedule the actors in the stream graph
371    ///
372    /// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
373    /// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
374    /// `added` when both are provided.
375    ///
376    /// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
377    /// ```text
378    /// {
379    ///     100: WorkerReschedule {
380    ///         increased_actor_count: { 1: 1 },
381    ///         decreased_actor_count: { 4: 1 },
382    ///     }
383    /// }
384    /// ```
385    /// Use ; to separate multiple fragment
386    #[clap(verbatim_doc_comment)]
387    #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
388    Reschedule {
389        /// Plan of reschedule, needs to be used with `revision`
390        #[clap(long, requires = "revision")]
391        plan: Option<String>,
392        /// Revision of the plan
393        #[clap(long)]
394        revision: Option<u64>,
395        /// Reschedule from a specific file
396        #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
397        from: Option<String>,
398        /// Show the plan only, no actual operation
399        #[clap(long, default_value = "false")]
400        dry_run: bool,
401        /// Resolve `NO_SHUFFLE` upstream
402        #[clap(long, default_value = "false")]
403        resolve_no_shuffle: bool,
404    },
405    /// backup meta by taking a meta snapshot
406    BackupMeta {
407        #[clap(long)]
408        remarks: Option<String>,
409    },
410    /// restore meta by recovering from a meta snapshot
411    RestoreMeta {
412        #[command(flatten)]
413        opts: RestoreOpts,
414    },
415    /// delete meta snapshots
416    DeleteMetaSnapshots {
417        #[clap(long, value_delimiter = ',')]
418        snapshot_ids: Vec<u64>,
419    },
420
421    /// List all existing connections in the catalog
422    ListConnections,
423
424    /// List fragment mapping for serving
425    ListServingFragmentMapping,
426
427    /// Unregister workers from the cluster
428    UnregisterWorkers {
429        /// The workers that needs to be unregistered, `worker_id` and `worker_host:worker_port` are both supported
430        #[clap(
431            long,
432            required = true,
433            value_delimiter = ',',
434            value_name = "worker_id or worker_host:worker_port, ..."
435        )]
436        workers: Vec<String>,
437
438        /// Automatic yes to prompts
439        #[clap(short = 'y', long, default_value_t = false)]
440        yes: bool,
441
442        /// The worker not found will be ignored
443        #[clap(long, default_value_t = false)]
444        ignore_not_found: bool,
445
446        /// Checking whether the fragment is occupied by workers
447        #[clap(long, default_value_t = false)]
448        check_fragment_occupied: bool,
449    },
450
451    /// Validate source interface for the cloud team
452    ValidateSource {
453        /// With properties in json format
454        /// If privatelink is used, specify `connection.id` instead of `connection.name`
455        #[clap(long)]
456        props: String,
457    },
458
459    SetCdcTableBackfillParallelism {
460        #[clap(long, required = true)]
461        table_id: u32,
462        #[clap(long, required = true)]
463        parallelism: u32,
464    },
465
466    /// Alter source connector properties with pause/resume orchestration (UNSAFE)
467    /// This operation pauses the source, updates properties, and resumes.
468    AlterSourcePropertiesSafe {
469        /// Source ID to update
470        #[clap(long)]
471        source_id: u32,
472        /// Properties to change in JSON format, e.g. '{"properties.bootstrap.server": "new-broker:9092"}'
473        #[clap(long)]
474        props: String,
475        /// Reset split assignments after property change (for major upstream changes)
476        #[clap(long, default_value_t = false)]
477        reset_splits: bool,
478    },
479
480    /// Reset source split assignments (UNSAFE - admin only)
481    /// Clears cached split state and triggers re-discovery from upstream.
482    ResetSourceSplits {
483        /// Source ID to reset
484        #[clap(long)]
485        source_id: u32,
486    },
487
488    /// Inject specific offsets into source splits (UNSAFE - admin only)
489    /// WARNING: This can cause data duplication or loss!
490    InjectSourceOffsets {
491        /// Source ID to inject offsets for
492        #[clap(long)]
493        source_id: u32,
494        /// Split offsets in JSON format, e.g. '{"split-0": "100", "split-1": "200"}'
495        #[clap(long)]
496        offsets: String,
497    },
498}
499
500#[derive(Subcommand, Clone, Debug)]
501pub enum AwaitTreeCommands {
502    /// Dump Await Tree
503    Dump {
504        /// The format of actor traces in the diagnose file. Allowed values: `json`, `text`. `json` by default.
505        #[clap(short, long = "actor-traces-format")]
506        actor_traces_format: Option<String>,
507    },
508    /// Analyze Await Tree
509    Analyze {
510        /// The path to the diagnose file, if None, ctl will first pull one from the cluster
511        /// The actor traces format can be either `json` or `text`. The analyze command will
512        /// automatically detect the format.
513        #[clap(long = "path")]
514        path: Option<String>,
515    },
516    /// Transcribe Await Tree From JSON to Text format
517    Transcribe {
518        /// The path to the await tree file to be transcribed
519        #[clap(long = "path")]
520        path: String,
521    },
522}
523
524#[derive(Subcommand, Clone, Debug)]
525enum TestCommands {
526    /// Test if JVM and Java libraries are working
527    Jvm,
528}
529
530#[derive(Subcommand, Clone, Debug)]
531enum ThrottleCommands {
532    Source(ThrottleCommandArgs),
533    Mv(ThrottleCommandArgs),
534    Sink(ThrottleCommandArgs),
535}
536
537#[derive(Clone, Debug, clap::ValueEnum)]
538pub enum ThrottleTypeArg {
539    Dml,
540    Backfill,
541    Source,
542    Sink,
543}
544
545#[derive(Clone, Debug, Args)]
546pub struct ThrottleCommandArgs {
547    /// The ID of the object to throttle
548    #[clap(long, required = true)]
549    id: u32,
550    /// The rate limit to apply
551    #[clap(long)]
552    rate: Option<u32>,
553    /// The type of throttle to apply
554    #[clap(long, value_enum, required = true)]
555    throttle_type: ThrottleTypeArg,
556}
557
558#[derive(Subcommand, Clone, Debug)]
559pub enum ProfileCommands {
560    /// CPU profile
561    Cpu {
562        /// The time to active profiling for (in seconds)
563        #[clap(short, long = "sleep")]
564        sleep: u64,
565        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
566        #[clap(long = "worker-type", value_name = "TYPE")]
567        worker_types: Vec<ProfileWorkerType>,
568    },
569    /// Heap profile
570    Heap {
571        /// The output directory of the dumped file
572        #[clap(long = "dir")]
573        dir: Option<String>,
574        /// Target worker types. Repeatable. Defaults to frontend, compute-node, and compactor.
575        #[clap(long = "worker-type", value_name = "TYPE")]
576        worker_types: Vec<ProfileWorkerType>,
577    },
578}
579
580/// Start `risectl` with the given options.
581/// Cancel the operation when the given `shutdown` token triggers.
582/// Log and abort the process if any error occurs.
583///
584/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
585/// in an embedded manner.
586pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
587    let context = CtlContext::default();
588
589    tokio::select! {
590        _ = shutdown.cancelled() => {
591            // Shutdown requested, clean up the context and return.
592            context.try_close().await;
593        }
594
595        result = start_fallible(opts, &context) => {
596            if let Err(e) = result {
597                eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
598                std::process::exit(1);
599            }
600        }
601    }
602}
603
604/// Start `risectl` with the given options.
605/// Return `Err` if any error occurs.
606pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
607    let result = start_impl(opts, context).await;
608    context.try_close().await;
609    result
610}
611
612#[expect(
613    clippy::large_stack_frames,
614    reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
615              post-layout generator stores only one arm at a time (~13–16 KiB)."
616)]
617async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
618    match opts.command {
619        Commands::Compute(ComputeCommands::ShowConfig { host }) => {
620            cmd_impl::compute::show_config(&host).await?
621        }
622        Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
623            cmd_impl::hummock::disable_commit_epoch(context).await?
624        }
625        Commands::Hummock(HummockCommands::ListVersion {
626            verbose,
627            verbose_key_range,
628        }) => {
629            cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
630        }
631        Commands::Hummock(HummockCommands::ListVersionDeltas {
632            start_id,
633            num_epochs,
634        }) => {
635            cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
636        }
637        Commands::Hummock(HummockCommands::ListKv {
638            epoch,
639            table_id,
640            data_dir,
641            use_new_object_prefix_strategy,
642        }) => {
643            cmd_impl::hummock::list_kv(
644                context,
645                epoch,
646                table_id,
647                data_dir,
648                use_new_object_prefix_strategy,
649            )
650            .await?;
651        }
652        Commands::Hummock(HummockCommands::SstDump(args)) => {
653            cmd_impl::hummock::sst_dump(context, args).await.unwrap()
654        }
655        Commands::Hummock(HummockCommands::TriggerManualCompaction {
656            compaction_group_id,
657            table_id,
658            levels,
659            sst_ids,
660            exclusive,
661            retry_interval_ms,
662        }) => {
663            cmd_impl::hummock::trigger_manual_compaction(
664                context,
665                compaction_group_id,
666                table_id.into(),
667                levels,
668                sst_ids,
669                exclusive,
670                retry_interval_ms,
671            )
672            .await?
673        }
674        Commands::Hummock(HummockCommands::TriggerFullGc {
675            sst_retention_time_sec,
676            prefix,
677        }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
678        Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
679            list_pinned_versions(context).await?
680        }
681        Commands::Hummock(HummockCommands::ListCompactionGroup) => {
682            cmd_impl::hummock::list_compaction_group(context).await?
683        }
684        Commands::Hummock(HummockCommands::UpdateCompactionConfig {
685            compaction_group_ids,
686            max_bytes_for_level_base,
687            max_bytes_for_level_multiplier,
688            max_compaction_bytes,
689            sub_level_max_compaction_bytes,
690            level0_tier_compact_file_number,
691            target_file_size_base,
692            compaction_filter_mask,
693            max_sub_compaction,
694            level0_stop_write_threshold_sub_level_number,
695            level0_sub_level_compact_level_count,
696            max_space_reclaim_bytes,
697            level0_max_compact_file_number,
698            level0_overlapping_sub_level_compact_level_count,
699            enable_emergency_picker,
700            tombstone_reclaim_ratio,
701            compression_level,
702            compression_algorithm,
703            max_l0_compact_level,
704            sst_allowed_trivial_move_min_size,
705            disable_auto_group_scheduling,
706            max_overlapping_level_size,
707            sst_allowed_trivial_move_max_count,
708            emergency_level0_sst_file_count,
709            emergency_level0_sub_level_partition,
710            level0_stop_write_threshold_max_sst_count,
711            level0_stop_write_threshold_max_size,
712            enable_optimize_l0_interval_selection,
713            max_kv_count_for_xor16,
714            max_vnode_key_range_bytes,
715        }) => {
716            cmd_impl::hummock::update_compaction_config(
717                context,
718                compaction_group_ids,
719                build_compaction_config_vec(
720                    max_bytes_for_level_base,
721                    max_bytes_for_level_multiplier,
722                    max_compaction_bytes,
723                    sub_level_max_compaction_bytes,
724                    level0_tier_compact_file_number,
725                    target_file_size_base,
726                    compaction_filter_mask,
727                    max_sub_compaction,
728                    level0_stop_write_threshold_sub_level_number,
729                    level0_sub_level_compact_level_count,
730                    max_space_reclaim_bytes,
731                    level0_max_compact_file_number,
732                    level0_overlapping_sub_level_compact_level_count,
733                    enable_emergency_picker,
734                    tombstone_reclaim_ratio,
735                    if let Some(level) = compression_level {
736                        assert!(compression_algorithm.is_some());
737                        Some(CompressionAlgorithm {
738                            level,
739                            compression_algorithm: compression_algorithm.unwrap(),
740                        })
741                    } else {
742                        None
743                    },
744                    max_l0_compact_level,
745                    sst_allowed_trivial_move_min_size,
746                    disable_auto_group_scheduling,
747                    max_overlapping_level_size,
748                    sst_allowed_trivial_move_max_count,
749                    emergency_level0_sst_file_count,
750                    emergency_level0_sub_level_partition,
751                    level0_stop_write_threshold_max_sst_count,
752                    level0_stop_write_threshold_max_size,
753                    enable_optimize_l0_interval_selection,
754                    max_kv_count_for_xor16,
755                    max_vnode_key_range_bytes,
756                ),
757            )
758            .await?
759        }
760        Commands::Hummock(HummockCommands::SplitCompactionGroup {
761            compaction_group_id,
762            table_ids,
763            partition_vnode_count,
764        }) => {
765            cmd_impl::hummock::split_compaction_group(
766                context,
767                compaction_group_id,
768                &table_ids.into_iter().map_into().collect_vec(),
769                partition_vnode_count,
770            )
771            .await?;
772        }
773        Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
774            cmd_impl::hummock::pause_version_checkpoint(context).await?;
775        }
776        Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
777            cmd_impl::hummock::resume_version_checkpoint(context).await?;
778        }
779        Commands::Hummock(HummockCommands::ReplayVersion) => {
780            cmd_impl::hummock::replay_version(context).await?;
781        }
782        Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
783            cmd_impl::hummock::list_compaction_status(context, verbose).await?;
784        }
785        Commands::Hummock(HummockCommands::GetCompactionScore {
786            compaction_group_id,
787        }) => {
788            cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
789        }
790        Commands::Hummock(HummockCommands::ValidateVersion) => {
791            cmd_impl::hummock::validate_version(context).await?;
792        }
793        Commands::Hummock(HummockCommands::RebuildTableStats) => {
794            cmd_impl::hummock::rebuild_table_stats(context).await?;
795        }
796        Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
797            cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
798        }
799        Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
800            archive_ids,
801            data_dir,
802            sst_id,
803            use_new_object_prefix_strategy,
804        }) => {
805            cmd_impl::hummock::print_version_delta_in_archive(
806                context,
807                archive_ids.into_iter().map(HummockVersionId::new),
808                data_dir,
809                sst_id,
810                use_new_object_prefix_strategy,
811            )
812            .await?;
813        }
814        Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
815            archive_ids,
816            data_dir,
817            user_key,
818            use_new_object_prefix_strategy,
819        }) => {
820            cmd_impl::hummock::print_user_key_in_archive(
821                context,
822                archive_ids.into_iter().map(HummockVersionId::new),
823                data_dir,
824                user_key,
825                use_new_object_prefix_strategy,
826            )
827            .await?;
828        }
829        Commands::Hummock(HummockCommands::TieredCacheTracing {
830            enable,
831            record_hybrid_insert_threshold_ms,
832            record_hybrid_get_threshold_ms,
833            record_hybrid_obtain_threshold_ms,
834            record_hybrid_remove_threshold_ms,
835            record_hybrid_fetch_threshold_ms,
836        }) => {
837            cmd_impl::hummock::tiered_cache_tracing(
838                context,
839                enable,
840                record_hybrid_insert_threshold_ms,
841                record_hybrid_get_threshold_ms,
842                record_hybrid_obtain_threshold_ms,
843                record_hybrid_remove_threshold_ms,
844                record_hybrid_fetch_threshold_ms,
845            )
846            .await?
847        }
848        Commands::Hummock(HummockCommands::MergeCompactionGroup {
849            left_group_id,
850            right_group_id,
851        }) => {
852            cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
853                .await?
854        }
855
856        Commands::Hummock(HummockCommands::MigrateLegacyObject {
857            url,
858            source_dir,
859            target_dir,
860            concurrency,
861        }) => {
862            migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
863        }
864        Commands::Hummock(HummockCommands::ResizeCache {
865            meta_cache_capacity_mb,
866            data_cache_capacity_mb,
867        }) => {
868            const MIB: u64 = 1024 * 1024;
869            cmd_impl::hummock::resize_cache(
870                context,
871                meta_cache_capacity_mb.map(|v| v * MIB),
872                data_cache_capacity_mb.map(|v| v * MIB),
873            )
874            .await?
875        }
876        Commands::Table(TableCommands::Scan {
877            mv_name,
878            data_dir,
879            use_new_object_prefix_strategy,
880        }) => {
881            cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
882                .await?
883        }
884        Commands::Table(TableCommands::ScanById {
885            table_id,
886            data_dir,
887            use_new_object_prefix_strategy,
888        }) => {
889            cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
890                .await?
891        }
892        Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
893        Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
894        Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
895        Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
896        Commands::Meta(MetaCommands::ResumeBackfill {
897            job_id,
898            fragment_id,
899        }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
900        Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
901        Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
902            cmd_impl::meta::source_split_info(context, ignore_id).await?
903        }
904        Commands::Meta(MetaCommands::Reschedule {
905            from,
906            dry_run,
907            plan,
908            revision,
909            resolve_no_shuffle,
910        }) => {
911            cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
912                .await?
913        }
914        Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
915            cmd_impl::meta::backup_meta(context, remarks).await?
916        }
917        Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
918            risingwave_meta::backup_restore::restore(opts).await?
919        }
920        Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
921            cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
922        }
923        Commands::Meta(MetaCommands::ListConnections) => {
924            cmd_impl::meta::list_connections(context).await?
925        }
926        Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
927            cmd_impl::meta::list_serving_fragment_mappings(context).await?
928        }
929        Commands::Meta(MetaCommands::UnregisterWorkers {
930            workers,
931            yes,
932            ignore_not_found,
933            check_fragment_occupied,
934        }) => {
935            cmd_impl::meta::unregister_workers(
936                context,
937                workers,
938                yes,
939                ignore_not_found,
940                check_fragment_occupied,
941            )
942            .await?
943        }
944        Commands::Meta(MetaCommands::ValidateSource { props }) => {
945            cmd_impl::meta::validate_source(context, props).await?
946        }
947        Commands::AwaitTree(AwaitTreeCommands::Dump {
948            actor_traces_format,
949        }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
950        Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
951            cmd_impl::await_tree::bottleneck_detect(context, path).await?
952        }
953        Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
954            rw_diagnose_tools::await_tree::transcribe(path)?
955        }
956        Commands::Profile(ProfileCommands::Cpu {
957            sleep,
958            worker_types,
959        }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
960        Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
961            cmd_impl::profile::heap_profile(context, dir, worker_types).await?
962        }
963        Commands::Throttle(ThrottleCommands::Source(args)) => {
964            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
965        }
966        Commands::Throttle(ThrottleCommands::Mv(args)) => {
967            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
968        }
969        Commands::Throttle(ThrottleCommands::Sink(args)) => {
970            apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
971        }
972        Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
973            table_id,
974            parallelism,
975        }) => {
976            set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
977        }
978        Commands::Meta(MetaCommands::AlterSourcePropertiesSafe {
979            source_id,
980            props,
981            reset_splits,
982        }) => {
983            cmd_impl::meta::alter_source_properties_safe(context, source_id, props, reset_splits)
984                .await?;
985        }
986        Commands::Meta(MetaCommands::ResetSourceSplits { source_id }) => {
987            cmd_impl::meta::reset_source_splits(context, source_id).await?;
988        }
989        Commands::Meta(MetaCommands::InjectSourceOffsets { source_id, offsets }) => {
990            cmd_impl::meta::inject_source_offsets(context, source_id, offsets).await?;
991        }
992        Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
993    }
994    Ok(())
995}