1#![feature(coroutines)]
16#![feature(stmt_expr_attributes)]
17#![feature(proc_macro_hygiene)]
18#![feature(register_tool)]
19#![register_tool(rw)]
20#![allow(rw::format_error)] #[macro_use]
23mod replay_impl;
24
25use std::fs::File;
26use std::io::BufReader;
27use std::path::Path;
28use std::sync::Arc;
29
30use clap::Parser;
31use foyer::{CacheBuilder, HybridCacheBuilder};
32use replay_impl::{GlobalReplayImpl, get_replay_notification_client};
33use risingwave_common::config::{
34 NoOverride, ObjectStoreConfig, extract_storage_memory_config, load_config,
35};
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_hummock_trace::{
38 GlobalReplay, HummockReplay, Operation, Record, Result, TraceReader, TraceReaderImpl, USE_TRACE,
39};
40use risingwave_meta::hummock::MockHummockMetaClient;
41use risingwave_meta::hummock::test_utils::setup_compute_env;
42use risingwave_object_store::object::build_remote_object_store;
43use risingwave_storage::compaction_catalog_manager::{
44 CompactionCatalogManager, FakeRemoteTableAccessor,
45};
46use risingwave_storage::hummock::none::NoneRecentFilter;
47use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
48use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
49use risingwave_storage::opts::StorageOpts;
50
51const SST_OFFSET: u64 = 2147383647000;
53
54#[derive(Parser, Debug)]
55struct Args {
56 #[arg(short, long)]
57 path: String,
58
59 #[arg(short, long, default_value = "src/config/hummock-trace.toml")]
61 config: String,
62
63 #[arg(short, long)]
64 object_storage: String,
65
66 #[arg(short, long)]
67 use_new_object_prefix_strategy: bool,
68}
69
70#[tokio::main(flavor = "multi_thread")]
71async fn main() {
72 let args = Args::parse();
73 unsafe { std::env::set_var(USE_TRACE, "false") };
75 run_replay(args).await.unwrap();
76}
77
78async fn run_replay(args: Args) -> Result<()> {
79 let path = Path::new(&args.path);
80 let f = BufReader::new(File::open(path)?);
81 let mut reader = TraceReaderImpl::new_bincode(f)?;
82 let r: Record = reader.read().unwrap();
84 let replay_interface = create_replay_hummock(r, &args).await.unwrap();
85 let mut replayer = HummockReplay::new(reader, replay_interface);
86 replayer.run().await.unwrap();
87
88 Ok(())
89}
90
91async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalReplay + use<>> {
92 let config = load_config(&args.config, NoOverride);
93 let storage_memory_config = extract_storage_memory_config(&config);
94 let system_params_reader =
95 SystemParamsReader::from(config.system.clone().into_init_system_params());
96
97 let storage_opts = Arc::new(StorageOpts::from((
98 &config,
99 &system_params_reader,
100 &storage_memory_config,
101 )));
102
103 let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused());
104 let object_store_metrics = Arc::new(ObjectStoreMetrics::unused());
105
106 let compactor_metrics = Arc::new(CompactorMetrics::unused());
107
108 let object_store = build_remote_object_store(
109 &args.object_storage,
110 object_store_metrics,
111 "Hummock",
112 Arc::new(ObjectStoreConfig::default()),
113 )
114 .await;
115
116 let meta_cache = HybridCacheBuilder::new()
117 .memory(storage_opts.meta_cache_capacity_mb * (1 << 20))
118 .with_shards(storage_opts.meta_cache_shard_num)
119 .storage()
120 .build()
121 .await
122 .unwrap();
123 let block_cache = HybridCacheBuilder::new()
124 .memory(storage_opts.block_cache_capacity_mb * (1 << 20))
125 .with_shards(storage_opts.block_cache_shard_num)
126 .storage()
127 .build()
128 .await
129 .unwrap();
130
131 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
132 store: Arc::new(object_store),
133 path: storage_opts.data_directory.clone(),
134 prefetch_buffer_capacity: storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
135 max_prefetch_block_number: storage_opts.max_prefetch_block_number,
136 recent_filter: Arc::new(NoneRecentFilter::default().into()),
137 state_store_metrics: state_store_metrics.clone(),
138 use_new_object_prefix_strategy: args.use_new_object_prefix_strategy,
139 skip_bloom_filter_in_serde: storage_opts.sst_skip_bloom_filter_in_serde,
140 meta_cache,
141 block_cache,
142 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
143 vector_block_cache: CacheBuilder::new(1 << 10).build(),
144 }));
145
146 let (hummock_meta_client, notification_client, notifier) = {
147 let (env, hummock_manager_ref, cluster_controller_ref, worker_id) =
148 setup_compute_env(8080).await;
149 let notifier = env.notification_manager_ref();
150
151 let worker_node = cluster_controller_ref
152 .get_worker_by_id(worker_id)
153 .await
154 .unwrap()
155 .unwrap();
156
157 let notification_client = match r.operation {
158 Operation::MetaMessage(resp) => get_replay_notification_client(env, worker_node, resp),
159 _ => panic!("unexpected operation, found {:?}", r.operation),
160 };
161
162 (
163 Arc::new(MockHummockMetaClient::with_sst_offset(
164 hummock_manager_ref,
165 worker_id as _,
166 SST_OFFSET,
167 )),
168 notification_client,
169 notifier,
170 )
171 };
172
173 let storage = HummockStorage::new(
174 storage_opts,
175 sstable_store,
176 hummock_meta_client.clone(),
177 notification_client,
178 Arc::new(CompactionCatalogManager::new(Box::new(
179 FakeRemoteTableAccessor {},
180 ))),
181 state_store_metrics,
182 compactor_metrics,
183 None,
184 )
185 .await
186 .expect("fail to create a HummockStorage object");
187 let replay_interface = GlobalReplayImpl::new(storage, notifier);
188
189 Ok(replay_interface)
190}