1use std::collections::{HashMap, HashSet, VecDeque};
16use std::pin::pin;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20use std::time::Duration;
21
22use arc_swap::ArcSwap;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::FutureExt;
25use itertools::Itertools;
26use parking_lot::RwLock;
27use prometheus::{Histogram, IntGauge};
28use risingwave_common::catalog::TableId;
29use risingwave_common::metrics::UintGauge;
30use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::version::{HummockVersionCommon, LocalHummockVersionDelta};
33use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
34use tokio::spawn;
35use tokio::sync::mpsc::error::SendError;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
37use tokio::sync::oneshot;
38use tracing::{debug, error, info, trace, warn};
39
40use super::refiller::{CacheRefillConfig, CacheRefiller};
41use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
44use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
45use crate::hummock::event_handler::uploader::{
46 HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
47};
48use crate::hummock::event_handler::{
49 HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
50 ReadOnlyRwLockRef,
51};
52use crate::hummock::local_version::pinned_version::PinnedVersion;
53use crate::hummock::local_version::recent_versions::RecentVersions;
54use crate::hummock::store::version::{
55 HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate,
56};
57use crate::hummock::{HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef};
58use crate::mem_table::ImmutableMemtable;
59use crate::monitor::HummockStateStoreMetrics;
60use crate::opts::StorageOpts;
61
62#[derive(Clone)]
63pub struct BufferTracker {
64 flush_threshold: usize,
65 min_batch_flush_size: usize,
66 global_buffer: Arc<MemoryLimiter>,
67 global_upload_task_size: UintGauge,
68}
69
70impl BufferTracker {
71 pub fn from_storage_opts(config: &StorageOpts, global_upload_task_size: UintGauge) -> Self {
72 let capacity = config.shared_buffer_capacity_mb * (1 << 20);
73 let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
74 let shared_buffer_min_batch_flush_size =
75 config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
76 assert!(
77 flush_threshold < capacity,
78 "flush_threshold {} should be less or equal to capacity {}",
79 flush_threshold,
80 capacity
81 );
82 Self::new(
83 capacity,
84 flush_threshold,
85 global_upload_task_size,
86 shared_buffer_min_batch_flush_size,
87 )
88 }
89
90 #[cfg(test)]
91 fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
92 Self::new(
93 usize::MAX,
94 flush_threshold,
95 UintGauge::new("test", "test").unwrap(),
96 min_batch_flush_size,
97 )
98 }
99
100 fn new(
101 capacity: usize,
102 flush_threshold: usize,
103 global_upload_task_size: UintGauge,
104 min_batch_flush_size: usize,
105 ) -> Self {
106 assert!(capacity >= flush_threshold);
107 Self {
108 flush_threshold,
109 global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)),
110 global_upload_task_size,
111 min_batch_flush_size,
112 }
113 }
114
115 pub fn for_test() -> Self {
116 Self::from_storage_opts(
117 &StorageOpts::default(),
118 UintGauge::new("test", "test").unwrap(),
119 )
120 }
121
122 pub fn get_buffer_size(&self) -> usize {
123 self.global_buffer.get_memory_usage() as usize
124 }
125
126 pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
127 &self.global_buffer
128 }
129
130 pub fn global_upload_task_size(&self) -> &UintGauge {
131 &self.global_upload_task_size
132 }
133
134 pub fn need_flush(&self) -> bool {
137 self.get_buffer_size() > self.flush_threshold + self.global_upload_task_size.get() as usize
138 }
139
140 pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
141 curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
142 }
143
144 #[cfg(test)]
145 pub(crate) fn flush_threshold(&self) -> usize {
146 self.flush_threshold
147 }
148}
149
150#[derive(Clone)]
151pub struct HummockEventSender {
152 inner: UnboundedSender<HummockEvent>,
153 event_count: IntGauge,
154}
155
156pub fn event_channel(event_count: IntGauge) -> (HummockEventSender, HummockEventReceiver) {
157 let (tx, rx) = unbounded_channel();
158 (
159 HummockEventSender {
160 inner: tx,
161 event_count: event_count.clone(),
162 },
163 HummockEventReceiver {
164 inner: rx,
165 event_count,
166 },
167 )
168}
169
170impl HummockEventSender {
171 pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
172 self.inner.send(event)?;
173 self.event_count.inc();
174 Ok(())
175 }
176}
177
178pub struct HummockEventReceiver {
179 inner: UnboundedReceiver<HummockEvent>,
180 event_count: IntGauge,
181}
182
183impl HummockEventReceiver {
184 async fn recv(&mut self) -> Option<HummockEvent> {
185 let event = self.inner.recv().await?;
186 self.event_count.dec();
187 Some(event)
188 }
189}
190
191struct HummockEventHandlerMetrics {
192 event_handler_on_upload_finish_latency: Histogram,
193 event_handler_on_apply_version_update: Histogram,
194 event_handler_on_recv_version_update: Histogram,
195}
196
197pub struct HummockEventHandler {
198 hummock_event_tx: HummockEventSender,
199 hummock_event_rx: HummockEventReceiver,
200 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
201 read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
202 local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
204
205 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
206 recent_versions: Arc<ArcSwap<RecentVersions>>,
207
208 uploader: HummockUploader,
209 refiller: CacheRefiller,
210
211 last_instance_id: LocalInstanceId,
212
213 metrics: HummockEventHandlerMetrics,
214}
215
216async fn flush_imms(
217 payload: Vec<ImmutableMemtable>,
218 compactor_context: CompactorContext,
219 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
220 sstable_object_id_manager: Arc<SstableObjectIdManager>,
221) -> HummockResult<UploadTaskOutput> {
222 compact(
223 compactor_context,
224 sstable_object_id_manager,
225 payload,
226 compaction_catalog_manager_ref,
227 )
228 .instrument_await("shared_buffer_compact".verbose())
229 .await
230}
231
232impl HummockEventHandler {
233 pub fn new(
234 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
235 pinned_version: PinnedVersion,
236 compactor_context: CompactorContext,
237 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
238 sstable_object_id_manager: Arc<SstableObjectIdManager>,
239 state_store_metrics: Arc<HummockStateStoreMetrics>,
240 ) -> Self {
241 let upload_compactor_context = compactor_context.clone();
242 let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
243 let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
244 let recent_versions = RecentVersions::new(
245 pinned_version,
246 compactor_context
247 .storage_opts
248 .max_cached_recent_versions_number,
249 state_store_metrics.clone(),
250 );
251 let buffer_tracker = BufferTracker::from_storage_opts(
252 &compactor_context.storage_opts,
253 state_store_metrics.uploader_uploading_task_size.clone(),
254 );
255 Self::new_inner(
256 version_update_rx,
257 compactor_context.sstable_store.clone(),
258 state_store_metrics,
259 CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
260 recent_versions,
261 buffer_tracker,
262 Arc::new(move |payload, task_info| {
263 static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
264 LazyLock::new(|| AtomicUsize::new(0));
265 let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
266 let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
267 reg.register(
268 await_tree_key::SpawnUploadTask { id: upload_task_id },
269 format!("Spawn Upload Task: {}", task_info),
270 )
271 });
272 let upload_task_latency = upload_task_latency.clone();
273 let wait_poll_latency = wait_poll_latency.clone();
274 let upload_compactor_context = upload_compactor_context.clone();
275 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
276 let sstable_object_id_manager = sstable_object_id_manager.clone();
277 spawn({
278 let future = async move {
279 let _timer = upload_task_latency.start_timer();
280 let mut output = flush_imms(
281 payload
282 .into_values()
283 .flat_map(|imms| imms.into_iter())
284 .collect(),
285 upload_compactor_context.clone(),
286 compaction_catalog_manager_ref.clone(),
287 sstable_object_id_manager.clone(),
288 )
289 .await?;
290 assert!(
291 output
292 .wait_poll_timer
293 .replace(wait_poll_latency.start_timer())
294 .is_none(),
295 "should not set timer before"
296 );
297 Ok(output)
298 };
299 if let Some(tree_root) = tree_root {
300 tree_root.instrument(future).left_future()
301 } else {
302 future.right_future()
303 }
304 })
305 }),
306 CacheRefiller::default_spawn_refill_task(),
307 )
308 }
309
310 fn new_inner(
311 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
312 sstable_store: SstableStoreRef,
313 state_store_metrics: Arc<HummockStateStoreMetrics>,
314 refill_config: CacheRefillConfig,
315 recent_versions: RecentVersions,
316 buffer_tracker: BufferTracker,
317 spawn_upload_task: SpawnUploadTask,
318 spawn_refill_task: SpawnRefillTask,
319 ) -> Self {
320 let (hummock_event_tx, hummock_event_rx) =
321 event_channel(state_store_metrics.event_handler_pending_event.clone());
322 let (version_update_notifier_tx, _) =
323 tokio::sync::watch::channel(recent_versions.latest_version().clone());
324 let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
325 let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
326
327 let metrics = HummockEventHandlerMetrics {
328 event_handler_on_upload_finish_latency: state_store_metrics
329 .event_handler_latency
330 .with_label_values(&["on_upload_finish"]),
331 event_handler_on_apply_version_update: state_store_metrics
332 .event_handler_latency
333 .with_label_values(&["apply_version"]),
334 event_handler_on_recv_version_update: state_store_metrics
335 .event_handler_latency
336 .with_label_values(&["recv_version_update"]),
337 };
338
339 let uploader = HummockUploader::new(
340 state_store_metrics.clone(),
341 recent_versions.latest_version().clone(),
342 spawn_upload_task,
343 buffer_tracker,
344 );
345 let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
346
347 Self {
348 hummock_event_tx,
349 hummock_event_rx,
350 version_update_rx,
351 version_update_notifier_tx,
352 recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
353 read_version_mapping,
354 local_read_version_mapping: Default::default(),
355 uploader,
356 refiller,
357 last_instance_id: 0,
358 metrics,
359 }
360 }
361
362 pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
363 self.version_update_notifier_tx.clone()
364 }
365
366 pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
367 self.recent_versions.clone()
368 }
369
370 pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
371 ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
372 }
373
374 pub fn event_sender(&self) -> HummockEventSender {
375 self.hummock_event_tx.clone()
376 }
377
378 pub fn buffer_tracker(&self) -> &BufferTracker {
379 self.uploader.buffer_tracker()
380 }
381}
382
383impl HummockEventHandler {
385 fn for_each_read_version(
388 &self,
389 instances: impl IntoIterator<Item = LocalInstanceId>,
390 mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
391 ) {
392 let instances = {
393 #[cfg(debug_assertions)]
394 {
395 let mut id_set = std::collections::HashSet::new();
397 for instance in instances {
398 assert!(id_set.insert(instance));
399 }
400 id_set
401 }
402 #[cfg(not(debug_assertions))]
403 {
404 instances
405 }
406 };
407 let mut pending = VecDeque::new();
408 let mut total_count = 0;
409 for instance_id in instances {
410 let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
411 continue;
412 };
413 total_count += 1;
414 match read_version.try_write() {
415 Some(mut write_guard) => {
416 f(instance_id, &mut write_guard);
417 }
418 _ => {
419 pending.push_back(instance_id);
420 }
421 }
422 }
423 if !pending.is_empty() {
424 if pending.len() * 10 > total_count {
425 warn!(
427 pending_count = pending.len(),
428 total_count, "cannot acquire lock for all read version"
429 );
430 } else {
431 debug!(
432 pending_count = pending.len(),
433 total_count, "cannot acquire lock for all read version"
434 );
435 }
436 }
437
438 const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
439
440 while let Some(instance_id) = pending.pop_front() {
441 let (_, read_version) = self
442 .local_read_version_mapping
443 .get(&instance_id)
444 .expect("have checked exist before");
445 match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
446 Some(mut write_guard) => {
447 f(instance_id, &mut write_guard);
448 }
449 _ => {
450 warn!(instance_id, "failed to get lock again for instance");
451 pending.push_back(instance_id);
452 }
453 }
454 }
455 }
456
457 fn handle_uploaded_sst_inner(&mut self, staging_sstable_info: Arc<StagingSstableInfo>) {
458 trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
459 self.for_each_read_version(
460 staging_sstable_info.imm_ids().keys().cloned(),
461 |_, read_version| {
462 read_version.update(VersionUpdate::Staging(StagingData::Sst(
463 staging_sstable_info.clone(),
464 )))
465 },
466 )
467 }
468
469 fn handle_sync_epoch(
470 &mut self,
471 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
472 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
473 ) {
474 debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
475 self.uploader
476 .start_sync_epoch(sync_result_sender, sync_table_epochs);
477 }
478
479 fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
480 info!(
481 current_version_id = ?self.uploader.hummock_version().id(),
482 "handle clear event"
483 );
484
485 self.uploader.clear(table_ids.clone());
486
487 if table_ids.is_none() {
488 assert!(
489 self.local_read_version_mapping.is_empty(),
490 "read version mapping not empty when clear. remaining tables: {:?}",
491 self.local_read_version_mapping
492 .values()
493 .map(|(_, read_version)| read_version.read().table_id())
494 .collect_vec()
495 );
496 }
497
498 let _ = notifier.send(()).inspect_err(|e| {
500 error!("failed to notify completion of clear event: {:?}", e);
501 });
502
503 info!("clear finished");
504 }
505
506 fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
507 let _timer = self
508 .metrics
509 .event_handler_on_recv_version_update
510 .start_timer();
511 let pinned_version = self
512 .refiller
513 .last_new_pinned_version()
514 .cloned()
515 .unwrap_or_else(|| self.uploader.hummock_version().clone());
516
517 let mut sst_delta_infos = vec![];
518 if let Some(new_pinned_version) = Self::resolve_version_update_info(
519 &pinned_version,
520 version_payload,
521 Some(&mut sst_delta_infos),
522 ) {
523 self.refiller
524 .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
525 }
526 }
527
528 fn resolve_version_update_info(
529 pinned_version: &PinnedVersion,
530 version_payload: HummockVersionUpdate,
531 mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
532 ) -> Option<PinnedVersion> {
533 match version_payload {
534 HummockVersionUpdate::VersionDeltas(version_deltas) => {
535 let mut version_to_apply = (**pinned_version).clone();
536 {
537 let mut table_change_log_to_apply_guard =
538 pinned_version.table_change_log_write_lock();
539 for version_delta in version_deltas {
540 assert_eq!(version_to_apply.id, version_delta.prev_id);
541
542 {
544 let mut state_table_info = version_to_apply.state_table_info.clone();
545 let (changed_table_info, _is_commit_epoch) = state_table_info
546 .apply_delta(
547 &version_delta.state_table_info_delta,
548 &version_delta.removed_table_ids,
549 );
550
551 HummockVersionCommon::<SstableInfo>::apply_change_log_delta(
552 &mut *table_change_log_to_apply_guard,
553 &version_delta.change_log_delta,
554 &version_delta.removed_table_ids,
555 &version_delta.state_table_info_delta,
556 &changed_table_info,
557 );
558 }
559
560 let local_hummock_version_delta =
561 LocalHummockVersionDelta::from(version_delta);
562 if let Some(sst_delta_infos) = &mut sst_delta_infos {
563 sst_delta_infos.extend(
564 version_to_apply
565 .build_sst_delta_infos(&local_hummock_version_delta)
566 .into_iter(),
567 );
568 }
569
570 version_to_apply.apply_version_delta(&local_hummock_version_delta);
571 }
572 }
573
574 pinned_version.new_with_local_version(version_to_apply)
575 }
576 HummockVersionUpdate::PinnedVersion(version) => {
577 pinned_version.new_pin_version(*version)
578 }
579 }
580 }
581
582 fn apply_version_update(
583 &mut self,
584 pinned_version: PinnedVersion,
585 new_pinned_version: PinnedVersion,
586 ) {
587 let _timer = self
588 .metrics
589 .event_handler_on_apply_version_update
590 .start_timer();
591 self.recent_versions.rcu(|prev_recent_versions| {
592 prev_recent_versions.with_new_version(new_pinned_version.clone())
593 });
594
595 {
596 self.for_each_read_version(
597 self.local_read_version_mapping.keys().cloned(),
598 |_, read_version| {
599 read_version
600 .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
601 },
602 );
603 }
604
605 self.version_update_notifier_tx.send_if_modified(|state| {
606 assert_eq!(pinned_version.id(), state.id());
607 if state.id() == new_pinned_version.id() {
608 return false;
609 }
610 assert!(new_pinned_version.id() > state.id());
611 *state = new_pinned_version.clone();
612 true
613 });
614
615 debug!("update to hummock version: {}", new_pinned_version.id(),);
616
617 self.uploader.update_pinned_version(new_pinned_version);
618 }
619}
620
621impl HummockEventHandler {
622 pub async fn start_hummock_event_handler_worker(mut self) {
623 loop {
624 tokio::select! {
625 sst = self.uploader.next_uploaded_sst() => {
626 self.handle_uploaded_sst(sst);
627 }
628 event = self.refiller.next_event() => {
629 let CacheRefillerEvent {pinned_version, new_pinned_version } = event;
630 self.apply_version_update(pinned_version, new_pinned_version);
631 }
632 event = pin!(self.hummock_event_rx.recv()) => {
633 let Some(event) = event else { break };
634 match event {
635 HummockEvent::Shutdown => {
636 info!("event handler shutdown");
637 return;
638 },
639 event => {
640 self.handle_hummock_event(event);
641 }
642 }
643 }
644 version_update = pin!(self.version_update_rx.recv()) => {
645 let Some(version_update) = version_update else {
646 warn!("version update stream ends. event handle shutdown");
647 return;
648 };
649 self.handle_version_update(version_update);
650 }
651 }
652 }
653 }
654
655 fn handle_uploaded_sst(&mut self, sst: Arc<StagingSstableInfo>) {
656 let _timer = self
657 .metrics
658 .event_handler_on_upload_finish_latency
659 .start_timer();
660 self.handle_uploaded_sst_inner(sst);
661 }
662
663 fn handle_hummock_event(&mut self, event: HummockEvent) {
665 match event {
666 HummockEvent::BufferMayFlush => {
667 self.uploader.may_flush();
668 }
669 HummockEvent::SyncEpoch {
670 sync_result_sender,
671 sync_table_epochs,
672 } => {
673 self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
674 }
675 HummockEvent::Clear(notifier, table_ids) => {
676 self.handle_clear(notifier, table_ids);
677 }
678 HummockEvent::Shutdown => {
679 unreachable!("shutdown is handled specially")
680 }
681 HummockEvent::StartEpoch { epoch, table_ids } => {
682 self.uploader.start_epoch(epoch, table_ids);
683 }
684 HummockEvent::InitEpoch {
685 instance_id,
686 init_epoch,
687 } => {
688 let table_id = self
689 .local_read_version_mapping
690 .get(&instance_id)
691 .expect("should exist")
692 .0;
693 self.uploader
694 .init_instance(instance_id, table_id, init_epoch);
695 }
696 HummockEvent::ImmToUploader { instance_id, imm } => {
697 assert!(
698 self.local_read_version_mapping.contains_key(&instance_id),
699 "add imm from non-existing read version instance: instance_id: {}, table_id {}",
700 instance_id,
701 imm.table_id,
702 );
703 self.uploader.add_imm(instance_id, imm);
704 self.uploader.may_flush();
705 }
706
707 HummockEvent::LocalSealEpoch {
708 next_epoch,
709 opts,
710 instance_id,
711 } => {
712 self.uploader
713 .local_seal_epoch(instance_id, next_epoch, opts);
714 }
715
716 #[cfg(any(test, feature = "test"))]
717 HummockEvent::FlushEvent(sender) => {
718 let _ = sender.send(()).inspect_err(|e| {
719 error!("unable to send flush result: {:?}", e);
720 });
721 }
722
723 HummockEvent::RegisterReadVersion {
724 table_id,
725 new_read_version_sender,
726 is_replicated,
727 vnodes,
728 } => {
729 let pinned_version = self.recent_versions.load().latest_version().clone();
730 let instance_id = self.generate_instance_id();
731 let basic_read_version = Arc::new(RwLock::new(
732 HummockReadVersion::new_with_replication_option(
733 table_id,
734 instance_id,
735 pinned_version,
736 is_replicated,
737 vnodes,
738 ),
739 ));
740
741 debug!(
742 "new read version registered: table_id: {}, instance_id: {}",
743 table_id, instance_id
744 );
745
746 {
747 self.local_read_version_mapping
748 .insert(instance_id, (table_id, basic_read_version.clone()));
749 let mut read_version_mapping_guard = self.read_version_mapping.write();
750
751 read_version_mapping_guard
752 .entry(table_id)
753 .or_default()
754 .insert(instance_id, basic_read_version.clone());
755 }
756
757 match new_read_version_sender.send((
758 basic_read_version,
759 LocalInstanceGuard {
760 table_id,
761 instance_id,
762 event_sender: Some(self.hummock_event_tx.clone()),
763 },
764 )) {
765 Ok(_) => {}
766 Err((_, mut guard)) => {
767 warn!(
768 "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
769 table_id, instance_id
770 );
771 guard.event_sender.take().expect("sender is just set");
772 self.destroy_read_version(instance_id);
773 }
774 }
775 }
776
777 HummockEvent::DestroyReadVersion { instance_id } => {
778 self.uploader.may_destroy_instance(instance_id);
779 self.destroy_read_version(instance_id);
780 }
781 HummockEvent::GetMinUncommittedSstId { result_tx } => {
782 let _ = result_tx
783 .send(self.uploader.min_uncommitted_sst_id())
784 .inspect_err(|e| {
785 error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
786 });
787 }
788 }
789 }
790
791 fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
792 {
793 {
794 debug!("read version deregister: instance_id: {}", instance_id);
795 let (table_id, _) = self
796 .local_read_version_mapping
797 .remove(&instance_id)
798 .unwrap_or_else(|| {
799 panic!(
800 "DestroyHummockInstance inexist instance instance_id {}",
801 instance_id
802 )
803 });
804 let mut read_version_mapping_guard = self.read_version_mapping.write();
805 let entry = read_version_mapping_guard
806 .get_mut(&table_id)
807 .unwrap_or_else(|| {
808 panic!(
809 "DestroyHummockInstance table_id {} instance_id {} fail",
810 table_id, instance_id
811 )
812 });
813 entry.remove(&instance_id).unwrap_or_else(|| {
814 panic!(
815 "DestroyHummockInstance inexist instance table_id {} instance_id {}",
816 table_id, instance_id
817 )
818 });
819 if entry.is_empty() {
820 read_version_mapping_guard.remove(&table_id);
821 }
822 }
823 }
824 }
825
826 fn generate_instance_id(&mut self) -> LocalInstanceId {
827 self.last_instance_id += 1;
828 self.last_instance_id
829 }
830}
831
832pub(super) fn send_sync_result(
833 sender: oneshot::Sender<HummockResult<SyncedData>>,
834 result: HummockResult<SyncedData>,
835) {
836 let _ = sender.send(result).inspect_err(|e| {
837 error!("unable to send sync result. Err: {:?}", e);
838 });
839}
840
841impl SyncedData {
842 pub fn into_sync_result(self) -> SyncResult {
843 {
844 let SyncedData {
845 uploaded_ssts,
846 table_watermarks,
847 } = self;
848 let mut sync_size = 0;
849 let mut uncommitted_ssts = Vec::new();
850 let mut old_value_ssts = Vec::new();
851 for sst in uploaded_ssts {
854 sync_size += sst.imm_size();
855 uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
856 old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
857 }
858 SyncResult {
859 sync_size,
860 uncommitted_ssts,
861 table_watermarks: table_watermarks.clone(),
862 old_value_ssts,
863 }
864 }
865 }
866}
867
868#[cfg(test)]
869mod tests {
870 use std::collections::{HashMap, HashSet};
871 use std::future::poll_fn;
872 use std::sync::Arc;
873 use std::task::Poll;
874
875 use futures::FutureExt;
876 use parking_lot::Mutex;
877 use risingwave_common::bitmap::Bitmap;
878 use risingwave_common::catalog::TableId;
879 use risingwave_common::hash::VirtualNode;
880 use risingwave_common::util::epoch::{EpochExt, test_epoch};
881 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
882 use risingwave_hummock_sdk::version::HummockVersion;
883 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
884 use tokio::spawn;
885 use tokio::sync::mpsc::unbounded_channel;
886 use tokio::sync::oneshot;
887
888 use crate::hummock::HummockError;
889 use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
890 use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
891 use crate::hummock::event_handler::uploader::UploadTaskOutput;
892 use crate::hummock::event_handler::uploader::test_utils::{
893 TEST_TABLE_ID, gen_imm, gen_imm_inner, prepare_uploader_order_test_spawn_task_fn,
894 };
895 use crate::hummock::event_handler::{
896 HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
897 };
898 use crate::hummock::iterator::test_utils::mock_sstable_store;
899 use crate::hummock::local_version::pinned_version::PinnedVersion;
900 use crate::hummock::local_version::recent_versions::RecentVersions;
901 use crate::hummock::store::version::{StagingData, VersionUpdate};
902 use crate::hummock::test_utils::default_opts_for_test;
903 use crate::mem_table::ImmutableMemtable;
904 use crate::monitor::HummockStateStoreMetrics;
905 use crate::store::SealCurrentEpochOptions;
906
907 #[tokio::test]
908 async fn test_old_epoch_sync_fail() {
909 let epoch0 = test_epoch(233);
910
911 let initial_version = PinnedVersion::new(
912 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
913 id: 1,
914 state_table_info: HashMap::from_iter([(
915 TEST_TABLE_ID.table_id,
916 StateTableInfo {
917 committed_epoch: epoch0,
918 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
919 },
920 )]),
921 ..Default::default()
922 }),
923 unbounded_channel().0,
924 );
925
926 let (_version_update_tx, version_update_rx) = unbounded_channel();
927
928 let epoch1 = epoch0.next_epoch();
929 let epoch2 = epoch1.next_epoch();
930 let (tx, rx) = oneshot::channel();
931 let rx = Arc::new(Mutex::new(Some(rx)));
932
933 let storage_opt = default_opts_for_test();
934 let metrics = Arc::new(HummockStateStoreMetrics::unused());
935
936 let event_handler = HummockEventHandler::new_inner(
937 version_update_rx,
938 mock_sstable_store().await,
939 metrics.clone(),
940 CacheRefillConfig::from_storage_opts(&storage_opt),
941 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
942 BufferTracker::from_storage_opts(
943 &storage_opt,
944 metrics.uploader_uploading_task_size.clone(),
945 ),
946 Arc::new(move |_, info| {
947 assert_eq!(info.epochs.len(), 1);
948 let epoch = info.epochs[0];
949 match epoch {
950 epoch if epoch == epoch1 => {
951 let rx = rx.lock().take().unwrap();
952 spawn(async move {
953 rx.await.unwrap();
954 Err(HummockError::other("fail"))
955 })
956 }
957 epoch if epoch == epoch2 => spawn(async move {
958 Ok(UploadTaskOutput {
959 new_value_ssts: vec![],
960 old_value_ssts: vec![],
961 wait_poll_timer: None,
962 })
963 }),
964 _ => unreachable!(),
965 }
966 }),
967 CacheRefiller::default_spawn_refill_task(),
968 );
969
970 let event_tx = event_handler.event_sender();
971
972 let send_event = |event| event_tx.send(event).unwrap();
973
974 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
975
976 let (read_version, guard) = {
977 let (tx, rx) = oneshot::channel();
978 send_event(HummockEvent::RegisterReadVersion {
979 table_id: TEST_TABLE_ID,
980 new_read_version_sender: tx,
981 is_replicated: false,
982 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
983 });
984 rx.await.unwrap()
985 };
986
987 send_event(HummockEvent::StartEpoch {
988 epoch: epoch1,
989 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
990 });
991
992 send_event(HummockEvent::InitEpoch {
993 instance_id: guard.instance_id,
994 init_epoch: epoch1,
995 });
996
997 let imm1 = gen_imm(epoch1).await;
998 read_version
999 .write()
1000 .update(VersionUpdate::Staging(StagingData::ImmMem(imm1.clone())));
1001
1002 send_event(HummockEvent::ImmToUploader {
1003 instance_id: guard.instance_id,
1004 imm: imm1,
1005 });
1006
1007 send_event(HummockEvent::StartEpoch {
1008 epoch: epoch2,
1009 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1010 });
1011
1012 send_event(HummockEvent::LocalSealEpoch {
1013 instance_id: guard.instance_id,
1014 next_epoch: epoch2,
1015 opts: SealCurrentEpochOptions::for_test(),
1016 });
1017
1018 let imm2 = gen_imm(epoch2).await;
1019 read_version
1020 .write()
1021 .update(VersionUpdate::Staging(StagingData::ImmMem(imm2.clone())));
1022
1023 send_event(HummockEvent::ImmToUploader {
1024 instance_id: guard.instance_id,
1025 imm: imm2,
1026 });
1027
1028 let epoch3 = epoch2.next_epoch();
1029 send_event(HummockEvent::StartEpoch {
1030 epoch: epoch3,
1031 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1032 });
1033 send_event(HummockEvent::LocalSealEpoch {
1034 instance_id: guard.instance_id,
1035 next_epoch: epoch3,
1036 opts: SealCurrentEpochOptions::for_test(),
1037 });
1038
1039 let (tx1, mut rx1) = oneshot::channel();
1040 send_event(HummockEvent::SyncEpoch {
1041 sync_result_sender: tx1,
1042 sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1043 });
1044 assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1045 let (tx2, mut rx2) = oneshot::channel();
1046 send_event(HummockEvent::SyncEpoch {
1047 sync_result_sender: tx2,
1048 sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1049 });
1050 assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1051
1052 tx.send(()).unwrap();
1053 rx1.await.unwrap().unwrap_err();
1054 rx2.await.unwrap().unwrap_err();
1055
1056 send_event(HummockEvent::Shutdown);
1057 join_handle.await.unwrap();
1058 }
1059
1060 #[tokio::test]
1061 async fn test_clear_tables() {
1062 let table_id1 = TableId::new(1);
1063 let table_id2 = TableId::new(2);
1064 let epoch0 = test_epoch(233);
1065
1066 let initial_version = PinnedVersion::new(
1067 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1068 id: 1,
1069 state_table_info: HashMap::from_iter([
1070 (
1071 table_id1.table_id,
1072 StateTableInfo {
1073 committed_epoch: epoch0,
1074 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1075 },
1076 ),
1077 (
1078 table_id2.table_id,
1079 StateTableInfo {
1080 committed_epoch: epoch0,
1081 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1082 },
1083 ),
1084 ]),
1085 ..Default::default()
1086 }),
1087 unbounded_channel().0,
1088 );
1089
1090 let (_version_update_tx, version_update_rx) = unbounded_channel();
1091
1092 let epoch1 = epoch0.next_epoch();
1093 let epoch2 = epoch1.next_epoch();
1094 let epoch3 = epoch2.next_epoch();
1095
1096 let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0, None).await.size();
1097
1098 let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1100 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1101
1102 let gen_imm = |table_id, epoch, spill_offset| {
1103 let imm = gen_imm_inner(table_id, epoch, spill_offset, Some(&*memory_limiter))
1104 .now_or_never()
1105 .unwrap();
1106 assert_eq!(imm.size(), imm_size);
1107 imm
1108 };
1109 let imm1_1 = gen_imm(table_id1, epoch1, 0);
1110 let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1111
1112 let storage_opt = default_opts_for_test();
1113 let metrics = Arc::new(HummockStateStoreMetrics::unused());
1114
1115 let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1116
1117 let event_handler = HummockEventHandler::new_inner(
1118 version_update_rx,
1119 mock_sstable_store().await,
1120 metrics.clone(),
1121 CacheRefillConfig::from_storage_opts(&storage_opt),
1122 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1123 buffer_tracker,
1124 spawn_task,
1125 CacheRefiller::default_spawn_refill_task(),
1126 );
1127
1128 let event_tx = event_handler.event_sender();
1129
1130 let send_event = |event| event_tx.send(event).unwrap();
1131 let flush_event = || async {
1132 let (tx, rx) = oneshot::channel();
1133 send_event(HummockEvent::FlushEvent(tx));
1134 rx.await.unwrap();
1135 };
1136 let start_epoch = |table_id, epoch| {
1137 send_event(HummockEvent::StartEpoch {
1138 epoch,
1139 table_ids: HashSet::from_iter([table_id]),
1140 })
1141 };
1142 let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1143 send_event(HummockEvent::InitEpoch {
1144 instance_id: instance.instance_id,
1145 init_epoch,
1146 })
1147 };
1148 let write_imm = |read_version: &HummockReadVersionRef,
1149 instance: &LocalInstanceGuard,
1150 imm: &ImmutableMemtable| {
1151 read_version
1152 .write()
1153 .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));
1154
1155 send_event(HummockEvent::ImmToUploader {
1156 instance_id: instance.instance_id,
1157 imm: imm.clone(),
1158 });
1159 };
1160 let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1161 send_event(HummockEvent::LocalSealEpoch {
1162 instance_id: instance.instance_id,
1163 next_epoch,
1164 opts: SealCurrentEpochOptions::for_test(),
1165 })
1166 };
1167 let sync_epoch = |table_id, new_sync_epoch| {
1168 let (tx, rx) = oneshot::channel();
1169 send_event(HummockEvent::SyncEpoch {
1170 sync_result_sender: tx,
1171 sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1172 });
1173 rx
1174 };
1175
1176 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1177
1178 let (read_version1, guard1) = {
1179 let (tx, rx) = oneshot::channel();
1180 send_event(HummockEvent::RegisterReadVersion {
1181 table_id: table_id1,
1182 new_read_version_sender: tx,
1183 is_replicated: false,
1184 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1185 });
1186 rx.await.unwrap()
1187 };
1188
1189 let (read_version2, guard2) = {
1190 let (tx, rx) = oneshot::channel();
1191 send_event(HummockEvent::RegisterReadVersion {
1192 table_id: table_id2,
1193 new_read_version_sender: tx,
1194 is_replicated: false,
1195 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1196 });
1197 rx.await.unwrap()
1198 };
1199
1200 let (task1_1_finish_tx, task1_1_rx) = {
1202 start_epoch(table_id1, epoch1);
1203
1204 init_epoch(&guard1, epoch1);
1205
1206 write_imm(&read_version1, &guard1, &imm1_1);
1207
1208 start_epoch(table_id1, epoch2);
1209
1210 seal_epoch(&guard1, epoch2);
1211
1212 let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1213 guard1.instance_id,
1214 vec![imm1_1.batch_id()],
1215 )]));
1216
1217 let mut rx = sync_epoch(table_id1, epoch1);
1218 wait_task_start.await;
1219 assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1220
1221 write_imm(&read_version1, &guard1, &imm1_2_1);
1222 flush_event().await;
1223
1224 (task_finish_tx, rx)
1225 };
1226 let (task1_2_finish_tx, _finish_txs) = {
1231 let mut finish_txs = vec![];
1232 let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1233 start_epoch(table_id2, epoch1);
1234 init_epoch(&guard2, epoch1);
1235 let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1236 guard1.instance_id,
1237 vec![imm1_2_1.batch_id()],
1238 )]));
1239 write_imm(&read_version2, &guard2, &imm2_1_1);
1240 wait_task_start.await;
1241
1242 let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1243 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1244 guard2.instance_id,
1245 vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1246 )]));
1247 finish_txs.push(finish_tx);
1248 write_imm(&read_version2, &guard2, &imm2_1_2);
1249 wait_task_start.await;
1250
1251 let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1252 write_imm(&read_version2, &guard2, &imm2_1_3);
1253 start_epoch(table_id2, epoch2);
1254 seal_epoch(&guard2, epoch2);
1255 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1256 guard2.instance_id,
1257 vec![imm2_1_3.batch_id()],
1258 )]));
1259 finish_txs.push(finish_tx);
1260 let _sync_rx = sync_epoch(table_id2, epoch1);
1261 wait_task_start.await;
1262
1263 let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1264 write_imm(&read_version2, &guard2, &imm2_2_1);
1265 flush_event().await;
1266 let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1267 write_imm(&read_version2, &guard2, &imm2_2_2);
1268 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1269 guard2.instance_id,
1270 vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1271 )]));
1272 finish_txs.push(finish_tx);
1273 wait_task_start.await;
1274
1275 let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1276 write_imm(&read_version2, &guard2, &imm2_2_3);
1277
1278 drop(guard2);
1286 let (clear_tx, clear_rx) = oneshot::channel();
1287 send_event(HummockEvent::Clear(
1288 clear_tx,
1289 Some(HashSet::from_iter([table_id2])),
1290 ));
1291 clear_rx.await.unwrap();
1292 (task1_2_finish_tx, finish_txs)
1293 };
1294
1295 let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1296 write_imm(&read_version1, &guard1, &imm1_2_2);
1297 start_epoch(table_id1, epoch3);
1298 seal_epoch(&guard1, epoch3);
1299
1300 let (tx2, mut sync_rx2) = oneshot::channel();
1301 let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1302 guard1.instance_id,
1303 vec![imm1_2_2.batch_id()],
1304 )]));
1305 send_event(HummockEvent::SyncEpoch {
1306 sync_result_sender: tx2,
1307 sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1308 });
1309 wait_task_start.await;
1310 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1311
1312 task1_1_finish_tx.send(()).unwrap();
1313 let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1314 sync_data1
1315 .uploaded_ssts
1316 .iter()
1317 .all(|sst| sst.epochs() == &vec![epoch1]);
1318 task1_2_finish_tx.send(()).unwrap();
1319 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1320 task1_2_2_finish_tx.send(()).unwrap();
1321 let sync_data2 = sync_rx2.await.unwrap().unwrap();
1322 sync_data2
1323 .uploaded_ssts
1324 .iter()
1325 .all(|sst| sst.epochs() == &vec![epoch2]);
1326
1327 send_event(HummockEvent::Shutdown);
1328 join_handle.await.unwrap();
1329 }
1330}