1use std::sync::Arc;
16
17use bytes::Bytes;
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_common::hash::VirtualNode;
21use risingwave_common_service::ObserverManager;
22use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
23use risingwave_hummock_sdk::key::TableKey;
24pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
25use risingwave_meta::controller::cluster::ClusterControllerRef;
26use risingwave_meta::hummock::test_utils::{
27 register_table_ids_to_compaction_group, setup_compute_env,
28};
29use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient};
30use risingwave_meta::manager::MetaSrvEnv;
31use risingwave_pb::catalog::{PbTable, Table};
32use risingwave_rpc_client::HummockMetaClient;
33use risingwave_storage::compaction_catalog_manager::{
34 CompactionCatalogManager, CompactionCatalogManagerRef,
35};
36use risingwave_storage::error::StorageResult;
37use risingwave_storage::hummock::HummockStorage;
38use risingwave_storage::hummock::backup_reader::BackupReader;
39use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
40use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
41use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
42use risingwave_storage::hummock::observer_manager::HummockObserverNode;
43use risingwave_storage::hummock::test_utils::*;
44use risingwave_storage::hummock::write_limiter::WriteLimiter;
45use risingwave_storage::storage_value::StorageValue;
46use risingwave_storage::store::*;
47use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
48
49use crate::mock_notification_client::get_notification_client_for_test;
50
51pub async fn prepare_first_valid_version(
52 env: MetaSrvEnv,
53 hummock_manager_ref: HummockManagerRef,
54 cluster_controller_ref: ClusterControllerRef,
55 worker_id: i32,
56) -> (
57 PinnedVersion,
58 UnboundedSender<HummockVersionUpdate>,
59 UnboundedReceiver<HummockVersionUpdate>,
60) {
61 let (tx, mut rx) = unbounded_channel();
62 let notification_client = get_notification_client_for_test(
63 env,
64 hummock_manager_ref.clone(),
65 cluster_controller_ref,
66 worker_id,
67 )
68 .await;
69 let backup_manager = BackupReader::unused().await;
70 let write_limiter = WriteLimiter::unused();
71 let observer_manager = ObserverManager::new(
72 notification_client,
73 HummockObserverNode::new(
74 Arc::new(CompactionCatalogManager::default()),
75 backup_manager,
76 tx.clone(),
77 write_limiter,
78 ),
79 )
80 .await;
81 observer_manager.start().await;
82 let hummock_version = match rx.recv().await {
83 Some(HummockVersionUpdate::PinnedVersion(version)) => version,
84 _ => unreachable!("should be full version"),
85 };
86
87 (
88 PinnedVersion::new(*hummock_version, unbounded_channel().0),
89 tx,
90 rx,
91 )
92}
93
94#[async_trait::async_trait]
95pub trait TestIngestBatch: LocalStateStore {
96 async fn ingest_batch(
97 &mut self,
98 kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
99 write_options: WriteOptions,
100 ) -> StorageResult<usize>;
101}
102
103#[async_trait::async_trait]
104impl<S: LocalStateStore> TestIngestBatch for S {
105 async fn ingest_batch(
106 &mut self,
107 kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
108 write_options: WriteOptions,
109 ) -> StorageResult<usize> {
110 assert_eq!(self.epoch(), write_options.epoch);
111 for (key, value) in kv_pairs {
112 match value.user_value {
113 None => self.delete(key, Bytes::new())?,
114 Some(value) => self.insert(key, value, None)?,
115 }
116 }
117 self.flush().await
118 }
119}
120
121pub async fn with_hummock_storage(
122 table_id: TableId,
123) -> (HummockStorage, Arc<MockHummockMetaClient>) {
124 let sstable_store = mock_sstable_store().await;
125 let hummock_options = Arc::new(default_opts_for_test());
126 let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
127 let meta_client = Arc::new(MockHummockMetaClient::new(
128 hummock_manager_ref.clone(),
129 worker_id as _,
130 ));
131
132 let hummock_storage = HummockStorage::for_test(
133 hummock_options,
134 sstable_store,
135 meta_client.clone(),
136 get_notification_client_for_test(
137 env,
138 hummock_manager_ref.clone(),
139 cluster_ctl_ref,
140 worker_id as _,
141 )
142 .await,
143 )
144 .await
145 .unwrap();
146
147 register_tables_with_id_for_test(
148 hummock_storage.compaction_catalog_manager_ref(),
149 &hummock_manager_ref,
150 &[table_id.table_id()],
151 )
152 .await;
153
154 (hummock_storage, meta_client)
155}
156
157pub fn update_filter_key_extractor_for_table_ids(
158 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
159 table_ids: &[u32],
160) {
161 for table_id in table_ids {
162 let mock_table = PbTable {
163 id: *table_id,
164 read_prefix_len_hint: 0,
165 maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as u32),
166 ..Default::default()
167 };
168 compaction_catalog_manager_ref.update(*table_id, mock_table);
169 }
170}
171
172pub async fn register_tables_with_id_for_test(
173 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
174 hummock_manager_ref: &HummockManagerRef,
175 table_ids: &[u32],
176) {
177 update_filter_key_extractor_for_table_ids(compaction_catalog_manager_ref, table_ids);
178 register_table_ids_to_compaction_group(
179 hummock_manager_ref,
180 table_ids,
181 StaticCompactionGroupId::StateDefault.into(),
182 )
183 .await;
184}
185
186pub fn update_filter_key_extractor_for_tables(
187 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
188 tables: &[PbTable],
189) {
190 for table in tables {
191 compaction_catalog_manager_ref.update(table.id, table.clone())
192 }
193}
194pub async fn register_tables_with_catalog_for_test(
195 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
196 hummock_manager_ref: &HummockManagerRef,
197 tables: &[Table],
198) {
199 update_filter_key_extractor_for_tables(compaction_catalog_manager_ref, tables);
200 let table_ids = tables.iter().map(|t| t.id).collect_vec();
201 register_table_ids_to_compaction_group(
202 hummock_manager_ref,
203 &table_ids,
204 StaticCompactionGroupId::StateDefault.into(),
205 )
206 .await;
207}
208
209pub struct HummockTestEnv {
210 pub storage: HummockStorage,
211 pub manager: HummockManagerRef,
212 pub meta_client: Arc<MockHummockMetaClient>,
213}
214
215impl HummockTestEnv {
216 async fn wait_version_sync(&self) {
217 self.storage
218 .wait_version(self.manager.get_current_version().await)
219 .await
220 }
221
222 pub async fn register_table_id(&self, table_id: TableId) {
223 register_tables_with_id_for_test(
224 self.storage.compaction_catalog_manager_ref(),
225 &self.manager,
226 &[table_id.table_id()],
227 )
228 .await;
229 self.wait_version_sync().await;
230 }
231
232 pub async fn register_table(&self, table: PbTable) {
233 register_tables_with_catalog_for_test(
234 self.storage.compaction_catalog_manager_ref(),
235 &self.manager,
236 &[table],
237 )
238 .await;
239 self.wait_version_sync().await;
240 }
241
242 pub async fn commit_epoch(&self, epoch: u64) {
245 let table_ids = self
246 .manager
247 .get_current_version()
248 .await
249 .state_table_info
250 .info()
251 .keys()
252 .cloned()
253 .collect();
254 let res = self
255 .storage
256 .seal_and_sync_epoch(epoch, table_ids)
257 .await
258 .unwrap();
259 self.meta_client.commit_epoch(epoch, res).await.unwrap();
260
261 self.wait_sync_committed_version().await;
262 }
263
264 pub async fn wait_sync_committed_version(&self) {
265 let version = self.manager.get_current_version().await;
266 self.storage.wait_version(version).await;
267 }
268}
269
270pub async fn prepare_hummock_test_env() -> HummockTestEnv {
271 let sstable_store = mock_sstable_store().await;
272 let hummock_options = Arc::new(default_opts_for_test());
273 let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
274
275 let hummock_meta_client = Arc::new(MockHummockMetaClient::new(
276 hummock_manager_ref.clone(),
277 worker_id as _,
278 ));
279
280 let notification_client = get_notification_client_for_test(
281 env,
282 hummock_manager_ref.clone(),
283 cluster_ctl_ref,
284 worker_id as _,
285 )
286 .await;
287
288 let storage = HummockStorage::for_test(
289 hummock_options,
290 sstable_store,
291 hummock_meta_client.clone(),
292 notification_client,
293 )
294 .await
295 .unwrap();
296
297 HummockTestEnv {
298 storage,
299 manager: hummock_manager_ref,
300 meta_client: hummock_meta_client,
301 }
302}