1#![feature(coroutines)]
16#![feature(stmt_expr_attributes)]
17#![feature(proc_macro_hygiene)]
18#![feature(register_tool)]
19#![register_tool(rw)]
20#![allow(rw::format_error)] #[macro_use]
23mod replay_impl;
24
25use std::fs::File;
26use std::io::BufReader;
27use std::path::Path;
28use std::sync::Arc;
29
30use clap::Parser;
31use foyer::{CacheBuilder, HybridCacheBuilder};
32use replay_impl::{GlobalReplayImpl, get_replay_notification_client};
33use risingwave_common::config::{
34 NoOverride, ObjectStoreConfig, extract_storage_memory_config, load_config,
35};
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_hummock_trace::{
38 GlobalReplay, HummockReplay, Operation, Record, Result, TraceReader, TraceReaderImpl, USE_TRACE,
39};
40use risingwave_meta::hummock::MockHummockMetaClient;
41use risingwave_meta::hummock::test_utils::setup_compute_env;
42use risingwave_object_store::object::build_remote_object_store;
43use risingwave_storage::compaction_catalog_manager::{
44 CompactionCatalogManager, FakeRemoteTableAccessor,
45};
46use risingwave_storage::hummock::none::NoneRecentFilter;
47use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
48use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
49use risingwave_storage::opts::StorageOpts;
50
51const SST_OFFSET: u64 = 2147383647000;
53
54#[derive(Parser, Debug)]
55struct Args {
56 #[arg(short, long)]
57 path: String,
58
59 #[arg(short, long, default_value = "src/config/hummock-trace.toml")]
61 config: String,
62
63 #[arg(short, long)]
64 object_storage: String,
65
66 #[arg(short, long)]
67 use_new_object_prefix_strategy: bool,
68}
69
70#[tokio::main(flavor = "multi_thread")]
71async fn main() {
72 let args = Args::parse();
73 unsafe { std::env::set_var(USE_TRACE, "false") };
75 run_replay(args).await.unwrap();
76}
77
78async fn run_replay(args: Args) -> Result<()> {
79 let path = Path::new(&args.path);
80 let f = BufReader::new(File::open(path)?);
81 let mut reader = TraceReaderImpl::new_bincode(f)?;
82 let r: Record = reader.read().unwrap();
84 let replay_interface = create_replay_hummock(r, &args).await.unwrap();
85 let mut replayer = HummockReplay::new(reader, replay_interface);
86 replayer.run().await.unwrap();
87
88 Ok(())
89}
90
91async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalReplay + use<>> {
92 let config = load_config(&args.config, NoOverride);
93 let storage_memory_config = extract_storage_memory_config(&config);
94 let system_params_reader =
95 SystemParamsReader::from(config.system.clone().into_init_system_params());
96
97 let storage_opts = Arc::new(StorageOpts::from((
98 &config,
99 &system_params_reader,
100 &storage_memory_config,
101 )));
102
103 let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused());
104 let object_store_metrics = Arc::new(ObjectStoreMetrics::unused());
105
106 let compactor_metrics = Arc::new(CompactorMetrics::unused());
107
108 let object_store = build_remote_object_store(
109 &args.object_storage,
110 object_store_metrics,
111 "Hummock",
112 Arc::new(ObjectStoreConfig::default()),
113 )
114 .await;
115
116 let meta_cache = HybridCacheBuilder::new()
117 .memory(storage_opts.meta_cache_capacity_mb * (1 << 20))
118 .with_shards(storage_opts.meta_cache_shard_num)
119 .storage()
120 .build()
121 .await
122 .unwrap();
123 let block_cache = HybridCacheBuilder::new()
124 .memory(storage_opts.block_cache_capacity_mb * (1 << 20))
125 .with_shards(storage_opts.block_cache_shard_num)
126 .storage()
127 .build()
128 .await
129 .unwrap();
130
131 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
132 store: Arc::new(object_store),
133 path: storage_opts.data_directory.clone(),
134 prefetch_buffer_capacity: storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
135 max_prefetch_block_number: storage_opts.max_prefetch_block_number,
136 recent_filter: Arc::new(NoneRecentFilter::default().into()),
137 state_store_metrics: state_store_metrics.clone(),
138 use_new_object_prefix_strategy: args.use_new_object_prefix_strategy,
139 meta_cache,
140 block_cache,
141 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
142 vector_block_cache: CacheBuilder::new(1 << 10).build(),
143 }));
144
145 let (hummock_meta_client, notification_client, notifier) = {
146 let (env, hummock_manager_ref, cluster_controller_ref, worker_id) =
147 setup_compute_env(8080).await;
148 let notifier = env.notification_manager_ref();
149
150 let worker_node = cluster_controller_ref
151 .get_worker_by_id(worker_id)
152 .await
153 .unwrap()
154 .unwrap();
155
156 let notification_client = match r.operation {
157 Operation::MetaMessage(resp) => get_replay_notification_client(env, worker_node, resp),
158 _ => panic!("unexpected operation, found {:?}", r.operation),
159 };
160
161 (
162 Arc::new(MockHummockMetaClient::with_sst_offset(
163 hummock_manager_ref,
164 worker_id as _,
165 SST_OFFSET,
166 )),
167 notification_client,
168 notifier,
169 )
170 };
171
172 let storage = HummockStorage::new(
173 storage_opts,
174 sstable_store,
175 hummock_meta_client.clone(),
176 notification_client,
177 Arc::new(CompactionCatalogManager::new(Box::new(
178 FakeRemoteTableAccessor {},
179 ))),
180 state_store_metrics,
181 compactor_metrics,
182 None,
183 )
184 .await
185 .expect("fail to create a HummockStorage object");
186 let replay_interface = GlobalReplayImpl::new(storage, notifier);
187
188 Ok(replay_interface)
189}