1#![warn(clippy::large_futures, clippy::large_stack_frames)]
16
17use anyhow::Result;
18use clap::{ArgGroup, Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use itertools::Itertools;
22use risingwave_common::util::tokio_util::sync::CancellationToken;
23use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
24use risingwave_meta::backup_restore::RestoreOpts;
25use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
26use risingwave_pb::id::{CompactionGroupId, FragmentId, HummockSstableId, JobId, TableId};
27use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
28use thiserror_ext::AsReport;
29
30use crate::cmd_impl::hummock::{
31 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
32};
33use crate::cmd_impl::profile::ProfileWorkerType;
34use crate::cmd_impl::scale::set_cdc_table_backfill_parallelism;
35use crate::cmd_impl::throttle::apply_throttle;
36use crate::common::CtlContext;
37
38pub mod cmd_impl;
39pub mod common;
40
41#[derive(Parser)]
47#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
48#[clap(propagate_version = true)]
49#[clap(infer_subcommands = true)]
50pub struct CliOpts {
51 #[clap(subcommand)]
52 command: Commands,
53}
54
55#[derive(Subcommand)]
56#[clap(infer_subcommands = true)]
57enum Commands {
58 #[clap(subcommand)]
60 Compute(ComputeCommands),
61 #[clap(subcommand)]
63 Hummock(HummockCommands),
64 #[clap(subcommand)]
66 Table(TableCommands),
67 #[clap(subcommand)]
69 Meta(MetaCommands),
70 #[clap(subcommand)]
72 Scale(ScaleCommands),
73 #[clap(subcommand)]
75 Bench(BenchCommands),
76 #[clap(subcommand)]
78 #[clap(visible_alias("trace"))]
79 AwaitTree(AwaitTreeCommands),
80 #[clap(subcommand)]
82 Profile(ProfileCommands),
83 #[clap(subcommand)]
84 Throttle(ThrottleCommands),
85 #[clap(subcommand, hide = true)]
87 Test(TestCommands),
88}
89
90#[derive(Subcommand)]
91enum ComputeCommands {
92 ShowConfig { host: String },
94}
95
96#[allow(clippy::large_enum_variant)]
97#[derive(Subcommand)]
98enum HummockCommands {
99 ListVersion {
101 #[clap(short, long = "verbose", default_value_t = false)]
102 verbose: bool,
103
104 #[clap(long = "verbose_key_range", default_value_t = false)]
105 verbose_key_range: bool,
106 },
107
108 ListVersionDeltas {
110 #[clap(short, long = "start-version-delta-id", default_value_t = HummockVersionId::new(0))]
111 start_id: HummockVersionId,
112
113 #[clap(short, long = "num-epochs", default_value_t = 100)]
114 num_epochs: u32,
115 },
116 DisableCommitEpoch,
118 ListKv {
120 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
121 epoch: u64,
122
123 #[clap(short, long = "table-id")]
124 table_id: TableId,
125
126 data_dir: Option<String>,
128
129 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
130 use_new_object_prefix_strategy: bool,
131 },
132 SstDump(SstDumpArgs),
133 TriggerManualCompaction {
135 #[clap(short, long = "compaction-group-id", default_value_t = CompactionGroupId::new(2))]
136 compaction_group_id: CompactionGroupId,
137
138 #[clap(short, long = "table-id", default_value_t = 0)]
139 table_id: u32,
140
141 #[clap(short, long = "level", value_delimiter = ',', default_values_t = vec![1u32])]
142 levels: Vec<u32>,
143
144 #[clap(short, long = "sst-ids", value_delimiter = ',')]
145 sst_ids: Vec<HummockSstableId>,
146 },
147 TriggerFullGc {
150 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
151 sst_retention_time_sec: u64,
152 #[clap(short, long = "prefix", required = false)]
153 prefix: Option<String>,
154 },
155 ListPinnedVersions {},
157 ListCompactionGroup,
159 UpdateCompactionConfig {
161 #[clap(long, value_delimiter = ',')]
162 compaction_group_ids: Vec<CompactionGroupId>,
163 #[clap(long)]
164 max_bytes_for_level_base: Option<u64>,
165 #[clap(long)]
166 max_bytes_for_level_multiplier: Option<u64>,
167 #[clap(long)]
168 max_compaction_bytes: Option<u64>,
169 #[clap(long)]
170 sub_level_max_compaction_bytes: Option<u64>,
171 #[clap(long)]
172 level0_tier_compact_file_number: Option<u64>,
173 #[clap(long)]
174 target_file_size_base: Option<u64>,
175 #[clap(long)]
176 compaction_filter_mask: Option<u32>,
177 #[clap(long)]
178 max_sub_compaction: Option<u32>,
179 #[clap(long)]
180 level0_stop_write_threshold_sub_level_number: Option<u64>,
181 #[clap(long)]
182 level0_sub_level_compact_level_count: Option<u32>,
183 #[clap(long)]
184 max_space_reclaim_bytes: Option<u64>,
185 #[clap(long)]
186 level0_max_compact_file_number: Option<u64>,
187 #[clap(long)]
188 level0_overlapping_sub_level_compact_level_count: Option<u32>,
189 #[clap(long)]
190 enable_emergency_picker: Option<bool>,
191 #[clap(long)]
192 tombstone_reclaim_ratio: Option<u32>,
193 #[clap(long)]
194 compression_level: Option<u32>,
195 #[clap(long)]
196 compression_algorithm: Option<String>,
197 #[clap(long)]
198 max_l0_compact_level: Option<u32>,
199 #[clap(long)]
200 sst_allowed_trivial_move_min_size: Option<u64>,
201 #[clap(long)]
202 disable_auto_group_scheduling: Option<bool>,
203 #[clap(long)]
204 max_overlapping_level_size: Option<u64>,
205 #[clap(long)]
206 sst_allowed_trivial_move_max_count: Option<u32>,
207 #[clap(long)]
208 emergency_level0_sst_file_count: Option<u32>,
209 #[clap(long)]
210 emergency_level0_sub_level_partition: Option<u32>,
211 #[clap(long)]
212 level0_stop_write_threshold_max_sst_count: Option<u32>,
213 #[clap(long)]
214 level0_stop_write_threshold_max_size: Option<u64>,
215 #[clap(long)]
216 enable_optimize_l0_interval_selection: Option<bool>,
217 #[clap(long)]
218 vnode_aligned_level_size_threshold: Option<u64>,
219 #[clap(long)]
220 max_kv_count_for_xor16: Option<u64>,
221 },
222 SplitCompactionGroup {
224 #[clap(long)]
225 compaction_group_id: CompactionGroupId,
226 #[clap(long, value_delimiter = ',')]
227 table_ids: Vec<TableId>,
228 #[clap(long, default_value_t = 0)]
229 partition_vnode_count: u32,
230 },
231 PauseVersionCheckpoint,
233 ResumeVersionCheckpoint,
235 ReplayVersion,
237 ListCompactionStatus {
239 #[clap(short, long = "verbose", default_value_t = false)]
240 verbose: bool,
241 },
242 GetCompactionScore {
243 #[clap(long)]
244 compaction_group_id: CompactionGroupId,
245 },
246 ValidateVersion,
248 RebuildTableStats,
250 CancelCompactTask {
251 #[clap(short, long)]
252 task_id: u64,
253 },
254 PrintUserKeyInArchive {
255 #[clap(long, value_delimiter = ',')]
257 archive_ids: Vec<u64>,
258 #[clap(long)]
260 data_dir: String,
261 #[clap(long)]
263 user_key: String,
264 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
265 use_new_object_prefix_strategy: bool,
266 },
267 PrintVersionDeltaInArchive {
268 #[clap(long, value_delimiter = ',')]
270 archive_ids: Vec<u64>,
271 #[clap(long)]
273 data_dir: String,
274 #[clap(long)]
276 sst_id: HummockSstableId,
277 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
278 use_new_object_prefix_strategy: bool,
279 },
280 TieredCacheTracing {
281 #[clap(long)]
282 enable: bool,
283 #[clap(long)]
284 record_hybrid_insert_threshold_ms: Option<u32>,
285 #[clap(long)]
286 record_hybrid_get_threshold_ms: Option<u32>,
287 #[clap(long)]
288 record_hybrid_obtain_threshold_ms: Option<u32>,
289 #[clap(long)]
290 record_hybrid_remove_threshold_ms: Option<u32>,
291 #[clap(long)]
292 record_hybrid_fetch_threshold_ms: Option<u32>,
293 },
294 MergeCompactionGroup {
295 #[clap(long)]
296 left_group_id: CompactionGroupId,
297 #[clap(long)]
298 right_group_id: CompactionGroupId,
299 },
300 MigrateLegacyObject {
301 url: String,
302 source_dir: String,
303 target_dir: String,
304 #[clap(long, default_value = "100")]
305 concurrency: u32,
306 },
307 ResizeCache {
308 #[clap(long)]
309 meta_cache_capacity_mb: Option<u64>,
310 #[clap(long)]
311 data_cache_capacity_mb: Option<u64>,
312 },
313}
314
315#[derive(Subcommand)]
316enum TableCommands {
317 Scan {
319 mv_name: String,
321 data_dir: Option<String>,
323
324 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
325 use_new_object_prefix_strategy: bool,
326 },
327 ScanById {
329 table_id: TableId,
331 data_dir: Option<String>,
333 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
334 use_new_object_prefix_strategy: bool,
335 },
336 List,
338}
339
340#[derive(Subcommand, Debug)]
341enum ScaleCommands {
342 #[clap(verbatim_doc_comment)]
344 Cordon {
345 #[clap(
347 long,
348 required = true,
349 value_delimiter = ',',
350 value_name = "id or host,..."
351 )]
352 workers: Vec<String>,
353 },
354 Uncordon {
356 #[clap(
358 long,
359 required = true,
360 value_delimiter = ',',
361 value_name = "id or host,..."
362 )]
363 workers: Vec<String>,
364 },
365}
366
367#[derive(Subcommand)]
368#[allow(clippy::large_enum_variant)]
369enum MetaCommands {
370 Pause,
372 Resume,
374 #[clap(
376 group(
377 ArgGroup::new("resume_backfill_target")
378 .required(true)
379 .args(&["job_id", "fragment_id"])
380 )
381 )]
382 ResumeBackfill {
383 #[clap(long)]
384 job_id: Option<JobId>,
385 #[clap(long)]
386 fragment_id: Option<FragmentId>,
387 },
388 ClusterInfo,
390 SourceSplitInfo {
392 #[clap(long)]
393 ignore_id: bool,
394 },
395 #[clap(verbatim_doc_comment)]
412 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
413 Reschedule {
414 #[clap(long, requires = "revision")]
416 plan: Option<String>,
417 #[clap(long)]
419 revision: Option<u64>,
420 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
422 from: Option<String>,
423 #[clap(long, default_value = "false")]
425 dry_run: bool,
426 #[clap(long, default_value = "false")]
428 resolve_no_shuffle: bool,
429 },
430 BackupMeta {
432 #[clap(long)]
433 remarks: Option<String>,
434 },
435 RestoreMeta {
437 #[command(flatten)]
438 opts: RestoreOpts,
439 },
440 DeleteMetaSnapshots {
442 #[clap(long, value_delimiter = ',')]
443 snapshot_ids: Vec<u64>,
444 },
445
446 ListConnections,
448
449 ListServingFragmentMapping,
451
452 UnregisterWorkers {
454 #[clap(
456 long,
457 required = true,
458 value_delimiter = ',',
459 value_name = "worker_id or worker_host:worker_port, ..."
460 )]
461 workers: Vec<String>,
462
463 #[clap(short = 'y', long, default_value_t = false)]
465 yes: bool,
466
467 #[clap(long, default_value_t = false)]
469 ignore_not_found: bool,
470
471 #[clap(long, default_value_t = false)]
473 check_fragment_occupied: bool,
474 },
475
476 ValidateSource {
478 #[clap(long)]
481 props: String,
482 },
483
484 SetCdcTableBackfillParallelism {
485 #[clap(long, required = true)]
486 table_id: u32,
487 #[clap(long, required = true)]
488 parallelism: u32,
489 },
490
491 AlterSourcePropertiesSafe {
494 #[clap(long)]
496 source_id: u32,
497 #[clap(long)]
499 props: String,
500 #[clap(long, default_value_t = false)]
502 reset_splits: bool,
503 },
504
505 ResetSourceSplits {
508 #[clap(long)]
510 source_id: u32,
511 },
512
513 InjectSourceOffsets {
516 #[clap(long)]
518 source_id: u32,
519 #[clap(long)]
521 offsets: String,
522 },
523}
524
525#[derive(Subcommand, Clone, Debug)]
526pub enum AwaitTreeCommands {
527 Dump {
529 #[clap(short, long = "actor-traces-format")]
531 actor_traces_format: Option<String>,
532 },
533 Analyze {
535 #[clap(long = "path")]
539 path: Option<String>,
540 },
541 Transcribe {
543 #[clap(long = "path")]
545 path: String,
546 },
547}
548
549#[derive(Subcommand, Clone, Debug)]
550enum TestCommands {
551 Jvm,
553}
554
555#[derive(Subcommand, Clone, Debug)]
556enum ThrottleCommands {
557 Source(ThrottleCommandArgs),
558 Mv(ThrottleCommandArgs),
559 Sink(ThrottleCommandArgs),
560}
561
562#[derive(Clone, Debug, clap::ValueEnum)]
563pub enum ThrottleTypeArg {
564 Dml,
565 Backfill,
566 Source,
567 Sink,
568}
569
570#[derive(Clone, Debug, Args)]
571pub struct ThrottleCommandArgs {
572 #[clap(long, required = true)]
574 id: u32,
575 #[clap(long)]
577 rate: Option<u32>,
578 #[clap(long, value_enum, required = true)]
580 throttle_type: ThrottleTypeArg,
581}
582
583#[derive(Subcommand, Clone, Debug)]
584pub enum ProfileCommands {
585 Cpu {
587 #[clap(short, long = "sleep")]
589 sleep: u64,
590 #[clap(long = "worker-type", value_name = "TYPE")]
592 worker_types: Vec<ProfileWorkerType>,
593 },
594 Heap {
596 #[clap(long = "dir")]
598 dir: Option<String>,
599 #[clap(long = "worker-type", value_name = "TYPE")]
601 worker_types: Vec<ProfileWorkerType>,
602 },
603}
604
605pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
612 let context = CtlContext::default();
613
614 tokio::select! {
615 _ = shutdown.cancelled() => {
616 context.try_close().await;
618 }
619
620 result = start_fallible(opts, &context) => {
621 if let Err(e) = result {
622 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
624 }
625 }
626 }
627}
628
629pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
632 let result = start_impl(opts, context).await;
633 context.try_close().await;
634 result
635}
636
637#[expect(
638 clippy::large_stack_frames,
639 reason = "Pre-opt MIR sums locals across match arms in async dispatch; \
640 post-layout generator stores only one arm at a time (~13–16 KiB)."
641)]
642async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
643 match opts.command {
644 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
645 cmd_impl::compute::show_config(&host).await?
646 }
647 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
648 cmd_impl::hummock::disable_commit_epoch(context).await?
649 }
650 Commands::Hummock(HummockCommands::ListVersion {
651 verbose,
652 verbose_key_range,
653 }) => {
654 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
655 }
656 Commands::Hummock(HummockCommands::ListVersionDeltas {
657 start_id,
658 num_epochs,
659 }) => {
660 cmd_impl::hummock::list_version_deltas(context, start_id, num_epochs).await?;
661 }
662 Commands::Hummock(HummockCommands::ListKv {
663 epoch,
664 table_id,
665 data_dir,
666 use_new_object_prefix_strategy,
667 }) => {
668 cmd_impl::hummock::list_kv(
669 context,
670 epoch,
671 table_id,
672 data_dir,
673 use_new_object_prefix_strategy,
674 )
675 .await?;
676 }
677 Commands::Hummock(HummockCommands::SstDump(args)) => {
678 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
679 }
680 Commands::Hummock(HummockCommands::TriggerManualCompaction {
681 compaction_group_id,
682 table_id,
683 levels,
684 sst_ids,
685 }) => {
686 cmd_impl::hummock::trigger_manual_compaction(
687 context,
688 compaction_group_id,
689 table_id.into(),
690 levels,
691 sst_ids,
692 )
693 .await?
694 }
695 Commands::Hummock(HummockCommands::TriggerFullGc {
696 sst_retention_time_sec,
697 prefix,
698 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
699 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
700 list_pinned_versions(context).await?
701 }
702 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
703 cmd_impl::hummock::list_compaction_group(context).await?
704 }
705 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
706 compaction_group_ids,
707 max_bytes_for_level_base,
708 max_bytes_for_level_multiplier,
709 max_compaction_bytes,
710 sub_level_max_compaction_bytes,
711 level0_tier_compact_file_number,
712 target_file_size_base,
713 compaction_filter_mask,
714 max_sub_compaction,
715 level0_stop_write_threshold_sub_level_number,
716 level0_sub_level_compact_level_count,
717 max_space_reclaim_bytes,
718 level0_max_compact_file_number,
719 level0_overlapping_sub_level_compact_level_count,
720 enable_emergency_picker,
721 tombstone_reclaim_ratio,
722 compression_level,
723 compression_algorithm,
724 max_l0_compact_level,
725 sst_allowed_trivial_move_min_size,
726 disable_auto_group_scheduling,
727 max_overlapping_level_size,
728 sst_allowed_trivial_move_max_count,
729 emergency_level0_sst_file_count,
730 emergency_level0_sub_level_partition,
731 level0_stop_write_threshold_max_sst_count,
732 level0_stop_write_threshold_max_size,
733 enable_optimize_l0_interval_selection,
734 vnode_aligned_level_size_threshold,
735 max_kv_count_for_xor16,
736 }) => {
737 cmd_impl::hummock::update_compaction_config(
738 context,
739 compaction_group_ids,
740 build_compaction_config_vec(
741 max_bytes_for_level_base,
742 max_bytes_for_level_multiplier,
743 max_compaction_bytes,
744 sub_level_max_compaction_bytes,
745 level0_tier_compact_file_number,
746 target_file_size_base,
747 compaction_filter_mask,
748 max_sub_compaction,
749 level0_stop_write_threshold_sub_level_number,
750 level0_sub_level_compact_level_count,
751 max_space_reclaim_bytes,
752 level0_max_compact_file_number,
753 level0_overlapping_sub_level_compact_level_count,
754 enable_emergency_picker,
755 tombstone_reclaim_ratio,
756 if let Some(level) = compression_level {
757 assert!(compression_algorithm.is_some());
758 Some(CompressionAlgorithm {
759 level,
760 compression_algorithm: compression_algorithm.unwrap(),
761 })
762 } else {
763 None
764 },
765 max_l0_compact_level,
766 sst_allowed_trivial_move_min_size,
767 disable_auto_group_scheduling,
768 max_overlapping_level_size,
769 sst_allowed_trivial_move_max_count,
770 emergency_level0_sst_file_count,
771 emergency_level0_sub_level_partition,
772 level0_stop_write_threshold_max_sst_count,
773 level0_stop_write_threshold_max_size,
774 enable_optimize_l0_interval_selection,
775 vnode_aligned_level_size_threshold,
776 max_kv_count_for_xor16,
777 ),
778 )
779 .await?
780 }
781 Commands::Hummock(HummockCommands::SplitCompactionGroup {
782 compaction_group_id,
783 table_ids,
784 partition_vnode_count,
785 }) => {
786 cmd_impl::hummock::split_compaction_group(
787 context,
788 compaction_group_id,
789 &table_ids.into_iter().map_into().collect_vec(),
790 partition_vnode_count,
791 )
792 .await?;
793 }
794 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
795 cmd_impl::hummock::pause_version_checkpoint(context).await?;
796 }
797 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
798 cmd_impl::hummock::resume_version_checkpoint(context).await?;
799 }
800 Commands::Hummock(HummockCommands::ReplayVersion) => {
801 cmd_impl::hummock::replay_version(context).await?;
802 }
803 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
804 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
805 }
806 Commands::Hummock(HummockCommands::GetCompactionScore {
807 compaction_group_id,
808 }) => {
809 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
810 }
811 Commands::Hummock(HummockCommands::ValidateVersion) => {
812 cmd_impl::hummock::validate_version(context).await?;
813 }
814 Commands::Hummock(HummockCommands::RebuildTableStats) => {
815 cmd_impl::hummock::rebuild_table_stats(context).await?;
816 }
817 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
818 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
819 }
820 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
821 archive_ids,
822 data_dir,
823 sst_id,
824 use_new_object_prefix_strategy,
825 }) => {
826 cmd_impl::hummock::print_version_delta_in_archive(
827 context,
828 archive_ids.into_iter().map(HummockVersionId::new),
829 data_dir,
830 sst_id,
831 use_new_object_prefix_strategy,
832 )
833 .await?;
834 }
835 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
836 archive_ids,
837 data_dir,
838 user_key,
839 use_new_object_prefix_strategy,
840 }) => {
841 cmd_impl::hummock::print_user_key_in_archive(
842 context,
843 archive_ids.into_iter().map(HummockVersionId::new),
844 data_dir,
845 user_key,
846 use_new_object_prefix_strategy,
847 )
848 .await?;
849 }
850 Commands::Hummock(HummockCommands::TieredCacheTracing {
851 enable,
852 record_hybrid_insert_threshold_ms,
853 record_hybrid_get_threshold_ms,
854 record_hybrid_obtain_threshold_ms,
855 record_hybrid_remove_threshold_ms,
856 record_hybrid_fetch_threshold_ms,
857 }) => {
858 cmd_impl::hummock::tiered_cache_tracing(
859 context,
860 enable,
861 record_hybrid_insert_threshold_ms,
862 record_hybrid_get_threshold_ms,
863 record_hybrid_obtain_threshold_ms,
864 record_hybrid_remove_threshold_ms,
865 record_hybrid_fetch_threshold_ms,
866 )
867 .await?
868 }
869 Commands::Hummock(HummockCommands::MergeCompactionGroup {
870 left_group_id,
871 right_group_id,
872 }) => {
873 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
874 .await?
875 }
876
877 Commands::Hummock(HummockCommands::MigrateLegacyObject {
878 url,
879 source_dir,
880 target_dir,
881 concurrency,
882 }) => {
883 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
884 }
885 Commands::Hummock(HummockCommands::ResizeCache {
886 meta_cache_capacity_mb,
887 data_cache_capacity_mb,
888 }) => {
889 const MIB: u64 = 1024 * 1024;
890 cmd_impl::hummock::resize_cache(
891 context,
892 meta_cache_capacity_mb.map(|v| v * MIB),
893 data_cache_capacity_mb.map(|v| v * MIB),
894 )
895 .await?
896 }
897 Commands::Table(TableCommands::Scan {
898 mv_name,
899 data_dir,
900 use_new_object_prefix_strategy,
901 }) => {
902 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
903 .await?
904 }
905 Commands::Table(TableCommands::ScanById {
906 table_id,
907 data_dir,
908 use_new_object_prefix_strategy,
909 }) => {
910 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
911 .await?
912 }
913 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
914 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
915 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
916 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
917 Commands::Meta(MetaCommands::ResumeBackfill {
918 job_id,
919 fragment_id,
920 }) => cmd_impl::meta::resume_backfill(context, job_id, fragment_id).await?,
921 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
922 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
923 cmd_impl::meta::source_split_info(context, ignore_id).await?
924 }
925 Commands::Meta(MetaCommands::Reschedule {
926 from,
927 dry_run,
928 plan,
929 revision,
930 resolve_no_shuffle,
931 }) => {
932 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
933 .await?
934 }
935 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
936 cmd_impl::meta::backup_meta(context, remarks).await?
937 }
938 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
939 risingwave_meta::backup_restore::restore(opts).await?
940 }
941 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
942 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
943 }
944 Commands::Meta(MetaCommands::ListConnections) => {
945 cmd_impl::meta::list_connections(context).await?
946 }
947 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
948 cmd_impl::meta::list_serving_fragment_mappings(context).await?
949 }
950 Commands::Meta(MetaCommands::UnregisterWorkers {
951 workers,
952 yes,
953 ignore_not_found,
954 check_fragment_occupied,
955 }) => {
956 cmd_impl::meta::unregister_workers(
957 context,
958 workers,
959 yes,
960 ignore_not_found,
961 check_fragment_occupied,
962 )
963 .await?
964 }
965 Commands::Meta(MetaCommands::ValidateSource { props }) => {
966 cmd_impl::meta::validate_source(context, props).await?
967 }
968 Commands::AwaitTree(AwaitTreeCommands::Dump {
969 actor_traces_format,
970 }) => cmd_impl::await_tree::dump(context, actor_traces_format).await?,
971 Commands::AwaitTree(AwaitTreeCommands::Analyze { path }) => {
972 cmd_impl::await_tree::bottleneck_detect(context, path).await?
973 }
974 Commands::AwaitTree(AwaitTreeCommands::Transcribe { path }) => {
975 rw_diagnose_tools::await_tree::transcribe(path)?
976 }
977 Commands::Profile(ProfileCommands::Cpu {
978 sleep,
979 worker_types,
980 }) => cmd_impl::profile::cpu_profile(context, sleep, worker_types).await?,
981 Commands::Profile(ProfileCommands::Heap { dir, worker_types }) => {
982 cmd_impl::profile::heap_profile(context, dir, worker_types).await?
983 }
984 Commands::Scale(ScaleCommands::Cordon { workers }) => {
985 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
986 .await?
987 }
988 Commands::Scale(ScaleCommands::Uncordon { workers }) => {
989 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
990 .await?
991 }
992 Commands::Throttle(ThrottleCommands::Source(args)) => {
993 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
994 }
995 Commands::Throttle(ThrottleCommands::Mv(args)) => {
996 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
997 }
998 Commands::Throttle(ThrottleCommands::Sink(args)) => {
999 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
1000 }
1001 Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
1002 table_id,
1003 parallelism,
1004 }) => {
1005 set_cdc_table_backfill_parallelism(context, table_id, parallelism).await?;
1006 }
1007 Commands::Meta(MetaCommands::AlterSourcePropertiesSafe {
1008 source_id,
1009 props,
1010 reset_splits,
1011 }) => {
1012 cmd_impl::meta::alter_source_properties_safe(context, source_id, props, reset_splits)
1013 .await?;
1014 }
1015 Commands::Meta(MetaCommands::ResetSourceSplits { source_id }) => {
1016 cmd_impl::meta::reset_source_splits(context, source_id).await?;
1017 }
1018 Commands::Meta(MetaCommands::InjectSourceOffsets { source_id, offsets }) => {
1019 cmd_impl::meta::inject_source_offsets(context, source_id, offsets).await?;
1020 }
1021 Commands::Test(TestCommands::Jvm) => cmd_impl::test::test_jvm()?,
1022 }
1023 Ok(())
1024}