risingwave_ctl/cmd_impl/
bench.rs1use std::ops::Bound;
16use std::ops::Bound::Unbounded;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU64;
19use std::time::Instant;
20
21use anyhow::{Result, anyhow};
22use clap::Subcommand;
23use futures::future::try_join_all;
24use futures::{Future, StreamExt, pin_mut};
25use risingwave_common::row::{self, OwnedRow};
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_storage::store::PrefetchOptions;
28use size::Size;
29use tokio::task::JoinHandle;
30
31use super::table::{get_table_catalog, make_state_table};
32use crate::CtlContext;
33use crate::common::HummockServiceOpts;
34
35#[derive(Subcommand)]
36pub enum BenchCommands {
37 Scan {
39 mv_name: String,
41 #[clap(long, default_value_t = 1)]
43 threads: usize,
44 data_dir: Option<String>,
45 #[clap(short, long = "use-new-object-prefix-strategy", default_value = "true")]
46 use_new_object_prefix_strategy: bool,
47 },
48}
49
50fn spawn_okk(fut: impl Future<Output = Result<()>> + Send + 'static) -> JoinHandle<Result<()>> {
53 tokio::spawn(fut)
54}
55
56#[derive(Clone, Debug)]
57pub struct InterestedMetrics {
58 object_store_read: u64,
59 object_store_write: u64,
60 next_cnt: u64,
61 iter_cnt: u64,
62 now: Instant,
63}
64
65impl InterestedMetrics {
66 pub fn report(&self, metrics: &InterestedMetrics) {
67 let elapsed = self.now.duration_since(metrics.now).as_secs_f64();
68 let read_rate = (self.object_store_read - metrics.object_store_read) as f64 / elapsed;
69 let write_rate = (self.object_store_write - metrics.object_store_write) as f64 / elapsed;
70 let next_rate = (self.next_cnt - metrics.next_cnt) as f64 / elapsed;
71 let iter_rate = (self.iter_cnt - metrics.iter_cnt) as f64 / elapsed;
72 println!(
73 "read_rate: {}/s\nwrite_rate:{}/s\nnext_rate:{}/s\niter_rate:{}/s\n",
74 Size::from_bytes(read_rate),
75 Size::from_bytes(write_rate),
76 next_rate,
77 iter_rate
78 );
79 }
80}
81
82pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
83 let meta = context.meta_client().await?;
84 let next_cnt = Arc::new(AtomicU64::new(0));
85 let iter_cnt = Arc::new(AtomicU64::new(0));
86 match cmd {
87 BenchCommands::Scan {
88 mv_name,
89 threads,
90 data_dir,
91 use_new_object_prefix_strategy,
92 } => {
93 let (hummock, metrics) = context
94 .hummock_store_with_metrics(HummockServiceOpts::from_env(
95 data_dir,
96 use_new_object_prefix_strategy,
97 )?)
98 .await?;
99 let table = get_table_catalog(meta.clone(), mv_name).await?;
100 let committed_epoch = hummock
101 .inner()
102 .get_pinned_version()
103 .table_committed_epoch(table.id)
104 .ok_or_else(|| anyhow!("table id {} not exist", table.id))?;
105 let mut handlers = vec![];
106 for i in 0..threads {
107 let table = table.clone();
108 let next_cnt = next_cnt.clone();
109 let iter_cnt = iter_cnt.clone();
110 let hummock = hummock.clone();
111 let handler = spawn_okk(async move {
112 tracing::info!(thread = i, "starting scan");
113 let state_table = {
114 let mut tb = make_state_table(hummock, &table).await;
115 tb.init_epoch(EpochPair::new(u64::MAX, committed_epoch))
116 .await?;
117 tb
118 };
119 loop {
120 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
121 &(Unbounded, Unbounded);
122 let stream = state_table
123 .iter_with_prefix(
124 row::empty(),
125 sub_range,
126 PrefetchOptions::prefetch_for_large_range_scan(),
127 )
128 .await?;
129 pin_mut!(stream);
130 iter_cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
131 while let Some(item) = stream.next().await {
132 next_cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
133 item?;
134 }
135 }
136 });
137 handlers.push(handler);
138 }
139 let handler = spawn_okk(async move {
140 tracing::info!("starting report metrics");
141 let mut last_collected_metrics = None;
142 loop {
143 let collected_metrics = InterestedMetrics {
144 object_store_read: metrics.object_store_metrics.read_bytes.get(),
145 object_store_write: metrics.object_store_metrics.write_bytes.get(),
146 next_cnt: next_cnt.load(std::sync::atomic::Ordering::Relaxed),
147 iter_cnt: iter_cnt.load(std::sync::atomic::Ordering::Relaxed),
148 now: Instant::now(),
149 };
150 if let Some(ref last_collected_metrics) = last_collected_metrics {
151 collected_metrics.report(last_collected_metrics);
152 }
153 last_collected_metrics = Some(collected_metrics);
154 tracing::info!("starting report metrics");
155 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
156 }
157 });
158 handlers.push(handler);
159 for result in try_join_all(handlers).await? {
160 result?;
161 }
162 }
163 }
164
165 Ok(())
166}