risingwave_compaction_test/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![warn(clippy::dbg_macro)]
16#![warn(clippy::disallowed_methods)]
17#![warn(clippy::doc_markdown)]
18#![warn(clippy::explicit_into_iter_loop)]
19#![warn(clippy::explicit_iter_loop)]
20#![warn(clippy::inconsistent_struct_constructor)]
21#![warn(clippy::map_flatten)]
22#![warn(clippy::await_holding_lock)]
23#![deny(rustdoc::broken_intra_doc_links)]
24#![feature(register_tool)]
25#![register_tool(rw)]
26#![allow(rw::format_error)] // test code
27
28mod compaction_test_runner;
29
30use clap::Parser;
31
32use crate::compaction_test_runner::compaction_test_main;
33
34/// Command-line arguments for compute-node.
35#[derive(Parser, Debug)]
36pub struct CompactionTestOpts {
37    #[clap(long, default_value = "127.0.0.1:6660")]
38    pub host: String,
39
40    // Optional, we will use listen_addr if not specified.
41    #[clap(long)]
42    pub client_address: Option<String>,
43
44    /// The state store string e.g. `hummock+s3://test-bucket`
45    #[clap(short, long)]
46    pub state_store: String,
47
48    #[clap(long, default_value = "http://127.0.0.1:5790")]
49    pub meta_address: String,
50
51    /// The data of this table will be checked after compaction
52    #[clap(short, long, default_value = "0")]
53    pub table_id: u32,
54
55    /// Whether runs in the CI environment
56    #[clap(long, default_value = "false")]
57    pub ci_mode: bool,
58
59    /// The number of version deltas needed to be replayed before triggering a compaction
60    #[clap(long, default_value = "10")]
61    pub num_trigger_frequency: u64,
62
63    /// The number of rounds to trigger compactions
64    #[clap(long, default_value = "5")]
65    pub num_trigger_rounds: u32,
66
67    /// The path of `risingwave.toml` configuration file.
68    ///
69    /// If empty, default configuration values will be used.
70    ///
71    /// Note that internal system parameters should be defined in the configuration file at
72    /// [`risingwave_common::config`] instead of command line arguments.
73    #[clap(long, default_value = "")]
74    pub config_path: String,
75
76    /// The path to the configuration file used for the embedded meta node.
77    #[clap(long, default_value = "src/config/ci-compaction-test-meta.toml")]
78    pub config_path_for_meta: String,
79}
80
81use std::future::Future;
82use std::pin::Pin;
83
84pub fn start(opts: CompactionTestOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
85    // WARNING: don't change the function signature. Making it `async fn` will cause
86    // slow compile in release mode.
87    Box::pin(async move {
88        tracing::info!("Compaction test start with options {:?}", opts);
89        let prefix = opts.state_store.strip_prefix("hummock+");
90        match prefix {
91            Some(s) => {
92                assert!(
93                    s.starts_with("s3://") || s.starts_with("minio://"),
94                    "Only support S3 and MinIO object store"
95                );
96            }
97            None => {
98                panic!("Invalid state store");
99            }
100        }
101        let listen_addr = opts.host.parse().unwrap();
102        tracing::info!("Server Listening at {}", listen_addr);
103
104        let client_address = opts
105            .client_address
106            .as_ref()
107            .unwrap_or_else(|| {
108                tracing::warn!("Client address is not specified, defaulting to host address");
109                &opts.host
110            })
111            .parse()
112            .unwrap();
113
114        let ret = compaction_test_main(listen_addr, client_address, opts).await;
115        match ret {
116            Ok(_) => {
117                tracing::info!("Success");
118            }
119            Err(e) => {
120                tracing::error!("Failure {}", e);
121            }
122        }
123    })
124}