1use std::cell::RefCell;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::pin::pin;
18use std::sync::atomic::AtomicUsize;
19use std::sync::atomic::Ordering::Relaxed;
20use std::sync::{Arc, LazyLock};
21use std::time::Duration;
22
23use arc_swap::ArcSwap;
24use await_tree::{InstrumentAwait, SpanExt};
25use futures::FutureExt;
26use itertools::Itertools;
27use parking_lot::RwLock;
28use prometheus::{Histogram, IntGauge, IntGaugeVec};
29use risingwave_common::catalog::TableId;
30use risingwave_common::metrics::UintGauge;
31use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
32use risingwave_hummock_sdk::version::LocalHummockVersionDelta;
33use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
34use tokio::spawn;
35use tokio::sync::mpsc::error::SendError;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
37use tokio::sync::oneshot;
38use tracing::{debug, error, info, trace, warn};
39
40use super::refiller::{CacheRefillConfig, CacheRefiller};
41use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
44use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
45use crate::hummock::event_handler::uploader::{
46 HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
47};
48use crate::hummock::event_handler::{
49 HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
50 ReadOnlyRwLockRef,
51};
52use crate::hummock::local_version::pinned_version::PinnedVersion;
53use crate::hummock::local_version::recent_versions::RecentVersions;
54use crate::hummock::store::version::{HummockReadVersion, StagingSstableInfo, VersionUpdate};
55use crate::hummock::{HummockResult, MemoryLimiter, ObjectIdManager, SstableStoreRef};
56use crate::mem_table::ImmutableMemtable;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::opts::StorageOpts;
59
60#[derive(Clone)]
61pub(crate) struct BufferTracker {
62 flush_threshold: usize,
63 min_batch_flush_size: usize,
64 global_uploading_memory_limiter: Arc<MemoryLimiter>,
65 uploader_imm_size: UintGauge,
66 uploader_uploading_task_size: UintGauge,
67}
68
69impl BufferTracker {
70 pub fn from_storage_opts(config: &StorageOpts, metrics: &HummockStateStoreMetrics) -> Self {
71 let capacity = config.shared_buffer_capacity_mb * (1 << 20);
72 let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
73 let min_batch_flush_size = config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
74 assert!(
75 flush_threshold < capacity,
76 "flush_threshold {} should be less or equal to capacity {}",
77 flush_threshold,
78 capacity
79 );
80 Self {
81 flush_threshold,
82 min_batch_flush_size,
83 global_uploading_memory_limiter: Arc::new(MemoryLimiter::new(capacity as u64)),
84 uploader_imm_size: metrics.uploader_imm_size.clone(),
85 uploader_uploading_task_size: metrics.uploader_uploading_task_size.clone(),
86 }
87 }
88
89 #[cfg(test)]
90 fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
91 Self {
92 flush_threshold,
93 min_batch_flush_size,
94 ..Self::for_test()
95 }
96 }
97
98 #[cfg(test)]
99 pub fn for_test() -> Self {
100 Self::from_storage_opts(&StorageOpts::default(), &HummockStateStoreMetrics::unused())
101 }
102
103 pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
104 &self.global_uploading_memory_limiter
105 }
106
107 pub fn global_upload_task_size(&self) -> &UintGauge {
108 &self.uploader_uploading_task_size
109 }
110
111 pub fn need_flush(&self) -> bool {
114 self.uploader_imm_size.get()
115 > self.flush_threshold as u64 + self.uploader_uploading_task_size.get()
116 }
117
118 pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
119 curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
120 }
121
122 #[cfg(test)]
123 pub(crate) fn flush_threshold(&self) -> usize {
124 self.flush_threshold
125 }
126}
127
128#[derive(Clone)]
129pub struct HummockEventSender {
130 inner: UnboundedSender<HummockEvent>,
131 event_count: IntGaugeVec,
132}
133
134pub fn event_channel(event_count: IntGaugeVec) -> (HummockEventSender, HummockEventReceiver) {
135 let (tx, rx) = unbounded_channel();
136 (
137 HummockEventSender {
138 inner: tx,
139 event_count: event_count.clone(),
140 },
141 HummockEventReceiver {
142 inner: rx,
143 event_count,
144 },
145 )
146}
147
148impl HummockEventSender {
149 pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
150 let event_type = event.event_name();
151 self.inner.send(event)?;
152 get_event_pending_gauge(&self.event_count, event_type).inc();
153 Ok(())
154 }
155}
156
157pub struct HummockEventReceiver {
158 inner: UnboundedReceiver<HummockEvent>,
159 event_count: IntGaugeVec,
160}
161
162impl HummockEventReceiver {
163 async fn recv(&mut self) -> Option<HummockEvent> {
164 let event = self.inner.recv().await?;
165 let event_type = event.event_name();
166 get_event_pending_gauge(&self.event_count, event_type).dec();
167 Some(event)
168 }
169}
170
171thread_local! {
172 static EVENT_PENDING_GAUGE_CACHE: RefCell<HashMap<&'static str, IntGauge>> = RefCell::new(HashMap::new());
173}
174
175fn get_event_pending_gauge(event_count: &IntGaugeVec, event_type: &'static str) -> IntGauge {
176 EVENT_PENDING_GAUGE_CACHE.with(|cache| {
177 cache
178 .borrow_mut()
179 .entry(event_type)
180 .or_insert_with(|| event_count.with_label_values(&[event_type]))
181 .clone()
182 })
183}
184
185struct HummockEventHandlerMetrics {
186 event_handler_on_upload_finish_latency: Histogram,
187 event_handler_on_apply_version_update: Histogram,
188 event_handler_on_recv_version_update: Histogram,
189 event_handler_on_spiller: Histogram,
190}
191
192pub(crate) struct HummockEventHandler {
193 hummock_event_tx: HummockEventSender,
194 hummock_event_rx: HummockEventReceiver,
195 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
196 read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
197 local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
199
200 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
201 recent_versions: Arc<ArcSwap<RecentVersions>>,
202
203 uploader: HummockUploader,
204 refiller: CacheRefiller,
205
206 last_instance_id: LocalInstanceId,
207
208 metrics: HummockEventHandlerMetrics,
209}
210
211async fn flush_imms(
212 payload: Vec<ImmutableMemtable>,
213 compactor_context: CompactorContext,
214 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
215 object_id_manager: Arc<ObjectIdManager>,
216) -> HummockResult<UploadTaskOutput> {
217 compact(
218 compactor_context,
219 object_id_manager,
220 payload,
221 compaction_catalog_manager_ref,
222 )
223 .instrument_await("shared_buffer_compact".verbose())
224 .await
225}
226
227impl HummockEventHandler {
228 pub fn new(
229 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
230 pinned_version: PinnedVersion,
231 compactor_context: CompactorContext,
232 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
233 object_id_manager: Arc<ObjectIdManager>,
234 state_store_metrics: Arc<HummockStateStoreMetrics>,
235 ) -> Self {
236 let upload_compactor_context = compactor_context.clone();
237 let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
238 let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
239 let recent_versions = RecentVersions::new(
240 pinned_version,
241 compactor_context
242 .storage_opts
243 .max_cached_recent_versions_number,
244 state_store_metrics.clone(),
245 );
246 let buffer_tracker =
247 BufferTracker::from_storage_opts(&compactor_context.storage_opts, &state_store_metrics);
248 Self::new_inner(
249 version_update_rx,
250 compactor_context.sstable_store.clone(),
251 state_store_metrics,
252 CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
253 recent_versions,
254 buffer_tracker,
255 Arc::new(move |payload, task_info| {
256 static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
257 LazyLock::new(|| AtomicUsize::new(0));
258 let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
259 let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
260 reg.register(
261 await_tree_key::SpawnUploadTask { id: upload_task_id },
262 format!("Spawn Upload Task: {}", task_info),
263 )
264 });
265 let upload_task_latency = upload_task_latency.clone();
266 let wait_poll_latency = wait_poll_latency.clone();
267 let upload_compactor_context = upload_compactor_context.clone();
268 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
269 let object_id_manager = object_id_manager.clone();
270 spawn({
271 let future = async move {
272 let _timer = upload_task_latency.start_timer();
273 let mut output = flush_imms(
274 payload
275 .into_values()
276 .flat_map(|imms| imms.into_iter())
277 .collect(),
278 upload_compactor_context.clone(),
279 compaction_catalog_manager_ref.clone(),
280 object_id_manager.clone(),
281 )
282 .await?;
283 assert!(
284 output
285 .wait_poll_timer
286 .replace(wait_poll_latency.start_timer())
287 .is_none(),
288 "should not set timer before"
289 );
290 Ok(output)
291 };
292 if let Some(tree_root) = tree_root {
293 tree_root.instrument(future).left_future()
294 } else {
295 future.right_future()
296 }
297 })
298 }),
299 CacheRefiller::default_spawn_refill_task(),
300 )
301 }
302
303 fn new_inner(
304 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
305 sstable_store: SstableStoreRef,
306 state_store_metrics: Arc<HummockStateStoreMetrics>,
307 refill_config: CacheRefillConfig,
308 recent_versions: RecentVersions,
309 buffer_tracker: BufferTracker,
310 spawn_upload_task: SpawnUploadTask,
311 spawn_refill_task: SpawnRefillTask,
312 ) -> Self {
313 let (hummock_event_tx, hummock_event_rx) =
314 event_channel(state_store_metrics.event_handler_pending_event.clone());
315 let (version_update_notifier_tx, _) =
316 tokio::sync::watch::channel(recent_versions.latest_version().clone());
317 let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
318 let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
319
320 let metrics = HummockEventHandlerMetrics {
321 event_handler_on_upload_finish_latency: state_store_metrics
322 .event_handler_latency
323 .with_label_values(&["on_upload_finish"]),
324 event_handler_on_apply_version_update: state_store_metrics
325 .event_handler_latency
326 .with_label_values(&["apply_version"]),
327 event_handler_on_recv_version_update: state_store_metrics
328 .event_handler_latency
329 .with_label_values(&["recv_version_update"]),
330 event_handler_on_spiller: state_store_metrics
331 .event_handler_latency
332 .with_label_values(&["spiller"]),
333 };
334
335 let uploader = HummockUploader::new(
336 state_store_metrics,
337 recent_versions.latest_version().clone(),
338 spawn_upload_task,
339 buffer_tracker,
340 );
341 let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
342
343 Self {
344 hummock_event_tx,
345 hummock_event_rx,
346 version_update_rx,
347 version_update_notifier_tx,
348 recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
349 read_version_mapping,
350 local_read_version_mapping: Default::default(),
351 uploader,
352 refiller,
353 last_instance_id: 0,
354 metrics,
355 }
356 }
357
358 pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
359 self.version_update_notifier_tx.clone()
360 }
361
362 pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
363 self.recent_versions.clone()
364 }
365
366 pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
367 ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
368 }
369
370 pub fn event_sender(&self) -> HummockEventSender {
371 self.hummock_event_tx.clone()
372 }
373
374 pub fn buffer_tracker(&self) -> &BufferTracker {
375 self.uploader.buffer_tracker()
376 }
377}
378
379impl HummockEventHandler {
381 fn for_each_read_version(
384 &self,
385 instances: impl IntoIterator<Item = LocalInstanceId>,
386 mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
387 ) {
388 let instances = {
389 #[cfg(debug_assertions)]
390 {
391 let mut id_set = std::collections::HashSet::new();
393 for instance in instances {
394 assert!(id_set.insert(instance));
395 }
396 id_set
397 }
398 #[cfg(not(debug_assertions))]
399 {
400 instances
401 }
402 };
403 let mut pending = VecDeque::new();
404 let mut total_count = 0;
405 for instance_id in instances {
406 let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
407 continue;
408 };
409 total_count += 1;
410 match read_version.try_write() {
411 Some(mut write_guard) => {
412 f(instance_id, &mut write_guard);
413 }
414 _ => {
415 pending.push_back(instance_id);
416 }
417 }
418 }
419 if !pending.is_empty() {
420 if pending.len() * 10 > total_count {
421 warn!(
423 pending_count = pending.len(),
424 total_count, "cannot acquire lock for all read version"
425 );
426 } else {
427 debug!(
428 pending_count = pending.len(),
429 total_count, "cannot acquire lock for all read version"
430 );
431 }
432 }
433
434 const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
435
436 while let Some(instance_id) = pending.pop_front() {
437 let (_, read_version) = self
438 .local_read_version_mapping
439 .get(&instance_id)
440 .expect("have checked exist before");
441 match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
442 Some(mut write_guard) => {
443 f(instance_id, &mut write_guard);
444 }
445 _ => {
446 warn!(instance_id, "failed to get lock again for instance");
447 pending.push_back(instance_id);
448 }
449 }
450 }
451 }
452
453 fn handle_uploaded_ssts_inner(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
454 match ssts.as_slice() {
455 [] => {
456 if cfg!(debug_assertions) {
457 panic!("empty ssts")
458 }
459 }
460 [staging_sstable_info] => {
461 trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
462 self.for_each_read_version(
463 staging_sstable_info.imm_ids().keys().cloned(),
464 |_, read_version| {
465 read_version.update(VersionUpdate::Sst(staging_sstable_info.clone()))
466 },
467 )
468 }
469 ssts => {
470 warn!(
471 batch_size = ssts.len(),
472 "handle multiple uploaded ssts in batch"
473 );
474 let affected_instances: HashSet<_> = ssts
475 .iter()
476 .flat_map(|sst| {
477 trace!("data_flushed. SST size {}", sst.imm_size());
478 sst.imm_ids().keys()
479 })
480 .copied()
481 .collect();
482 self.for_each_read_version(affected_instances, |instance_id, read_version| {
483 for sst in ssts {
484 if sst.imm_ids().contains_key(&instance_id) {
485 read_version.update(VersionUpdate::Sst(sst.clone()));
486 }
487 }
488 })
489 }
490 }
491 }
492
493 fn handle_sync_epoch(
494 &mut self,
495 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
496 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
497 ) {
498 debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
499 self.uploader
500 .start_sync_epoch(sync_result_sender, sync_table_epochs);
501 }
502
503 fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
504 info!(
505 current_version_id = ?self.uploader.hummock_version().id(),
506 ?table_ids,
507 "handle clear event"
508 );
509
510 self.uploader.clear(table_ids.clone());
511
512 if table_ids.is_none() {
513 assert!(
514 self.local_read_version_mapping.is_empty(),
515 "read version mapping not empty when clear. remaining tables: {:?}",
516 self.local_read_version_mapping
517 .values()
518 .map(|(_, read_version)| read_version.read().table_id())
519 .collect_vec()
520 );
521 }
522
523 let _ = notifier.send(()).inspect_err(|e| {
525 error!("failed to notify completion of clear event: {:?}", e);
526 });
527
528 info!("clear finished");
529 }
530
531 fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
532 let _timer = self
533 .metrics
534 .event_handler_on_recv_version_update
535 .start_timer();
536 let pinned_version = self
537 .refiller
538 .last_new_pinned_version()
539 .cloned()
540 .unwrap_or_else(|| self.uploader.hummock_version().clone());
541
542 let mut sst_delta_infos = vec![];
543 if let Some(new_pinned_version) = Self::resolve_version_update_info(
544 &pinned_version,
545 version_payload,
546 Some(&mut sst_delta_infos),
547 ) {
548 self.refiller
549 .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
550 }
551 }
552
553 fn resolve_version_update_info(
554 pinned_version: &PinnedVersion,
555 version_payload: HummockVersionUpdate,
556 mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
557 ) -> Option<PinnedVersion> {
558 match version_payload {
559 HummockVersionUpdate::VersionDeltas(version_deltas) => {
560 let mut version_to_apply = (**pinned_version).clone();
561 {
562 for version_delta in version_deltas {
563 assert_eq!(version_to_apply.id, version_delta.prev_id);
564 let local_hummock_version_delta =
565 LocalHummockVersionDelta::from(version_delta);
566 if let Some(sst_delta_infos) = &mut sst_delta_infos {
567 sst_delta_infos.extend(
568 version_to_apply
569 .build_sst_delta_infos(&local_hummock_version_delta),
570 );
571 }
572
573 version_to_apply.apply_version_delta(&local_hummock_version_delta);
574 }
575 }
576
577 pinned_version.new_with_local_version(version_to_apply)
578 }
579 HummockVersionUpdate::PinnedVersion(version) => {
580 pinned_version.new_pin_version(*version)
581 }
582 }
583 }
584
585 fn apply_version_updates(&mut self, events: Vec<CacheRefillerEvent>) {
586 let Some(CacheRefillerEvent {
587 new_pinned_version: latest_pinned_version,
588 ..
589 }) = events.last()
590 else {
591 if cfg!(debug_assertions) {
592 panic!("empty events")
593 }
594 return;
595 };
596 if events.len() > 1 {
597 warn!(
598 count = events.len(),
599 "handle multiple version updates in batch"
600 );
601 }
602 let _timer = self
603 .metrics
604 .event_handler_on_apply_version_update
605 .start_timer();
606 self.recent_versions.rcu(|prev_recent_versions| {
607 let mut recent_versions = None;
608 for event in &events {
609 let CacheRefillerEvent {
610 new_pinned_version, ..
611 } = event;
612 recent_versions = Some(
613 recent_versions
614 .as_ref()
615 .unwrap_or(prev_recent_versions.as_ref())
616 .with_new_version(new_pinned_version.clone()),
617 );
618 }
619 recent_versions.expect("non-empty events")
620 });
621
622 {
623 self.for_each_read_version(
624 self.local_read_version_mapping.keys().cloned(),
625 |_, read_version| {
626 for CacheRefillerEvent {
627 new_pinned_version, ..
628 } in &events
629 {
630 read_version
631 .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
632 }
633 },
634 );
635 }
636
637 self.version_update_notifier_tx.send_if_modified(|state| {
638 let mut modified = false;
639 for CacheRefillerEvent {
640 pinned_version,
641 new_pinned_version,
642 } in &events
643 {
644 assert_eq!(pinned_version.id(), state.id());
645 if state.id() == new_pinned_version.id() {
646 continue;
647 }
648 assert!(new_pinned_version.id() > state.id());
649 *state = new_pinned_version.clone();
650 modified = true;
651 }
652 modified
653 });
654
655 debug!("update to hummock version: {}", latest_pinned_version.id(),);
656
657 self.uploader
658 .update_pinned_version(latest_pinned_version.clone());
659 }
660}
661
662impl HummockEventHandler {
663 pub async fn start_hummock_event_handler_worker(mut self) {
664 loop {
665 tokio::select! {
666 ssts = self.uploader.next_uploaded_ssts() => {
667 self.handle_uploaded_ssts(ssts);
668 }
669 events = self.refiller.next_events() => {
670 self.apply_version_updates(events);
671 }
672 event = pin!(self.hummock_event_rx.recv()) => {
673 let Some(event) = event else { break };
674 match event {
675 HummockEvent::Shutdown => {
676 info!("event handler shutdown");
677 return;
678 },
679 event => {
680 self.handle_hummock_event(event);
681 }
682 }
683 }
684 version_update = pin!(self.version_update_rx.recv()) => {
685 let Some(version_update) = version_update else {
686 warn!("version update stream ends. event handle shutdown");
687 return;
688 };
689 self.handle_version_update(version_update);
690 }
691 }
692 }
693 }
694
695 fn handle_uploaded_ssts(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
696 let _timer = self
697 .metrics
698 .event_handler_on_upload_finish_latency
699 .start_timer();
700 self.handle_uploaded_ssts_inner(ssts);
701 }
702
703 fn handle_hummock_event(&mut self, event: HummockEvent) {
705 match event {
706 HummockEvent::BufferMayFlush => {
707 self.uploader
708 .may_flush(&self.metrics.event_handler_on_spiller);
709 }
710 HummockEvent::SyncEpoch {
711 sync_result_sender,
712 sync_table_epochs,
713 } => {
714 self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
715 }
716 HummockEvent::Clear(notifier, table_ids) => {
717 self.handle_clear(notifier, table_ids);
718 }
719 HummockEvent::Shutdown => {
720 unreachable!("shutdown is handled specially")
721 }
722 HummockEvent::StartEpoch { epoch, table_ids } => {
723 self.uploader.start_epoch(epoch, table_ids);
724 }
725 HummockEvent::InitEpoch {
726 instance_id,
727 init_epoch,
728 } => {
729 let table_id = self
730 .local_read_version_mapping
731 .get(&instance_id)
732 .expect("should exist")
733 .0;
734 self.uploader
735 .init_instance(instance_id, table_id, init_epoch);
736 }
737 HummockEvent::ImmToUploader { instance_id, imms } => {
738 assert!(
739 self.local_read_version_mapping.contains_key(&instance_id),
740 "add imm from non-existing read version instance: instance_id: {}, table_id {:?}",
741 instance_id,
742 imms.first().map(|(imm, _)| imm.table_id),
743 );
744 self.uploader.add_imms(instance_id, imms);
745 self.uploader
746 .may_flush(&self.metrics.event_handler_on_spiller);
747 }
748
749 HummockEvent::LocalSealEpoch {
750 next_epoch,
751 opts,
752 instance_id,
753 } => {
754 self.uploader
755 .local_seal_epoch(instance_id, next_epoch, opts);
756 }
757
758 #[cfg(any(test, feature = "test"))]
759 HummockEvent::FlushEvent(sender) => {
760 let _ = sender.send(()).inspect_err(|e| {
761 error!("unable to send flush result: {:?}", e);
762 });
763 }
764
765 HummockEvent::RegisterReadVersion {
766 table_id,
767 new_read_version_sender,
768 is_replicated,
769 vnodes,
770 } => {
771 let pinned_version = self.recent_versions.load().latest_version().clone();
772 let instance_id = self.generate_instance_id();
773 let basic_read_version = Arc::new(RwLock::new(
774 HummockReadVersion::new_with_replication_option(
775 table_id,
776 instance_id,
777 pinned_version,
778 is_replicated,
779 vnodes,
780 ),
781 ));
782
783 debug!(
784 "new read version registered: table_id: {}, instance_id: {}",
785 table_id, instance_id
786 );
787
788 {
789 self.local_read_version_mapping
790 .insert(instance_id, (table_id, basic_read_version.clone()));
791 let mut read_version_mapping_guard = self.read_version_mapping.write();
792
793 read_version_mapping_guard
794 .entry(table_id)
795 .or_default()
796 .insert(instance_id, basic_read_version.clone());
797 }
798
799 match new_read_version_sender.send((
800 basic_read_version,
801 LocalInstanceGuard {
802 table_id,
803 instance_id,
804 event_sender: Some(self.hummock_event_tx.clone()),
805 },
806 )) {
807 Ok(_) => {}
808 Err((_, mut guard)) => {
809 warn!(
810 "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
811 table_id, instance_id
812 );
813 guard.event_sender.take().expect("sender is just set");
814 self.destroy_read_version(instance_id);
815 }
816 }
817 }
818
819 HummockEvent::DestroyReadVersion { instance_id } => {
820 self.uploader.may_destroy_instance(instance_id);
821 self.destroy_read_version(instance_id);
822 }
823 HummockEvent::GetMinUncommittedObjectId { result_tx } => {
824 let _ = result_tx
825 .send(self.uploader.min_uncommitted_object_id())
826 .inspect_err(|e| {
827 error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
828 });
829 }
830 HummockEvent::RegisterVectorWriter {
831 table_id,
832 init_epoch,
833 } => self.uploader.register_vector_writer(table_id, init_epoch),
834 HummockEvent::VectorWriterSealEpoch {
835 table_id,
836 next_epoch,
837 add,
838 } => {
839 self.uploader
840 .vector_writer_seal_epoch(table_id, next_epoch, add);
841 }
842 HummockEvent::DropVectorWriter { table_id } => {
843 self.uploader.drop_vector_writer(table_id);
844 }
845 }
846 }
847
848 fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
849 {
850 {
851 debug!("read version deregister: instance_id: {}", instance_id);
852 let (table_id, _) = self
853 .local_read_version_mapping
854 .remove(&instance_id)
855 .unwrap_or_else(|| {
856 panic!(
857 "DestroyHummockInstance inexist instance instance_id {}",
858 instance_id
859 )
860 });
861 let mut read_version_mapping_guard = self.read_version_mapping.write();
862 let entry = read_version_mapping_guard
863 .get_mut(&table_id)
864 .unwrap_or_else(|| {
865 panic!(
866 "DestroyHummockInstance table_id {} instance_id {} fail",
867 table_id, instance_id
868 )
869 });
870 entry.remove(&instance_id).unwrap_or_else(|| {
871 panic!(
872 "DestroyHummockInstance inexist instance table_id {} instance_id {}",
873 table_id, instance_id
874 )
875 });
876 if entry.is_empty() {
877 read_version_mapping_guard.remove(&table_id);
878 }
879 }
880 }
881 }
882
883 fn generate_instance_id(&mut self) -> LocalInstanceId {
884 self.last_instance_id += 1;
885 self.last_instance_id
886 }
887}
888
889pub(super) fn send_sync_result(
890 sender: oneshot::Sender<HummockResult<SyncedData>>,
891 result: HummockResult<SyncedData>,
892) {
893 let _ = sender.send(result).inspect_err(|e| {
894 error!("unable to send sync result. Err: {:?}", e);
895 });
896}
897
898impl SyncedData {
899 pub fn into_sync_result(self) -> SyncResult {
900 {
901 let SyncedData {
902 uploaded_ssts,
903 table_watermarks,
904 vector_index_adds,
905 } = self;
906 let mut sync_size = 0;
907 let mut uncommitted_ssts = Vec::new();
908 let mut old_value_ssts = Vec::new();
909 for sst in uploaded_ssts {
912 sync_size += sst.imm_size();
913 uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
914 old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
915 }
916 SyncResult {
917 sync_size,
918 uncommitted_ssts,
919 table_watermarks,
920 old_value_ssts,
921 vector_index_adds,
922 }
923 }
924 }
925}
926
927#[cfg(test)]
928mod tests {
929 use std::collections::{HashMap, HashSet};
930 use std::future::poll_fn;
931 use std::sync::Arc;
932 use std::task::Poll;
933
934 use futures::FutureExt;
935 use parking_lot::Mutex;
936 use risingwave_common::bitmap::Bitmap;
937 use risingwave_common::catalog::TableId;
938 use risingwave_common::hash::VirtualNode;
939 use risingwave_common::util::epoch::{EpochExt, test_epoch};
940 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
941 use risingwave_hummock_sdk::version::HummockVersion;
942 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
943 use tokio::spawn;
944 use tokio::sync::mpsc::unbounded_channel;
945 use tokio::sync::oneshot;
946
947 use crate::hummock::HummockError;
948 use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
949 use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
950 use crate::hummock::event_handler::uploader::UploadTaskOutput;
951 use crate::hummock::event_handler::uploader::test_utils::{
952 TEST_TABLE_ID, gen_imm_inner, gen_imm_with_unlimit,
953 prepare_uploader_order_test_spawn_task_fn,
954 };
955 use crate::hummock::event_handler::{
956 HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
957 };
958 use crate::hummock::iterator::test_utils::mock_sstable_store;
959 use crate::hummock::local_version::pinned_version::PinnedVersion;
960 use crate::hummock::local_version::recent_versions::RecentVersions;
961 use crate::hummock::test_utils::default_opts_for_test;
962 use crate::mem_table::ImmutableMemtable;
963 use crate::monitor::HummockStateStoreMetrics;
964 use crate::store::SealCurrentEpochOptions;
965
966 #[tokio::test]
967 async fn test_old_epoch_sync_fail() {
968 let epoch0 = test_epoch(233);
969
970 let initial_version = PinnedVersion::new(
971 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
972 id: 1.into(),
973 state_table_info: HashMap::from_iter([(
974 TEST_TABLE_ID,
975 StateTableInfo {
976 committed_epoch: epoch0,
977 compaction_group_id: StaticCompactionGroupId::StateDefault,
978 },
979 )]),
980 ..Default::default()
981 }),
982 unbounded_channel().0,
983 );
984
985 let (_version_update_tx, version_update_rx) = unbounded_channel();
986
987 let epoch1 = epoch0.next_epoch();
988 let epoch2 = epoch1.next_epoch();
989 let (tx, rx) = oneshot::channel();
990 let rx = Arc::new(Mutex::new(Some(rx)));
991
992 let storage_opt = default_opts_for_test();
993 let metrics = Arc::new(HummockStateStoreMetrics::unused());
994
995 let event_handler = HummockEventHandler::new_inner(
996 version_update_rx,
997 mock_sstable_store().await,
998 metrics.clone(),
999 CacheRefillConfig::from_storage_opts(&storage_opt),
1000 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1001 BufferTracker::from_storage_opts(&storage_opt, &metrics),
1002 Arc::new(move |_, info| {
1003 assert_eq!(info.epochs.len(), 1);
1004 let epoch = info.epochs[0];
1005 match epoch {
1006 epoch if epoch == epoch1 => {
1007 let rx = rx.lock().take().unwrap();
1008 spawn(async move {
1009 rx.await.unwrap();
1010 Err(HummockError::other("fail"))
1011 })
1012 }
1013 epoch if epoch == epoch2 => spawn(async move {
1014 Ok(UploadTaskOutput {
1015 new_value_ssts: vec![],
1016 old_value_ssts: vec![],
1017 wait_poll_timer: None,
1018 })
1019 }),
1020 _ => unreachable!(),
1021 }
1022 }),
1023 CacheRefiller::default_spawn_refill_task(),
1024 );
1025
1026 let event_tx = event_handler.event_sender();
1027
1028 let send_event = |event| event_tx.send(event).unwrap();
1029
1030 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1031
1032 let (read_version, guard) = {
1033 let (tx, rx) = oneshot::channel();
1034 send_event(HummockEvent::RegisterReadVersion {
1035 table_id: TEST_TABLE_ID,
1036 new_read_version_sender: tx,
1037 is_replicated: false,
1038 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1039 });
1040 rx.await.unwrap()
1041 };
1042
1043 send_event(HummockEvent::StartEpoch {
1044 epoch: epoch1,
1045 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1046 });
1047
1048 read_version.write().init();
1049
1050 send_event(HummockEvent::InitEpoch {
1051 instance_id: guard.instance_id,
1052 init_epoch: epoch1,
1053 });
1054
1055 let (imm1, tracker1) = gen_imm_with_unlimit(epoch1);
1056 read_version.write().add_pending_imm(imm1.clone(), tracker1);
1057
1058 send_event(HummockEvent::ImmToUploader {
1059 instance_id: guard.instance_id,
1060 imms: read_version.write().start_upload_pending_imms(),
1061 });
1062
1063 send_event(HummockEvent::StartEpoch {
1064 epoch: epoch2,
1065 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1066 });
1067
1068 send_event(HummockEvent::LocalSealEpoch {
1069 instance_id: guard.instance_id,
1070 next_epoch: epoch2,
1071 opts: SealCurrentEpochOptions::for_test(),
1072 });
1073
1074 {
1075 let (imm2, tracker2) = gen_imm_with_unlimit(epoch2);
1076 let mut read_version = read_version.write();
1077 read_version.add_pending_imm(imm2, tracker2);
1078
1079 send_event(HummockEvent::ImmToUploader {
1080 instance_id: guard.instance_id,
1081 imms: read_version.start_upload_pending_imms(),
1082 });
1083 }
1084
1085 let epoch3 = epoch2.next_epoch();
1086 send_event(HummockEvent::StartEpoch {
1087 epoch: epoch3,
1088 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1089 });
1090 send_event(HummockEvent::LocalSealEpoch {
1091 instance_id: guard.instance_id,
1092 next_epoch: epoch3,
1093 opts: SealCurrentEpochOptions::for_test(),
1094 });
1095
1096 let (tx1, mut rx1) = oneshot::channel();
1097 send_event(HummockEvent::SyncEpoch {
1098 sync_result_sender: tx1,
1099 sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1100 });
1101 assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1102 let (tx2, mut rx2) = oneshot::channel();
1103 send_event(HummockEvent::SyncEpoch {
1104 sync_result_sender: tx2,
1105 sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1106 });
1107 assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1108
1109 tx.send(()).unwrap();
1110 rx1.await.unwrap().unwrap_err();
1111 rx2.await.unwrap().unwrap_err();
1112
1113 send_event(HummockEvent::Shutdown);
1114 join_handle.await.unwrap();
1115 }
1116
1117 #[tokio::test]
1118 async fn test_clear_tables() {
1119 let table_id1 = TableId::new(1);
1120 let table_id2 = TableId::new(2);
1121 let epoch0 = test_epoch(233);
1122
1123 let initial_version = PinnedVersion::new(
1124 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1125 id: 1.into(),
1126 state_table_info: HashMap::from_iter([
1127 (
1128 table_id1,
1129 StateTableInfo {
1130 committed_epoch: epoch0,
1131 compaction_group_id: StaticCompactionGroupId::StateDefault,
1132 },
1133 ),
1134 (
1135 table_id2,
1136 StateTableInfo {
1137 committed_epoch: epoch0,
1138 compaction_group_id: StaticCompactionGroupId::StateDefault,
1139 },
1140 ),
1141 ]),
1142 ..Default::default()
1143 }),
1144 unbounded_channel().0,
1145 );
1146
1147 let (_version_update_tx, version_update_rx) = unbounded_channel();
1148
1149 let epoch1 = epoch0.next_epoch();
1150 let epoch2 = epoch1.next_epoch();
1151 let epoch3 = epoch2.next_epoch();
1152
1153 let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0).size();
1154
1155 let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1157 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1158
1159 let gen_imm = |table_id, epoch, spill_offset| {
1160 let imm = gen_imm_inner(table_id, epoch, spill_offset);
1161 assert_eq!(imm.size(), imm_size);
1162 imm
1163 };
1164 let imm1_1 = gen_imm(table_id1, epoch1, 0);
1165 let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1166
1167 let storage_opt = default_opts_for_test();
1168 let metrics = Arc::new(HummockStateStoreMetrics::unused());
1169
1170 let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1171
1172 let event_handler = HummockEventHandler::new_inner(
1173 version_update_rx,
1174 mock_sstable_store().await,
1175 metrics.clone(),
1176 CacheRefillConfig::from_storage_opts(&storage_opt),
1177 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1178 buffer_tracker,
1179 spawn_task,
1180 CacheRefiller::default_spawn_refill_task(),
1181 );
1182
1183 let event_tx = event_handler.event_sender();
1184
1185 let send_event = |event| event_tx.send(event).unwrap();
1186 let flush_event = || async {
1187 let (tx, rx) = oneshot::channel();
1188 send_event(HummockEvent::FlushEvent(tx));
1189 rx.await.unwrap();
1190 };
1191 let start_epoch = |table_id, epoch| {
1192 send_event(HummockEvent::StartEpoch {
1193 epoch,
1194 table_ids: HashSet::from_iter([table_id]),
1195 })
1196 };
1197 let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1198 send_event(HummockEvent::InitEpoch {
1199 instance_id: instance.instance_id,
1200 init_epoch,
1201 })
1202 };
1203 let event_tx_clone = event_tx.clone();
1204 let write_imm = {
1205 let memory_limiter = memory_limiter.clone();
1206 move |read_version: &HummockReadVersionRef,
1207 instance: &LocalInstanceGuard,
1208 imm: &ImmutableMemtable| {
1209 let memory_limiter = memory_limiter.clone();
1210 let event_tx = event_tx_clone.clone();
1211 let read_version = read_version.clone();
1212 let imm = imm.clone();
1213 let instance_id = instance.instance_id;
1214 async move {
1215 let tracker = memory_limiter.require_memory(imm.size() as _).await;
1216 let mut read_version = read_version.write();
1217 read_version.add_pending_imm(imm.clone(), tracker);
1218
1219 event_tx
1220 .send(HummockEvent::ImmToUploader {
1221 instance_id,
1222 imms: read_version.start_upload_pending_imms(),
1223 })
1224 .unwrap();
1225 }
1226 }
1227 };
1228 let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1229 send_event(HummockEvent::LocalSealEpoch {
1230 instance_id: instance.instance_id,
1231 next_epoch,
1232 opts: SealCurrentEpochOptions::for_test(),
1233 })
1234 };
1235 let sync_epoch = |table_id, new_sync_epoch| {
1236 let (tx, rx) = oneshot::channel();
1237 send_event(HummockEvent::SyncEpoch {
1238 sync_result_sender: tx,
1239 sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1240 });
1241 rx
1242 };
1243
1244 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1245
1246 let (read_version1, guard1) = {
1247 let (tx, rx) = oneshot::channel();
1248 send_event(HummockEvent::RegisterReadVersion {
1249 table_id: table_id1,
1250 new_read_version_sender: tx,
1251 is_replicated: false,
1252 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1253 });
1254 rx.await.unwrap()
1255 };
1256
1257 let (read_version2, guard2) = {
1258 let (tx, rx) = oneshot::channel();
1259 send_event(HummockEvent::RegisterReadVersion {
1260 table_id: table_id2,
1261 new_read_version_sender: tx,
1262 is_replicated: false,
1263 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1264 });
1265 rx.await.unwrap()
1266 };
1267
1268 let (task1_1_finish_tx, task1_1_rx) = {
1270 start_epoch(table_id1, epoch1);
1271
1272 read_version1.write().init();
1273 init_epoch(&guard1, epoch1);
1274
1275 write_imm(&read_version1, &guard1, &imm1_1).await;
1276
1277 start_epoch(table_id1, epoch2);
1278
1279 seal_epoch(&guard1, epoch2);
1280
1281 let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1282 guard1.instance_id,
1283 vec![imm1_1.batch_id()],
1284 )]));
1285
1286 let mut rx = sync_epoch(table_id1, epoch1);
1287 wait_task_start.await;
1288 assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1289
1290 write_imm(&read_version1, &guard1, &imm1_2_1).await;
1291 flush_event().await;
1292
1293 (task_finish_tx, rx)
1294 };
1295 let (task1_2_finish_tx, _finish_txs) = {
1300 let mut finish_txs = vec![];
1301 let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1302 start_epoch(table_id2, epoch1);
1303 read_version2.write().init();
1304 init_epoch(&guard2, epoch1);
1305 let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1306 guard1.instance_id,
1307 vec![imm1_2_1.batch_id()],
1308 )]));
1309 write_imm(&read_version2, &guard2, &imm2_1_1).await;
1310 wait_task_start.await;
1311
1312 let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1313 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1314 guard2.instance_id,
1315 vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1316 )]));
1317 finish_txs.push(finish_tx);
1318 write_imm(&read_version2, &guard2, &imm2_1_2).await;
1319 wait_task_start.await;
1320
1321 let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1322 write_imm(&read_version2, &guard2, &imm2_1_3).await;
1323 start_epoch(table_id2, epoch2);
1324 seal_epoch(&guard2, epoch2);
1325 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1326 guard2.instance_id,
1327 vec![imm2_1_3.batch_id()],
1328 )]));
1329 finish_txs.push(finish_tx);
1330 let _sync_rx = sync_epoch(table_id2, epoch1);
1331 wait_task_start.await;
1332
1333 let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1334 write_imm(&read_version2, &guard2, &imm2_2_1).await;
1335 flush_event().await;
1336 let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1337 write_imm(&read_version2, &guard2, &imm2_2_2).await;
1338 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1339 guard2.instance_id,
1340 vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1341 )]));
1342 finish_txs.push(finish_tx);
1343 wait_task_start.await;
1344
1345 let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1346 write_imm(&read_version2, &guard2, &imm2_2_3).await;
1347
1348 drop(guard2);
1356 let (clear_tx, clear_rx) = oneshot::channel();
1357 send_event(HummockEvent::Clear(
1358 clear_tx,
1359 Some(HashSet::from_iter([table_id2])),
1360 ));
1361 clear_rx.await.unwrap();
1362 (task1_2_finish_tx, finish_txs)
1363 };
1364
1365 let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1366 write_imm(&read_version1, &guard1, &imm1_2_2).await;
1367 start_epoch(table_id1, epoch3);
1368 seal_epoch(&guard1, epoch3);
1369
1370 let (tx2, mut sync_rx2) = oneshot::channel();
1371 let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1372 guard1.instance_id,
1373 vec![imm1_2_2.batch_id()],
1374 )]));
1375 send_event(HummockEvent::SyncEpoch {
1376 sync_result_sender: tx2,
1377 sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1378 });
1379 wait_task_start.await;
1380 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1381
1382 task1_1_finish_tx.send(()).unwrap();
1383 let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1384 sync_data1
1385 .uploaded_ssts
1386 .iter()
1387 .all(|sst| sst.epochs() == &vec![epoch1]);
1388 task1_2_finish_tx.send(()).unwrap();
1389 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1390 task1_2_2_finish_tx.send(()).unwrap();
1391 let sync_data2 = sync_rx2.await.unwrap().unwrap();
1392 sync_data2
1393 .uploaded_ssts
1394 .iter()
1395 .all(|sst| sst.epochs() == &vec![epoch2]);
1396
1397 send_event(HummockEvent::Shutdown);
1398 join_handle.await.unwrap();
1399 }
1400}