1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16#![allow(unfulfilled_lint_expectations)]
17
18use anyhow::Result;
19use clap::{ArgGroup, Args, Parser, Subcommand};
20use cmd_impl::bench::BenchCommands;
21use cmd_impl::hummock::SstDumpArgs;
22use itertools::Itertools;
23use risingwave_common::util::tokio_util::sync::CancellationToken;
24use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
25use risingwave_meta::backup_restore::RestoreOpts;
26use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
27 CompressionAlgorithm, SstableFilterLayout, SstableFilterType,
28};
29use risingwave_pb::id::{CompactionGroupId, FragmentId, HummockSstableId, JobId, TableId};
30use thiserror_ext::AsReport;
31
32use crate::cmd_impl::hummock::{
33 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
34};
35use crate::cmd_impl::profile::ProfileWorkerType;
36use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
37use crate::cmd_impl::throttle::apply_throttle;
38use crate::common::CtlContext;
39
40pub mod cmd_impl;
41pub mod common;
42
43#[derive(Parser)]
49#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
50#[clap(propagate_version = true)]
51#[clap(infer_subcommands = true)]
52pub struct CliOpts {
53 #[clap(subcommand)]
54 command: Commands,
55}
56
57#[derive(Subcommand)]
58#[clap(infer_subcommands = true)]
59enum Commands {
60 #[clap(subcommand)]
62 Compute(ComputeCommands),
63 #[clap(subcommand)]
65 Hummock(HummockCommands),
66 #[clap(subcommand)]
68 Table(TableCommands),
69 #[clap(subcommand)]
71 Meta(MetaCommands),
72 #[clap(subcommand)]
74 Bench(BenchCommands),
75 #[clap(subcommand)]
77 #[clap(visible_alias("trace"))]
78 AwaitTree(AwaitTreeCommands),
79 #[clap(subcommand)]
81 Profile(ProfileCommands),
82 #[clap(subcommand)]
83 Throttle(ThrottleCommands),
84 #[clap(subcommand, hide = true)]
86 Test(TestCommands),
87}
88
89#[derive(Subcommand)]
90enum ComputeCommands {
91 ShowConfig { host: String },
93}
94
95#[expect(clippy::large_enum_variant)]
96#[derive(Subcommand)]
97enum HummockCommands {
98 ListVersion {
100 #[clap(short, long = "verbose", default_value_t = false)]
101 verbose: bool,
102
103 #[clap(long = "verbose_key_range", default_value_t = false)]
104 verbose_key_range: bool,
105 },
106
107 ListVersionDeltas {
109 #[clap(short, long = "start-version-delta-id", default_value_t = HummockVersionId::new(0))]
110 start_id: HummockVersionId,
111
112 #[clap(short, long = "num-epochs", default_value_t = 100)]
113 num_epochs: u32,
114 },
115 DisableCommitEpoch,
117 ListKv {
119 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
120 epoch: u64,
121
122 #[clap(short, long = "table-id")]
123 table_id: TableId,
124
125 data_dir: Option<String>,
127
128 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
129 use_new_object_prefix_strategy: bool,
130 },
131 SstDump(SstDumpArgs),
132 TriggerManualCompaction {
134 #[clap(short, long = "compaction-group-id", default_value_t = CompactionGroupId::new(2))]
135 compaction_group_id: CompactionGroupId,
136
137 #[clap(short, long = "table-id", default_value_t = 0)]
138 table_id: u32,
139
140 #[clap(short, long = "level", value_delimiter = ',', default_values_t = vec![1u32])]
141 levels: Vec<u32>,
142
143 #[clap(long = "target-level")]
144 target_level: Option<u32>,
145
146 #[clap(short, long = "sst-ids", value_delimiter = ',')]
147 sst_ids: Vec<HummockSstableId>,
148
149 #[clap(long = "exclusive", default_value_t = false)]
150 exclusive: bool,
151
152 #[clap(long = "retry-interval-ms", default_value_t = 1000)]
153 retry_interval_ms: u64,
154 },
155 TriggerFullGc {
158 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
159 sst_retention_time_sec: u64,
160 #[clap(short, long = "prefix", required = false)]
161 prefix: Option<String>,
162 },
163 ListPinnedVersions {},
165 ListCompactionGroup,
167 UpdateCompactionConfig {
169 #[clap(long, value_delimiter = ',')]
170 compaction_group_ids: Vec<CompactionGroupId>,
171 #[clap(long)]
172 max_bytes_for_level_base: Option<u64>,
173 #[clap(long)]
174 max_bytes_for_level_multiplier: Option<u64>,
175 #[clap(long)]
176 max_compaction_bytes: Option<u64>,
177 #[clap(long)]
178 sub_level_max_compaction_bytes: Option<u64>,
179 #[clap(long)]
180 level0_tier_compact_file_number: Option<u64>,
181 #[clap(long)]
182 target_file_size_base: Option<u64>,
183 #[clap(long)]
184 compaction_filter_mask: Option<u32>,
185 #[clap(long)]
186 max_sub_compaction: Option<u32>,
187 #[clap(long)]
188 level0_stop_write_threshold_sub_level_number: Option<u64>,
189 #[clap(long)]
190 level0_sub_level_compact_level_count: Option<u32>,
191 #[clap(long)]
192 max_space_reclaim_bytes: Option<u64>,
193 #[clap(long)]
194 level0_max_compact_file_number: Option<u64>,
195 #[clap(long)]
196 level0_overlapping_sub_level_compact_level_count: Option<u32>,
197 #[clap(long)]
198 enable_emergency_picker: Option<bool>,
199 #[clap(long)]
200 tombstone_reclaim_ratio: Option<u32>,
201 #[clap(long)]
202 compression_level: Option<u32>,
203 #[clap(long)]
204 compression_algorithm: Option<String>,
205 #[clap(long, requires = "sstable_filter_type")]
207 sstable_filter_type_level: Option<u32>,
208 #[clap(long, requires = "sstable_filter_type_level")]
210 sstable_filter_type: Option<String>,
211 #[clap(long, requires = "sstable_filter_layout")]
213 sstable_filter_layout_level: Option<u32>,
214 #[clap(long, requires = "sstable_filter_layout_level")]
221 sstable_filter_layout: Option<String>,
222 #[clap(long)]
223 max_l0_compact_level: Option<u32>,
224 #[clap(long)]
225 sst_allowed_trivial_move_min_size: Option<u64>,
226 #[clap(long)]
227 disable_auto_group_scheduling: Option<bool>,
228 #[clap(long)]
229 max_overlapping_level_size: Option<u64>,
230 #[clap(long)]
231 sst_allowed_trivial_move_max_count: Option<u32>,
232 #[clap(long)]
233 emergency_level0_sst_file_count: Option<u32>,
234 #[clap(long)]
235 emergency_level0_sub_level_partition: Option<u32>,
236 #[clap(long)]
237 level0_stop_write_threshold_max_sst_count: Option<u32>,
238 #[clap(long)]
239 level0_stop_write_threshold_max_size: Option<u64>,
240 #[clap(long)]
241 enable_optimize_l0_interval_selection: Option<bool>,
242 #[clap(long)]
247 blocked_xor_filter_kv_count_threshold: Option<u64>,
248 #[clap(long)]
249 max_vnode_key_range_bytes: Option<u64>,
250 },
251 SplitCompactionGroup {
253 #[clap(long)]
254 compaction_group_id: CompactionGroupId,
255 #[clap(long, value_delimiter = ',')]
256 table_ids: Vec<TableId>,
257 #[clap(long, default_value_t = 0)]
258 partition_vnode_count: u32,
259 },
260 PauseVersionCheckpoint,
262 ResumeVersionCheckpoint,
264 ReplayVersion,
266 ListCompactionStatus {
268 #[clap(short, long = "verbose", default_value_t = false)]
269 verbose: bool,
270 },
271 GetCompactionScore {
272 #[clap(long)]
273 compaction_group_id: CompactionGroupId,
274 },
275 ValidateVersion,
277 RebuildTableStats,
279 CancelCompactTask {
280 #[clap(short, long)]
281 task_id: u64,
282 },
283 PrintUserKeyInArchive {
284 #[clap(long, value_delimiter = ',')]
286 archive_ids: Vec<u64>,
287 #[clap(long)]
289 data_dir: String,
290 #[clap(long)]
292 user_key: String,
293 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
294 use_new_object_prefix_strategy: bool,
295 },
296 PrintVersionDeltaInArchive {
297 #[clap(long, value_delimiter = ',')]
299 archive_ids: Vec<u64>,
300 #[clap(long)]
302 data_dir: String,
303 #[clap(long)]
305 sst_id: HummockSstableId,
306 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
307 use_new_object_prefix_strategy: bool,
308 },
309 TieredCacheTracing {
310 #[clap(long)]
311 enable: bool,
312 #[clap(long)]
313 record_hybrid_insert_threshold_ms: Option<u32>,
314 #[clap(long)]
315 record_hybrid_get_threshold_ms: Option<u32>,
316 #[clap(long)]
317 record_hybrid_obtain_threshold_ms: Option<u32>,
318 #[clap(long)]
319 record_hybrid_remove_threshold_ms: Option<u32>,
320 #[clap(long)]
321 record_hybrid_fetch_threshold_ms: Option<u32>,
322 },
323 MergeCompactionGroup {
324 #[clap(long)]
325 left_group_id: CompactionGroupId,
326 #[clap(long)]
327 right_group_id: CompactionGroupId,
328 },
329 MigrateLegacyObject {
330 url: String,
331 source_dir: String,
332 target_dir: String,
333 #[clap(long, default_value = "100")]
334 concurrency: u32,
335 },
336 ResizeCache {
337 #[clap(long)]
338 meta_cache_capacity_mb: Option<u64>,
339 #[clap(long)]
340 data_cache_capacity_mb: Option<u64>,
341 },
342}
343
344#[derive(Subcommand)]
345enum TableCommands {
346 Scan {
348 mv_name: String,
350 data_dir: Option<String>,
352
353 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
354 use_new_object_prefix_strategy: bool,
355 },
356 ScanById {
358 table_id: TableId,
360 data_dir: Option<String>,
362 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
363 use_new_object_prefix_strategy: bool,
364 },
365 List,
367}
368
369#[derive(Subcommand)]
370#[expect(clippy::large_enum_variant)]
371enum MetaCommands {
372 Pause,
374 Resume,
376 #[clap(
378 group(
379 ArgGroup::new("resume_backfill_target")
380 .required(true)
381 .args(&["job_id", "fragment_id"])
382 )
383 )]
384 ResumeBackfill {
385 #[clap(long)]
386 job_id: Option<JobId>,
387 #[clap(long)]
388 fragment_id: Option<FragmentId>,
389 },
390 ClusterInfo,
392 SourceSplitInfo {
394 #[clap(long)]
395 ignore_id: bool,
396 },
397 #[clap(verbatim_doc_comment)]
414 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
415 Reschedule {
416 #[clap(long, requires = "revision")]
418 plan: Option<String>,
419 #[clap(long)]
421 revision: Option<u64>,
422 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
424 from: Option<String>,
425 #[clap(long, default_value = "false")]
427 dry_run: bool,
428 #[clap(long, default_value = "false")]
430 resolve_no_shuffle: bool,
431 },
432 BackupMeta {
434 #[clap(long)]
435 remarks: Option<String>,
436 },
437 RestoreMeta {
439 #[command(flatten)]
440 opts: RestoreOpts,
441 },
442 DeleteMetaSnapshots {
444 #[clap(long, value_delimiter = ',')]
445 snapshot_ids: Vec<u64>,
446 },
447
448 ListConnections,
450
451 ListServingFragmentMapping,
453
454 UnregisterWorkers {
456 #[clap(
458 long,
459 required = true,
460 value_delimiter = ',',
461 value_name = "worker_id or worker_host:worker_port, ..."
462 )]
463 workers: Vec<String>,
464
465 #[clap(short = 'y', long, default_value_t = false)]
467 yes: bool,
468
469 #[clap(long, default_value_t = false)]
471 ignore_not_found: bool,
472
473 #[clap(long, default_value_t = false)]
475 check_fragment_occupied: bool,
476 },
477
478 ValidateSource {
480 #[clap(long)]
483 props: String,
484 },
485
486 SetCdcTableBackfillParallelism {
487 #[clap(long, required = true)]
488 table_id: u32,
489 #[clap(long, required = true)]
490 parallelism: u32,
491 },
492
493 AlterSourcePropertiesSafe {
496 #[clap(long)]
498 source_id: u32,
499 #[clap(long)]
501 props: String,
502 #[clap(long, default_value_t = false)]
504 reset_splits: bool,
505 },
506
507 ResetSourceSplits {
510 #[clap(long)]
512 source_id: u32,
513 },
514
515 InjectSourceOffsets {
518 #[clap(long)]
520 source_id: u32,
521 #[clap(long)]
523 offsets: String,
524 },
525
526 CreateMetaStoreSchema {
529 #[command(flatten)]
530 opts: cmd_impl::meta::CreateMetaStoreSchemaOpts,
531 },
532}
533
534#[derive(Subcommand, Clone, Debug)]
535pub enum AwaitTreeCommands {
536 Dump {
538 #[clap(short, long = "actor-traces-format")]
540 actor_traces_format: Option<String>,
541 },
542 Analyze {
544 #[clap(long = "path")]
548 path: Option<String>,
549 },
550 Transcribe {
552 #[clap(long = "path")]
554 path: String,
555 },
556}
557
558#[derive(Subcommand, Clone, Debug)]
559enum TestCommands {
560 Jvm,
562}
563
564#[derive(Subcommand, Clone, Debug)]
565enum ThrottleCommands {
566 Source(ThrottleCommandArgs),
567 Mv(ThrottleCommandArgs),
568 Sink(ThrottleCommandArgs),
569}
570
571#[derive(Clone, Debug, clap::ValueEnum)]
572pub enum ThrottleTypeArg {
573 Dml,
574 Backfill,
575 Source,
576 Sink,
577}
578
579#[derive(Clone, Debug, Args)]
580pub struct ThrottleCommandArgs {
581 #[clap(long, required = true)]
583 id: u32,
584 #[clap(long)]
586 rate: Option<u32>,
587 #[clap(long, value_enum, required = true)]
589 throttle_type: ThrottleTypeArg,
590}
591
592#[derive(Subcommand, Clone, Debug)]
593pub enum ProfileCommands {
594 Cpu {
596 #[clap(short, long = "sleep")]
598 sleep: u64,
599 #[clap(long = "worker-type", value_name = "TYPE")]
601 worker_types: Vec<ProfileWorkerType>,
602 },
603 Heap {
605 #[clap(long = "dir")]
607 dir: Option<String>,
608 #[clap(long = "worker-type", value_name = "TYPE")]
610 worker_types: Vec<ProfileWorkerType>,
611 },
612}
613
614pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
621 let context = CtlContext::default();
622
623 tokio::select! {
624 _ = shutdown.cancelled() => {
625 context.try_close().await;
627 }
628
629 result = start_fallible(opts, &context) => {
630 if let Err(e) = result {
631 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
633 }
634 }
635 }
636}
637
638pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
641 let result = start_impl(opts, context).await;
642 context.try_close().await;
643 result
644}
645
646async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
647 match opts.command {
648 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
649 cmd_impl::compute::show_config(&host).await?
650 }
651 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
652 cmd_impl::hummock::disable_commit_epoch(context).await?
653 }
654 Commands::Hummock(HummockCommands::ListVersion {
655 verbose,
656 verbose_key_range,
657 }) => {
658 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
659 }
660 Commands::Hummock(HummockCommands::ListVersionDeltas {
661 start_id,
662 num_epochs,
663 }) => {
664 cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
665 }
666 Commands::Hummock(HummockCommands::ListKv {
667 epoch,
668 table_id,
669 data_dir,
670 use_new_object_prefix_strategy,
671 }) => {
672 cmd_impl::hummock::list_kv(
673 context,
674 epoch,
675 table_id,
676 data_dir,
677 use_new_object_prefix_strategy,
678 )
679 .await?;
680 }
681 Commands::Hummock(HummockCommands::SstDump(args)) => {
682 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
683 }
684 Commands::Hummock(HummockCommands::TriggerManualCompaction {
685 compaction_group_id,
686 table_id,
687 levels,
688 target_level,
689 sst_ids,
690 exclusive,
691 retry_interval_ms,
692 }) => {
693 cmd_impl::hummock::trigger_manual_compaction(
694 context,
695 compaction_group_id,
696 table_id.into(),
697 levels,
698 target_level,
699 sst_ids,
700 exclusive,
701 retry_interval_ms,
702 )
703 .await?
704 }
705 Commands::Hummock(HummockCommands::TriggerFullGc {
706 sst_retention_time_sec,
707 prefix,
708 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
709 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
710 list_pinned_versions(context).await?
711 }
712 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
713 cmd_impl::hummock::list_compaction_group(context).await?
714 }
715 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
716 compaction_group_ids,
717 max_bytes_for_level_base,
718 max_bytes_for_level_multiplier,
719 max_compaction_bytes,
720 sub_level_max_compaction_bytes,
721 level0_tier_compact_file_number,
722 target_file_size_base,
723 compaction_filter_mask,
724 max_sub_compaction,
725 level0_stop_write_threshold_sub_level_number,
726 level0_sub_level_compact_level_count,
727 max_space_reclaim_bytes,
728 level0_max_compact_file_number,
729 level0_overlapping_sub_level_compact_level_count,
730 enable_emergency_picker,
731 tombstone_reclaim_ratio,
732 compression_level,
733 compression_algorithm,
734 sstable_filter_type_level,
735 sstable_filter_type,
736 sstable_filter_layout_level,
737 sstable_filter_layout,
738 max_l0_compact_level,
739 sst_allowed_trivial_move_min_size,
740 disable_auto_group_scheduling,
741 max_overlapping_level_size,
742 sst_allowed_trivial_move_max_count,
743 emergency_level0_sst_file_count,
744 emergency_level0_sub_level_partition,
745 level0_stop_write_threshold_max_sst_count,
746 level0_stop_write_threshold_max_size,
747 enable_optimize_l0_interval_selection,
748 blocked_xor_filter_kv_count_threshold,
749 max_vnode_key_range_bytes,
750 }) => {
751 cmd_impl::hummock::update_compaction_config(
752 context,
753 compaction_group_ids,
754 build_compaction_config_vec(
755 max_bytes_for_level_base,
756 max_bytes_for_level_multiplier,
757 max_compaction_bytes,
758 sub_level_max_compaction_bytes,
759 level0_tier_compact_file_number,
760 target_file_size_base,
761 compaction_filter_mask,
762 max_sub_compaction,
763 level0_stop_write_threshold_sub_level_number,
764 level0_sub_level_compact_level_count,
765 max_space_reclaim_bytes,
766 level0_max_compact_file_number,
767 level0_overlapping_sub_level_compact_level_count,
768 enable_emergency_picker,
769 tombstone_reclaim_ratio,
770 if let Some(level) = compression_level {
771 assert!(compression_algorithm.is_some());
772 Some(CompressionAlgorithm {
773 level,
774 compression_algorithm: compression_algorithm.unwrap(),
775 })
776 } else {
777 None
778 },
779 if let (Some(level), Some(filter_type)) =
780 (sstable_filter_type_level, sstable_filter_type)
781 {
782 Some(SstableFilterType { level, filter_type })
783 } else {
784 None
785 },
786 if let (Some(level), Some(layout)) =
787 (sstable_filter_layout_level, sstable_filter_layout)
788 {
789 Some(SstableFilterLayout { level, layout })
790 } else {
791 None
792 },
793 max_l0_compact_level,
794 sst_allowed_trivial_move_min_size,
795 disable_auto_group_scheduling,
796 max_overlapping_level_size,
797 sst_allowed_trivial_move_max_count,
798 emergency_level0_sst_file_count,
799 emergency_level0_sub_level_partition,
800 level0_stop_write_threshold_max_sst_count,
801 level0_stop_write_threshold_max_size,
802 enable_optimize_l0_interval_selection,
803 blocked_xor_filter_kv_count_threshold,
804 max_vnode_key_range_bytes,
805 ),
806 )
807 .await?
808 }
809 Commands::Hummock(HummockCommands::SplitCompactionGroup {
810 compaction_group_id,
811 table_ids,
812 partition_vnode_count,
813 }) => {
814 cmd_impl::hummock::split_compaction_group(
815 context,
816 compaction_group_id,
817 &table_ids.into_iter().map_into().collect_vec(),
818 partition_vnode_count,
819 )
820 .await?;
821 }
822 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
823 cmd_impl::hummock::pause_version_checkpoint(context).await?;
824 }
825 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
826 cmd_impl::hummock::resume_version_checkpoint(context).await?;
827 }
828 Commands::Hummock(HummockCommands::ReplayVersion) => {
829 cmd_impl::hummock::replay_version(context).await?;
830 }
831 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
832 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
833 }
834 Commands::Hummock(HummockCommands::GetCompactionScore {
835 compaction_group_id,
836 }) => {
837 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
838 }
839 Commands::Hummock(HummockCommands::ValidateVersion) => {
840 cmd_impl::hummock::validate_version(context).await?;
841 }
842 Commands::Hummock(HummockCommands::RebuildTableStats) => {
843 cmd_impl::hummock::rebuild_table_stats(context).await?;
844 }
845 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
846 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
847 }
848 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
849 archive_ids,
850 data_dir,
851 sst_id,
852 use_new_object_prefix_strategy,
853 }) => {
854 cmd_impl::hummock::print_version_delta_in_archive(
855 context,
856 archive_ids.into_iter().map(HummockVersionId::new),
857 data_dir,
858 sst_id,
859 use_new_object_prefix_strategy,
860 )
861 .await?;
862 }
863 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
864 archive_ids,
865 data_dir,
866 user_key,
867 use_new_object_prefix_strategy,
868 }) => {
869 cmd_impl::hummock::print_user_key_in_archive(
870 context,
871 archive_ids.into_iter().map(HummockVersionId::new),
872 data_dir,
873 user_key,
874 use_new_object_prefix_strategy,
875 )
876 .await?;
877 }
878 Commands::Hummock(HummockCommands::TieredCacheTracing {
879 enable,
880 record_hybrid_insert_threshold_ms,
881 record_hybrid_get_threshold_ms,
882 record_hybrid_obtain_threshold_ms,
883 record_hybrid_remove_threshold_ms,
884 record_hybrid_fetch_threshold_ms,
885 }) => {
886 cmd_impl::hummock::tiered_cache_tracing(
887 context,
888 enable,
889 record_hybrid_insert_threshold_ms,
890 record_hybrid_get_threshold_ms,
891 record_hybrid_obtain_threshold_ms,
892 record_hybrid_remove_threshold_ms,
893 record_hybrid_fetch_threshold_ms,
894 )
895 .await?
896 }
897 Commands::Hummock(HummockCommands::MergeCompactionGroup {
898 left_group_id,
899 right_group_id,
900 }) => {
901 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
902 .await?
903 }
904
905 Commands::Hummock(HummockCommands::MigrateLegacyObject {
906 url,
907 source_dir,
908 target_dir,
909 concurrency,
910 }) => {
911 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
912 }
913 Commands::Hummock(HummockCommands::ResizeCache {
914 meta_cache_capacity_mb,
915 data_cache_capacity_mb,
916 }) => {
917 const MIB: u64 = 1024 * 1024;
918 cmd_impl::hummock::resize_cache(
919 context,
920 meta_cache_capacity_mb.map(|v| v * MIB),
921 data_cache_capacity_mb.map(|v| v * MIB),
922 )
923 .await?
924 }
925 Commands::Table(TableCommands::Scan {
926 mv_name,
927 data_dir,
928 use_new_object_prefix_strategy,
929 }) => {
930 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
931 .await?
932 }
933 Commands::Table(TableCommands::ScanById {
934 table_id,
935 data_dir,
936 use_new_object_prefix_strategy,
937 }) => {
938 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
939 .await?
940 }
941 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
942 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
943 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
944 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
945 Commands::Meta(MetaCommands::ResumeBackfill {
946 job_id,
947 fragment_id,
948 }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
949 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
950 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
951 cmd_impl::meta::source_split_info(context, ignore_id).await?
952 }
953 Commands::Meta(MetaCommands::Reschedule {
954 from,
955 dry_run,
956 plan,
957 revision,
958 resolve_no_shuffle,
959 }) => {
960 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
961 .await?
962 }
963 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
964 cmd_impl::meta::backup_meta(context, remarks).await?
965 }
966 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
967 risingwave_meta::backup_restore::restore(opts).await?
968 }
969 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
970 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
971 }
972 Commands::Meta(MetaCommands::ListConnections) => {
973 cmd_impl::meta::list_connections(context).await?
974 }
975 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
976 cmd_impl::meta::list_serving_fragment_mappings(context).await?
977 }
978 Commands::Meta(MetaCommands::UnregisterWorkers {
979 workers,
980 yes,
981 ignore_not_found,
982 check_fragment_occupied,
983 }) => {
984 cmd_impl::meta::unregister_workers(
985 context,
986 workers,
987 yes,
988 ignore_not_found,
989 check_fragment_occupied,
990 )
991 .await?
992 }
993 Commands::Meta(MetaCommands::ValidateSource { props }) => {
994 cmd_impl::meta::validate_source(context, props).await?
995 }
996 Commands::AwaitTree(AwaitTreeCommands::Dump {
997 actor_traces_format,
998 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
999 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
1000 cmd_impl::await_tree::bottleneck_detect(context, path).await?
1001 }
1002 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
1003 rw_diagnose_tools::await_tree::transcribe(path)?
1004 }
1005 Commands::Profile(ProfileCommands::Cpu {
1006 sleep,
1007 worker_types,
1008 }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
1009 Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
1010 cmd_impl::profile::heap_profile(context, dir, worker_types).await?
1011 }
1012 Commands::Throttle(ThrottleCommands::Source(args)) => {
1013 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
1014 }
1015 Commands::Throttle(ThrottleCommands::Mv(args)) => {
1016 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
1017 }
1018 Commands::Throttle(ThrottleCommands::Sink(args)) => {
1019 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
1020 }
1021 Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
1022 table_id,
1023 parallelism,
1024 }) => {
1025 set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
1026 }
1027 Commands::Meta(MetaCommands::AlterSourcePropertiesSafe {
1028 source_id,
1029 props,
1030 reset_splits,
1031 }) => {
1032 cmd_impl::meta::alter_source_properties_safe(context, source_id, props, reset_splits)
1033 .await?;
1034 }
1035 Commands::Meta(MetaCommands::ResetSourceSplits { source_id }) => {
1036 cmd_impl::meta::reset_source_splits(context, source_id).await?;
1037 }
1038 Commands::Meta(MetaCommands::InjectSourceOffsets { source_id, offsets }) => {
1039 cmd_impl::meta::inject_source_offsets(context, source_id, offsets).await?;
1040 }
1041 Commands::Meta(MetaCommands::CreateMetaStoreSchema { opts }) => {
1042 cmd_impl::meta::create_meta_store_schema(opts).await?;
1043 }
1044 Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
1045 }
1046 Ok(())
1047}