1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{ArgGroup, Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::id::{CompactionGroupId, FragmentId, HummockSstableId, JobId, TableId};
27use thiserror_ext::AsReport;
28
29use crate::cmd_impl::hummock::{
30 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
31};
32use crate::cmd_impl::profile::ProfileWorkerType;
33use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
34use crate::cmd_impl::throttle::apply_throttle;
35use crate::common::CtlContext;
36
37pub mod cmd_impl;
38pub mod common;
39
40#[derive(Parser)]
46#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
47#[clap(propagate_version = true)]
48#[clap(infer_subcommands = true)]
49pub struct CliOpts {
50 #[clap(subcommand)]
51 command: Commands,
52}
53
54#[derive(Subcommand)]
55#[clap(infer_subcommands = true)]
56enum Commands {
57 #[clap(subcommand)]
59 Compute(ComputeCommands),
60 #[clap(subcommand)]
62 Hummock(HummockCommands),
63 #[clap(subcommand)]
65 Table(TableCommands),
66 #[clap(subcommand)]
68 Meta(MetaCommands),
69 #[clap(subcommand)]
71 Bench(BenchCommands),
72 #[clap(subcommand)]
74 #[clap(visible_alias("trace"))]
75 AwaitTree(AwaitTreeCommands),
76 #[clap(subcommand)]
78 Profile(ProfileCommands),
79 #[clap(subcommand)]
80 Throttle(ThrottleCommands),
81 #[clap(subcommand, hide = true)]
83 Test(TestCommands),
84}
85
86#[derive(Subcommand)]
87enum ComputeCommands {
88 ShowConfig { host: String },
90}
91
92#[allow(clippy::large_enum_variant)]
93#[derive(Subcommand)]
94enum HummockCommands {
95 ListVersion {
97 #[clap(short, long = "verbose", default_value_t = false)]
98 verbose: bool,
99
100 #[clap(long = "verbose_key_range", default_value_t = false)]
101 verbose_key_range: bool,
102 },
103
104 ListVersionDeltas {
106 #[clap(short, long = "start-version-delta-id", default_value_t = HummockVersionId::new(0))]
107 start_id: HummockVersionId,
108
109 #[clap(short, long = "num-epochs", default_value_t = 100)]
110 num_epochs: u32,
111 },
112 DisableCommitEpoch,
114 ListKv {
116 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
117 epoch: u64,
118
119 #[clap(short, long = "table-id")]
120 table_id: TableId,
121
122 data_dir: Option<String>,
124
125 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
126 use_new_object_prefix_strategy: bool,
127 },
128 SstDump(SstDumpArgs),
129 TriggerManualCompaction {
131 #[clap(short, long = "compaction-group-id", default_value_t = CompactionGroupId::new(2))]
132 compaction_group_id: CompactionGroupId,
133
134 #[clap(short, long = "table-id", default_value_t = 0)]
135 table_id: u32,
136
137 #[clap(short, long = "level", value_delimiter = ',', default_values_t = vec![1u32])]
138 levels: Vec<u32>,
139
140 #[clap(short, long = "sst-ids", value_delimiter = ',')]
141 sst_ids: Vec<HummockSstableId>,
142
143 #[clap(long = "exclusive", default_value_t = false)]
144 exclusive: bool,
145
146 #[clap(long = "retry-interval-ms", default_value_t = 1000)]
147 retry_interval_ms: u64,
148 },
149 TriggerFullGc {
152 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
153 sst_retention_time_sec: u64,
154 #[clap(short, long = "prefix", required = false)]
155 prefix: Option<String>,
156 },
157 ListPinnedVersions {},
159 ListCompactionGroup,
161 UpdateCompactionConfig {
163 #[clap(long, value_delimiter = ',')]
164 compaction_group_ids: Vec<CompactionGroupId>,
165 #[clap(long)]
166 max_bytes_for_level_base: Option<u64>,
167 #[clap(long)]
168 max_bytes_for_level_multiplier: Option<u64>,
169 #[clap(long)]
170 max_compaction_bytes: Option<u64>,
171 #[clap(long)]
172 sub_level_max_compaction_bytes: Option<u64>,
173 #[clap(long)]
174 level0_tier_compact_file_number: Option<u64>,
175 #[clap(long)]
176 target_file_size_base: Option<u64>,
177 #[clap(long)]
178 compaction_filter_mask: Option<u32>,
179 #[clap(long)]
180 max_sub_compaction: Option<u32>,
181 #[clap(long)]
182 level0_stop_write_threshold_sub_level_number: Option<u64>,
183 #[clap(long)]
184 level0_sub_level_compact_level_count: Option<u32>,
185 #[clap(long)]
186 max_space_reclaim_bytes: Option<u64>,
187 #[clap(long)]
188 level0_max_compact_file_number: Option<u64>,
189 #[clap(long)]
190 level0_overlapping_sub_level_compact_level_count: Option<u32>,
191 #[clap(long)]
192 enable_emergency_picker: Option<bool>,
193 #[clap(long)]
194 tombstone_reclaim_ratio: Option<u32>,
195 #[clap(long)]
196 compression_level: Option<u32>,
197 #[clap(long)]
198 compression_algorithm: Option<String>,
199 #[clap(long)]
200 max_l0_compact_level: Option<u32>,
201 #[clap(long)]
202 sst_allowed_trivial_move_min_size: Option<u64>,
203 #[clap(long)]
204 disable_auto_group_scheduling: Option<bool>,
205 #[clap(long)]
206 max_overlapping_level_size: Option<u64>,
207 #[clap(long)]
208 sst_allowed_trivial_move_max_count: Option<u32>,
209 #[clap(long)]
210 emergency_level0_sst_file_count: Option<u32>,
211 #[clap(long)]
212 emergency_level0_sub_level_partition: Option<u32>,
213 #[clap(long)]
214 level0_stop_write_threshold_max_sst_count: Option<u32>,
215 #[clap(long)]
216 level0_stop_write_threshold_max_size: Option<u64>,
217 #[clap(long)]
218 enable_optimize_l0_interval_selection: Option<bool>,
219 #[clap(long)]
220 max_kv_count_for_xor16: Option<u64>,
221 #[clap(long)]
222 max_vnode_key_range_bytes: Option<u64>,
223 },
224 SplitCompactionGroup {
226 #[clap(long)]
227 compaction_group_id: CompactionGroupId,
228 #[clap(long, value_delimiter = ',')]
229 table_ids: Vec<TableId>,
230 #[clap(long, default_value_t = 0)]
231 partition_vnode_count: u32,
232 },
233 PauseVersionCheckpoint,
235 ResumeVersionCheckpoint,
237 ReplayVersion,
239 ListCompactionStatus {
241 #[clap(short, long = "verbose", default_value_t = false)]
242 verbose: bool,
243 },
244 GetCompactionScore {
245 #[clap(long)]
246 compaction_group_id: CompactionGroupId,
247 },
248 ValidateVersion,
250 RebuildTableStats,
252 CancelCompactTask {
253 #[clap(short, long)]
254 task_id: u64,
255 },
256 PrintUserKeyInArchive {
257 #[clap(long, value_delimiter = ',')]
259 archive_ids: Vec<u64>,
260 #[clap(long)]
262 data_dir: String,
263 #[clap(long)]
265 user_key: String,
266 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
267 use_new_object_prefix_strategy: bool,
268 },
269 PrintVersionDeltaInArchive {
270 #[clap(long, value_delimiter = ',')]
272 archive_ids: Vec<u64>,
273 #[clap(long)]
275 data_dir: String,
276 #[clap(long)]
278 sst_id: HummockSstableId,
279 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
280 use_new_object_prefix_strategy: bool,
281 },
282 TieredCacheTracing {
283 #[clap(long)]
284 enable: bool,
285 #[clap(long)]
286 record_hybrid_insert_threshold_ms: Option<u32>,
287 #[clap(long)]
288 record_hybrid_get_threshold_ms: Option<u32>,
289 #[clap(long)]
290 record_hybrid_obtain_threshold_ms: Option<u32>,
291 #[clap(long)]
292 record_hybrid_remove_threshold_ms: Option<u32>,
293 #[clap(long)]
294 record_hybrid_fetch_threshold_ms: Option<u32>,
295 },
296 MergeCompactionGroup {
297 #[clap(long)]
298 left_group_id: CompactionGroupId,
299 #[clap(long)]
300 right_group_id: CompactionGroupId,
301 },
302 MigrateLegacyObject {
303 url: String,
304 source_dir: String,
305 target_dir: String,
306 #[clap(long, default_value = "100")]
307 concurrency: u32,
308 },
309 ResizeCache {
310 #[clap(long)]
311 meta_cache_capacity_mb: Option<u64>,
312 #[clap(long)]
313 data_cache_capacity_mb: Option<u64>,
314 },
315}
316
317#[derive(Subcommand)]
318enum TableCommands {
319 Scan {
321 mv_name: String,
323 data_dir: Option<String>,
325
326 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
327 use_new_object_prefix_strategy: bool,
328 },
329 ScanById {
331 table_id: TableId,
333 data_dir: Option<String>,
335 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
336 use_new_object_prefix_strategy: bool,
337 },
338 List,
340}
341
342#[derive(Subcommand)]
343#[allow(clippy::large_enum_variant)]
344enum MetaCommands {
345 Pause,
347 Resume,
349 #[clap(
351 group(
352 ArgGroup::new("resume_backfill_target")
353 .required(true)
354 .args(&["job_id", "fragment_id"])
355 )
356 )]
357 ResumeBackfill {
358 #[clap(long)]
359 job_id: Option<JobId>,
360 #[clap(long)]
361 fragment_id: Option<FragmentId>,
362 },
363 ClusterInfo,
365 SourceSplitInfo {
367 #[clap(long)]
368 ignore_id: bool,
369 },
370 #[clap(verbatim_doc_comment)]
387 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
388 Reschedule {
389 #[clap(long, requires = "revision")]
391 plan: Option<String>,
392 #[clap(long)]
394 revision: Option<u64>,
395 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
397 from: Option<String>,
398 #[clap(long, default_value = "false")]
400 dry_run: bool,
401 #[clap(long, default_value = "false")]
403 resolve_no_shuffle: bool,
404 },
405 BackupMeta {
407 #[clap(long)]
408 remarks: Option<String>,
409 },
410 RestoreMeta {
412 #[command(flatten)]
413 opts: RestoreOpts,
414 },
415 DeleteMetaSnapshots {
417 #[clap(long, value_delimiter = ',')]
418 snapshot_ids: Vec<u64>,
419 },
420
421 ListConnections,
423
424 ListServingFragmentMapping,
426
427 UnregisterWorkers {
429 #[clap(
431 long,
432 required = true,
433 value_delimiter = ',',
434 value_name = "worker_id or worker_host:worker_port, ..."
435 )]
436 workers: Vec<String>,
437
438 #[clap(short = 'y', long, default_value_t = false)]
440 yes: bool,
441
442 #[clap(long, default_value_t = false)]
444 ignore_not_found: bool,
445
446 #[clap(long, default_value_t = false)]
448 check_fragment_occupied: bool,
449 },
450
451 ValidateSource {
453 #[clap(long)]
456 props: String,
457 },
458
459 SetCdcTableBackfillParallelism {
460 #[clap(long, required = true)]
461 table_id: u32,
462 #[clap(long, required = true)]
463 parallelism: u32,
464 },
465
466 AlterSourcePropertiesSafe {
469 #[clap(long)]
471 source_id: u32,
472 #[clap(long)]
474 props: String,
475 #[clap(long, default_value_t = false)]
477 reset_splits: bool,
478 },
479
480 ResetSourceSplits {
483 #[clap(long)]
485 source_id: u32,
486 },
487
488 InjectSourceOffsets {
491 #[clap(long)]
493 source_id: u32,
494 #[clap(long)]
496 offsets: String,
497 },
498}
499
500#[derive(Subcommand, Clone, Debug)]
501pub enum AwaitTreeCommands {
502 Dump {
504 #[clap(short, long = "actor-traces-format")]
506 actor_traces_format: Option<String>,
507 },
508 Analyze {
510 #[clap(long = "path")]
514 path: Option<String>,
515 },
516 Transcribe {
518 #[clap(long = "path")]
520 path: String,
521 },
522}
523
524#[derive(Subcommand, Clone, Debug)]
525enum TestCommands {
526 Jvm,
528}
529
530#[derive(Subcommand, Clone, Debug)]
531enum ThrottleCommands {
532 Source(ThrottleCommandArgs),
533 Mv(ThrottleCommandArgs),
534 Sink(ThrottleCommandArgs),
535}
536
537#[derive(Clone, Debug, clap::ValueEnum)]
538pub enum ThrottleTypeArg {
539 Dml,
540 Backfill,
541 Source,
542 Sink,
543}
544
545#[derive(Clone, Debug, Args)]
546pub struct ThrottleCommandArgs {
547 #[clap(long, required = true)]
549 id: u32,
550 #[clap(long)]
552 rate: Option<u32>,
553 #[clap(long, value_enum, required = true)]
555 throttle_type: ThrottleTypeArg,
556}
557
558#[derive(Subcommand, Clone, Debug)]
559pub enum ProfileCommands {
560 Cpu {
562 #[clap(short, long = "sleep")]
564 sleep: u64,
565 #[clap(long = "worker-type", value_name = "TYPE")]
567 worker_types: Vec<ProfileWorkerType>,
568 },
569 Heap {
571 #[clap(long = "dir")]
573 dir: Option<String>,
574 #[clap(long = "worker-type", value_name = "TYPE")]
576 worker_types: Vec<ProfileWorkerType>,
577 },
578}
579
580pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
587 let context = CtlContext::default();
588
589 tokio::select! {
590 _ = shutdown.cancelled() => {
591 context.try_close().await;
593 }
594
595 result = start_fallible(opts, &context) => {
596 if let Err(e) = result {
597 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
599 }
600 }
601 }
602}
603
604pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
607 let result = start_impl(opts, context).await;
608 context.try_close().await;
609 result
610}
611
612#[expect(
613 clippy::large_stack_frames,
614 reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
615 post-layout generator stores only one arm at a time (~13–16 KiB)."
616)]
617async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
618 match opts.command {
619 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
620 cmd_impl::compute::show_config(&host).await?
621 }
622 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
623 cmd_impl::hummock::disable_commit_epoch(context).await?
624 }
625 Commands::Hummock(HummockCommands::ListVersion {
626 verbose,
627 verbose_key_range,
628 }) => {
629 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
630 }
631 Commands::Hummock(HummockCommands::ListVersionDeltas {
632 start_id,
633 num_epochs,
634 }) => {
635 cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
636 }
637 Commands::Hummock(HummockCommands::ListKv {
638 epoch,
639 table_id,
640 data_dir,
641 use_new_object_prefix_strategy,
642 }) => {
643 cmd_impl::hummock::list_kv(
644 context,
645 epoch,
646 table_id,
647 data_dir,
648 use_new_object_prefix_strategy,
649 )
650 .await?;
651 }
652 Commands::Hummock(HummockCommands::SstDump(args)) => {
653 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
654 }
655 Commands::Hummock(HummockCommands::TriggerManualCompaction {
656 compaction_group_id,
657 table_id,
658 levels,
659 sst_ids,
660 exclusive,
661 retry_interval_ms,
662 }) => {
663 cmd_impl::hummock::trigger_manual_compaction(
664 context,
665 compaction_group_id,
666 table_id.into(),
667 levels,
668 sst_ids,
669 exclusive,
670 retry_interval_ms,
671 )
672 .await?
673 }
674 Commands::Hummock(HummockCommands::TriggerFullGc {
675 sst_retention_time_sec,
676 prefix,
677 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
678 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
679 list_pinned_versions(context).await?
680 }
681 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
682 cmd_impl::hummock::list_compaction_group(context).await?
683 }
684 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
685 compaction_group_ids,
686 max_bytes_for_level_base,
687 max_bytes_for_level_multiplier,
688 max_compaction_bytes,
689 sub_level_max_compaction_bytes,
690 level0_tier_compact_file_number,
691 target_file_size_base,
692 compaction_filter_mask,
693 max_sub_compaction,
694 level0_stop_write_threshold_sub_level_number,
695 level0_sub_level_compact_level_count,
696 max_space_reclaim_bytes,
697 level0_max_compact_file_number,
698 level0_overlapping_sub_level_compact_level_count,
699 enable_emergency_picker,
700 tombstone_reclaim_ratio,
701 compression_level,
702 compression_algorithm,
703 max_l0_compact_level,
704 sst_allowed_trivial_move_min_size,
705 disable_auto_group_scheduling,
706 max_overlapping_level_size,
707 sst_allowed_trivial_move_max_count,
708 emergency_level0_sst_file_count,
709 emergency_level0_sub_level_partition,
710 level0_stop_write_threshold_max_sst_count,
711 level0_stop_write_threshold_max_size,
712 enable_optimize_l0_interval_selection,
713 max_kv_count_for_xor16,
714 max_vnode_key_range_bytes,
715 }) => {
716 cmd_impl::hummock::update_compaction_config(
717 context,
718 compaction_group_ids,
719 build_compaction_config_vec(
720 max_bytes_for_level_base,
721 max_bytes_for_level_multiplier,
722 max_compaction_bytes,
723 sub_level_max_compaction_bytes,
724 level0_tier_compact_file_number,
725 target_file_size_base,
726 compaction_filter_mask,
727 max_sub_compaction,
728 level0_stop_write_threshold_sub_level_number,
729 level0_sub_level_compact_level_count,
730 max_space_reclaim_bytes,
731 level0_max_compact_file_number,
732 level0_overlapping_sub_level_compact_level_count,
733 enable_emergency_picker,
734 tombstone_reclaim_ratio,
735 if let Some(level) = compression_level {
736 assert!(compression_algorithm.is_some());
737 Some(CompressionAlgorithm {
738 level,
739 compression_algorithm: compression_algorithm.unwrap(),
740 })
741 } else {
742 None
743 },
744 max_l0_compact_level,
745 sst_allowed_trivial_move_min_size,
746 disable_auto_group_scheduling,
747 max_overlapping_level_size,
748 sst_allowed_trivial_move_max_count,
749 emergency_level0_sst_file_count,
750 emergency_level0_sub_level_partition,
751 level0_stop_write_threshold_max_sst_count,
752 level0_stop_write_threshold_max_size,
753 enable_optimize_l0_interval_selection,
754 max_kv_count_for_xor16,
755 max_vnode_key_range_bytes,
756 ),
757 )
758 .await?
759 }
760 Commands::Hummock(HummockCommands::SplitCompactionGroup {
761 compaction_group_id,
762 table_ids,
763 partition_vnode_count,
764 }) => {
765 cmd_impl::hummock::split_compaction_group(
766 context,
767 compaction_group_id,
768 &table_ids.into_iter().map_into().collect_vec(),
769 partition_vnode_count,
770 )
771 .await?;
772 }
773 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
774 cmd_impl::hummock::pause_version_checkpoint(context).await?;
775 }
776 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
777 cmd_impl::hummock::resume_version_checkpoint(context).await?;
778 }
779 Commands::Hummock(HummockCommands::ReplayVersion) => {
780 cmd_impl::hummock::replay_version(context).await?;
781 }
782 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
783 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
784 }
785 Commands::Hummock(HummockCommands::GetCompactionScore {
786 compaction_group_id,
787 }) => {
788 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
789 }
790 Commands::Hummock(HummockCommands::ValidateVersion) => {
791 cmd_impl::hummock::validate_version(context).await?;
792 }
793 Commands::Hummock(HummockCommands::RebuildTableStats) => {
794 cmd_impl::hummock::rebuild_table_stats(context).await?;
795 }
796 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
797 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
798 }
799 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
800 archive_ids,
801 data_dir,
802 sst_id,
803 use_new_object_prefix_strategy,
804 }) => {
805 cmd_impl::hummock::print_version_delta_in_archive(
806 context,
807 archive_ids.into_iter().map(HummockVersionId::new),
808 data_dir,
809 sst_id,
810 use_new_object_prefix_strategy,
811 )
812 .await?;
813 }
814 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
815 archive_ids,
816 data_dir,
817 user_key,
818 use_new_object_prefix_strategy,
819 }) => {
820 cmd_impl::hummock::print_user_key_in_archive(
821 context,
822 archive_ids.into_iter().map(HummockVersionId::new),
823 data_dir,
824 user_key,
825 use_new_object_prefix_strategy,
826 )
827 .await?;
828 }
829 Commands::Hummock(HummockCommands::TieredCacheTracing {
830 enable,
831 record_hybrid_insert_threshold_ms,
832 record_hybrid_get_threshold_ms,
833 record_hybrid_obtain_threshold_ms,
834 record_hybrid_remove_threshold_ms,
835 record_hybrid_fetch_threshold_ms,
836 }) => {
837 cmd_impl::hummock::tiered_cache_tracing(
838 context,
839 enable,
840 record_hybrid_insert_threshold_ms,
841 record_hybrid_get_threshold_ms,
842 record_hybrid_obtain_threshold_ms,
843 record_hybrid_remove_threshold_ms,
844 record_hybrid_fetch_threshold_ms,
845 )
846 .await?
847 }
848 Commands::Hummock(HummockCommands::MergeCompactionGroup {
849 left_group_id,
850 right_group_id,
851 }) => {
852 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
853 .await?
854 }
855
856 Commands::Hummock(HummockCommands::MigrateLegacyObject {
857 url,
858 source_dir,
859 target_dir,
860 concurrency,
861 }) => {
862 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
863 }
864 Commands::Hummock(HummockCommands::ResizeCache {
865 meta_cache_capacity_mb,
866 data_cache_capacity_mb,
867 }) => {
868 const MIB: u64 = 1024 * 1024;
869 cmd_impl::hummock::resize_cache(
870 context,
871 meta_cache_capacity_mb.map(|v| v * MIB),
872 data_cache_capacity_mb.map(|v| v * MIB),
873 )
874 .await?
875 }
876 Commands::Table(TableCommands::Scan {
877 mv_name,
878 data_dir,
879 use_new_object_prefix_strategy,
880 }) => {
881 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
882 .await?
883 }
884 Commands::Table(TableCommands::ScanById {
885 table_id,
886 data_dir,
887 use_new_object_prefix_strategy,
888 }) => {
889 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
890 .await?
891 }
892 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
893 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
894 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
895 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
896 Commands::Meta(MetaCommands::ResumeBackfill {
897 job_id,
898 fragment_id,
899 }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
900 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
901 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
902 cmd_impl::meta::source_split_info(context, ignore_id).await?
903 }
904 Commands::Meta(MetaCommands::Reschedule {
905 from,
906 dry_run,
907 plan,
908 revision,
909 resolve_no_shuffle,
910 }) => {
911 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
912 .await?
913 }
914 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
915 cmd_impl::meta::backup_meta(context, remarks).await?
916 }
917 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
918 risingwave_meta::backup_restore::restore(opts).await?
919 }
920 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
921 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
922 }
923 Commands::Meta(MetaCommands::ListConnections) => {
924 cmd_impl::meta::list_connections(context).await?
925 }
926 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
927 cmd_impl::meta::list_serving_fragment_mappings(context).await?
928 }
929 Commands::Meta(MetaCommands::UnregisterWorkers {
930 workers,
931 yes,
932 ignore_not_found,
933 check_fragment_occupied,
934 }) => {
935 cmd_impl::meta::unregister_workers(
936 context,
937 workers,
938 yes,
939 ignore_not_found,
940 check_fragment_occupied,
941 )
942 .await?
943 }
944 Commands::Meta(MetaCommands::ValidateSource { props }) => {
945 cmd_impl::meta::validate_source(context, props).await?
946 }
947 Commands::AwaitTree(AwaitTreeCommands::Dump {
948 actor_traces_format,
949 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
950 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
951 cmd_impl::await_tree::bottleneck_detect(context, path).await?
952 }
953 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
954 rw_diagnose_tools::await_tree::transcribe(path)?
955 }
956 Commands::Profile(ProfileCommands::Cpu {
957 sleep,
958 worker_types,
959 }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
960 Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
961 cmd_impl::profile::heap_profile(context, dir, worker_types).await?
962 }
963 Commands::Throttle(ThrottleCommands::Source(args)) => {
964 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
965 }
966 Commands::Throttle(ThrottleCommands::Mv(args)) => {
967 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
968 }
969 Commands::Throttle(ThrottleCommands::Sink(args)) => {
970 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
971 }
972 Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
973 table_id,
974 parallelism,
975 }) => {
976 set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
977 }
978 Commands::Meta(MetaCommands::AlterSourcePropertiesSafe {
979 source_id,
980 props,
981 reset_splits,
982 }) => {
983 cmd_impl::meta::alter_source_properties_safe(context, source_id, props, reset_splits)
984 .await?;
985 }
986 Commands::Meta(MetaCommands::ResetSourceSplits { source_id }) => {
987 cmd_impl::meta::reset_source_splits(context, source_id).await?;
988 }
989 Commands::Meta(MetaCommands::InjectSourceOffsets { source_id, offsets }) => {
990 cmd_impl::meta::inject_source_offsets(context, source_id, offsets).await?;
991 }
992 Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
993 }
994 Ok(())
995}