1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use bytes::Bytes;
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::hash::VirtualNode;
22use risingwave_common::id::WorkerId;
23use risingwave_common_service::ObserverManager;
24use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
25use risingwave_hummock_sdk::key::TableKey;
26pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
27use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
28use risingwave_meta::controller::cluster::ClusterControllerRef;
29use risingwave_meta::hummock::test_utils::{
30 register_table_ids_to_compaction_group, setup_compute_env,
31};
32use risingwave_meta::hummock::{
33 CommitEpochInfo, HummockManagerRef, MockHummockMetaClient, NewTableFragmentInfo,
34};
35use risingwave_meta::manager::MetaSrvEnv;
36use risingwave_pb::catalog::{PbTable, Table};
37use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
38use risingwave_rpc_client::HummockMetaClient;
39use risingwave_storage::compaction_catalog_manager::{
40 CompactionCatalogManager, CompactionCatalogManagerRef,
41};
42use risingwave_storage::error::StorageResult;
43use risingwave_storage::hummock::HummockStorage;
44use risingwave_storage::hummock::backup_reader::BackupReader;
45use risingwave_storage::hummock::event_handler::HummockVersionUpdate;
46use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
47use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion;
48use risingwave_storage::hummock::observer_manager::HummockObserverNode;
49use risingwave_storage::hummock::test_utils::*;
50use risingwave_storage::hummock::write_limiter::WriteLimiter;
51use risingwave_storage::storage_value::StorageValue;
52use risingwave_storage::store::*;
53use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
54
55use crate::mock_notification_client::get_notification_client_for_test;
56
57pub async fn prepare_first_valid_version(
58 env: MetaSrvEnv,
59 hummock_manager_ref: HummockManagerRef,
60 cluster_controller_ref: ClusterControllerRef,
61 worker_id: WorkerId,
62) -> (
63 PinnedVersion,
64 UnboundedSender<HummockVersionUpdate>,
65 UnboundedReceiver<HummockVersionUpdate>,
66) {
67 let (tx, mut rx) = unbounded_channel();
68 let notification_client = get_notification_client_for_test(
69 env,
70 hummock_manager_ref.clone(),
71 cluster_controller_ref,
72 worker_id,
73 )
74 .await;
75 let backup_manager = BackupReader::unused().await;
76 let write_limiter = WriteLimiter::unused();
77 let observer_manager = ObserverManager::new(
78 notification_client,
79 HummockObserverNode::new(
80 Arc::new(CompactionCatalogManager::default()),
81 backup_manager,
82 tx.clone(),
83 write_limiter,
84 ),
85 )
86 .await;
87 observer_manager.start().await;
88 let hummock_version = match rx.recv().await {
89 Some(HummockVersionUpdate::PinnedVersion(version)) => version,
90 _ => unreachable!("should be full version"),
91 };
92
93 (
94 PinnedVersion::new(*hummock_version, unbounded_channel().0),
95 tx,
96 rx,
97 )
98}
99
100#[async_trait::async_trait]
101pub trait TestIngestBatch: LocalStateStore {
102 async fn ingest_batch(
103 &mut self,
104 kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
105 ) -> StorageResult<usize>;
106}
107
108#[async_trait::async_trait]
109impl<S: LocalStateStore> TestIngestBatch for S {
110 async fn ingest_batch(
111 &mut self,
112 kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
113 ) -> StorageResult<usize> {
114 for (key, value) in kv_pairs {
115 match value.user_value {
116 None => self.delete(key, Bytes::new())?,
117 Some(value) => self.insert(key, value, None)?,
118 }
119 }
120 self.flush().await
121 }
122}
123
124pub async fn with_hummock_storage(
125 table_id: TableId,
126) -> (HummockStorage, Arc<MockHummockMetaClient>) {
127 let sstable_store = mock_sstable_store().await;
128 let hummock_options = Arc::new(default_opts_for_test());
129 let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
130 let meta_client = Arc::new(MockHummockMetaClient::new(
131 hummock_manager_ref.clone(),
132 worker_id as _,
133 ));
134
135 let hummock_storage = HummockStorage::for_test(
136 hummock_options,
137 sstable_store,
138 meta_client.clone(),
139 get_notification_client_for_test(
140 env,
141 hummock_manager_ref.clone(),
142 cluster_ctl_ref,
143 worker_id,
144 )
145 .await,
146 )
147 .await
148 .unwrap();
149
150 register_tables_with_id_for_test(
151 hummock_storage.compaction_catalog_manager_ref(),
152 &hummock_manager_ref,
153 &[table_id],
154 )
155 .await;
156
157 (hummock_storage, meta_client)
158}
159
160pub fn update_filter_key_extractor_for_table_ids(
161 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
162 table_ids: &[TableId],
163) {
164 for table_id in table_ids {
165 let mock_table = PbTable {
166 id: *table_id,
167 read_prefix_len_hint: 0,
168 maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as u32),
169 ..Default::default()
170 };
171 compaction_catalog_manager_ref.update(*table_id, mock_table);
172 }
173}
174
175pub async fn register_tables_with_id_for_test(
176 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
177 hummock_manager_ref: &HummockManagerRef,
178 table_ids: &[TableId],
179) {
180 update_filter_key_extractor_for_table_ids(compaction_catalog_manager_ref, table_ids);
181 register_table_ids_to_compaction_group(
182 hummock_manager_ref,
183 table_ids,
184 StaticCompactionGroupId::StateDefault.into(),
185 )
186 .await;
187}
188
189pub fn update_filter_key_extractor_for_tables(
190 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
191 tables: &[PbTable],
192) {
193 for table in tables {
194 compaction_catalog_manager_ref.update(table.id, table.clone())
195 }
196}
197pub async fn register_tables_with_catalog_for_test(
198 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
199 hummock_manager_ref: &HummockManagerRef,
200 tables: &[Table],
201) {
202 update_filter_key_extractor_for_tables(compaction_catalog_manager_ref, tables);
203 let table_ids = tables.iter().map(|t| t.id).collect_vec();
204 register_table_ids_to_compaction_group(
205 hummock_manager_ref,
206 &table_ids,
207 StaticCompactionGroupId::StateDefault.into(),
208 )
209 .await;
210}
211
212pub struct HummockTestEnv {
213 pub storage: HummockStorage,
214 pub manager: HummockManagerRef,
215 pub meta_client: Arc<MockHummockMetaClient>,
216}
217
218impl HummockTestEnv {
219 async fn wait_version_sync(&self) {
220 self.storage
221 .wait_version(self.manager.get_current_version().await)
222 .await
223 }
224
225 pub async fn register_table_id(&self, table_id: TableId) {
226 register_tables_with_id_for_test(
227 self.storage.compaction_catalog_manager_ref(),
228 &self.manager,
229 &[table_id],
230 )
231 .await;
232 self.wait_version_sync().await;
233 }
234
235 pub async fn register_vector_index(
236 &self,
237 table_id: TableId,
238 init_epoch: u64,
239 init_config: PbVectorIndexInit,
240 ) {
241 self.manager
242 .commit_epoch(CommitEpochInfo {
243 sstables: vec![],
244 new_table_watermarks: Default::default(),
245 sst_to_context: Default::default(),
246 new_table_fragment_infos: vec![NewTableFragmentInfo {
247 table_ids: HashSet::from_iter([table_id]),
248 }],
249 change_log_delta: Default::default(),
250 vector_index_delta: HashMap::from_iter([(
251 table_id,
252 VectorIndexDelta::Init(init_config),
253 )]),
254 tables_to_commit: HashMap::from_iter([(table_id, init_epoch)]),
255 truncate_tables: HashSet::new(),
256 })
257 .await
258 .unwrap();
259 }
260
261 pub async fn register_table(&self, table: PbTable) {
262 register_tables_with_catalog_for_test(
263 self.storage.compaction_catalog_manager_ref(),
264 &self.manager,
265 &[table],
266 )
267 .await;
268 self.wait_version_sync().await;
269 }
270
271 pub async fn commit_epoch(&self, epoch: u64) {
274 let table_ids = self
275 .manager
276 .get_current_version()
277 .await
278 .state_table_info
279 .info()
280 .keys()
281 .cloned()
282 .collect();
283 let res = self
284 .storage
285 .seal_and_sync_epoch(epoch, table_ids)
286 .await
287 .unwrap();
288 self.meta_client.commit_epoch(epoch, res).await.unwrap();
289
290 self.wait_sync_committed_version().await;
291 }
292
293 pub async fn wait_sync_committed_version(&self) {
294 let version = self.manager.get_current_version().await;
295 self.storage.wait_version(version).await;
296 }
297}
298
299pub async fn prepare_hummock_test_env() -> HummockTestEnv {
300 let sstable_store = mock_sstable_store().await;
301 let hummock_options = Arc::new(default_opts_for_test());
302 let (env, hummock_manager_ref, cluster_ctl_ref, worker_id) = setup_compute_env(8080).await;
303
304 let hummock_meta_client = Arc::new(MockHummockMetaClient::new(
305 hummock_manager_ref.clone(),
306 worker_id as _,
307 ));
308
309 let notification_client = get_notification_client_for_test(
310 env,
311 hummock_manager_ref.clone(),
312 cluster_ctl_ref,
313 worker_id,
314 )
315 .await;
316
317 let storage = HummockStorage::for_test(
318 hummock_options,
319 sstable_store,
320 hummock_meta_client.clone(),
321 notification_client,
322 )
323 .await
324 .unwrap();
325
326 HummockTestEnv {
327 storage,
328 manager: hummock_manager_ref,
329 meta_client: hummock_meta_client,
330 }
331}