1#![feature(coroutines)]
16#![feature(stmt_expr_attributes)]
17#![feature(proc_macro_hygiene)]
18#![feature(register_tool)]
19#![register_tool(rw)]
20#![allow(rw::format_error)] #[macro_use]
23mod replay_impl;
24
25use std::fs::File;
26use std::io::BufReader;
27use std::path::Path;
28use std::sync::Arc;
29
30use clap::Parser;
31use foyer::{Engine, HybridCacheBuilder};
32use replay_impl::{GlobalReplayImpl, get_replay_notification_client};
33use risingwave_common::config::{
34 NoOverride, ObjectStoreConfig, extract_storage_memory_config, load_config,
35};
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_hummock_trace::{
38 GlobalReplay, HummockReplay, Operation, Record, Result, TraceReader, TraceReaderImpl, USE_TRACE,
39};
40use risingwave_meta::hummock::MockHummockMetaClient;
41use risingwave_meta::hummock::test_utils::setup_compute_env;
42use risingwave_object_store::object::build_remote_object_store;
43use risingwave_storage::compaction_catalog_manager::{
44 CompactionCatalogManager, FakeRemoteTableAccessor,
45};
46use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
47use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
48use risingwave_storage::opts::StorageOpts;
49
50const SST_OFFSET: u64 = 2147383647000;
52
53#[derive(Parser, Debug)]
54struct Args {
55 #[arg(short, long)]
56 path: String,
57
58 #[arg(short, long, default_value = "src/config/hummock-trace.toml")]
60 config: String,
61
62 #[arg(short, long)]
63 object_storage: String,
64
65 #[arg(short, long)]
66 use_new_object_prefix_strategy: bool,
67}
68
69#[tokio::main(flavor = "multi_thread")]
70async fn main() {
71 let args = Args::parse();
72 unsafe { std::env::set_var(USE_TRACE, "false") };
74 run_replay(args).await.unwrap();
75}
76
77async fn run_replay(args: Args) -> Result<()> {
78 let path = Path::new(&args.path);
79 let f = BufReader::new(File::open(path)?);
80 let mut reader = TraceReaderImpl::new_bincode(f)?;
81 let r: Record = reader.read().unwrap();
83 let replay_interface = create_replay_hummock(r, &args).await.unwrap();
84 let mut replayer = HummockReplay::new(reader, replay_interface);
85 replayer.run().await.unwrap();
86
87 Ok(())
88}
89
90async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalReplay + use<>> {
91 let config = load_config(&args.config, NoOverride);
92 let storage_memory_config = extract_storage_memory_config(&config);
93 let system_params_reader =
94 SystemParamsReader::from(config.system.clone().into_init_system_params());
95
96 let storage_opts = Arc::new(StorageOpts::from((
97 &config,
98 &system_params_reader,
99 &storage_memory_config,
100 )));
101
102 let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused());
103 let object_store_metrics = Arc::new(ObjectStoreMetrics::unused());
104
105 let compactor_metrics = Arc::new(CompactorMetrics::unused());
106
107 let object_store = build_remote_object_store(
108 &args.object_storage,
109 object_store_metrics,
110 "Hummock",
111 Arc::new(ObjectStoreConfig::default()),
112 )
113 .await;
114
115 let meta_cache = HybridCacheBuilder::new()
116 .memory(storage_opts.meta_cache_capacity_mb * (1 << 20))
117 .with_shards(storage_opts.meta_cache_shard_num)
118 .storage(Engine::Large)
119 .build()
120 .await
121 .unwrap();
122 let block_cache = HybridCacheBuilder::new()
123 .memory(storage_opts.block_cache_capacity_mb * (1 << 20))
124 .with_shards(storage_opts.block_cache_shard_num)
125 .storage(Engine::Large)
126 .build()
127 .await
128 .unwrap();
129
130 let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
131 store: Arc::new(object_store),
132 path: storage_opts.data_directory.clone(),
133 prefetch_buffer_capacity: storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
134 max_prefetch_block_number: storage_opts.max_prefetch_block_number,
135 recent_filter: None,
136 state_store_metrics: state_store_metrics.clone(),
137 use_new_object_prefix_strategy: args.use_new_object_prefix_strategy,
138 meta_cache,
139 block_cache,
140 }));
141
142 let (hummock_meta_client, notification_client, notifier) = {
143 let (env, hummock_manager_ref, cluster_controller_ref, worker_id) =
144 setup_compute_env(8080).await;
145 let notifier = env.notification_manager_ref();
146
147 let worker_node = cluster_controller_ref
148 .get_worker_by_id(worker_id)
149 .await
150 .unwrap()
151 .unwrap();
152
153 let notification_client = match r.operation {
154 Operation::MetaMessage(resp) => get_replay_notification_client(env, worker_node, resp),
155 _ => panic!("unexpected operation, found {:?}", r.operation),
156 };
157
158 (
159 Arc::new(MockHummockMetaClient::with_sst_offset(
160 hummock_manager_ref,
161 worker_id as _,
162 SST_OFFSET,
163 )),
164 notification_client,
165 notifier,
166 )
167 };
168
169 let storage = HummockStorage::new(
170 storage_opts,
171 sstable_store,
172 hummock_meta_client.clone(),
173 notification_client,
174 Arc::new(CompactionCatalogManager::new(Box::new(
175 FakeRemoteTableAccessor {},
176 ))),
177 state_store_metrics,
178 compactor_metrics,
179 None,
180 )
181 .await
182 .expect("fail to create a HummockStorage object");
183 let replay_interface = GlobalReplayImpl::new(storage, notifier);
184
185 Ok(replay_interface)
186}