replay/
main.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(coroutines)]
16#![feature(stmt_expr_attributes)]
17#![feature(proc_macro_hygiene)]
18#![feature(register_tool)]
19#![register_tool(rw)]
20#![allow(rw::format_error)] // test code
21
22#[macro_use]
23mod replay_impl;
24
25use std::fs::File;
26use std::io::BufReader;
27use std::path::Path;
28use std::sync::Arc;
29
30use clap::Parser;
31use foyer::{CacheBuilder, HybridCacheBuilder};
32use replay_impl::{GlobalReplayImpl, get_replay_notification_client};
33use risingwave_common::config::{
34    NoOverride, ObjectStoreConfig, extract_storage_memory_config, load_config,
35};
36use risingwave_common::system_param::reader::SystemParamsReader;
37use risingwave_hummock_trace::{
38    GlobalReplay, HummockReplay, Operation, Record, Result, TraceReader, TraceReaderImpl, USE_TRACE,
39};
40use risingwave_meta::hummock::MockHummockMetaClient;
41use risingwave_meta::hummock::test_utils::setup_compute_env;
42use risingwave_object_store::object::build_remote_object_store;
43use risingwave_storage::compaction_catalog_manager::{
44    CompactionCatalogManager, FakeRemoteTableAccessor,
45};
46use risingwave_storage::hummock::none::NoneRecentFilter;
47use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
48use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
49use risingwave_storage::opts::StorageOpts;
50
51// use a large offset to avoid collision with real sstables
52const SST_OFFSET: u64 = 2147383647000;
53
54#[derive(Parser, Debug)]
55struct Args {
56    #[arg(short, long)]
57    path: String,
58
59    // path to config file
60    #[arg(short, long, default_value = "src/config/hummock-trace.toml")]
61    config: String,
62
63    #[arg(short, long)]
64    object_storage: String,
65
66    #[arg(short, long)]
67    use_new_object_prefix_strategy: bool,
68}
69
70#[tokio::main(flavor = "multi_thread")]
71async fn main() {
72    let args = Args::parse();
73    // disable runtime tracing when replaying
74    unsafe { std::env::set_var(USE_TRACE, "false") };
75    run_replay(args).await.unwrap();
76}
77
78async fn run_replay(args: Args) -> Result<()> {
79    let path = Path::new(&args.path);
80    let f = BufReader::new(File::open(path)?);
81    let mut reader = TraceReaderImpl::new_bincode(f)?;
82    // first record is the snapshot
83    let r: Record = reader.read().unwrap();
84    let replay_interface = create_replay_hummock(r, &args).await.unwrap();
85    let mut replayer = HummockReplay::new(reader, replay_interface);
86    replayer.run().await.unwrap();
87
88    Ok(())
89}
90
91async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalReplay + use<>> {
92    let config = load_config(&args.config, NoOverride);
93    let storage_memory_config = extract_storage_memory_config(&config);
94    let system_params_reader =
95        SystemParamsReader::from(config.system.clone().into_init_system_params());
96
97    let storage_opts = Arc::new(StorageOpts::from((
98        &config,
99        &system_params_reader,
100        &storage_memory_config,
101    )));
102
103    let state_store_metrics = Arc::new(HummockStateStoreMetrics::unused());
104    let object_store_metrics = Arc::new(ObjectStoreMetrics::unused());
105
106    let compactor_metrics = Arc::new(CompactorMetrics::unused());
107
108    let object_store = build_remote_object_store(
109        &args.object_storage,
110        object_store_metrics,
111        "Hummock",
112        Arc::new(ObjectStoreConfig::default()),
113    )
114    .await;
115
116    let meta_cache = HybridCacheBuilder::new()
117        .memory(storage_opts.meta_cache_capacity_mb * (1 << 20))
118        .with_shards(storage_opts.meta_cache_shard_num)
119        .storage()
120        .build()
121        .await
122        .unwrap();
123    let block_cache = HybridCacheBuilder::new()
124        .memory(storage_opts.block_cache_capacity_mb * (1 << 20))
125        .with_shards(storage_opts.block_cache_shard_num)
126        .storage()
127        .build()
128        .await
129        .unwrap();
130
131    let sstable_store = Arc::new(SstableStore::new(SstableStoreConfig {
132        store: Arc::new(object_store),
133        path: storage_opts.data_directory.clone(),
134        prefetch_buffer_capacity: storage_opts.prefetch_buffer_capacity_mb * (1 << 20),
135        max_prefetch_block_number: storage_opts.max_prefetch_block_number,
136        recent_filter: Arc::new(NoneRecentFilter::default().into()),
137        state_store_metrics: state_store_metrics.clone(),
138        use_new_object_prefix_strategy: args.use_new_object_prefix_strategy,
139        meta_cache,
140        block_cache,
141        vector_meta_cache: CacheBuilder::new(1 << 10).build(),
142        vector_block_cache: CacheBuilder::new(1 << 10).build(),
143    }));
144
145    let (hummock_meta_client, notification_client, notifier) = {
146        let (env, hummock_manager_ref, cluster_controller_ref, worker_id) =
147            setup_compute_env(8080).await;
148        let notifier = env.notification_manager_ref();
149
150        let worker_node = cluster_controller_ref
151            .get_worker_by_id(worker_id)
152            .await
153            .unwrap()
154            .unwrap();
155
156        let notification_client = match r.operation {
157            Operation::MetaMessage(resp) => get_replay_notification_client(env, worker_node, resp),
158            _ => panic!("unexpected operation, found {:?}", r.operation),
159        };
160
161        (
162            Arc::new(MockHummockMetaClient::with_sst_offset(
163                hummock_manager_ref,
164                worker_id as _,
165                SST_OFFSET,
166            )),
167            notification_client,
168            notifier,
169        )
170    };
171
172    let storage = HummockStorage::new(
173        storage_opts,
174        sstable_store,
175        hummock_meta_client.clone(),
176        notification_client,
177        Arc::new(CompactionCatalogManager::new(Box::new(
178            FakeRemoteTableAccessor {},
179        ))),
180        state_store_metrics,
181        compactor_metrics,
182        None,
183    )
184    .await
185    .expect("fail to create a HummockStorage object");
186    let replay_interface = GlobalReplayImpl::new(storage, notifier);
187
188    Ok(replay_interface)
189}