1use std::collections::{BTreeMap, HashSet};
16use std::net::SocketAddr;
17use std::ops::Bound;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::thread::JoinHandle;
21use std::time::Duration;
22
23use anyhow::anyhow;
24use bytes::{BufMut, Bytes, BytesMut};
25use clap::Parser;
26use foyer::Hint;
27use risingwave_common::catalog::TableOption;
28use risingwave_common::config::{
29 MetaConfig, NoOverride, extract_storage_memory_config, load_config,
30};
31use risingwave_common::util::addr::HostAddr;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common::util::tokio_util::sync::CancellationToken;
34use risingwave_hummock_sdk::key::TableKey;
35use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
36use risingwave_hummock_sdk::{
37 CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockReadEpoch, HummockVersionId,
38};
39use risingwave_pb::common::WorkerType;
40use risingwave_pb::id::TableId;
41use risingwave_rpc_client::{HummockMetaClient, MetaClient};
42use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
43use risingwave_storage::hummock::{CachePolicy, HummockStorage};
44use risingwave_storage::monitor::{
45 CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
46 MonitoredStorageMetrics, ObjectStoreMetrics,
47};
48use risingwave_storage::opts::StorageOpts;
49use risingwave_storage::store::{NewReadSnapshotOptions, ReadOptions, StateStoreRead};
50use risingwave_storage::{StateStore, StateStoreImpl, StateStoreIter};
51
52const SST_ID_SHIFT_COUNT: u32 = 1000000;
53const CHECKPOINT_FREQ_FOR_REPLAY: u64 = 99999999;
54
55use crate::CompactionTestOpts;
56
57struct CompactionTestMetrics {
58 num_expect_check: u64,
59 num_uncheck: u64,
60}
61
62impl CompactionTestMetrics {
63 fn new() -> CompactionTestMetrics {
64 Self {
65 num_expect_check: 0,
66 num_uncheck: 0,
67 }
68 }
69}
70
71pub async fn compaction_test_main(
80 _listen_addr: SocketAddr,
81 advertise_addr: HostAddr,
82 opts: CompactionTestOpts,
83) -> anyhow::Result<()> {
84 let meta_listen_addr = opts
85 .meta_address
86 .strip_prefix("http://")
87 .unwrap()
88 .to_owned();
89
90 let _meta_handle = tokio::spawn(start_meta_node(
91 meta_listen_addr.clone(),
92 opts.state_store.clone(),
93 opts.config_path_for_meta.clone(),
94 ));
95
96 tokio::time::sleep(Duration::from_secs(1)).await;
98 tracing::info!("Started embedded Meta");
99
100 let (compactor_thrd, compactor_shutdown_tx) = start_compactor_thread(
101 opts.meta_address.clone(),
102 advertise_addr.to_string(),
103 opts.config_path.clone(),
104 );
105
106 let original_meta_endpoint = "http://127.0.0.1:5690";
107 let mut table_id = opts.table_id.into();
108
109 init_metadata_for_replay(
110 original_meta_endpoint,
111 &opts.meta_address,
112 &advertise_addr,
113 opts.ci_mode,
114 &mut table_id,
115 )
116 .await?;
117
118 assert_ne!(table_id, 0, "Invalid table_id for correctness checking");
119
120 let version_deltas = pull_version_deltas(original_meta_endpoint, &advertise_addr).await?;
121
122 tracing::info!(
123 "Pulled delta logs from Meta: len(logs): {}",
124 version_deltas.len()
125 );
126
127 let replay_thrd = start_replay_thread(opts, table_id, version_deltas);
128 replay_thrd.join().unwrap();
129 compactor_shutdown_tx.send(()).unwrap();
130 compactor_thrd.join().unwrap();
131 Ok(())
132}
133
134pub async fn start_meta_node(listen_addr: String, state_store: String, config_path: String) {
135 let meta_opts = risingwave_meta_node::MetaNodeOpts::parse_from([
136 "meta-node",
137 "--listen-addr",
138 &listen_addr,
139 "--advertise-addr",
140 &listen_addr,
141 "--backend",
142 "mem",
143 "--state-store",
144 &state_store,
145 "--config-path",
146 &config_path,
147 ]);
148 let config = load_config(&meta_opts.config_path, &meta_opts);
149 assert_eq!(
152 CHECKPOINT_FREQ_FOR_REPLAY,
153 config.system.checkpoint_frequency.unwrap()
154 );
155 assert!(
156 config.meta.enable_compaction_deterministic,
157 "enable_compaction_deterministic should be set"
158 );
159
160 risingwave_meta_node::start(meta_opts, CancellationToken::new() ).await
161}
162
163async fn start_compactor_node(
164 meta_rpc_endpoint: String,
165 advertise_addr: String,
166 config_path: String,
167) {
168 let opts = risingwave_compactor::CompactorOpts::parse_from([
169 "compactor-node",
170 "--listen-addr",
171 "127.0.0.1:5550",
172 "--advertise-addr",
173 &advertise_addr,
174 "--meta-address",
175 &meta_rpc_endpoint,
176 "--config-path",
177 &config_path,
178 ]);
179 risingwave_compactor::start(opts, CancellationToken::new() ).await
180}
181
182pub fn start_compactor_thread(
183 meta_endpoint: String,
184 advertise_addr: String,
185 config_path: String,
186) -> (JoinHandle<()>, std::sync::mpsc::Sender<()>) {
187 let (tx, rx) = std::sync::mpsc::channel();
188 let compact_func = move || {
189 let runtime = tokio::runtime::Builder::new_multi_thread()
190 .enable_all()
191 .build()
192 .unwrap();
193 runtime.block_on(async {
194 tokio::spawn(async {
195 tracing::info!("Starting compactor node");
196 start_compactor_node(meta_endpoint, advertise_addr, config_path).await
197 });
198 rx.recv().unwrap();
199 });
200 };
201
202 (std::thread::spawn(compact_func), tx)
203}
204
205fn start_replay_thread(
206 opts: CompactionTestOpts,
207 table_id: TableId,
208 version_deltas: Vec<HummockVersionDelta>,
209) -> JoinHandle<()> {
210 let replay_func = move || {
211 let runtime = tokio::runtime::Builder::new_current_thread()
212 .enable_all()
213 .build()
214 .unwrap();
215 runtime
216 .block_on(start_replay(opts, table_id, version_deltas))
217 .expect("repaly error occurred");
218 };
219
220 std::thread::spawn(replay_func)
221}
222
223async fn init_metadata_for_replay(
224 cluster_meta_endpoint: &str,
225 new_meta_endpoint: &str,
226 advertise_addr: &HostAddr,
227 ci_mode: bool,
228 table_id: &mut TableId,
229) -> anyhow::Result<()> {
230 tokio::time::sleep(Duration::from_secs(2)).await;
235
236 let meta_config = MetaConfig::default();
237 let meta_client: MetaClient;
238 tokio::select! {
239 _ = tokio::signal::ctrl_c() => {
240 tracing::info!("Ctrl+C received, now exiting");
241 std::process::exit(0);
242 },
243 ret = MetaClient::register_new(cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), Arc::new(meta_config.clone())) => {
244 (meta_client, _) = ret;
245 },
246 }
247 let worker_id = meta_client.worker_id();
248 tracing::info!("Assigned init worker id {}", worker_id);
249 meta_client.activate(advertise_addr).await.unwrap();
250
251 let tables = meta_client.risectl_list_state_tables().await?;
252
253 let (new_meta_client, _) = MetaClient::register_new(
254 new_meta_endpoint.parse()?,
255 WorkerType::RiseCtl,
256 advertise_addr,
257 Default::default(),
258 Arc::new(meta_config.clone()),
259 )
260 .await;
261 new_meta_client.activate(advertise_addr).await.unwrap();
262 if ci_mode {
263 let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap();
264 *table_id = table_to_check.id;
265 }
266
267 new_meta_client
269 .init_metadata_for_replay(tables, vec![])
270 .await?;
271
272 let _ = new_meta_client
274 .get_new_object_ids(SST_ID_SHIFT_COUNT)
275 .await?;
276
277 tracing::info!("Finished initializing the new Meta");
278 Ok(())
279}
280
281async fn pull_version_deltas(
282 cluster_meta_endpoint: &str,
283 advertise_addr: &HostAddr,
284) -> anyhow::Result<Vec<HummockVersionDelta>> {
285 let (meta_client, _) = MetaClient::register_new(
288 cluster_meta_endpoint.parse()?,
289 WorkerType::RiseCtl,
290 advertise_addr,
291 Default::default(),
292 Arc::new(MetaConfig::default()),
293 )
294 .await;
295 let worker_id = meta_client.worker_id();
296 tracing::info!("Assigned pull worker id {}", worker_id);
297 meta_client.activate(advertise_addr).await.unwrap();
298
299 let (handle, shutdown_tx) =
300 MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
301 let res = meta_client
302 .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
303 .await
304 .unwrap();
305
306 if let Err(err) = shutdown_tx.send(()) {
307 tracing::warn!("Failed to send shutdown to heartbeat task: {:?}", err);
308 }
309 handle.await?;
310 tracing::info!("Shutdown the pull worker");
311 Ok(res)
312}
313
314async fn start_replay(
315 opts: CompactionTestOpts,
316 table_to_check: TableId,
317 version_delta_logs: Vec<HummockVersionDelta>,
318) -> anyhow::Result<()> {
319 let advertise_addr = "127.0.0.1:7770".parse().unwrap();
320 tracing::info!(
321 "Start to replay. Advertise address is {}, Table id {}",
322 advertise_addr,
323 table_to_check
324 );
325
326 let mut metric = CompactionTestMetrics::new();
327 let config = load_config(&opts.config_path_for_meta, NoOverride);
328 tracing::info!(
329 "Starting replay with config {:?} and opts {:?}",
330 config,
331 opts
332 );
333
334 let (meta_client, system_params) = MetaClient::register_new(
337 opts.meta_address.parse()?,
338 WorkerType::RiseCtl,
339 &advertise_addr,
340 Default::default(),
341 Arc::new(config.meta.clone()),
342 )
343 .await;
344 let worker_id = meta_client.worker_id();
345 tracing::info!("Assigned replay worker id {}", worker_id);
346 meta_client.activate(&advertise_addr).await.unwrap();
347
348 let sub_tasks = vec![MetaClient::start_heartbeat_loop(
349 meta_client.clone(),
350 Duration::from_millis(1000),
351 )];
352
353 let latest_version = meta_client.disable_commit_epoch().await?;
355 assert_eq!(FIRST_VERSION_ID, latest_version.id);
356 for level in latest_version.levels.values() {
358 level.levels.iter().for_each(|lvl| {
359 assert!(lvl.table_infos.is_empty());
360 assert_eq!(0, lvl.total_file_size);
361 });
362 }
363
364 let storage_memory_config = extract_storage_memory_config(&config);
366 let storage_opts = Arc::new(StorageOpts::from((
367 &config,
368 &system_params,
369 &storage_memory_config,
370 )));
371 let hummock = create_hummock_store_with_metrics(&meta_client, storage_opts, &opts).await?;
372
373 let mut modified_compaction_groups = HashSet::<CompactionGroupId>::new();
375 let mut replay_count: u64 = 0;
376 let mut replayed_epochs = vec![];
377 let mut check_result_task: Option<tokio::task::JoinHandle<_>> = None;
378
379 for delta in version_delta_logs {
380 let (current_version, compaction_groups) = meta_client.replay_version_delta(delta).await?;
381 let (version_id, committed_epoch) = (
382 current_version.id,
383 current_version
384 .table_committed_epoch(table_to_check)
385 .unwrap_or_default(),
386 );
387 tracing::info!(
388 "Replayed version delta version_id: {}, committed_epoch: {}, compaction_groups: {:?}",
389 version_id,
390 committed_epoch,
391 compaction_groups
392 );
393
394 hummock
395 .inner()
396 .update_version_and_wait(current_version.clone())
397 .await;
398
399 replay_count += 1;
400 replayed_epochs.push(committed_epoch);
401 compaction_groups
402 .into_iter()
403 .map(|c| modified_compaction_groups.insert(c))
404 .count();
405
406 if replay_count.is_multiple_of(opts.num_trigger_frequency)
409 && !modified_compaction_groups.is_empty()
410 {
411 if let Some(handle) = check_result_task {
413 handle.await??;
414 }
415
416 metric.num_expect_check += 1;
417
418 replayed_epochs.pop();
420 let mut epochs = vec![committed_epoch];
421 epochs.extend(pin_old_snapshots(&meta_client, &replayed_epochs, 1).into_iter());
422 tracing::info!("===== Prepare to check snapshots: {:?}", epochs);
423
424 let old_version_iters = open_hummock_iters(&hummock, &epochs, table_to_check).await?;
425
426 tracing::info!(
427 "Trigger compaction for version {}, epoch {} compaction_groups: {:?}",
428 version_id,
429 committed_epoch,
430 modified_compaction_groups,
431 );
432 let is_multi_round = opts.num_trigger_rounds > 1;
434 for _ in 0..opts.num_trigger_rounds {
435 meta_client
436 .trigger_compaction_deterministic(
437 version_id,
438 Vec::from_iter(modified_compaction_groups.iter().copied()),
439 )
440 .await?;
441 if is_multi_round {
442 tokio::time::sleep(Duration::from_millis(50)).await;
443 }
444 }
445
446 let old_task_num = meta_client.get_assigned_compact_task_num().await?;
447 let (schedule_ok, version_diff) =
449 poll_compaction_schedule_status(&meta_client, old_task_num).await;
450
451 tracing::info!(
452 "Compaction schedule_ok {}, version_diff {}",
453 schedule_ok,
454 version_diff,
455 );
456 let (compaction_ok, new_version) = poll_compaction_tasks_status(
457 &meta_client,
458 schedule_ok,
459 version_diff as u32,
460 ¤t_version,
461 )
462 .await;
463
464 tracing::info!(
465 "Compaction schedule_ok {}, version_diff {} compaction_ok {}",
466 schedule_ok,
467 version_diff,
468 compaction_ok,
469 );
470
471 let new_version_id = new_version.id;
472 assert!(
473 new_version_id >= version_id,
474 "new_version_id: {}",
475 new_version_id,
476 );
477
478 if new_version_id != version_id {
479 hummock.inner().update_version_and_wait(new_version).await;
480
481 let new_version_iters =
482 open_hummock_iters(&hummock, &epochs, table_to_check).await?;
483
484 check_result_task = Some(tokio::spawn(check_compaction_results(
486 new_version_id,
487 old_version_iters,
488 new_version_iters,
489 )));
490 } else {
491 check_result_task = None;
492 metric.num_uncheck += 1;
493 }
494 modified_compaction_groups.clear();
495 replayed_epochs.clear();
496 }
497 }
498
499 if let Some(handle) = check_result_task {
501 handle.await??;
502 }
503 tracing::info!(
504 "Replay finished. Expect check count: {}, actual check count: {}",
505 metric.num_expect_check,
506 metric.num_expect_check - metric.num_uncheck
507 );
508
509 assert_ne!(0, metric.num_expect_check - metric.num_uncheck);
510
511 for (join_handle, shutdown_sender) in sub_tasks {
512 if let Err(err) = shutdown_sender.send(()) {
513 tracing::warn!("Failed to send shutdown: {:?}", err);
514 continue;
515 }
516 if let Err(err) = join_handle.await {
517 tracing::warn!("Failed to join shutdown: {:?}", err);
518 }
519 }
520
521 Ok(())
522}
523
524fn pin_old_snapshots(
525 _meta_client: &MetaClient,
526 replayed_epochs: &[HummockEpoch],
527 num: usize,
528) -> Vec<HummockEpoch> {
529 let mut old_epochs = vec![];
530 for &epoch in replayed_epochs.iter().rev().take(num) {
531 old_epochs.push(epoch);
532 }
533 old_epochs
534}
535
536async fn poll_compaction_schedule_status(
539 meta_client: &MetaClient,
540 old_task_num: usize,
541) -> (bool, i32) {
542 let poll_timeout = Duration::from_secs(2);
543 let poll_interval = Duration::from_millis(20);
544 let mut poll_duration_cnt = Duration::from_millis(0);
545 let mut new_task_num = meta_client.get_assigned_compact_task_num().await.unwrap();
546 let mut schedule_ok = false;
547 loop {
548 if new_task_num > old_task_num {
550 schedule_ok = true;
551 break;
552 }
553
554 if poll_duration_cnt >= poll_timeout {
555 break;
556 }
557 tokio::time::sleep(poll_interval).await;
558 poll_duration_cnt += poll_interval;
559 new_task_num = meta_client.get_assigned_compact_task_num().await.unwrap();
560 }
561 (
562 schedule_ok,
563 (new_task_num as i32 - old_task_num as i32).abs(),
564 )
565}
566
567async fn poll_compaction_tasks_status(
568 meta_client: &MetaClient,
569 schedule_ok: bool,
570 version_diff: u32,
571 base_version: &HummockVersion,
572) -> (bool, HummockVersion) {
573 let poll_timeout = if schedule_ok {
577 Duration::from_secs(120)
578 } else {
579 Duration::from_secs(5)
580 };
581 let poll_interval = Duration::from_millis(50);
582 let mut duration_cnt = Duration::from_millis(0);
583 let mut compaction_ok = false;
584
585 let mut cur_version = meta_client.get_current_version().await.unwrap();
586 loop {
587 if (cur_version.id > base_version.id)
588 && (cur_version.id - base_version.id >= version_diff as u64)
589 {
590 tracing::info!(
591 "Collected all of compact tasks. Actual version diff {}",
592 cur_version.id - base_version.id
593 );
594 compaction_ok = true;
595 break;
596 }
597 if duration_cnt >= poll_timeout {
598 break;
599 }
600 tokio::time::sleep(poll_interval).await;
601 duration_cnt += poll_interval;
602 cur_version = meta_client.get_current_version().await.unwrap();
603 }
604 (compaction_ok, cur_version)
605}
606
607type StateStoreIterType = Pin<
608 Box<
609 <<MonitoredStateStore<HummockStorage> as StateStore>::ReadSnapshot as StateStoreRead>::Iter,
610 >,
611>;
612
613async fn open_hummock_iters(
614 hummock: &MonitoredStateStore<HummockStorage>,
615 snapshots: &[HummockEpoch],
616 table_id: TableId,
617) -> anyhow::Result<BTreeMap<HummockEpoch, StateStoreIterType>> {
618 let mut results = BTreeMap::new();
619
620 let mut buf = BytesMut::with_capacity(5);
623 buf.put_u32(table_id.as_raw_id());
624 let b = buf.freeze();
625 let range = (
626 Bound::Included(b.clone()).map(TableKey),
627 Bound::Excluded(Bytes::from(risingwave_hummock_sdk::key::next_key(
628 b.as_ref(),
629 )))
630 .map(TableKey),
631 );
632
633 for &epoch in snapshots {
634 let snapshot = hummock
635 .new_read_snapshot(
636 HummockReadEpoch::NoWait(epoch),
637 NewReadSnapshotOptions {
638 table_id,
639 table_option: TableOption::default(),
640 },
641 )
642 .await?;
643 let iter = snapshot
644 .iter(
645 range.clone(),
646 ReadOptions {
647 cache_policy: CachePolicy::Fill(Hint::Normal),
648 ..Default::default()
649 },
650 )
651 .await?;
652 results.insert(epoch, Box::pin(iter));
653 }
654 Ok(results)
655}
656
657pub async fn check_compaction_results(
658 version_id: HummockVersionId,
659 mut expect_results: BTreeMap<HummockEpoch, StateStoreIterType>,
660 mut actual_results: BTreeMap<HummockEpoch, StateStoreIterType>,
661) -> anyhow::Result<()> {
662 let combined = expect_results
663 .iter_mut()
664 .zip_eq_fast(actual_results.iter_mut());
665 for ((e1, expect_iter), (e2, actual_iter)) in combined {
666 assert_eq!(e1, e2);
667 tracing::info!(
668 "Check results for version: id: {}, epoch: {}",
669 version_id,
670 e1,
671 );
672 let mut expect_cnt = 0;
673 let mut actual_cnt = 0;
674
675 while let Some(kv_expect) = expect_iter.try_next().await? {
676 expect_cnt += 1;
677 let ret = actual_iter.try_next().await?;
678 match ret {
679 None => {
680 break;
681 }
682 Some(kv_actual) => {
683 actual_cnt += 1;
684 assert_eq!(kv_expect.0, kv_actual.0, "Key mismatch");
685 assert_eq!(kv_expect.1, kv_actual.1, "Value mismatch");
686 }
687 }
688 }
689 assert_eq!(expect_cnt, actual_cnt);
690 }
691 Ok(())
692}
693
694struct StorageMetrics {
695 pub hummock_metrics: Arc<HummockMetrics>,
696 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
697 pub object_store_metrics: Arc<ObjectStoreMetrics>,
698 pub storage_metrics: Arc<MonitoredStorageMetrics>,
699 pub compactor_metrics: Arc<CompactorMetrics>,
700}
701
702pub async fn create_hummock_store_with_metrics(
703 meta_client: &MetaClient,
704 storage_opts: Arc<StorageOpts>,
705 opts: &CompactionTestOpts,
706) -> anyhow::Result<MonitoredStateStore<HummockStorage>> {
707 let metrics = StorageMetrics {
708 hummock_metrics: Arc::new(HummockMetrics::unused()),
709 state_store_metrics: Arc::new(HummockStateStoreMetrics::unused()),
710 object_store_metrics: Arc::new(ObjectStoreMetrics::unused()),
711 storage_metrics: Arc::new(MonitoredStorageMetrics::unused()),
712 compactor_metrics: Arc::new(CompactorMetrics::unused()),
713 };
714
715 let state_store_impl = StateStoreImpl::new(
716 &opts.state_store,
717 storage_opts,
718 Arc::new(MonitoredHummockMetaClient::new(
719 meta_client.clone(),
720 metrics.hummock_metrics.clone(),
721 )),
722 metrics.state_store_metrics.clone(),
723 metrics.object_store_metrics.clone(),
724 metrics.storage_metrics.clone(),
725 metrics.compactor_metrics.clone(),
726 None,
727 true,
728 )
729 .await?;
730
731 if let Some(hummock_state_store) = state_store_impl.as_hummock() {
732 Ok(hummock_state_store
733 .clone()
734 .monitored(metrics.storage_metrics))
735 } else {
736 Err(anyhow!("only Hummock state store is supported!"))
737 }
738}