1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{ArgGroup, Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
26 CompressionAlgorithm, SstableFilterKind, SstableFilterLayout,
27};
28use risingwave_pb::id::{CompactionGroupId, FragmentId, HummockSstableId, JobId, TableId};
29use thiserror_ext::AsReport;
30
31use crate::cmd_impl::hummock::{
32 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
33};
34use crate::cmd_impl::profile::ProfileWorkerType;
35use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
36use crate::cmd_impl::throttle::apply_throttle;
37use crate::common::CtlContext;
38
39pub mod cmd_impl;
40pub mod common;
41
42#[derive(Parser)]
48#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
49#[clap(propagate_version = true)]
50#[clap(infer_subcommands = true)]
51pub struct CliOpts {
52 #[clap(subcommand)]
53 command: Commands,
54}
55
56#[derive(Subcommand)]
57#[clap(infer_subcommands = true)]
58enum Commands {
59 #[clap(subcommand)]
61 Compute(ComputeCommands),
62 #[clap(subcommand)]
64 Hummock(HummockCommands),
65 #[clap(subcommand)]
67 Table(TableCommands),
68 #[clap(subcommand)]
70 Meta(MetaCommands),
71 #[clap(subcommand)]
73 Bench(BenchCommands),
74 #[clap(subcommand)]
76 #[clap(visible_alias("trace"))]
77 AwaitTree(AwaitTreeCommands),
78 #[clap(subcommand)]
80 Profile(ProfileCommands),
81 #[clap(subcommand)]
82 Throttle(ThrottleCommands),
83 #[clap(subcommand, hide = true)]
85 Test(TestCommands),
86}
87
88#[derive(Subcommand)]
89enum ComputeCommands {
90 ShowConfig { host: String },
92}
93
94#[expect(clippy::large_enum_variant)]
95#[derive(Subcommand)]
96enum HummockCommands {
97 ListVersion {
99 #[clap(short, long = "verbose", default_value_t = false)]
100 verbose: bool,
101
102 #[clap(long = "verbose_key_range", default_value_t = false)]
103 verbose_key_range: bool,
104 },
105
106 ListVersionDeltas {
108 #[clap(short, long = "start-version-delta-id", default_value_t = HummockVersionId::new(0))]
109 start_id: HummockVersionId,
110
111 #[clap(short, long = "num-epochs", default_value_t = 100)]
112 num_epochs: u32,
113 },
114 DisableCommitEpoch,
116 ListKv {
118 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
119 epoch: u64,
120
121 #[clap(short, long = "table-id")]
122 table_id: TableId,
123
124 data_dir: Option<String>,
126
127 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
128 use_new_object_prefix_strategy: bool,
129 },
130 SstDump(SstDumpArgs),
131 TriggerManualCompaction {
133 #[clap(short, long = "compaction-group-id", default_value_t = CompactionGroupId::new(2))]
134 compaction_group_id: CompactionGroupId,
135
136 #[clap(short, long = "table-id", default_value_t = 0)]
137 table_id: u32,
138
139 #[clap(short, long = "level", value_delimiter = ',', default_values_t = vec![1u32])]
140 levels: Vec<u32>,
141
142 #[clap(long = "target-level")]
143 target_level: Option<u32>,
144
145 #[clap(short, long = "sst-ids", value_delimiter = ',')]
146 sst_ids: Vec<HummockSstableId>,
147
148 #[clap(long = "exclusive", default_value_t = false)]
149 exclusive: bool,
150
151 #[clap(long = "retry-interval-ms", default_value_t = 1000)]
152 retry_interval_ms: u64,
153 },
154 TriggerFullGc {
157 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
158 sst_retention_time_sec: u64,
159 #[clap(short, long = "prefix", required = false)]
160 prefix: Option<String>,
161 },
162 ListPinnedVersions {},
164 ListCompactionGroup,
166 UpdateCompactionConfig {
168 #[clap(long, value_delimiter = ',')]
169 compaction_group_ids: Vec<CompactionGroupId>,
170 #[clap(long)]
171 max_bytes_for_level_base: Option<u64>,
172 #[clap(long)]
173 max_bytes_for_level_multiplier: Option<u64>,
174 #[clap(long)]
175 max_compaction_bytes: Option<u64>,
176 #[clap(long)]
177 sub_level_max_compaction_bytes: Option<u64>,
178 #[clap(long)]
179 level0_tier_compact_file_number: Option<u64>,
180 #[clap(long)]
181 target_file_size_base: Option<u64>,
182 #[clap(long)]
183 compaction_filter_mask: Option<u32>,
184 #[clap(long)]
185 max_sub_compaction: Option<u32>,
186 #[clap(long)]
187 level0_stop_write_threshold_sub_level_number: Option<u64>,
188 #[clap(long)]
189 level0_sub_level_compact_level_count: Option<u32>,
190 #[clap(long)]
191 max_space_reclaim_bytes: Option<u64>,
192 #[clap(long)]
193 level0_max_compact_file_number: Option<u64>,
194 #[clap(long)]
195 level0_overlapping_sub_level_compact_level_count: Option<u32>,
196 #[clap(long)]
197 enable_emergency_picker: Option<bool>,
198 #[clap(long)]
199 tombstone_reclaim_ratio: Option<u32>,
200 #[clap(long)]
201 compression_level: Option<u32>,
202 #[clap(long)]
203 compression_algorithm: Option<String>,
204 #[clap(long, requires = "sstable_filter_kind")]
206 sstable_filter_kind_level: Option<u32>,
207 #[clap(long, requires = "sstable_filter_kind_level")]
209 sstable_filter_kind: Option<String>,
210 #[clap(long, requires = "sstable_filter_layout")]
212 sstable_filter_layout_level: Option<u32>,
213 #[clap(long, requires = "sstable_filter_layout_level")]
219 sstable_filter_layout: Option<String>,
220 #[clap(long)]
221 max_l0_compact_level: Option<u32>,
222 #[clap(long)]
223 sst_allowed_trivial_move_min_size: Option<u64>,
224 #[clap(long)]
225 disable_auto_group_scheduling: Option<bool>,
226 #[clap(long)]
227 max_overlapping_level_size: Option<u64>,
228 #[clap(long)]
229 sst_allowed_trivial_move_max_count: Option<u32>,
230 #[clap(long)]
231 emergency_level0_sst_file_count: Option<u32>,
232 #[clap(long)]
233 emergency_level0_sub_level_partition: Option<u32>,
234 #[clap(long)]
235 level0_stop_write_threshold_max_sst_count: Option<u32>,
236 #[clap(long)]
237 level0_stop_write_threshold_max_size: Option<u64>,
238 #[clap(long)]
239 enable_optimize_l0_interval_selection: Option<bool>,
240 #[clap(long)]
245 blocked_xor_filter_kv_count_threshold: Option<u64>,
246 #[clap(long)]
247 max_vnode_key_range_bytes: Option<u64>,
248 },
249 SplitCompactionGroup {
251 #[clap(long)]
252 compaction_group_id: CompactionGroupId,
253 #[clap(long, value_delimiter = ',')]
254 table_ids: Vec<TableId>,
255 #[clap(long, default_value_t = 0)]
256 partition_vnode_count: u32,
257 },
258 PauseVersionCheckpoint,
260 ResumeVersionCheckpoint,
262 ReplayVersion,
264 ListCompactionStatus {
266 #[clap(short, long = "verbose", default_value_t = false)]
267 verbose: bool,
268 },
269 GetCompactionScore {
270 #[clap(long)]
271 compaction_group_id: CompactionGroupId,
272 },
273 ValidateVersion,
275 RebuildTableStats,
277 CancelCompactTask {
278 #[clap(short, long)]
279 task_id: u64,
280 },
281 PrintUserKeyInArchive {
282 #[clap(long, value_delimiter = ',')]
284 archive_ids: Vec<u64>,
285 #[clap(long)]
287 data_dir: String,
288 #[clap(long)]
290 user_key: String,
291 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
292 use_new_object_prefix_strategy: bool,
293 },
294 PrintVersionDeltaInArchive {
295 #[clap(long, value_delimiter = ',')]
297 archive_ids: Vec<u64>,
298 #[clap(long)]
300 data_dir: String,
301 #[clap(long)]
303 sst_id: HummockSstableId,
304 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
305 use_new_object_prefix_strategy: bool,
306 },
307 TieredCacheTracing {
308 #[clap(long)]
309 enable: bool,
310 #[clap(long)]
311 record_hybrid_insert_threshold_ms: Option<u32>,
312 #[clap(long)]
313 record_hybrid_get_threshold_ms: Option<u32>,
314 #[clap(long)]
315 record_hybrid_obtain_threshold_ms: Option<u32>,
316 #[clap(long)]
317 record_hybrid_remove_threshold_ms: Option<u32>,
318 #[clap(long)]
319 record_hybrid_fetch_threshold_ms: Option<u32>,
320 },
321 MergeCompactionGroup {
322 #[clap(long)]
323 left_group_id: CompactionGroupId,
324 #[clap(long)]
325 right_group_id: CompactionGroupId,
326 },
327 MigrateLegacyObject {
328 url: String,
329 source_dir: String,
330 target_dir: String,
331 #[clap(long, default_value = "100")]
332 concurrency: u32,
333 },
334 ResizeCache {
335 #[clap(long)]
336 meta_cache_capacity_mb: Option<u64>,
337 #[clap(long)]
338 data_cache_capacity_mb: Option<u64>,
339 },
340}
341
342#[derive(Subcommand)]
343enum TableCommands {
344 Scan {
346 mv_name: String,
348 data_dir: Option<String>,
350
351 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
352 use_new_object_prefix_strategy: bool,
353 },
354 ScanById {
356 table_id: TableId,
358 data_dir: Option<String>,
360 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
361 use_new_object_prefix_strategy: bool,
362 },
363 List,
365}
366
367#[derive(Subcommand)]
368#[expect(clippy::large_enum_variant)]
369enum MetaCommands {
370 Pause,
372 Resume,
374 #[clap(
376 group(
377 ArgGroup::new("resume_backfill_target")
378 .required(true)
379 .args(&["job_id", "fragment_id"])
380 )
381 )]
382 ResumeBackfill {
383 #[clap(long)]
384 job_id: Option<JobId>,
385 #[clap(long)]
386 fragment_id: Option<FragmentId>,
387 },
388 ClusterInfo,
390 SourceSplitInfo {
392 #[clap(long)]
393 ignore_id: bool,
394 },
395 #[clap(verbatim_doc_comment)]
412 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
413 Reschedule {
414 #[clap(long, requires = "revision")]
416 plan: Option<String>,
417 #[clap(long)]
419 revision: Option<u64>,
420 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
422 from: Option<String>,
423 #[clap(long, default_value = "false")]
425 dry_run: bool,
426 #[clap(long, default_value = "false")]
428 resolve_no_shuffle: bool,
429 },
430 BackupMeta {
432 #[clap(long)]
433 remarks: Option<String>,
434 },
435 RestoreMeta {
437 #[command(flatten)]
438 opts: RestoreOpts,
439 },
440 DeleteMetaSnapshots {
442 #[clap(long, value_delimiter = ',')]
443 snapshot_ids: Vec<u64>,
444 },
445
446 ListConnections,
448
449 ListServingFragmentMapping,
451
452 UnregisterWorkers {
454 #[clap(
456 long,
457 required = true,
458 value_delimiter = ',',
459 value_name = "worker_id or worker_host:worker_port, ..."
460 )]
461 workers: Vec<String>,
462
463 #[clap(short = 'y', long, default_value_t = false)]
465 yes: bool,
466
467 #[clap(long, default_value_t = false)]
469 ignore_not_found: bool,
470
471 #[clap(long, default_value_t = false)]
473 check_fragment_occupied: bool,
474 },
475
476 ValidateSource {
478 #[clap(long)]
481 props: String,
482 },
483
484 SetCdcTableBackfillParallelism {
485 #[clap(long, required = true)]
486 table_id: u32,
487 #[clap(long, required = true)]
488 parallelism: u32,
489 },
490
491 AlterSourcePropertiesSafe {
494 #[clap(long)]
496 source_id: u32,
497 #[clap(long)]
499 props: String,
500 #[clap(long, default_value_t = false)]
502 reset_splits: bool,
503 },
504
505 ResetSourceSplits {
508 #[clap(long)]
510 source_id: u32,
511 },
512
513 InjectSourceOffsets {
516 #[clap(long)]
518 source_id: u32,
519 #[clap(long)]
521 offsets: String,
522 },
523}
524
525#[derive(Subcommand, Clone, Debug)]
526pub enum AwaitTreeCommands {
527 Dump {
529 #[clap(short, long = "actor-traces-format")]
531 actor_traces_format: Option<String>,
532 },
533 Analyze {
535 #[clap(long = "path")]
539 path: Option<String>,
540 },
541 Transcribe {
543 #[clap(long = "path")]
545 path: String,
546 },
547}
548
549#[derive(Subcommand, Clone, Debug)]
550enum TestCommands {
551 Jvm,
553}
554
555#[derive(Subcommand, Clone, Debug)]
556enum ThrottleCommands {
557 Source(ThrottleCommandArgs),
558 Mv(ThrottleCommandArgs),
559 Sink(ThrottleCommandArgs),
560}
561
562#[derive(Clone, Debug, clap::ValueEnum)]
563pub enum ThrottleTypeArg {
564 Dml,
565 Backfill,
566 Source,
567 Sink,
568}
569
570#[derive(Clone, Debug, Args)]
571pub struct ThrottleCommandArgs {
572 #[clap(long, required = true)]
574 id: u32,
575 #[clap(long)]
577 rate: Option<u32>,
578 #[clap(long, value_enum, required = true)]
580 throttle_type: ThrottleTypeArg,
581}
582
583#[derive(Subcommand, Clone, Debug)]
584pub enum ProfileCommands {
585 Cpu {
587 #[clap(short, long = "sleep")]
589 sleep: u64,
590 #[clap(long = "worker-type", value_name = "TYPE")]
592 worker_types: Vec<ProfileWorkerType>,
593 },
594 Heap {
596 #[clap(long = "dir")]
598 dir: Option<String>,
599 #[clap(long = "worker-type", value_name = "TYPE")]
601 worker_types: Vec<ProfileWorkerType>,
602 },
603}
604
605pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
612 let context = CtlContext::default();
613
614 tokio::select! {
615 _ = shutdown.cancelled() => {
616 context.try_close().await;
618 }
619
620 result = start_fallible(opts, &context) => {
621 if let Err(e) = result {
622 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
624 }
625 }
626 }
627}
628
629pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
632 let result = start_impl(opts, context).await;
633 context.try_close().await;
634 result
635}
636
637#[expect(
638 clippy::large_stack_frames,
639 reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
640 post-layout generator stores only one arm at a time (~13–16 KiB)."
641)]
642async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
643 match opts.command {
644 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
645 cmd_impl::compute::show_config(&host).await?
646 }
647 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
648 cmd_impl::hummock::disable_commit_epoch(context).await?
649 }
650 Commands::Hummock(HummockCommands::ListVersion {
651 verbose,
652 verbose_key_range,
653 }) => {
654 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
655 }
656 Commands::Hummock(HummockCommands::ListVersionDeltas {
657 start_id,
658 num_epochs,
659 }) => {
660 cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
661 }
662 Commands::Hummock(HummockCommands::ListKv {
663 epoch,
664 table_id,
665 data_dir,
666 use_new_object_prefix_strategy,
667 }) => {
668 cmd_impl::hummock::list_kv(
669 context,
670 epoch,
671 table_id,
672 data_dir,
673 use_new_object_prefix_strategy,
674 )
675 .await?;
676 }
677 Commands::Hummock(HummockCommands::SstDump(args)) => {
678 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
679 }
680 Commands::Hummock(HummockCommands::TriggerManualCompaction {
681 compaction_group_id,
682 table_id,
683 levels,
684 target_level,
685 sst_ids,
686 exclusive,
687 retry_interval_ms,
688 }) => {
689 cmd_impl::hummock::trigger_manual_compaction(
690 context,
691 compaction_group_id,
692 table_id.into(),
693 levels,
694 target_level,
695 sst_ids,
696 exclusive,
697 retry_interval_ms,
698 )
699 .await?
700 }
701 Commands::Hummock(HummockCommands::TriggerFullGc {
702 sst_retention_time_sec,
703 prefix,
704 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
705 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
706 list_pinned_versions(context).await?
707 }
708 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
709 cmd_impl::hummock::list_compaction_group(context).await?
710 }
711 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
712 compaction_group_ids,
713 max_bytes_for_level_base,
714 max_bytes_for_level_multiplier,
715 max_compaction_bytes,
716 sub_level_max_compaction_bytes,
717 level0_tier_compact_file_number,
718 target_file_size_base,
719 compaction_filter_mask,
720 max_sub_compaction,
721 level0_stop_write_threshold_sub_level_number,
722 level0_sub_level_compact_level_count,
723 max_space_reclaim_bytes,
724 level0_max_compact_file_number,
725 level0_overlapping_sub_level_compact_level_count,
726 enable_emergency_picker,
727 tombstone_reclaim_ratio,
728 compression_level,
729 compression_algorithm,
730 sstable_filter_kind_level,
731 sstable_filter_kind,
732 sstable_filter_layout_level,
733 sstable_filter_layout,
734 max_l0_compact_level,
735 sst_allowed_trivial_move_min_size,
736 disable_auto_group_scheduling,
737 max_overlapping_level_size,
738 sst_allowed_trivial_move_max_count,
739 emergency_level0_sst_file_count,
740 emergency_level0_sub_level_partition,
741 level0_stop_write_threshold_max_sst_count,
742 level0_stop_write_threshold_max_size,
743 enable_optimize_l0_interval_selection,
744 blocked_xor_filter_kv_count_threshold,
745 max_vnode_key_range_bytes,
746 }) => {
747 cmd_impl::hummock::update_compaction_config(
748 context,
749 compaction_group_ids,
750 build_compaction_config_vec(
751 max_bytes_for_level_base,
752 max_bytes_for_level_multiplier,
753 max_compaction_bytes,
754 sub_level_max_compaction_bytes,
755 level0_tier_compact_file_number,
756 target_file_size_base,
757 compaction_filter_mask,
758 max_sub_compaction,
759 level0_stop_write_threshold_sub_level_number,
760 level0_sub_level_compact_level_count,
761 max_space_reclaim_bytes,
762 level0_max_compact_file_number,
763 level0_overlapping_sub_level_compact_level_count,
764 enable_emergency_picker,
765 tombstone_reclaim_ratio,
766 if let Some(level) = compression_level {
767 assert!(compression_algorithm.is_some());
768 Some(CompressionAlgorithm {
769 level,
770 compression_algorithm: compression_algorithm.unwrap(),
771 })
772 } else {
773 None
774 },
775 if let (Some(level), Some(filter_kind)) =
776 (sstable_filter_kind_level, sstable_filter_kind)
777 {
778 Some(SstableFilterKind { level, filter_kind })
779 } else {
780 None
781 },
782 if let (Some(level), Some(layout)) =
783 (sstable_filter_layout_level, sstable_filter_layout)
784 {
785 Some(SstableFilterLayout { level, layout })
786 } else {
787 None
788 },
789 max_l0_compact_level,
790 sst_allowed_trivial_move_min_size,
791 disable_auto_group_scheduling,
792 max_overlapping_level_size,
793 sst_allowed_trivial_move_max_count,
794 emergency_level0_sst_file_count,
795 emergency_level0_sub_level_partition,
796 level0_stop_write_threshold_max_sst_count,
797 level0_stop_write_threshold_max_size,
798 enable_optimize_l0_interval_selection,
799 blocked_xor_filter_kv_count_threshold,
800 max_vnode_key_range_bytes,
801 ),
802 )
803 .await?
804 }
805 Commands::Hummock(HummockCommands::SplitCompactionGroup {
806 compaction_group_id,
807 table_ids,
808 partition_vnode_count,
809 }) => {
810 cmd_impl::hummock::split_compaction_group(
811 context,
812 compaction_group_id,
813 &table_ids.into_iter().map_into().collect_vec(),
814 partition_vnode_count,
815 )
816 .await?;
817 }
818 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
819 cmd_impl::hummock::pause_version_checkpoint(context).await?;
820 }
821 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
822 cmd_impl::hummock::resume_version_checkpoint(context).await?;
823 }
824 Commands::Hummock(HummockCommands::ReplayVersion) => {
825 cmd_impl::hummock::replay_version(context).await?;
826 }
827 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
828 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
829 }
830 Commands::Hummock(HummockCommands::GetCompactionScore {
831 compaction_group_id,
832 }) => {
833 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
834 }
835 Commands::Hummock(HummockCommands::ValidateVersion) => {
836 cmd_impl::hummock::validate_version(context).await?;
837 }
838 Commands::Hummock(HummockCommands::RebuildTableStats) => {
839 cmd_impl::hummock::rebuild_table_stats(context).await?;
840 }
841 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
842 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
843 }
844 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
845 archive_ids,
846 data_dir,
847 sst_id,
848 use_new_object_prefix_strategy,
849 }) => {
850 cmd_impl::hummock::print_version_delta_in_archive(
851 context,
852 archive_ids.into_iter().map(HummockVersionId::new),
853 data_dir,
854 sst_id,
855 use_new_object_prefix_strategy,
856 )
857 .await?;
858 }
859 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
860 archive_ids,
861 data_dir,
862 user_key,
863 use_new_object_prefix_strategy,
864 }) => {
865 cmd_impl::hummock::print_user_key_in_archive(
866 context,
867 archive_ids.into_iter().map(HummockVersionId::new),
868 data_dir,
869 user_key,
870 use_new_object_prefix_strategy,
871 )
872 .await?;
873 }
874 Commands::Hummock(HummockCommands::TieredCacheTracing {
875 enable,
876 record_hybrid_insert_threshold_ms,
877 record_hybrid_get_threshold_ms,
878 record_hybrid_obtain_threshold_ms,
879 record_hybrid_remove_threshold_ms,
880 record_hybrid_fetch_threshold_ms,
881 }) => {
882 cmd_impl::hummock::tiered_cache_tracing(
883 context,
884 enable,
885 record_hybrid_insert_threshold_ms,
886 record_hybrid_get_threshold_ms,
887 record_hybrid_obtain_threshold_ms,
888 record_hybrid_remove_threshold_ms,
889 record_hybrid_fetch_threshold_ms,
890 )
891 .await?
892 }
893 Commands::Hummock(HummockCommands::MergeCompactionGroup {
894 left_group_id,
895 right_group_id,
896 }) => {
897 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
898 .await?
899 }
900
901 Commands::Hummock(HummockCommands::MigrateLegacyObject {
902 url,
903 source_dir,
904 target_dir,
905 concurrency,
906 }) => {
907 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
908 }
909 Commands::Hummock(HummockCommands::ResizeCache {
910 meta_cache_capacity_mb,
911 data_cache_capacity_mb,
912 }) => {
913 const MIB: u64 = 1024 * 1024;
914 cmd_impl::hummock::resize_cache(
915 context,
916 meta_cache_capacity_mb.map(|v| v * MIB),
917 data_cache_capacity_mb.map(|v| v * MIB),
918 )
919 .await?
920 }
921 Commands::Table(TableCommands::Scan {
922 mv_name,
923 data_dir,
924 use_new_object_prefix_strategy,
925 }) => {
926 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
927 .await?
928 }
929 Commands::Table(TableCommands::ScanById {
930 table_id,
931 data_dir,
932 use_new_object_prefix_strategy,
933 }) => {
934 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
935 .await?
936 }
937 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
938 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
939 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
940 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
941 Commands::Meta(MetaCommands::ResumeBackfill {
942 job_id,
943 fragment_id,
944 }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
945 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
946 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
947 cmd_impl::meta::source_split_info(context, ignore_id).await?
948 }
949 Commands::Meta(MetaCommands::Reschedule {
950 from,
951 dry_run,
952 plan,
953 revision,
954 resolve_no_shuffle,
955 }) => {
956 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
957 .await?
958 }
959 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
960 cmd_impl::meta::backup_meta(context, remarks).await?
961 }
962 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
963 risingwave_meta::backup_restore::restore(opts).await?
964 }
965 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
966 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
967 }
968 Commands::Meta(MetaCommands::ListConnections) => {
969 cmd_impl::meta::list_connections(context).await?
970 }
971 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
972 cmd_impl::meta::list_serving_fragment_mappings(context).await?
973 }
974 Commands::Meta(MetaCommands::UnregisterWorkers {
975 workers,
976 yes,
977 ignore_not_found,
978 check_fragment_occupied,
979 }) => {
980 cmd_impl::meta::unregister_workers(
981 context,
982 workers,
983 yes,
984 ignore_not_found,
985 check_fragment_occupied,
986 )
987 .await?
988 }
989 Commands::Meta(MetaCommands::ValidateSource { props }) => {
990 cmd_impl::meta::validate_source(context, props).await?
991 }
992 Commands::AwaitTree(AwaitTreeCommands::Dump {
993 actor_traces_format,
994 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
995 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
996 cmd_impl::await_tree::bottleneck_detect(context, path).await?
997 }
998 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
999 rw_diagnose_tools::await_tree::transcribe(path)?
1000 }
1001 Commands::Profile(ProfileCommands::Cpu {
1002 sleep,
1003 worker_types,
1004 }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
1005 Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
1006 cmd_impl::profile::heap_profile(context, dir, worker_types).await?
1007 }
1008 Commands::Throttle(ThrottleCommands::Source(args)) => {
1009 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
1010 }
1011 Commands::Throttle(ThrottleCommands::Mv(args)) => {
1012 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
1013 }
1014 Commands::Throttle(ThrottleCommands::Sink(args)) => {
1015 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
1016 }
1017 Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
1018 table_id,
1019 parallelism,
1020 }) => {
1021 set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
1022 }
1023 Commands::Meta(MetaCommands::AlterSourcePropertiesSafe {
1024 source_id,
1025 props,
1026 reset_splits,
1027 }) => {
1028 cmd_impl::meta::alter_source_properties_safe(context, source_id, props, reset_splits)
1029 .await?;
1030 }
1031 Commands::Meta(MetaCommands::ResetSourceSplits { source_id }) => {
1032 cmd_impl::meta::reset_source_splits(context, source_id).await?;
1033 }
1034 Commands::Meta(MetaCommands::InjectSourceOffsets { source_id, offsets }) => {
1035 cmd_impl::meta::inject_source_offsets(context, source_id, offsets).await?;
1036 }
1037 Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
1038 }
1039 Ok(())
1040}