risingwave_compaction_test/
lib.rs1#![warn(clippy::dbg_macro)]
16#![warn(clippy::disallowed_methods)]
17#![warn(clippy::doc_markdown)]
18#![warn(clippy::explicit_into_iter_loop)]
19#![warn(clippy::explicit_iter_loop)]
20#![warn(clippy::inconsistent_struct_constructor)]
21#![warn(clippy::map_flatten)]
22#![warn(clippy::await_holding_lock)]
23#![deny(rustdoc::broken_intra_doc_links)]
24#![feature(register_tool)]
25#![register_tool(rw)]
26#![allow(rw::format_error)] mod compaction_test_runner;
29
30use clap::Parser;
31
32use crate::compaction_test_runner::compaction_test_main;
33
34#[derive(Parser, Debug)]
36pub struct CompactionTestOpts {
37 #[clap(long, default_value = "127.0.0.1:6660")]
38 pub host: String,
39
40 #[clap(long)]
42 pub client_address: Option<String>,
43
44 #[clap(short, long)]
46 pub state_store: String,
47
48 #[clap(long, default_value = "http://127.0.0.1:5790")]
49 pub meta_address: String,
50
51 #[clap(short, long, default_value = "0")]
53 pub table_id: u32,
54
55 #[clap(long, default_value = "false")]
57 pub ci_mode: bool,
58
59 #[clap(long, default_value = "10")]
61 pub num_trigger_frequency: u64,
62
63 #[clap(long, default_value = "5")]
65 pub num_trigger_rounds: u32,
66
67 #[clap(long, default_value = "")]
74 pub config_path: String,
75
76 #[clap(long, default_value = "src/config/ci-compaction-test-meta.toml")]
78 pub config_path_for_meta: String,
79}
80
81use std::future::Future;
82use std::pin::Pin;
83
84pub fn start(opts: CompactionTestOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
85 Box::pin(async move {
88 tracing::info!("Compaction test start with options {:?}", opts);
89 let prefix = opts.state_store.strip_prefix("hummock+");
90 match prefix {
91 Some(s) => {
92 assert!(
93 s.starts_with("s3://") || s.starts_with("minio://"),
94 "Only support S3 and MinIO object store"
95 );
96 }
97 None => {
98 panic!("Invalid state store");
99 }
100 }
101 let listen_addr = opts.host.parse().unwrap();
102 tracing::info!("Server Listening at {}", listen_addr);
103
104 let client_address = opts
105 .client_address
106 .as_ref()
107 .unwrap_or_else(|| {
108 tracing::warn!("Client address is not specified, defaulting to host address");
109 &opts.host
110 })
111 .parse()
112 .unwrap();
113
114 let ret = compaction_test_main(listen_addr, client_address, opts).await;
115 match ret {
116 Ok(_) => {
117 tracing::info!("Success");
118 }
119 Err(e) => {
120 tracing::error!("Failure {}", e);
121 }
122 }
123 })
124}