1#![feature(let_chains)]
16
17use anyhow::Result;
18use clap::{Args, Parser, Subcommand};
19use cmd_impl::bench::BenchCommands;
20use cmd_impl::hummock::SstDumpArgs;
21use risingwave_common::util::tokio_util::sync::CancellationToken;
22use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId};
23use risingwave_meta::backup_restore::RestoreOpts;
24use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
25use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
26use thiserror_ext::AsReport;
27
28use crate::cmd_impl::hummock::{
29 build_compaction_config_vec, list_pinned_versions, migrate_legacy_object,
30};
31use crate::cmd_impl::throttle::apply_throttle;
32use crate::common::CtlContext;
33
34pub mod cmd_impl;
35pub mod common;
36
37#[derive(Parser)]
43#[clap(version, about = "The DevOps tool that provides internal access to the RisingWave cluster", long_about = None)]
44#[clap(propagate_version = true)]
45#[clap(infer_subcommands = true)]
46pub struct CliOpts {
47 #[clap(subcommand)]
48 command: Commands,
49}
50
51#[derive(Subcommand)]
52#[clap(infer_subcommands = true)]
53enum Commands {
54 #[clap(subcommand)]
56 Compute(ComputeCommands),
57 #[clap(subcommand)]
59 Hummock(HummockCommands),
60 #[clap(subcommand)]
62 Table(TableCommands),
63 #[clap(subcommand)]
65 Meta(MetaCommands),
66 #[clap(subcommand)]
68 Scale(ScaleCommands),
69 #[clap(subcommand)]
71 Bench(BenchCommands),
72 #[clap(visible_alias("trace"))]
74 AwaitTree,
75 #[clap(subcommand)]
78 Profile(ProfileCommands),
79 #[clap(subcommand)]
80 Throttle(ThrottleCommands),
81}
82
83#[derive(Subcommand)]
84enum ComputeCommands {
85 ShowConfig { host: String },
87}
88
89#[allow(clippy::large_enum_variant)]
90#[derive(Subcommand)]
91enum HummockCommands {
92 ListVersion {
94 #[clap(short, long = "verbose", default_value_t = false)]
95 verbose: bool,
96
97 #[clap(long = "verbose_key_range", default_value_t = false)]
98 verbose_key_range: bool,
99 },
100
101 ListVersionDeltas {
103 #[clap(short, long = "start-version-delta-id", default_value_t = 0)]
104 start_id: u64,
105
106 #[clap(short, long = "num-epochs", default_value_t = 100)]
107 num_epochs: u32,
108 },
109 DisableCommitEpoch,
111 ListKv {
113 #[clap(short, long = "epoch", default_value_t = HummockEpoch::MAX)]
114 epoch: u64,
115
116 #[clap(short, long = "table-id")]
117 table_id: u32,
118
119 data_dir: Option<String>,
121
122 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
123 use_new_object_prefix_strategy: bool,
124 },
125 SstDump(SstDumpArgs),
126 TriggerManualCompaction {
128 #[clap(short, long = "compaction-group-id", default_value_t = 2)]
129 compaction_group_id: u64,
130
131 #[clap(short, long = "table-id", default_value_t = 0)]
132 table_id: u32,
133
134 #[clap(short, long = "level", default_value_t = 1)]
135 level: u32,
136
137 #[clap(short, long = "sst-ids", value_delimiter = ',')]
138 sst_ids: Vec<u64>,
139 },
140 TriggerFullGc {
143 #[clap(short, long = "sst_retention_time_sec", default_value_t = 259200)]
144 sst_retention_time_sec: u64,
145 #[clap(short, long = "prefix", required = false)]
146 prefix: Option<String>,
147 },
148 ListPinnedVersions {},
150 ListCompactionGroup,
152 UpdateCompactionConfig {
154 #[clap(long, value_delimiter = ',')]
155 compaction_group_ids: Vec<u64>,
156 #[clap(long)]
157 max_bytes_for_level_base: Option<u64>,
158 #[clap(long)]
159 max_bytes_for_level_multiplier: Option<u64>,
160 #[clap(long)]
161 max_compaction_bytes: Option<u64>,
162 #[clap(long)]
163 sub_level_max_compaction_bytes: Option<u64>,
164 #[clap(long)]
165 level0_tier_compact_file_number: Option<u64>,
166 #[clap(long)]
167 target_file_size_base: Option<u64>,
168 #[clap(long)]
169 compaction_filter_mask: Option<u32>,
170 #[clap(long)]
171 max_sub_compaction: Option<u32>,
172 #[clap(long)]
173 level0_stop_write_threshold_sub_level_number: Option<u64>,
174 #[clap(long)]
175 level0_sub_level_compact_level_count: Option<u32>,
176 #[clap(long)]
177 max_space_reclaim_bytes: Option<u64>,
178 #[clap(long)]
179 level0_max_compact_file_number: Option<u64>,
180 #[clap(long)]
181 level0_overlapping_sub_level_compact_level_count: Option<u32>,
182 #[clap(long)]
183 enable_emergency_picker: Option<bool>,
184 #[clap(long)]
185 tombstone_reclaim_ratio: Option<u32>,
186 #[clap(long)]
187 compression_level: Option<u32>,
188 #[clap(long)]
189 compression_algorithm: Option<String>,
190 #[clap(long)]
191 max_l0_compact_level: Option<u32>,
192 #[clap(long)]
193 sst_allowed_trivial_move_min_size: Option<u64>,
194 #[clap(long)]
195 disable_auto_group_scheduling: Option<bool>,
196 #[clap(long)]
197 max_overlapping_level_size: Option<u64>,
198 #[clap(long)]
199 sst_allowed_trivial_move_max_count: Option<u32>,
200 #[clap(long)]
201 emergency_level0_sst_file_count: Option<u32>,
202 #[clap(long)]
203 emergency_level0_sub_level_partition: Option<u32>,
204 #[clap(long)]
205 level0_stop_write_threshold_max_sst_count: Option<u32>,
206 #[clap(long)]
207 level0_stop_write_threshold_max_size: Option<u64>,
208 },
209 SplitCompactionGroup {
211 #[clap(long)]
212 compaction_group_id: u64,
213 #[clap(long, value_delimiter = ',')]
214 table_ids: Vec<u32>,
215 #[clap(long, default_value_t = 0)]
216 partition_vnode_count: u32,
217 },
218 PauseVersionCheckpoint,
220 ResumeVersionCheckpoint,
222 ReplayVersion,
224 ListCompactionStatus {
226 #[clap(short, long = "verbose", default_value_t = false)]
227 verbose: bool,
228 },
229 GetCompactionScore {
230 #[clap(long)]
231 compaction_group_id: u64,
232 },
233 ValidateVersion,
235 RebuildTableStats,
237 CancelCompactTask {
238 #[clap(short, long)]
239 task_id: u64,
240 },
241 PrintUserKeyInArchive {
242 #[clap(long, value_delimiter = ',')]
244 archive_ids: Vec<u64>,
245 #[clap(long)]
247 data_dir: String,
248 #[clap(long)]
250 user_key: String,
251 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
252 use_new_object_prefix_strategy: bool,
253 },
254 PrintVersionDeltaInArchive {
255 #[clap(long, value_delimiter = ',')]
257 archive_ids: Vec<u64>,
258 #[clap(long)]
260 data_dir: String,
261 #[clap(long)]
263 sst_id: u64,
264 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
265 use_new_object_prefix_strategy: bool,
266 },
267 TieredCacheTracing {
268 #[clap(long)]
269 enable: bool,
270 #[clap(long)]
271 record_hybrid_insert_threshold_ms: Option<u32>,
272 #[clap(long)]
273 record_hybrid_get_threshold_ms: Option<u32>,
274 #[clap(long)]
275 record_hybrid_obtain_threshold_ms: Option<u32>,
276 #[clap(long)]
277 record_hybrid_remove_threshold_ms: Option<u32>,
278 #[clap(long)]
279 record_hybrid_fetch_threshold_ms: Option<u32>,
280 },
281 MergeCompactionGroup {
282 #[clap(long)]
283 left_group_id: u64,
284 #[clap(long)]
285 right_group_id: u64,
286 },
287 MigrateLegacyObject {
288 url: String,
289 source_dir: String,
290 target_dir: String,
291 #[clap(long, default_value = "100")]
292 concurrency: u32,
293 },
294 ResizeCache {
295 #[clap(long)]
296 meta_cache_capacity_mb: Option<u64>,
297 #[clap(long)]
298 data_cache_capacity_mb: Option<u64>,
299 },
300}
301
302#[derive(Subcommand)]
303enum TableCommands {
304 Scan {
306 mv_name: String,
308 data_dir: Option<String>,
310
311 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
312 use_new_object_prefix_strategy: bool,
313 },
314 ScanById {
316 table_id: u32,
318 data_dir: Option<String>,
320 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
321 use_new_object_prefix_strategy: bool,
322 },
323 List,
325}
326
327#[derive(Subcommand, Debug)]
328enum ScaleCommands {
329 #[clap(verbatim_doc_comment)]
331 Cordon {
332 #[clap(
334 long,
335 required = true,
336 value_delimiter = ',',
337 value_name = "id or host,..."
338 )]
339 workers: Vec<String>,
340 },
341 Uncordon {
343 #[clap(
345 long,
346 required = true,
347 value_delimiter = ',',
348 value_name = "id or host,..."
349 )]
350 workers: Vec<String>,
351 },
352}
353
354#[derive(Subcommand)]
355#[allow(clippy::large_enum_variant)]
356enum MetaCommands {
357 Pause,
359 Resume,
361 ClusterInfo,
363 SourceSplitInfo {
365 #[clap(long)]
366 ignore_id: bool,
367 },
368 #[clap(verbatim_doc_comment)]
385 #[clap(group(clap::ArgGroup::new("input_group").required(true).args(&["plan", "from"])))]
386 Reschedule {
387 #[clap(long, requires = "revision")]
389 plan: Option<String>,
390 #[clap(long)]
392 revision: Option<u64>,
393 #[clap(long, conflicts_with = "revision", value_hint = clap::ValueHint::AnyPath)]
395 from: Option<String>,
396 #[clap(long, default_value = "false")]
398 dry_run: bool,
399 #[clap(long, default_value = "false")]
401 resolve_no_shuffle: bool,
402 },
403 BackupMeta {
405 #[clap(long)]
406 remarks: Option<String>,
407 },
408 RestoreMeta {
410 #[command(flatten)]
411 opts: RestoreOpts,
412 },
413 DeleteMetaSnapshots {
415 #[clap(long, value_delimiter = ',')]
416 snapshot_ids: Vec<u64>,
417 },
418
419 ListConnections,
421
422 ListServingFragmentMapping,
424
425 UnregisterWorkers {
427 #[clap(
429 long,
430 required = true,
431 value_delimiter = ',',
432 value_name = "worker_id or worker_host:worker_port, ..."
433 )]
434 workers: Vec<String>,
435
436 #[clap(short = 'y', long, default_value_t = false)]
438 yes: bool,
439
440 #[clap(long, default_value_t = false)]
442 ignore_not_found: bool,
443
444 #[clap(long, default_value_t = false)]
446 check_fragment_occupied: bool,
447 },
448
449 ValidateSource {
451 #[clap(long)]
454 props: String,
455 },
456
457 #[clap(verbatim_doc_comment)]
459 GraphCheck {
460 #[clap(long, required = true)]
462 endpoint: String,
463 },
464}
465
466#[derive(Subcommand, Clone, Debug)]
467enum ThrottleCommands {
468 Source(ThrottleCommandArgs),
469 Mv(ThrottleCommandArgs),
470}
471
472#[derive(Clone, Debug, Args)]
473pub struct ThrottleCommandArgs {
474 id: u32,
475 rate: Option<u32>,
476}
477
478#[derive(Subcommand, Clone, Debug)]
479pub enum ProfileCommands {
480 Cpu {
482 #[clap(short, long = "sleep")]
484 sleep: u64,
485 },
486 Heap {
488 #[clap(long = "dir")]
490 dir: Option<String>,
491 },
492}
493
494pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
501 let context = CtlContext::default();
502
503 tokio::select! {
504 _ = shutdown.cancelled() => {
505 context.try_close().await;
507 }
508
509 result = start_fallible(opts, &context) => {
510 if let Err(e) = result {
511 eprintln!("Error: {:#?}", e.as_report()); std::process::exit(1);
513 }
514 }
515 }
516}
517
518pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
521 let result = start_impl(opts, context).await;
522 context.try_close().await;
523 result
524}
525
526async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
527 match opts.command {
528 Commands::Compute(ComputeCommands::ShowConfig { host }) => {
529 cmd_impl::compute::show_config(&host).await?
530 }
531 Commands::Hummock(HummockCommands::DisableCommitEpoch) => {
532 cmd_impl::hummock::disable_commit_epoch(context).await?
533 }
534 Commands::Hummock(HummockCommands::ListVersion {
535 verbose,
536 verbose_key_range,
537 }) => {
538 cmd_impl::hummock::list_version(context, verbose, verbose_key_range).await?;
539 }
540 Commands::Hummock(HummockCommands::ListVersionDeltas {
541 start_id,
542 num_epochs,
543 }) => {
544 cmd_impl::hummock::list_version_deltas(
545 context,
546 HummockVersionId::new(start_id),
547 num_epochs,
548 )
549 .await?;
550 }
551 Commands::Hummock(HummockCommands::ListKv {
552 epoch,
553 table_id,
554 data_dir,
555 use_new_object_prefix_strategy,
556 }) => {
557 cmd_impl::hummock::list_kv(
558 context,
559 epoch,
560 table_id,
561 data_dir,
562 use_new_object_prefix_strategy,
563 )
564 .await?;
565 }
566 Commands::Hummock(HummockCommands::SstDump(args)) => {
567 cmd_impl::hummock::sst_dump(context, args).await.unwrap()
568 }
569 Commands::Hummock(HummockCommands::TriggerManualCompaction {
570 compaction_group_id,
571 table_id,
572 level,
573 sst_ids,
574 }) => {
575 cmd_impl::hummock::trigger_manual_compaction(
576 context,
577 compaction_group_id,
578 table_id,
579 level,
580 sst_ids,
581 )
582 .await?
583 }
584 Commands::Hummock(HummockCommands::TriggerFullGc {
585 sst_retention_time_sec,
586 prefix,
587 }) => cmd_impl::hummock::trigger_full_gc(context, sst_retention_time_sec, prefix).await?,
588 Commands::Hummock(HummockCommands::ListPinnedVersions {}) => {
589 list_pinned_versions(context).await?
590 }
591 Commands::Hummock(HummockCommands::ListCompactionGroup) => {
592 cmd_impl::hummock::list_compaction_group(context).await?
593 }
594 Commands::Hummock(HummockCommands::UpdateCompactionConfig {
595 compaction_group_ids,
596 max_bytes_for_level_base,
597 max_bytes_for_level_multiplier,
598 max_compaction_bytes,
599 sub_level_max_compaction_bytes,
600 level0_tier_compact_file_number,
601 target_file_size_base,
602 compaction_filter_mask,
603 max_sub_compaction,
604 level0_stop_write_threshold_sub_level_number,
605 level0_sub_level_compact_level_count,
606 max_space_reclaim_bytes,
607 level0_max_compact_file_number,
608 level0_overlapping_sub_level_compact_level_count,
609 enable_emergency_picker,
610 tombstone_reclaim_ratio,
611 compression_level,
612 compression_algorithm,
613 max_l0_compact_level,
614 sst_allowed_trivial_move_min_size,
615 disable_auto_group_scheduling,
616 max_overlapping_level_size,
617 sst_allowed_trivial_move_max_count,
618 emergency_level0_sst_file_count,
619 emergency_level0_sub_level_partition,
620 level0_stop_write_threshold_max_sst_count,
621 level0_stop_write_threshold_max_size,
622 }) => {
623 cmd_impl::hummock::update_compaction_config(
624 context,
625 compaction_group_ids,
626 build_compaction_config_vec(
627 max_bytes_for_level_base,
628 max_bytes_for_level_multiplier,
629 max_compaction_bytes,
630 sub_level_max_compaction_bytes,
631 level0_tier_compact_file_number,
632 target_file_size_base,
633 compaction_filter_mask,
634 max_sub_compaction,
635 level0_stop_write_threshold_sub_level_number,
636 level0_sub_level_compact_level_count,
637 max_space_reclaim_bytes,
638 level0_max_compact_file_number,
639 level0_overlapping_sub_level_compact_level_count,
640 enable_emergency_picker,
641 tombstone_reclaim_ratio,
642 if let Some(level) = compression_level {
643 assert!(compression_algorithm.is_some());
644 Some(CompressionAlgorithm {
645 level,
646 compression_algorithm: compression_algorithm.unwrap(),
647 })
648 } else {
649 None
650 },
651 max_l0_compact_level,
652 sst_allowed_trivial_move_min_size,
653 disable_auto_group_scheduling,
654 max_overlapping_level_size,
655 sst_allowed_trivial_move_max_count,
656 emergency_level0_sst_file_count,
657 emergency_level0_sub_level_partition,
658 level0_stop_write_threshold_max_sst_count,
659 level0_stop_write_threshold_max_size,
660 ),
661 )
662 .await?
663 }
664 Commands::Hummock(HummockCommands::SplitCompactionGroup {
665 compaction_group_id,
666 table_ids,
667 partition_vnode_count,
668 }) => {
669 cmd_impl::hummock::split_compaction_group(
670 context,
671 compaction_group_id,
672 &table_ids,
673 partition_vnode_count,
674 )
675 .await?;
676 }
677 Commands::Hummock(HummockCommands::PauseVersionCheckpoint) => {
678 cmd_impl::hummock::pause_version_checkpoint(context).await?;
679 }
680 Commands::Hummock(HummockCommands::ResumeVersionCheckpoint) => {
681 cmd_impl::hummock::resume_version_checkpoint(context).await?;
682 }
683 Commands::Hummock(HummockCommands::ReplayVersion) => {
684 cmd_impl::hummock::replay_version(context).await?;
685 }
686 Commands::Hummock(HummockCommands::ListCompactionStatus { verbose }) => {
687 cmd_impl::hummock::list_compaction_status(context, verbose).await?;
688 }
689 Commands::Hummock(HummockCommands::GetCompactionScore {
690 compaction_group_id,
691 }) => {
692 cmd_impl::hummock::get_compaction_score(context, compaction_group_id).await?;
693 }
694 Commands::Hummock(HummockCommands::ValidateVersion) => {
695 cmd_impl::hummock::validate_version(context).await?;
696 }
697 Commands::Hummock(HummockCommands::RebuildTableStats) => {
698 cmd_impl::hummock::rebuild_table_stats(context).await?;
699 }
700 Commands::Hummock(HummockCommands::CancelCompactTask { task_id }) => {
701 cmd_impl::hummock::cancel_compact_task(context, task_id).await?;
702 }
703 Commands::Hummock(HummockCommands::PrintVersionDeltaInArchive {
704 archive_ids,
705 data_dir,
706 sst_id,
707 use_new_object_prefix_strategy,
708 }) => {
709 cmd_impl::hummock::print_version_delta_in_archive(
710 context,
711 archive_ids.into_iter().map(HummockVersionId::new),
712 data_dir,
713 sst_id,
714 use_new_object_prefix_strategy,
715 )
716 .await?;
717 }
718 Commands::Hummock(HummockCommands::PrintUserKeyInArchive {
719 archive_ids,
720 data_dir,
721 user_key,
722 use_new_object_prefix_strategy,
723 }) => {
724 cmd_impl::hummock::print_user_key_in_archive(
725 context,
726 archive_ids.into_iter().map(HummockVersionId::new),
727 data_dir,
728 user_key,
729 use_new_object_prefix_strategy,
730 )
731 .await?;
732 }
733 Commands::Hummock(HummockCommands::TieredCacheTracing {
734 enable,
735 record_hybrid_insert_threshold_ms,
736 record_hybrid_get_threshold_ms,
737 record_hybrid_obtain_threshold_ms,
738 record_hybrid_remove_threshold_ms,
739 record_hybrid_fetch_threshold_ms,
740 }) => {
741 cmd_impl::hummock::tiered_cache_tracing(
742 context,
743 enable,
744 record_hybrid_insert_threshold_ms,
745 record_hybrid_get_threshold_ms,
746 record_hybrid_obtain_threshold_ms,
747 record_hybrid_remove_threshold_ms,
748 record_hybrid_fetch_threshold_ms,
749 )
750 .await?
751 }
752 Commands::Hummock(HummockCommands::MergeCompactionGroup {
753 left_group_id,
754 right_group_id,
755 }) => {
756 cmd_impl::hummock::merge_compaction_group(context, left_group_id, right_group_id)
757 .await?
758 }
759
760 Commands::Hummock(HummockCommands::MigrateLegacyObject {
761 url,
762 source_dir,
763 target_dir,
764 concurrency,
765 }) => {
766 migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
767 }
768 Commands::Hummock(HummockCommands::ResizeCache {
769 meta_cache_capacity_mb,
770 data_cache_capacity_mb,
771 }) => {
772 const MIB: u64 = 1024 * 1024;
773 cmd_impl::hummock::resize_cache(
774 context,
775 meta_cache_capacity_mb.map(|v| v * MIB),
776 data_cache_capacity_mb.map(|v| v * MIB),
777 )
778 .await?
779 }
780 Commands::Table(TableCommands::Scan {
781 mv_name,
782 data_dir,
783 use_new_object_prefix_strategy,
784 }) => {
785 cmd_impl::table::scan(context, mv_name, data_dir, use_new_object_prefix_strategy)
786 .await?
787 }
788 Commands::Table(TableCommands::ScanById {
789 table_id,
790 data_dir,
791 use_new_object_prefix_strategy,
792 }) => {
793 cmd_impl::table::scan_id(context, table_id, data_dir, use_new_object_prefix_strategy)
794 .await?
795 }
796 Commands::Table(TableCommands::List) => cmd_impl::table::list(context).await?,
797 Commands::Bench(cmd) => cmd_impl::bench::do_bench(context, cmd).await?,
798 Commands::Meta(MetaCommands::Pause) => cmd_impl::meta::pause(context).await?,
799 Commands::Meta(MetaCommands::Resume) => cmd_impl::meta::resume(context).await?,
800 Commands::Meta(MetaCommands::ClusterInfo) => cmd_impl::meta::cluster_info(context).await?,
801 Commands::Meta(MetaCommands::SourceSplitInfo { ignore_id }) => {
802 cmd_impl::meta::source_split_info(context, ignore_id).await?
803 }
804 Commands::Meta(MetaCommands::Reschedule {
805 from,
806 dry_run,
807 plan,
808 revision,
809 resolve_no_shuffle,
810 }) => {
811 cmd_impl::meta::reschedule(context, plan, revision, from, dry_run, resolve_no_shuffle)
812 .await?
813 }
814 Commands::Meta(MetaCommands::BackupMeta { remarks }) => {
815 cmd_impl::meta::backup_meta(context, remarks).await?
816 }
817 Commands::Meta(MetaCommands::RestoreMeta { opts }) => {
818 risingwave_meta::backup_restore::restore(opts).await?
819 }
820 Commands::Meta(MetaCommands::DeleteMetaSnapshots { snapshot_ids }) => {
821 cmd_impl::meta::delete_meta_snapshots(context, &snapshot_ids).await?
822 }
823 Commands::Meta(MetaCommands::ListConnections) => {
824 cmd_impl::meta::list_connections(context).await?
825 }
826 Commands::Meta(MetaCommands::ListServingFragmentMapping) => {
827 cmd_impl::meta::list_serving_fragment_mappings(context).await?
828 }
829 Commands::Meta(MetaCommands::UnregisterWorkers {
830 workers,
831 yes,
832 ignore_not_found,
833 check_fragment_occupied,
834 }) => {
835 cmd_impl::meta::unregister_workers(
836 context,
837 workers,
838 yes,
839 ignore_not_found,
840 check_fragment_occupied,
841 )
842 .await?
843 }
844 Commands::Meta(MetaCommands::ValidateSource { props }) => {
845 cmd_impl::meta::validate_source(context, props).await?
846 }
847 Commands::Meta(MetaCommands::GraphCheck { endpoint }) => {
848 cmd_impl::meta::graph_check(endpoint).await?
849 }
850 Commands::AwaitTree => cmd_impl::await_tree::dump(context).await?,
851 Commands::Profile(ProfileCommands::Cpu { sleep }) => {
852 cmd_impl::profile::cpu_profile(context, sleep).await?
853 }
854 Commands::Profile(ProfileCommands::Heap { dir }) => {
855 cmd_impl::profile::heap_profile(context, dir).await?
856 }
857 Commands::Scale(ScaleCommands::Cordon { workers }) => {
858 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Unschedulable)
859 .await?
860 }
861 Commands::Scale(ScaleCommands::Uncordon { workers }) => {
862 cmd_impl::scale::update_schedulability(context, workers, Schedulability::Schedulable)
863 .await?
864 }
865 Commands::Throttle(ThrottleCommands::Source(args)) => {
866 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
867 }
868 Commands::Throttle(ThrottleCommands::Mv(args)) => {
869 apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
870 }
871 }
872 Ok(())
873}