replay/
main.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(coroutines)]
16#![feature(stmt_expr_attributes)]
17#![feature(proc_macro_hygiene)]
18#![feature(register_tool)]
19#![register_tool(rw)]
20#![allow(rw::format_error)] // test code
21
22#[macro_use]
23mod replay_impl;
24
25use std::fs::File;
26use std::io::BufReader;
27use std::path::Path;
28use std::sync::Arc;
29
30use clap::Parser;
31use foyer::{Engine, HybridCacheBuilder};
32use replay_impl::{GlobalReplayImpl, get_replay_notification_client};
33use risingwave_common::config::{
34    NoOverride, ObjectStoreConfig, extract_storage_memory_config, load_config,
35};
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_hummock_trace::{
38    GlobalReplay, HummockReplay, Operation, Record, Result, TraceReader, TraceReaderImpl, USE_TRACE,
39};
40use risingwave_meta::hummock::MockHummockMetaClient;
41use risingwave_meta::hummock::test_utils::setup_compute_env;
42use risingwave_object_store::object::build_remote_object_store;
43use risingwave_storage::compaction_catalog_manager::{
44    CompactionCatalogManager, FakeRemoteTableAccessor,
45};
46use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
47use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
48use risingwave_storage::opts::StorageOpts;
49
50// use a large offset to avoid collision with real sstables
51const SST_OFFSET: u64 = 2147383647000;
52
53#[derive(Parser, Debug)]
54struct Args {
55    #[arg(short, long)]
56    path: String,
57
58    // path to config file
59    #[arg(short, long, default_value = "src/config/hummock-trace.toml")]
60    config: String,
61
62    #[arg(short, long)]
63    object_storage: String,
64
65    #[arg(short, long)]
66    use_new_object_prefix_strategy: bool,
67}
68
69#[tokio::main(flavor = "multi_thread")]
70async fn main() {
71    let args = Args::parse();
72    // disable runtime tracing when replaying
73    unsafe { std::env::set_var(USE_TRACE, "false") };
74    run_replay(args).await.unwrap();
75}
76
77async fn run_replay(args: Args) -> Result<()> {
78    let path = Path::new(&args.path);
79    let f = BufReader::new(File::open(path)?);
80    let mut reader = TraceReaderImpl::new_bincode(f)?;
81    // first record is the snapshot
82    let r: Record = reader.read().unwrap();
83    let replay_interface = create_replay_hummock(r, &args).await.unwrap();
84    let mut replayer = HummockReplay::new(reader, replay_interface);
85    replayer.run().await.unwrap();
86
87    Ok(())
88}
89
90async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalReplay + use<>> {
91    let config = load_config(&args.config, NoOverride);
92    let storage_memory_config = extract_storage_memory_config(&config);
93    let system_params_reader =
94        SystemParamsReader::from(config.system.clone().into_init_system_params());
95
96    let storage_opts = Arc::new(StorageOpts::from((
97        &config,
98        &system_params_reader,
99        &storage_memory_config,
100    )));
101
102    let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused());
103    let object_store_metrics = Arc::new(ObjectStoreMetrics::unused());
104
105    let compactor_metrics = Arc::new(CompactorMetrics::unused());
106
107    let object_store = build_remote_object_store(
108        &args.object_storage,
109        object_store_metrics,
110        "Hummock",
111        Arc::new(ObjectStoreConfig::default()),
112    )
113    .await;
114
115    let meta_cache = HybridCacheBuilder::new()
116        .memory(storage_opts.meta_cache_capacity_mb * (1 << 20))
117        .with_shards(storage_opts.meta_cache_shard_num)
118        .storage(Engine::Large)
119        .build()
120        .await
121        .unwrap();
122    let block_cache = HybridCacheBuilder::new()
123        .memory(storage_opts.block_cache_capacity_mb * (1 << 20))
124        .with_shards(storage_opts.block_cache_shard_num)
125        .storage(Engine::Large)
126        .build()
127        .await
128        .unwrap();
129
130    let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
131        store: Arc::new(object_store),
132        path: storage_opts.data_directory.clone(),
133        prefetch_buffer_capacity: storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
134        max_prefetch_block_number: storage_opts.max_prefetch_block_number,
135        recent_filter: None,
136        state_store_metrics: state_store_metrics.clone(),
137        use_new_object_prefix_strategy: args.use_new_object_prefix_strategy,
138        meta_cache,
139        block_cache,
140    }));
141
142    let (hummock_meta_client, notification_client, notifier) = {
143        let (env, hummock_manager_ref, cluster_controller_ref, worker_id) =
144            setup_compute_env(8080).await;
145        let notifier = env.notification_manager_ref();
146
147        let worker_node = cluster_controller_ref
148            .get_worker_by_id(worker_id)
149            .await
150            .unwrap()
151            .unwrap();
152
153        let notification_client = match r.operation {
154            Operation::MetaMessage(resp) => get_replay_notification_client(env, worker_node, resp),
155            _ => panic!("unexpected operation, found {:?}", r.operation),
156        };
157
158        (
159            Arc::new(MockHummockMetaClient::with_sst_offset(
160                hummock_manager_ref,
161                worker_id as _,
162                SST_OFFSET,
163            )),
164            notification_client,
165            notifier,
166        )
167    };
168
169    let storage = HummockStorage::new(
170        storage_opts,
171        sstable_store,
172        hummock_meta_client.clone(),
173        notification_client,
174        Arc::new(CompactionCatalogManager::new(Box::new(
175            FakeRemoteTableAccessor {},
176        ))),
177        state_store_metrics,
178        compactor_metrics,
179        None,
180    )
181    .await
182    .expect("fail to create a HummockStorage object");
183    let replay_interface = GlobalReplayImpl::new(storage, notifier);
184
185    Ok(replay_interface)
186}