risingwave_ctl/cmd_impl/
bench.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16use std::ops::Bound::Unbounded;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU64;
19use std::time::Instant;
20
21use anyhow::{Result, anyhow};
22use clap::Subcommand;
23use futures::future::try_join_all;
24use futures::{Future, StreamExt, pin_mut};
25use risingwave_common::row::{self, OwnedRow};
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_storage::store::PrefetchOptions;
28use size::Size;
29use tokio::task::JoinHandle;
30
31use super::table::{get_table_catalog, make_state_table};
32use crate::CtlContext;
33use crate::common::HummockServiceOpts;
34
35#[derive(Subcommand)]
36pub enum BenchCommands {
37    /// benchmark scan state table
38    Scan {
39        /// name of the materialized view to operate on
40        mv_name: String,
41        /// number of futures doing scan
42        #[clap(long, default_value_t = 1)]
43        threads: usize,
44        data_dir: Option<String>,
45        #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
46        use_new_object_prefix_strategy: bool,
47    },
48}
49
50/// Spawn a tokio task with output of `anyhow::Result`, so that we can write dead loop in async
51/// functions.
52fn spawn_okk(fut: impl Future<Output = Result<()>> + Send + 'static) -> JoinHandle<Result<()>> {
53    tokio::spawn(fut)
54}
55
56#[derive(Clone, Debug)]
57pub struct InterestedMetrics {
58    object_store_read: u64,
59    object_store_write: u64,
60    next_cnt: u64,
61    iter_cnt: u64,
62    now: Instant,
63}
64
65impl InterestedMetrics {
66    pub fn report(&self, metrics: &InterestedMetrics) {
67        let elapsed = self.now.duration_since(metrics.now).as_secs_f64();
68        let read_rate = (self.object_store_read - metrics.object_store_read) as f64 / elapsed;
69        let write_rate = (self.object_store_write - metrics.object_store_write) as f64 / elapsed;
70        let next_rate = (self.next_cnt - metrics.next_cnt) as f64 / elapsed;
71        let iter_rate = (self.iter_cnt - metrics.iter_cnt) as f64 / elapsed;
72        println!(
73            "read_rate: {}/s\nwrite_rate:{}/s\nnext_rate:{}/s\niter_rate:{}/s\n",
74            Size::from_bytes(read_rate),
75            Size::from_bytes(write_rate),
76            next_rate,
77            iter_rate
78        );
79    }
80}
81
82pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
83    let meta = context.meta_client().await?;
84    let next_cnt = Arc::new(AtomicU64::new(0));
85    let iter_cnt = Arc::new(AtomicU64::new(0));
86    match cmd {
87        BenchCommands::Scan {
88            mv_name,
89            threads,
90            data_dir,
91            use_new_object_prefix_strategy,
92        } => {
93            let (hummock, metrics) = context
94                .hummock_store_with_metrics(HummockServiceOpts::from_env(
95                    data_dir,
96                    use_new_object_prefix_strategy,
97                )?)
98                .await?;
99            let table = get_table_catalog(meta.clone(), mv_name).await?;
100            let committed_epoch = hummock
101                .inner()
102                .get_pinned_version()
103                .table_committed_epoch(table.id)
104                .ok_or_else(|| anyhow!("table id {} not exist", table.id))?;
105            let mut handlers = vec![];
106            for i in 0..threads {
107                let table = table.clone();
108                let next_cnt = next_cnt.clone();
109                let iter_cnt = iter_cnt.clone();
110                let hummock = hummock.clone();
111                let handler = spawn_okk(async move {
112                    tracing::info!(thread = i, "starting scan");
113                    let state_table = {
114                        let mut tb = make_state_table(hummock, &table).await;
115                        tb.init_epoch(EpochPair::new(u64::MAX, committed_epoch))
116                            .await?;
117                        tb
118                    };
119                    loop {
120                        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
121                            &(Unbounded, Unbounded);
122                        let stream = state_table
123                            .iter_with_prefix(
124                                row::empty(),
125                                sub_range,
126                                PrefetchOptions::prefetch_for_large_range_scan(),
127                            )
128                            .await?;
129                        pin_mut!(stream);
130                        iter_cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
131                        while let Some(item) = stream.next().await {
132                            next_cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
133                            item?;
134                        }
135                    }
136                });
137                handlers.push(handler);
138            }
139            let handler = spawn_okk(async move {
140                tracing::info!("starting report metrics");
141                let mut last_collected_metrics = None;
142                loop {
143                    let collected_metrics = InterestedMetrics {
144                        object_store_read: metrics.object_store_metrics.read_bytes.get(),
145                        object_store_write: metrics.object_store_metrics.write_bytes.get(),
146                        next_cnt: next_cnt.load(std::sync::atomic::Ordering::Relaxed),
147                        iter_cnt: iter_cnt.load(std::sync::atomic::Ordering::Relaxed),
148                        now: Instant::now(),
149                    };
150                    if let Some(ref last_collected_metrics) = last_collected_metrics {
151                        collected_metrics.report(last_collected_metrics);
152                    }
153                    last_collected_metrics = Some(collected_metrics);
154                    tracing::info!("starting report metrics");
155                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
156                }
157            });
158            handlers.push(handler);
159            for result in try_join_all(handlers).await? {
160                result?;
161            }
162        }
163    }
164
165    Ok(())
166}