1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::Bytes;
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common_service::ObserverManager;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::key::TableKey;
25pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
26use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
27use risingwave_meta::controller::cluster::ClusterControllerRef;
28use risingwave_meta::hummock::test_utils::{
29 register_table_ids_to_compaction_group, setup_compute_env,
30};
31use risingwave_meta::hummock::{
32 CommitEpochInfo, HummockManagerRef, MockHummockMetaClient, NewTableFragmentInfo,
33};
34use risingwave_meta::manager::MetaSrvEnv;
35use risingwave_pb::catalog::{PbTable, Table};
36use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
37use risingwave_rpc_client::HummockMetaClient;
38use risingwave_storage::compaction_catalog_manager::{
39 CompactionCatalogManager, CompactionCatalogManagerRef,
40};
41use risingwave_storage::error::StorageResult;
42use risingwave_storage::hummock::HummockStorage;
43use risingwave_storage::hummock::backup_reader::BackupReader;
44use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
45use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
46use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
47use risingwave_storage::hummock::observer_manager::HummockObserverNode;
48use risingwave_storage::hummock::test_utils::*;
49use risingwave_storage::hummock::write_limiter::WriteLimiter;
50use risingwave_storage::storage_value::StorageValue;
51use risingwave_storage::store::*;
52use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
53
54use crate::mock_notification_client::get_notification_client_for_test;
55
56pub async fn prepare_first_valid_version(
57 env: MetaSrvEnv,
58 hummock_manager_ref: HummockManagerRef,
59 cluster_controller_ref: ClusterControllerRef,
60 worker_id: i32,
61) -> (
62 PinnedVersion,
63 UnboundedSender<HummockVersionUpdate>,
64 UnboundedReceiver<HummockVersionUpdate>,
65) {
66 let (tx, mut rx) = unbounded_channel();
67 let notification_client = get_notification_client_for_test(
68 env,
69 hummock_manager_ref.clone(),
70 cluster_controller_ref,
71 worker_id,
72 )
73 .await;
74 let backup_manager = BackupReader::unused().await;
75 let write_limiter = WriteLimiter::unused();
76 let observer_manager = ObserverManager::new(
77 notification_client,
78 HummockObserverNode::new(
79 Arc::new(CompactionCatalogManager::default()),
80 backup_manager,
81 tx.clone(),
82 write_limiter,
83 ),
84 )
85 .await;
86 observer_manager.start().await;
87 let hummock_version = match rx.recv().await {
88 Some(HummockVersionUpdate::PinnedVersion(version)) => version,
89 _ => unreachable!("should be full version"),
90 };
91
92 (
93 PinnedVersion::new(*hummock_version, unbounded_channel().0),
94 tx,
95 rx,
96 )
97}
98
99#[async_trait::async_trait]
100pub trait TestIngestBatch: LocalStateStore {
101 async fn ingest_batch(
102 &mut self,
103 kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
104 ) -> StorageResult<usize>;
105}
106
107#[async_trait::async_trait]
108impl<S: LocalStateStore> TestIngestBatch for S {
109 async fn ingest_batch(
110 &mut self,
111 kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
112 ) -> StorageResult<usize> {
113 for (key, value) in kv_pairs {
114 match value.user_value {
115 None => self.delete(key, Bytes::new())?,
116 Some(value) => self.insert(key, value, None)?,
117 }
118 }
119 self.flush().await
120 }
121}
122
123pub async fn with_hummock_storage(
124 table_id: TableId,
125) -> (HummockStorage, Arc<MockHummockMetaClient>) {
126 let sstable_store = mock_sstable_store().await;
127 let hummock_options = Arc::new(default_opts_for_test());
128 let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
129 let meta_client = Arc::new(MockHummockMetaClient::new(
130 hummock_manager_ref.clone(),
131 worker_id as _,
132 ));
133
134 let hummock_storage = HummockStorage::for_test(
135 hummock_options,
136 sstable_store,
137 meta_client.clone(),
138 get_notification_client_for_test(
139 env,
140 hummock_manager_ref.clone(),
141 cluster_ctl_ref,
142 worker_id as _,
143 )
144 .await,
145 )
146 .await
147 .unwrap();
148
149 register_tables_with_id_for_test(
150 hummock_storage.compaction_catalog_manager_ref(),
151 &hummock_manager_ref,
152 &[table_id.table_id()],
153 )
154 .await;
155
156 (hummock_storage, meta_client)
157}
158
159pub fn update_filter_key_extractor_for_table_ids(
160 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
161 table_ids: &[u32],
162) {
163 for table_id in table_ids {
164 let mock_table = PbTable {
165 id: *table_id,
166 read_prefix_len_hint: 0,
167 maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as u32),
168 ..Default::default()
169 };
170 compaction_catalog_manager_ref.update(*table_id, mock_table);
171 }
172}
173
174pub async fn register_tables_with_id_for_test(
175 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
176 hummock_manager_ref: &HummockManagerRef,
177 table_ids: &[u32],
178) {
179 update_filter_key_extractor_for_table_ids(compaction_catalog_manager_ref, table_ids);
180 register_table_ids_to_compaction_group(
181 hummock_manager_ref,
182 table_ids,
183 StaticCompactionGroupId::StateDefault.into(),
184 )
185 .await;
186}
187
188pub fn update_filter_key_extractor_for_tables(
189 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
190 tables: &[PbTable],
191) {
192 for table in tables {
193 compaction_catalog_manager_ref.update(table.id, table.clone())
194 }
195}
196pub async fn register_tables_with_catalog_for_test(
197 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
198 hummock_manager_ref: &HummockManagerRef,
199 tables: &[Table],
200) {
201 update_filter_key_extractor_for_tables(compaction_catalog_manager_ref, tables);
202 let table_ids = tables.iter().map(|t| t.id).collect_vec();
203 register_table_ids_to_compaction_group(
204 hummock_manager_ref,
205 &table_ids,
206 StaticCompactionGroupId::StateDefault.into(),
207 )
208 .await;
209}
210
211pub struct HummockTestEnv {
212 pub storage: HummockStorage,
213 pub manager: HummockManagerRef,
214 pub meta_client: Arc<MockHummockMetaClient>,
215}
216
217impl HummockTestEnv {
218 async fn wait_version_sync(&self) {
219 self.storage
220 .wait_version(self.manager.get_current_version().await)
221 .await
222 }
223
224 pub async fn register_table_id(&self, table_id: TableId) {
225 register_tables_with_id_for_test(
226 self.storage.compaction_catalog_manager_ref(),
227 &self.manager,
228 &[table_id.table_id()],
229 )
230 .await;
231 self.wait_version_sync().await;
232 }
233
234 pub async fn register_vector_index(
235 &self,
236 table_id: TableId,
237 init_epoch: u64,
238 init_config: PbVectorIndexInit,
239 ) {
240 self.manager
241 .commit_epoch(CommitEpochInfo {
242 sstables: vec![],
243 new_table_watermarks: Default::default(),
244 sst_to_context: Default::default(),
245 new_table_fragment_infos: vec![NewTableFragmentInfo {
246 table_ids: HashSet::from_iter([table_id]),
247 }],
248 change_log_delta: Default::default(),
249 vector_index_delta: HashMap::from_iter([(
250 table_id,
251 VectorIndexDelta::Init(init_config),
252 )]),
253 tables_to_commit: HashMap::from_iter([(table_id, init_epoch)]),
254 })
255 .await
256 .unwrap();
257 }
258
259 pub async fn register_table(&self, table: PbTable) {
260 register_tables_with_catalog_for_test(
261 self.storage.compaction_catalog_manager_ref(),
262 &self.manager,
263 &[table],
264 )
265 .await;
266 self.wait_version_sync().await;
267 }
268
269 pub async fn commit_epoch(&self, epoch: u64) {
272 let table_ids = self
273 .manager
274 .get_current_version()
275 .await
276 .state_table_info
277 .info()
278 .keys()
279 .cloned()
280 .collect();
281 let res = self
282 .storage
283 .seal_and_sync_epoch(epoch, table_ids)
284 .await
285 .unwrap();
286 self.meta_client.commit_epoch(epoch, res).await.unwrap();
287
288 self.wait_sync_committed_version().await;
289 }
290
291 pub async fn wait_sync_committed_version(&self) {
292 let version = self.manager.get_current_version().await;
293 self.storage.wait_version(version).await;
294 }
295}
296
297pub async fn prepare_hummock_test_env() -> HummockTestEnv {
298 let sstable_store = mock_sstable_store().await;
299 let hummock_options = Arc::new(default_opts_for_test());
300 let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
301
302 let hummock_meta_client = Arc::new(MockHummockMetaClient::new(
303 hummock_manager_ref.clone(),
304 worker_id as _,
305 ));
306
307 let notification_client = get_notification_client_for_test(
308 env,
309 hummock_manager_ref.clone(),
310 cluster_ctl_ref,
311 worker_id as _,
312 )
313 .await;
314
315 let storage = HummockStorage::for_test(
316 hummock_options,
317 sstable_store,
318 hummock_meta_client.clone(),
319 notification_client,
320 )
321 .await
322 .unwrap();
323
324 HummockTestEnv {
325 storage,
326 manager: hummock_manager_ref,
327 meta_client: hummock_meta_client,
328 }
329}