1use std::collections::{BTreeMap, HashSet};
16use std::net::SocketAddr;
17use std::ops::Bound;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::thread::JoinHandle;
21use std::time::Duration;
22
23use anyhow::anyhow;
24use bytes::{BufMut, Bytes, BytesMut};
25use clap::Parser;
26use foyer::Hint;
27use risingwave_common::catalog::TableId;
28use risingwave_common::config::{
29 MetaConfig, NoOverride, extract_storage_memory_config, load_config,
30};
31use risingwave_common::util::addr::HostAddr;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common::util::tokio_util::sync::CancellationToken;
34use risingwave_hummock_sdk::key::TableKey;
35use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
36use risingwave_hummock_sdk::{
37 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockReadEpoch, HummockVersionId,
38};
39use risingwave_pb::common::WorkerType;
40use risingwave_rpc_client::{HummockMetaClient, MetaClient};
41use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
42use risingwave_storage::hummock::{CachePolicy, HummockStorage};
43use risingwave_storage::monitor::{
44 CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
45 MonitoredStorageMetrics, ObjectStoreMetrics,
46};
47use risingwave_storage::opts::StorageOpts;
48use risingwave_storage::store::{NewReadSnapshotOptions, ReadOptions, StateStoreRead};
49use risingwave_storage::{StateStore, StateStoreImpl, StateStoreIter};
50
51const SST_ID_SHIFT_COUNT: u32 = 1000000;
52const CHECKPOINT_FREQ_FOR_REPLAY: u64 = 99999999;
53
54use crate::CompactionTestOpts;
55
56struct CompactionTestMetrics {
57 num_expect_check: u64,
58 num_uncheck: u64,
59}
60
61impl CompactionTestMetrics {
62 fn new() -> CompactionTestMetrics {
63 Self {
64 num_expect_check: 0,
65 num_uncheck: 0,
66 }
67 }
68}
69
70pub async fn compaction_test_main(
79 _listen_addr: SocketAddr,
80 advertise_addr: HostAddr,
81 opts: CompactionTestOpts,
82) -> anyhow::Result<()> {
83 let meta_listen_addr = opts
84 .meta_address
85 .strip_prefix("http://")
86 .unwrap()
87 .to_owned();
88
89 let _meta_handle = tokio::spawn(start_meta_node(
90 meta_listen_addr.clone(),
91 opts.state_store.clone(),
92 opts.config_path_for_meta.clone(),
93 ));
94
95 tokio::time::sleep(Duration::from_secs(1)).await;
97 tracing::info!("Started embedded Meta");
98
99 let (compactor_thrd, compactor_shutdown_tx) = start_compactor_thread(
100 opts.meta_address.clone(),
101 advertise_addr.to_string(),
102 opts.config_path.clone(),
103 );
104
105 let original_meta_endpoint = "http://127.0.0.1:5690";
106 let mut table_id: u32 = opts.table_id;
107
108 init_metadata_for_replay(
109 original_meta_endpoint,
110 &opts.meta_address,
111 &advertise_addr,
112 opts.ci_mode,
113 &mut table_id,
114 )
115 .await?;
116
117 assert_ne!(0, table_id, "Invalid table_id for correctness checking");
118
119 let version_deltas = pull_version_deltas(original_meta_endpoint, &advertise_addr).await?;
120
121 tracing::info!(
122 "Pulled delta logs from Meta: len(logs): {}",
123 version_deltas.len()
124 );
125
126 let replay_thrd = start_replay_thread(opts, table_id, version_deltas);
127 replay_thrd.join().unwrap();
128 compactor_shutdown_tx.send(()).unwrap();
129 compactor_thrd.join().unwrap();
130 Ok(())
131}
132
133pub async fn start_meta_node(listen_addr: String, state_store: String, config_path: String) {
134 let meta_opts = risingwave_meta_node::MetaNodeOpts::parse_from([
135 "meta-node",
136 "--listen-addr",
137 &listen_addr,
138 "--advertise-addr",
139 &listen_addr,
140 "--backend",
141 "mem",
142 "--state-store",
143 &state_store,
144 "--config-path",
145 &config_path,
146 ]);
147 let config = load_config(&meta_opts.config_path, &meta_opts);
148 assert_eq!(
151 CHECKPOINT_FREQ_FOR_REPLAY,
152 config.system.checkpoint_frequency.unwrap()
153 );
154 assert!(
155 config.meta.enable_compaction_deterministic,
156 "enable_compaction_deterministic should be set"
157 );
158
159 risingwave_meta_node::start(meta_opts, CancellationToken::new() ).await
160}
161
162async fn start_compactor_node(
163 meta_rpc_endpoint: String,
164 advertise_addr: String,
165 config_path: String,
166) {
167 let opts = risingwave_compactor::CompactorOpts::parse_from([
168 "compactor-node",
169 "--listen-addr",
170 "127.0.0.1:5550",
171 "--advertise-addr",
172 &advertise_addr,
173 "--meta-address",
174 &meta_rpc_endpoint,
175 "--config-path",
176 &config_path,
177 ]);
178 risingwave_compactor::start(opts, CancellationToken::new() ).await
179}
180
181pub fn start_compactor_thread(
182 meta_endpoint: String,
183 advertise_addr: String,
184 config_path: String,
185) -> (JoinHandle<()>, std::sync::mpsc::Sender<()>) {
186 let (tx, rx) = std::sync::mpsc::channel();
187 let compact_func = move || {
188 let runtime = tokio::runtime::Builder::new_multi_thread()
189 .enable_all()
190 .build()
191 .unwrap();
192 runtime.block_on(async {
193 tokio::spawn(async {
194 tracing::info!("Starting compactor node");
195 start_compactor_node(meta_endpoint, advertise_addr, config_path).await
196 });
197 rx.recv().unwrap();
198 });
199 };
200
201 (std::thread::spawn(compact_func), tx)
202}
203
204fn start_replay_thread(
205 opts: CompactionTestOpts,
206 table_id: u32,
207 version_deltas: Vec<HummockVersionDelta>,
208) -> JoinHandle<()> {
209 let replay_func = move || {
210 let runtime = tokio::runtime::Builder::new_current_thread()
211 .enable_all()
212 .build()
213 .unwrap();
214 runtime
215 .block_on(start_replay(opts, table_id, version_deltas))
216 .expect("repaly error occurred");
217 };
218
219 std::thread::spawn(replay_func)
220}
221
222async fn init_metadata_for_replay(
223 cluster_meta_endpoint: &str,
224 new_meta_endpoint: &str,
225 advertise_addr: &HostAddr,
226 ci_mode: bool,
227 table_id: &mut u32,
228) -> anyhow::Result<()> {
229 tokio::time::sleep(Duration::from_secs(2)).await;
234
235 let meta_config = MetaConfig::default();
236 let meta_client: MetaClient;
237 tokio::select! {
238 _ = tokio::signal::ctrl_c() => {
239 tracing::info!("Ctrl+C received, now exiting");
240 std::process::exit(0);
241 },
242 ret = MetaClient::register_new(cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), Arc::new(meta_config.clone())) => {
243 (meta_client, _) = ret;
244 },
245 }
246 let worker_id = meta_client.worker_id();
247 tracing::info!("Assigned init worker id {}", worker_id);
248 meta_client.activate(advertise_addr).await.unwrap();
249
250 let tables = meta_client.risectl_list_state_tables().await?;
251
252 let (new_meta_client, _) = MetaClient::register_new(
253 new_meta_endpoint.parse()?,
254 WorkerType::RiseCtl,
255 advertise_addr,
256 Default::default(),
257 Arc::new(meta_config.clone()),
258 )
259 .await;
260 new_meta_client.activate(advertise_addr).await.unwrap();
261 if ci_mode {
262 let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap();
263 *table_id = table_to_check.id;
264 }
265
266 new_meta_client
268 .init_metadata_for_replay(tables, vec![])
269 .await?;
270
271 let _ = new_meta_client
273 .get_new_object_ids(SST_ID_SHIFT_COUNT)
274 .await?;
275
276 tracing::info!("Finished initializing the new Meta");
277 Ok(())
278}
279
280async fn pull_version_deltas(
281 cluster_meta_endpoint: &str,
282 advertise_addr: &HostAddr,
283) -> anyhow::Result<Vec<HummockVersionDelta>> {
284 let (meta_client, _) = MetaClient::register_new(
287 cluster_meta_endpoint.parse()?,
288 WorkerType::RiseCtl,
289 advertise_addr,
290 Default::default(),
291 Arc::new(MetaConfig::default()),
292 )
293 .await;
294 let worker_id = meta_client.worker_id();
295 tracing::info!("Assigned pull worker id {}", worker_id);
296 meta_client.activate(advertise_addr).await.unwrap();
297
298 let (handle, shutdown_tx) =
299 MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
300 let res = meta_client
301 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
302 .await
303 .unwrap();
304
305 if let Err(err) = shutdown_tx.send(()) {
306 tracing::warn!("Failed to send shutdown to heartbeat task: {:?}", err);
307 }
308 handle.await?;
309 tracing::info!("Shutdown the pull worker");
310 Ok(res)
311}
312
313async fn start_replay(
314 opts: CompactionTestOpts,
315 table_to_check: u32,
316 version_delta_logs: Vec<HummockVersionDelta>,
317) -> anyhow::Result<()> {
318 let advertise_addr = "127.0.0.1:7770".parse().unwrap();
319 tracing::info!(
320 "Start to replay. Advertise address is {}, Table id {}",
321 advertise_addr,
322 table_to_check
323 );
324
325 let mut metric = CompactionTestMetrics::new();
326 let config = load_config(&opts.config_path_for_meta, NoOverride);
327 tracing::info!(
328 "Starting replay with config {:?} and opts {:?}",
329 config,
330 opts
331 );
332
333 let (meta_client, system_params) = MetaClient::register_new(
336 opts.meta_address.parse()?,
337 WorkerType::RiseCtl,
338 &advertise_addr,
339 Default::default(),
340 Arc::new(config.meta.clone()),
341 )
342 .await;
343 let worker_id = meta_client.worker_id();
344 tracing::info!("Assigned replay worker id {}", worker_id);
345 meta_client.activate(&advertise_addr).await.unwrap();
346
347 let sub_tasks = vec![MetaClient::start_heartbeat_loop(
348 meta_client.clone(),
349 Duration::from_millis(1000),
350 )];
351
352 let latest_version = meta_client.disable_commit_epoch().await?;
354 assert_eq!(FIRST_VERSION_ID, latest_version.id);
355 for level in latest_version.levels.values() {
357 level.levels.iter().for_each(|lvl| {
358 assert!(lvl.table_infos.is_empty());
359 assert_eq!(0, lvl.total_file_size);
360 });
361 }
362
363 let storage_memory_config = extract_storage_memory_config(&config);
365 let storage_opts = Arc::new(StorageOpts::from((
366 &config,
367 &system_params,
368 &storage_memory_config,
369 )));
370 let hummock = create_hummock_store_with_metrics(&meta_client, storage_opts, &opts).await?;
371
372 let mut modified_compaction_groups = HashSet::<CompactionGroupId>::new();
374 let mut replay_count: u64 = 0;
375 let mut replayed_epochs = vec![];
376 let mut check_result_task: Option<tokio::task::JoinHandle<_>> = None;
377
378 for delta in version_delta_logs {
379 let (current_version, compaction_groups) = meta_client.replay_version_delta(delta).await?;
380 let (version_id, committed_epoch) = (
381 current_version.id,
382 current_version
383 .table_committed_epoch(table_to_check.into())
384 .unwrap_or_default(),
385 );
386 tracing::info!(
387 "Replayed version delta version_id: {}, committed_epoch: {}, compaction_groups: {:?}",
388 version_id,
389 committed_epoch,
390 compaction_groups
391 );
392
393 hummock
394 .inner()
395 .update_version_and_wait(current_version.clone())
396 .await;
397
398 replay_count += 1;
399 replayed_epochs.push(committed_epoch);
400 compaction_groups
401 .into_iter()
402 .map(|c| modified_compaction_groups.insert(c))
403 .count();
404
405 if replay_count.is_multiple_of(opts.num_trigger_frequency)
408 && !modified_compaction_groups.is_empty()
409 {
410 if let Some(handle) = check_result_task {
412 handle.await??;
413 }
414
415 metric.num_expect_check += 1;
416
417 replayed_epochs.pop();
419 let mut epochs = vec![committed_epoch];
420 epochs.extend(pin_old_snapshots(&meta_client, &replayed_epochs, 1).into_iter());
421 tracing::info!("===== Prepare to check snapshots: {:?}", epochs);
422
423 let old_version_iters = open_hummock_iters(&hummock, &epochs, table_to_check).await?;
424
425 tracing::info!(
426 "Trigger compaction for version {}, epoch {} compaction_groups: {:?}",
427 version_id,
428 committed_epoch,
429 modified_compaction_groups,
430 );
431 let is_multi_round = opts.num_trigger_rounds > 1;
433 for _ in 0..opts.num_trigger_rounds {
434 meta_client
435 .trigger_compaction_deterministic(
436 version_id,
437 Vec::from_iter(modified_compaction_groups.iter().copied()),
438 )
439 .await?;
440 if is_multi_round {
441 tokio::time::sleep(Duration::from_millis(50)).await;
442 }
443 }
444
445 let old_task_num = meta_client.get_assigned_compact_task_num().await?;
446 let (schedule_ok, version_diff) =
448 poll_compaction_schedule_status(&meta_client, old_task_num).await;
449
450 tracing::info!(
451 "Compaction schedule_ok {}, version_diff {}",
452 schedule_ok,
453 version_diff,
454 );
455 let (compaction_ok, new_version) = poll_compaction_tasks_status(
456 &meta_client,
457 schedule_ok,
458 version_diff as u32,
459 ¤t_version,
460 )
461 .await;
462
463 tracing::info!(
464 "Compaction schedule_ok {}, version_diff {} compaction_ok {}",
465 schedule_ok,
466 version_diff,
467 compaction_ok,
468 );
469
470 let new_version_id = new_version.id;
471 assert!(
472 new_version_id >= version_id,
473 "new_version_id: {}",
474 new_version_id,
475 );
476
477 if new_version_id != version_id {
478 hummock.inner().update_version_and_wait(new_version).await;
479
480 let new_version_iters =
481 open_hummock_iters(&hummock, &epochs, table_to_check).await?;
482
483 check_result_task = Some(tokio::spawn(check_compaction_results(
485 new_version_id,
486 old_version_iters,
487 new_version_iters,
488 )));
489 } else {
490 check_result_task = None;
491 metric.num_uncheck += 1;
492 }
493 modified_compaction_groups.clear();
494 replayed_epochs.clear();
495 }
496 }
497
498 if let Some(handle) = check_result_task {
500 handle.await??;
501 }
502 tracing::info!(
503 "Replay finished. Expect check count: {}, actual check count: {}",
504 metric.num_expect_check,
505 metric.num_expect_check - metric.num_uncheck
506 );
507
508 assert_ne!(0, metric.num_expect_check - metric.num_uncheck);
509
510 for (join_handle, shutdown_sender) in sub_tasks {
511 if let Err(err) = shutdown_sender.send(()) {
512 tracing::warn!("Failed to send shutdown: {:?}", err);
513 continue;
514 }
515 if let Err(err) = join_handle.await {
516 tracing::warn!("Failed to join shutdown: {:?}", err);
517 }
518 }
519
520 Ok(())
521}
522
523fn pin_old_snapshots(
524 _meta_client: &MetaClient,
525 replayed_epochs: &[HummockEpoch],
526 num: usize,
527) -> Vec<HummockEpoch> {
528 let mut old_epochs = vec![];
529 for &epoch in replayed_epochs.iter().rev().take(num) {
530 old_epochs.push(epoch);
531 }
532 old_epochs
533}
534
535async fn poll_compaction_schedule_status(
538 meta_client: &MetaClient,
539 old_task_num: usize,
540) -> (bool, i32) {
541 let poll_timeout = Duration::from_secs(2);
542 let poll_interval = Duration::from_millis(20);
543 let mut poll_duration_cnt = Duration::from_millis(0);
544 let mut new_task_num = meta_client.get_assigned_compact_task_num().await.unwrap();
545 let mut schedule_ok = false;
546 loop {
547 if new_task_num > old_task_num {
549 schedule_ok = true;
550 break;
551 }
552
553 if poll_duration_cnt >= poll_timeout {
554 break;
555 }
556 tokio::time::sleep(poll_interval).await;
557 poll_duration_cnt += poll_interval;
558 new_task_num = meta_client.get_assigned_compact_task_num().await.unwrap();
559 }
560 (
561 schedule_ok,
562 (new_task_num as i32 - old_task_num as i32).abs(),
563 )
564}
565
566async fn poll_compaction_tasks_status(
567 meta_client: &MetaClient,
568 schedule_ok: bool,
569 version_diff: u32,
570 base_version: &HummockVersion,
571) -> (bool, HummockVersion) {
572 let poll_timeout = if schedule_ok {
576 Duration::from_secs(120)
577 } else {
578 Duration::from_secs(5)
579 };
580 let poll_interval = Duration::from_millis(50);
581 let mut duration_cnt = Duration::from_millis(0);
582 let mut compaction_ok = false;
583
584 let mut cur_version = meta_client.get_current_version().await.unwrap();
585 loop {
586 if (cur_version.id > base_version.id)
587 && (cur_version.id - base_version.id >= version_diff as u64)
588 {
589 tracing::info!(
590 "Collected all of compact tasks. Actual version diff {}",
591 cur_version.id - base_version.id
592 );
593 compaction_ok = true;
594 break;
595 }
596 if duration_cnt >= poll_timeout {
597 break;
598 }
599 tokio::time::sleep(poll_interval).await;
600 duration_cnt += poll_interval;
601 cur_version = meta_client.get_current_version().await.unwrap();
602 }
603 (compaction_ok, cur_version)
604}
605
606type StateStoreIterType = Pin<
607 Box<
608 <<MonitoredStateStore<HummockStorage> as StateStore>::ReadSnapshot as StateStoreRead>::Iter,
609 >,
610>;
611
612async fn open_hummock_iters(
613 hummock: &MonitoredStateStore<HummockStorage>,
614 snapshots: &[HummockEpoch],
615 table_id: u32,
616) -> anyhow::Result<BTreeMap<HummockEpoch, StateStoreIterType>> {
617 let mut results = BTreeMap::new();
618
619 let mut buf = BytesMut::with_capacity(5);
622 buf.put_u32(table_id);
623 let b = buf.freeze();
624 let range = (
625 Bound::Included(b.clone()).map(TableKey),
626 Bound::Excluded(Bytes::from(risingwave_hummock_sdk::key::next_key(
627 b.as_ref(),
628 )))
629 .map(TableKey),
630 );
631
632 for &epoch in snapshots {
633 let snapshot = hummock
634 .new_read_snapshot(
635 HummockReadEpoch::NoWait(epoch),
636 NewReadSnapshotOptions {
637 table_id: TableId { table_id },
638 },
639 )
640 .await?;
641 let iter = snapshot
642 .iter(
643 range.clone(),
644 ReadOptions {
645 cache_policy: CachePolicy::Fill(Hint::Normal),
646 ..Default::default()
647 },
648 )
649 .await?;
650 results.insert(epoch, Box::pin(iter));
651 }
652 Ok(results)
653}
654
655pub async fn check_compaction_results(
656 version_id: HummockVersionId,
657 mut expect_results: BTreeMap<HummockEpoch, StateStoreIterType>,
658 mut actual_results: BTreeMap<HummockEpoch, StateStoreIterType>,
659) -> anyhow::Result<()> {
660 let combined = expect_results
661 .iter_mut()
662 .zip_eq_fast(actual_results.iter_mut());
663 for ((e1, expect_iter), (e2, actual_iter)) in combined {
664 assert_eq!(e1, e2);
665 tracing::info!(
666 "Check results for version: id: {}, epoch: {}",
667 version_id,
668 e1,
669 );
670 let mut expect_cnt = 0;
671 let mut actual_cnt = 0;
672
673 while let Some(kv_expect) = expect_iter.try_next().await? {
674 expect_cnt += 1;
675 let ret = actual_iter.try_next().await?;
676 match ret {
677 None => {
678 break;
679 }
680 Some(kv_actual) => {
681 actual_cnt += 1;
682 assert_eq!(kv_expect.0, kv_actual.0, "Key mismatch");
683 assert_eq!(kv_expect.1, kv_actual.1, "Value mismatch");
684 }
685 }
686 }
687 assert_eq!(expect_cnt, actual_cnt);
688 }
689 Ok(())
690}
691
692struct StorageMetrics {
693 pub hummock_metrics: Arc<HummockMetrics>,
694 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
695 pub object_store_metrics: Arc<ObjectStoreMetrics>,
696 pub storage_metrics: Arc<MonitoredStorageMetrics>,
697 pub compactor_metrics: Arc<CompactorMetrics>,
698}
699
700pub async fn create_hummock_store_with_metrics(
701 meta_client: &MetaClient,
702 storage_opts: Arc<StorageOpts>,
703 opts: &CompactionTestOpts,
704) -> anyhow::Result<MonitoredStateStore<HummockStorage>> {
705 let metrics = StorageMetrics {
706 hummock_metrics: Arc::new(HummockMetrics::unused()),
707 state_store_metrics: Arc::new(HummockStateStoreMetrics::unused()),
708 object_store_metrics: Arc::new(ObjectStoreMetrics::unused()),
709 storage_metrics: Arc::new(MonitoredStorageMetrics::unused()),
710 compactor_metrics: Arc::new(CompactorMetrics::unused()),
711 };
712
713 let state_store_impl = StateStoreImpl::new(
714 &opts.state_store,
715 storage_opts,
716 Arc::new(MonitoredHummockMetaClient::new(
717 meta_client.clone(),
718 metrics.hummock_metrics.clone(),
719 )),
720 metrics.state_store_metrics.clone(),
721 metrics.object_store_metrics.clone(),
722 metrics.storage_metrics.clone(),
723 metrics.compactor_metrics.clone(),
724 None,
725 true,
726 )
727 .await?;
728
729 if let Some(hummock_state_store) = state_store_impl.as_hummock() {
730 Ok(hummock_state_store
731 .clone()
732 .monitored(metrics.storage_metrics))
733 } else {
734 Err(anyhow!("only Hummock state store is supported!"))
735 }
736}