1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::Bytes;
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::id::WorkerId;
23use risingwave_common_service::ObserverManager;
24use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
25use risingwave_hummock_sdk::key::TableKey;
26pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
27use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
28use risingwave_meta::controller::cluster::ClusterControllerRef;
29use risingwave_meta::hummock::test_utils::{
30 register_table_ids_to_compaction_group, setup_compute_env,
31};
32use risingwave_meta::hummock::{
33 CommitEpochInfo, HummockManagerRef, MockHummockMetaClient, NewTableFragmentInfo,
34};
35use risingwave_meta::manager::MetaSrvEnv;
36use risingwave_pb::catalog::{PbTable, Table};
37use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
38use risingwave_rpc_client::HummockMetaClient;
39use risingwave_storage::compaction_catalog_manager::{
40 CompactionCatalogManager, CompactionCatalogManagerRef,
41};
42use risingwave_storage::error::StorageResult;
43use risingwave_storage::hummock::HummockStorage;
44use risingwave_storage::hummock::backup_reader::BackupReader;
45use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
46use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
47use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
48use risingwave_storage::hummock::observer_manager::HummockObserverNode;
49use risingwave_storage::hummock::test_utils::*;
50use risingwave_storage::hummock::write_limiter::WriteLimiter;
51use risingwave_storage::storage_value::StorageValue;
52use risingwave_storage::store::*;
53use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
54
55use crate::mock_notification_client::get_notification_client_for_test;
56
57pub async fn prepare_first_valid_version(
58 env: MetaSrvEnv,
59 hummock_manager_ref: HummockManagerRef,
60 cluster_controller_ref: ClusterControllerRef,
61 worker_id: WorkerId,
62) -> (
63 PinnedVersion,
64 UnboundedSender<HummockVersionUpdate>,
65 UnboundedReceiver<HummockVersionUpdate>,
66) {
67 let (tx, mut rx) = unbounded_channel();
68 let notification_client = get_notification_client_for_test(
69 env,
70 hummock_manager_ref.clone(),
71 cluster_controller_ref,
72 worker_id,
73 )
74 .await;
75 let backup_manager = BackupReader::unused().await;
76 let write_limiter = WriteLimiter::unused();
77 let observer_manager = ObserverManager::new(
78 notification_client,
79 HummockObserverNode::new(
80 Arc::new(CompactionCatalogManager::default()),
81 backup_manager,
82 tx.clone(),
83 write_limiter,
84 ),
85 )
86 .await;
87 observer_manager.start().await;
88 let hummock_version = match rx.recv().await {
89 Some(HummockVersionUpdate::PinnedVersion(version)) => version,
90 _ => unreachable!("should be full version"),
91 };
92
93 (
94 PinnedVersion::new(*hummock_version, unbounded_channel().0),
95 tx,
96 rx,
97 )
98}
99
100#[async_trait::async_trait]
101pub trait TestIngestBatch: LocalStateStore {
102 async fn ingest_batch(
103 &mut self,
104 kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
105 ) -> StorageResult<usize>;
106}
107
108#[async_trait::async_trait]
109impl<S: LocalStateStore> TestIngestBatch for S {
110 async fn ingest_batch(
111 &mut self,
112 kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
113 ) -> StorageResult<usize> {
114 for (key, value) in kv_pairs {
115 match value.user_value {
116 None => self.delete(key, Bytes::new())?,
117 Some(value) => self.insert(key, value, None)?,
118 }
119 }
120 self.flush().await
121 }
122}
123
124pub async fn with_hummock_storage(
125 table_id: TableId,
126) -> (HummockStorage, Arc<MockHummockMetaClient>) {
127 let sstable_store = mock_sstable_store().await;
128 let hummock_options = Arc::new(default_opts_for_test());
129 let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
130 let meta_client = Arc::new(MockHummockMetaClient::new(
131 hummock_manager_ref.clone(),
132 worker_id as _,
133 ));
134
135 let hummock_storage = HummockStorage::for_test(
136 hummock_options,
137 sstable_store,
138 meta_client.clone(),
139 get_notification_client_for_test(
140 env,
141 hummock_manager_ref.clone(),
142 cluster_ctl_ref,
143 worker_id,
144 )
145 .await,
146 )
147 .await
148 .unwrap();
149
150 register_tables_with_id_for_test(
151 hummock_storage.compaction_catalog_manager_ref(),
152 &hummock_manager_ref,
153 &[table_id],
154 )
155 .await;
156
157 (hummock_storage, meta_client)
158}
159
160pub fn update_filter_key_extractor_for_table_ids(
161 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
162 table_ids: &[TableId],
163) {
164 for table_id in table_ids {
165 let mock_table = PbTable {
166 id: *table_id,
167 read_prefix_len_hint: 1,
171 maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as u32),
172 ..Default::default()
173 };
174 compaction_catalog_manager_ref.update(*table_id, mock_table);
175 }
176}
177
178pub async fn register_tables_with_id_for_test(
179 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
180 hummock_manager_ref: &HummockManagerRef,
181 table_ids: &[TableId],
182) {
183 update_filter_key_extractor_for_table_ids(compaction_catalog_manager_ref, table_ids);
184 register_table_ids_to_compaction_group(
185 hummock_manager_ref,
186 table_ids,
187 StaticCompactionGroupId::StateDefault,
188 )
189 .await;
190}
191
192pub fn update_filter_key_extractor_for_tables(
193 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
194 tables: &[PbTable],
195) {
196 for table in tables {
197 compaction_catalog_manager_ref.update(table.id, table.clone())
198 }
199}
200pub async fn register_tables_with_catalog_for_test(
201 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
202 hummock_manager_ref: &HummockManagerRef,
203 tables: &[Table],
204) {
205 update_filter_key_extractor_for_tables(compaction_catalog_manager_ref, tables);
206 let table_ids = tables.iter().map(|t| t.id).collect_vec();
207 register_table_ids_to_compaction_group(
208 hummock_manager_ref,
209 &table_ids,
210 StaticCompactionGroupId::StateDefault,
211 )
212 .await;
213}
214
215pub struct HummockTestEnv {
216 pub storage: HummockStorage,
217 pub manager: HummockManagerRef,
218 pub meta_client: Arc<MockHummockMetaClient>,
219}
220
221impl HummockTestEnv {
222 async fn wait_version_sync(&self) {
223 self.storage
224 .wait_version(self.manager.get_current_version().await)
225 .await
226 }
227
228 pub async fn register_table_id(&self, table_id: TableId) {
229 register_tables_with_id_for_test(
230 self.storage.compaction_catalog_manager_ref(),
231 &self.manager,
232 &[table_id],
233 )
234 .await;
235 self.wait_version_sync().await;
236 }
237
238 pub async fn register_vector_index(
239 &self,
240 table_id: TableId,
241 init_epoch: u64,
242 init_config: PbVectorIndexInit,
243 ) {
244 self.manager
245 .commit_epoch(CommitEpochInfo {
246 sstables: vec![],
247 new_table_watermarks: Default::default(),
248 sst_to_context: Default::default(),
249 new_table_fragment_infos: vec![NewTableFragmentInfo {
250 table_ids: HashSet::from_iter([table_id]),
251 }],
252 change_log_delta: Default::default(),
253 vector_index_delta: HashMap::from_iter([(
254 table_id,
255 VectorIndexDelta::Init(init_config),
256 )]),
257 tables_to_commit: HashMap::from_iter([(table_id, init_epoch)]),
258 truncate_tables: HashSet::new(),
259 })
260 .await
261 .unwrap();
262 }
263
264 pub async fn register_table(&self, table: PbTable) {
265 register_tables_with_catalog_for_test(
266 self.storage.compaction_catalog_manager_ref(),
267 &self.manager,
268 &[table],
269 )
270 .await;
271 self.wait_version_sync().await;
272 }
273
274 pub async fn commit_epoch(&self, epoch: u64) {
277 let table_ids = self
278 .manager
279 .get_current_version()
280 .await
281 .state_table_info
282 .info()
283 .keys()
284 .cloned()
285 .collect();
286 let res = self
287 .storage
288 .seal_and_sync_epoch(epoch, table_ids)
289 .await
290 .unwrap();
291 self.meta_client.commit_epoch(epoch, res).await.unwrap();
292
293 self.wait_sync_committed_version().await;
294 }
295
296 pub async fn wait_sync_committed_version(&self) {
297 let version = self.manager.get_current_version().await;
298 self.storage.wait_version(version).await;
299 }
300}
301
302pub async fn prepare_hummock_test_env() -> HummockTestEnv {
303 let sstable_store = mock_sstable_store().await;
304 let hummock_options = Arc::new(default_opts_for_test());
305 let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
306
307 let hummock_meta_client = Arc::new(MockHummockMetaClient::new(
308 hummock_manager_ref.clone(),
309 worker_id as _,
310 ));
311
312 let notification_client = get_notification_client_for_test(
313 env,
314 hummock_manager_ref.clone(),
315 cluster_ctl_ref,
316 worker_id,
317 )
318 .await;
319
320 let storage = HummockStorage::for_test(
321 hummock_options,
322 sstable_store,
323 hummock_meta_client.clone(),
324 notification_client,
325 )
326 .await
327 .unwrap();
328
329 HummockTestEnv {
330 storage,
331 manager: hummock_manager_ref,
332 meta_client: hummock_meta_client,
333 }
334}