1use std::cell::RefCell;
16use std::collections::{HashMap, HashSet, VecDeque};
17use std::pin::pin;
18use std::sync::atomic::AtomicUsize;
19use std::sync::atomic::Ordering::Relaxed;
20use std::sync::{Arc, LazyLock};
21use std::time::Duration;
22
23use arc_swap::ArcSwap;
24use await_tree::{InstrumentAwait, SpanExt};
25use futures::FutureExt;
26use itertools::Itertools;
27use parking_lot::RwLock;
28use prometheus::{Histogram, IntGauge, IntGaugeVec};
29use risingwave_common::catalog::TableId;
30use risingwave_common::metrics::UintGauge;
31use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
32use risingwave_hummock_sdk::version::LocalHummockVersionDelta;
33use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
34use tokio::spawn;
35use tokio::sync::mpsc::error::SendError;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
37use tokio::sync::oneshot;
38use tracing::{debug, error, info, trace, warn};
39
40use super::refiller::{CacheRefillConfig, CacheRefiller};
41use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
44use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
45use crate::hummock::event_handler::uploader::{
46 HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
47};
48use crate::hummock::event_handler::{
49 HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
50 ReadOnlyRwLockRef,
51};
52use crate::hummock::local_version::pinned_version::PinnedVersion;
53use crate::hummock::local_version::recent_versions::RecentVersions;
54use crate::hummock::store::version::{HummockReadVersion, StagingSstableInfo, VersionUpdate};
55use crate::hummock::{HummockResult, MemoryLimiter, ObjectIdManager, SstableStoreRef};
56use crate::mem_table::ImmutableMemtable;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::opts::StorageOpts;
59
60#[derive(Clone)]
61pub(crate) struct BufferTracker {
62 flush_threshold: usize,
63 min_batch_flush_size: usize,
64 global_uploading_memory_limiter: Arc<MemoryLimiter>,
65 uploader_imm_size: UintGauge,
66 uploader_uploading_task_size: UintGauge,
67}
68
69impl BufferTracker {
70 pub fn from_storage_opts(config: &StorageOpts, metrics: &HummockStateStoreMetrics) -> Self {
71 let capacity = config.shared_buffer_capacity_mb * (1 << 20);
72 let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
73 let min_batch_flush_size = config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
74 assert!(
75 flush_threshold < capacity,
76 "flush_threshold {} should be less or equal to capacity {}",
77 flush_threshold,
78 capacity
79 );
80 Self {
81 flush_threshold,
82 min_batch_flush_size,
83 global_uploading_memory_limiter: Arc::new(MemoryLimiter::new(capacity as u64)),
84 uploader_imm_size: metrics.uploader_imm_size.clone(),
85 uploader_uploading_task_size: metrics.uploader_uploading_task_size.clone(),
86 }
87 }
88
89 #[cfg(test)]
90 fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
91 Self {
92 flush_threshold,
93 min_batch_flush_size,
94 ..Self::for_test()
95 }
96 }
97
98 #[cfg(test)]
99 pub fn for_test() -> Self {
100 Self::from_storage_opts(&StorageOpts::default(), &HummockStateStoreMetrics::unused())
101 }
102
103 pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
104 &self.global_uploading_memory_limiter
105 }
106
107 pub fn global_upload_task_size(&self) -> &UintGauge {
108 &self.uploader_uploading_task_size
109 }
110
111 pub fn need_flush(&self) -> bool {
114 self.uploader_imm_size.get()
115 > self.flush_threshold as u64 + self.uploader_uploading_task_size.get()
116 }
117
118 pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
119 curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
120 }
121
122 #[cfg(test)]
123 pub(crate) fn flush_threshold(&self) -> usize {
124 self.flush_threshold
125 }
126}
127
128#[derive(Clone)]
129pub struct HummockEventSender {
130 inner: UnboundedSender<HummockEvent>,
131 event_count: IntGaugeVec,
132}
133
134pub fn event_channel(event_count: IntGaugeVec) -> (HummockEventSender, HummockEventReceiver) {
135 let (tx, rx) = unbounded_channel();
136 (
137 HummockEventSender {
138 inner: tx,
139 event_count: event_count.clone(),
140 },
141 HummockEventReceiver {
142 inner: rx,
143 event_count,
144 },
145 )
146}
147
148impl HummockEventSender {
149 pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
150 let event_type = event.event_name();
151 self.inner.send(event)?;
152 get_event_pending_gauge(&self.event_count, event_type).inc();
153 Ok(())
154 }
155}
156
157pub struct HummockEventReceiver {
158 inner: UnboundedReceiver<HummockEvent>,
159 event_count: IntGaugeVec,
160}
161
162impl HummockEventReceiver {
163 async fn recv(&mut self) -> Option<HummockEvent> {
164 let event = self.inner.recv().await?;
165 let event_type = event.event_name();
166 get_event_pending_gauge(&self.event_count, event_type).dec();
167 Some(event)
168 }
169}
170
171thread_local! {
172 static EVENT_PENDING_GAUGE_CACHE: RefCell<HashMap<&'static str, IntGauge>> = RefCell::new(HashMap::new());
173}
174
175fn get_event_pending_gauge(event_count: &IntGaugeVec, event_type: &'static str) -> IntGauge {
176 EVENT_PENDING_GAUGE_CACHE.with(|cache| {
177 cache
178 .borrow_mut()
179 .entry(event_type)
180 .or_insert_with(|| event_count.with_label_values(&[event_type]))
181 .clone()
182 })
183}
184
185struct HummockEventHandlerMetrics {
186 event_handler_on_upload_finish_latency: Histogram,
187 event_handler_on_apply_version_update: Histogram,
188 event_handler_on_recv_version_update: Histogram,
189 event_handler_on_spiller: Histogram,
190}
191
192pub(crate) struct HummockEventHandler {
193 hummock_event_tx: HummockEventSender,
194 hummock_event_rx: HummockEventReceiver,
195 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
196 read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
197 local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
199
200 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
201 recent_versions: Arc<ArcSwap<RecentVersions>>,
202
203 uploader: HummockUploader,
204 refiller: CacheRefiller,
205
206 last_instance_id: LocalInstanceId,
207
208 metrics: HummockEventHandlerMetrics,
209}
210
211async fn flush_imms(
212 payload: Vec<ImmutableMemtable>,
213 compactor_context: CompactorContext,
214 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
215 object_id_manager: Arc<ObjectIdManager>,
216) -> HummockResult<UploadTaskOutput> {
217 compact(
218 compactor_context,
219 object_id_manager,
220 payload,
221 compaction_catalog_manager_ref,
222 )
223 .instrument_await("shared_buffer_compact".verbose())
224 .await
225}
226
227impl HummockEventHandler {
228 pub fn new(
229 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
230 pinned_version: PinnedVersion,
231 compactor_context: CompactorContext,
232 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
233 object_id_manager: Arc<ObjectIdManager>,
234 state_store_metrics: Arc<HummockStateStoreMetrics>,
235 ) -> Self {
236 let upload_compactor_context = compactor_context.clone();
237 let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
238 let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
239 let recent_versions = RecentVersions::new(
240 pinned_version,
241 compactor_context
242 .storage_opts
243 .max_cached_recent_versions_number,
244 state_store_metrics.clone(),
245 );
246 let buffer_tracker =
247 BufferTracker::from_storage_opts(&compactor_context.storage_opts, &state_store_metrics);
248 Self::new_inner(
249 version_update_rx,
250 compactor_context.sstable_store.clone(),
251 state_store_metrics,
252 CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
253 recent_versions,
254 buffer_tracker,
255 Arc::new(move |payload, task_info| {
256 static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
257 LazyLock::new(|| AtomicUsize::new(0));
258 let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
259 let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
260 reg.register(
261 await_tree_key::SpawnUploadTask { id: upload_task_id },
262 format!("Spawn Upload Task: {}", task_info),
263 )
264 });
265 let upload_task_latency = upload_task_latency.clone();
266 let wait_poll_latency = wait_poll_latency.clone();
267 let upload_compactor_context = upload_compactor_context.clone();
268 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
269 let object_id_manager = object_id_manager.clone();
270 spawn({
271 let future = async move {
272 let _timer = upload_task_latency.start_timer();
273 let mut output = flush_imms(
274 payload
275 .into_values()
276 .flat_map(|imms| imms.into_iter())
277 .collect(),
278 upload_compactor_context.clone(),
279 compaction_catalog_manager_ref.clone(),
280 object_id_manager.clone(),
281 )
282 .await?;
283 assert!(
284 output
285 .wait_poll_timer
286 .replace(wait_poll_latency.start_timer())
287 .is_none(),
288 "should not set timer before"
289 );
290 Ok(output)
291 };
292 if let Some(tree_root) = tree_root {
293 tree_root.instrument(future).left_future()
294 } else {
295 future.right_future()
296 }
297 })
298 }),
299 CacheRefiller::default_spawn_refill_task(),
300 )
301 }
302
303 fn new_inner(
304 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
305 sstable_store: SstableStoreRef,
306 state_store_metrics: Arc<HummockStateStoreMetrics>,
307 refill_config: CacheRefillConfig,
308 recent_versions: RecentVersions,
309 buffer_tracker: BufferTracker,
310 spawn_upload_task: SpawnUploadTask,
311 spawn_refill_task: SpawnRefillTask,
312 ) -> Self {
313 let (hummock_event_tx, hummock_event_rx) =
314 event_channel(state_store_metrics.event_handler_pending_event.clone());
315 let (version_update_notifier_tx, _) =
316 tokio::sync::watch::channel(recent_versions.latest_version().clone());
317 let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
318 let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
319
320 let metrics = HummockEventHandlerMetrics {
321 event_handler_on_upload_finish_latency: state_store_metrics
322 .event_handler_latency
323 .with_label_values(&["on_upload_finish"]),
324 event_handler_on_apply_version_update: state_store_metrics
325 .event_handler_latency
326 .with_label_values(&["apply_version"]),
327 event_handler_on_recv_version_update: state_store_metrics
328 .event_handler_latency
329 .with_label_values(&["recv_version_update"]),
330 event_handler_on_spiller: state_store_metrics
331 .event_handler_latency
332 .with_label_values(&["spiller"]),
333 };
334
335 let uploader = HummockUploader::new(
336 state_store_metrics,
337 recent_versions.latest_version().clone(),
338 spawn_upload_task,
339 buffer_tracker,
340 );
341 let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
342
343 Self {
344 hummock_event_tx,
345 hummock_event_rx,
346 version_update_rx,
347 version_update_notifier_tx,
348 recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
349 read_version_mapping,
350 local_read_version_mapping: Default::default(),
351 uploader,
352 refiller,
353 last_instance_id: 0,
354 metrics,
355 }
356 }
357
358 pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
359 self.version_update_notifier_tx.clone()
360 }
361
362 pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
363 self.recent_versions.clone()
364 }
365
366 pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
367 ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
368 }
369
370 pub fn event_sender(&self) -> HummockEventSender {
371 self.hummock_event_tx.clone()
372 }
373
374 pub fn buffer_tracker(&self) -> &BufferTracker {
375 self.uploader.buffer_tracker()
376 }
377}
378
379impl HummockEventHandler {
381 fn for_each_read_version(
384 &self,
385 instances: impl IntoIterator<Item = LocalInstanceId>,
386 mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
387 ) {
388 let instances = {
389 #[cfg(debug_assertions)]
390 {
391 let mut id_set = std::collections::HashSet::new();
393 for instance in instances {
394 assert!(id_set.insert(instance));
395 }
396 id_set
397 }
398 #[cfg(not(debug_assertions))]
399 {
400 instances
401 }
402 };
403 let mut pending = VecDeque::new();
404 let mut total_count = 0;
405 for instance_id in instances {
406 let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
407 continue;
408 };
409 total_count += 1;
410 match read_version.try_write() {
411 Some(mut write_guard) => {
412 f(instance_id, &mut write_guard);
413 }
414 _ => {
415 pending.push_back(instance_id);
416 }
417 }
418 }
419 if !pending.is_empty() {
420 if pending.len() * 10 > total_count {
421 warn!(
423 pending_count = pending.len(),
424 total_count, "cannot acquire lock for all read version"
425 );
426 } else {
427 debug!(
428 pending_count = pending.len(),
429 total_count, "cannot acquire lock for all read version"
430 );
431 }
432 }
433
434 const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
435
436 while let Some(instance_id) = pending.pop_front() {
437 let (_, read_version) = self
438 .local_read_version_mapping
439 .get(&instance_id)
440 .expect("have checked exist before");
441 match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
442 Some(mut write_guard) => {
443 f(instance_id, &mut write_guard);
444 }
445 _ => {
446 warn!(instance_id, "failed to get lock again for instance");
447 pending.push_back(instance_id);
448 }
449 }
450 }
451 }
452
453 fn handle_uploaded_ssts_inner(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
454 match ssts.as_slice() {
455 [] => {
456 if cfg!(debug_assertions) {
457 panic!("empty ssts")
458 }
459 }
460 [staging_sstable_info] => {
461 trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
462 self.for_each_read_version(
463 staging_sstable_info.imm_ids().keys().cloned(),
464 |_, read_version| {
465 read_version.update(VersionUpdate::Sst(staging_sstable_info.clone()))
466 },
467 )
468 }
469 ssts => {
470 warn!(
471 batch_size = ssts.len(),
472 "handle multiple uploaded ssts in batch"
473 );
474 let affected_instances: HashSet<_> = ssts
475 .iter()
476 .flat_map(|sst| {
477 trace!("data_flushed. SST size {}", sst.imm_size());
478 sst.imm_ids().keys()
479 })
480 .copied()
481 .collect();
482 self.for_each_read_version(affected_instances, |instance_id, read_version| {
483 for sst in ssts {
484 if sst.imm_ids().contains_key(&instance_id) {
485 read_version.update(VersionUpdate::Sst(sst.clone()));
486 }
487 }
488 })
489 }
490 }
491 }
492
493 fn handle_sync_epoch(
494 &mut self,
495 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
496 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
497 ) {
498 debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
499 self.uploader
500 .start_sync_epoch(sync_result_sender, sync_table_epochs);
501 }
502
503 fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
504 info!(
505 current_version_id = ?self.uploader.hummock_version().id(),
506 ?table_ids,
507 "handle clear event"
508 );
509
510 self.uploader.clear(table_ids.clone());
511
512 if table_ids.is_none() {
513 assert!(
514 self.local_read_version_mapping.is_empty(),
515 "read version mapping not empty when clear. remaining tables: {:?}",
516 self.local_read_version_mapping
517 .values()
518 .map(|(_, read_version)| read_version.read().table_id())
519 .collect_vec()
520 );
521 }
522
523 let _ = notifier.send(()).inspect_err(|e| {
525 error!("failed to notify completion of clear event: {:?}", e);
526 });
527
528 info!("clear finished");
529 }
530
531 fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
532 let _timer = self
533 .metrics
534 .event_handler_on_recv_version_update
535 .start_timer();
536 let pinned_version = self
537 .refiller
538 .last_new_pinned_version()
539 .cloned()
540 .unwrap_or_else(|| self.uploader.hummock_version().clone());
541
542 let mut sst_delta_infos = vec![];
543 if let Some(new_pinned_version) = Self::resolve_version_update_info(
544 &pinned_version,
545 version_payload,
546 Some(&mut sst_delta_infos),
547 ) {
548 self.refiller
549 .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
550 }
551 }
552
553 fn resolve_version_update_info(
554 pinned_version: &PinnedVersion,
555 version_payload: HummockVersionUpdate,
556 mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
557 ) -> Option<PinnedVersion> {
558 match version_payload {
559 HummockVersionUpdate::VersionDeltas(version_deltas) => {
560 let mut version_to_apply = (**pinned_version).clone();
561 {
562 for version_delta in version_deltas {
563 assert_eq!(version_to_apply.id, version_delta.prev_id);
564 let local_hummock_version_delta =
565 LocalHummockVersionDelta::from(version_delta);
566 if let Some(sst_delta_infos) = &mut sst_delta_infos {
567 sst_delta_infos.extend(
568 version_to_apply
569 .build_sst_delta_infos(&local_hummock_version_delta)
570 .into_iter(),
571 );
572 }
573
574 version_to_apply.apply_version_delta(&local_hummock_version_delta);
575 }
576 }
577
578 pinned_version.new_with_local_version(version_to_apply)
579 }
580 HummockVersionUpdate::PinnedVersion(version) => {
581 pinned_version.new_pin_version(*version)
582 }
583 }
584 }
585
586 fn apply_version_updates(&mut self, events: Vec<CacheRefillerEvent>) {
587 let Some(CacheRefillerEvent {
588 new_pinned_version: latest_pinned_version,
589 ..
590 }) = events.last()
591 else {
592 if cfg!(debug_assertions) {
593 panic!("empty events")
594 }
595 return;
596 };
597 if events.len() > 1 {
598 warn!(
599 count = events.len(),
600 "handle multiple version updates in batch"
601 );
602 }
603 let _timer = self
604 .metrics
605 .event_handler_on_apply_version_update
606 .start_timer();
607 self.recent_versions.rcu(|prev_recent_versions| {
608 let mut recent_versions = None;
609 for event in &events {
610 let CacheRefillerEvent {
611 new_pinned_version, ..
612 } = event;
613 recent_versions = Some(
614 recent_versions
615 .as_ref()
616 .unwrap_or(prev_recent_versions.as_ref())
617 .with_new_version(new_pinned_version.clone()),
618 );
619 }
620 recent_versions.expect("non-empty events")
621 });
622
623 {
624 self.for_each_read_version(
625 self.local_read_version_mapping.keys().cloned(),
626 |_, read_version| {
627 for CacheRefillerEvent {
628 new_pinned_version, ..
629 } in &events
630 {
631 read_version
632 .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
633 }
634 },
635 );
636 }
637
638 self.version_update_notifier_tx.send_if_modified(|state| {
639 let mut modified = false;
640 for CacheRefillerEvent {
641 pinned_version,
642 new_pinned_version,
643 } in &events
644 {
645 assert_eq!(pinned_version.id(), state.id());
646 if state.id() == new_pinned_version.id() {
647 continue;
648 }
649 assert!(new_pinned_version.id() > state.id());
650 *state = new_pinned_version.clone();
651 modified = true;
652 }
653 modified
654 });
655
656 debug!("update to hummock version: {}", latest_pinned_version.id(),);
657
658 self.uploader
659 .update_pinned_version(latest_pinned_version.clone());
660 }
661}
662
663impl HummockEventHandler {
664 pub async fn start_hummock_event_handler_worker(mut self) {
665 loop {
666 tokio::select! {
667 ssts = self.uploader.next_uploaded_ssts() => {
668 self.handle_uploaded_ssts(ssts);
669 }
670 events = self.refiller.next_events() => {
671 self.apply_version_updates(events);
672 }
673 event = pin!(self.hummock_event_rx.recv()) => {
674 let Some(event) = event else { break };
675 match event {
676 HummockEvent::Shutdown => {
677 info!("event handler shutdown");
678 return;
679 },
680 event => {
681 self.handle_hummock_event(event);
682 }
683 }
684 }
685 version_update = pin!(self.version_update_rx.recv()) => {
686 let Some(version_update) = version_update else {
687 warn!("version update stream ends. event handle shutdown");
688 return;
689 };
690 self.handle_version_update(version_update);
691 }
692 }
693 }
694 }
695
696 fn handle_uploaded_ssts(&mut self, ssts: Vec<Arc<StagingSstableInfo>>) {
697 let _timer = self
698 .metrics
699 .event_handler_on_upload_finish_latency
700 .start_timer();
701 self.handle_uploaded_ssts_inner(ssts);
702 }
703
704 fn handle_hummock_event(&mut self, event: HummockEvent) {
706 match event {
707 HummockEvent::BufferMayFlush => {
708 self.uploader
709 .may_flush(&self.metrics.event_handler_on_spiller);
710 }
711 HummockEvent::SyncEpoch {
712 sync_result_sender,
713 sync_table_epochs,
714 } => {
715 self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
716 }
717 HummockEvent::Clear(notifier, table_ids) => {
718 self.handle_clear(notifier, table_ids);
719 }
720 HummockEvent::Shutdown => {
721 unreachable!("shutdown is handled specially")
722 }
723 HummockEvent::StartEpoch { epoch, table_ids } => {
724 self.uploader.start_epoch(epoch, table_ids);
725 }
726 HummockEvent::InitEpoch {
727 instance_id,
728 init_epoch,
729 } => {
730 let table_id = self
731 .local_read_version_mapping
732 .get(&instance_id)
733 .expect("should exist")
734 .0;
735 self.uploader
736 .init_instance(instance_id, table_id, init_epoch);
737 }
738 HummockEvent::ImmToUploader { instance_id, imms } => {
739 assert!(
740 self.local_read_version_mapping.contains_key(&instance_id),
741 "add imm from non-existing read version instance: instance_id: {}, table_id {:?}",
742 instance_id,
743 imms.first().map(|(imm, _)| imm.table_id),
744 );
745 self.uploader.add_imms(instance_id, imms);
746 self.uploader
747 .may_flush(&self.metrics.event_handler_on_spiller);
748 }
749
750 HummockEvent::LocalSealEpoch {
751 next_epoch,
752 opts,
753 instance_id,
754 } => {
755 self.uploader
756 .local_seal_epoch(instance_id, next_epoch, opts);
757 }
758
759 #[cfg(any(test, feature = "test"))]
760 HummockEvent::FlushEvent(sender) => {
761 let _ = sender.send(()).inspect_err(|e| {
762 error!("unable to send flush result: {:?}", e);
763 });
764 }
765
766 HummockEvent::RegisterReadVersion {
767 table_id,
768 new_read_version_sender,
769 is_replicated,
770 vnodes,
771 } => {
772 let pinned_version = self.recent_versions.load().latest_version().clone();
773 let instance_id = self.generate_instance_id();
774 let basic_read_version = Arc::new(RwLock::new(
775 HummockReadVersion::new_with_replication_option(
776 table_id,
777 instance_id,
778 pinned_version,
779 is_replicated,
780 vnodes,
781 ),
782 ));
783
784 debug!(
785 "new read version registered: table_id: {}, instance_id: {}",
786 table_id, instance_id
787 );
788
789 {
790 self.local_read_version_mapping
791 .insert(instance_id, (table_id, basic_read_version.clone()));
792 let mut read_version_mapping_guard = self.read_version_mapping.write();
793
794 read_version_mapping_guard
795 .entry(table_id)
796 .or_default()
797 .insert(instance_id, basic_read_version.clone());
798 }
799
800 match new_read_version_sender.send((
801 basic_read_version,
802 LocalInstanceGuard {
803 table_id,
804 instance_id,
805 event_sender: Some(self.hummock_event_tx.clone()),
806 },
807 )) {
808 Ok(_) => {}
809 Err((_, mut guard)) => {
810 warn!(
811 "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
812 table_id, instance_id
813 );
814 guard.event_sender.take().expect("sender is just set");
815 self.destroy_read_version(instance_id);
816 }
817 }
818 }
819
820 HummockEvent::DestroyReadVersion { instance_id } => {
821 self.uploader.may_destroy_instance(instance_id);
822 self.destroy_read_version(instance_id);
823 }
824 HummockEvent::GetMinUncommittedObjectId { result_tx } => {
825 let _ = result_tx
826 .send(self.uploader.min_uncommitted_object_id())
827 .inspect_err(|e| {
828 error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
829 });
830 }
831 HummockEvent::RegisterVectorWriter {
832 table_id,
833 init_epoch,
834 } => self.uploader.register_vector_writer(table_id, init_epoch),
835 HummockEvent::VectorWriterSealEpoch {
836 table_id,
837 next_epoch,
838 add,
839 } => {
840 self.uploader
841 .vector_writer_seal_epoch(table_id, next_epoch, add);
842 }
843 HummockEvent::DropVectorWriter { table_id } => {
844 self.uploader.drop_vector_writer(table_id);
845 }
846 }
847 }
848
849 fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
850 {
851 {
852 debug!("read version deregister: instance_id: {}", instance_id);
853 let (table_id, _) = self
854 .local_read_version_mapping
855 .remove(&instance_id)
856 .unwrap_or_else(|| {
857 panic!(
858 "DestroyHummockInstance inexist instance instance_id {}",
859 instance_id
860 )
861 });
862 let mut read_version_mapping_guard = self.read_version_mapping.write();
863 let entry = read_version_mapping_guard
864 .get_mut(&table_id)
865 .unwrap_or_else(|| {
866 panic!(
867 "DestroyHummockInstance table_id {} instance_id {} fail",
868 table_id, instance_id
869 )
870 });
871 entry.remove(&instance_id).unwrap_or_else(|| {
872 panic!(
873 "DestroyHummockInstance inexist instance table_id {} instance_id {}",
874 table_id, instance_id
875 )
876 });
877 if entry.is_empty() {
878 read_version_mapping_guard.remove(&table_id);
879 }
880 }
881 }
882 }
883
884 fn generate_instance_id(&mut self) -> LocalInstanceId {
885 self.last_instance_id += 1;
886 self.last_instance_id
887 }
888}
889
890pub(super) fn send_sync_result(
891 sender: oneshot::Sender<HummockResult<SyncedData>>,
892 result: HummockResult<SyncedData>,
893) {
894 let _ = sender.send(result).inspect_err(|e| {
895 error!("unable to send sync result. Err: {:?}", e);
896 });
897}
898
899impl SyncedData {
900 pub fn into_sync_result(self) -> SyncResult {
901 {
902 let SyncedData {
903 uploaded_ssts,
904 table_watermarks,
905 vector_index_adds,
906 } = self;
907 let mut sync_size = 0;
908 let mut uncommitted_ssts = Vec::new();
909 let mut old_value_ssts = Vec::new();
910 for sst in uploaded_ssts {
913 sync_size += sst.imm_size();
914 uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
915 old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
916 }
917 SyncResult {
918 sync_size,
919 uncommitted_ssts,
920 table_watermarks,
921 old_value_ssts,
922 vector_index_adds,
923 }
924 }
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use std::collections::{HashMap, HashSet};
931 use std::future::poll_fn;
932 use std::sync::Arc;
933 use std::task::Poll;
934
935 use futures::FutureExt;
936 use parking_lot::Mutex;
937 use risingwave_common::bitmap::Bitmap;
938 use risingwave_common::catalog::TableId;
939 use risingwave_common::hash::VirtualNode;
940 use risingwave_common::util::epoch::{EpochExt, test_epoch};
941 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
942 use risingwave_hummock_sdk::version::HummockVersion;
943 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
944 use tokio::spawn;
945 use tokio::sync::mpsc::unbounded_channel;
946 use tokio::sync::oneshot;
947
948 use crate::hummock::HummockError;
949 use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
950 use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
951 use crate::hummock::event_handler::uploader::UploadTaskOutput;
952 use crate::hummock::event_handler::uploader::test_utils::{
953 TEST_TABLE_ID, gen_imm_inner, gen_imm_with_unlimit,
954 prepare_uploader_order_test_spawn_task_fn,
955 };
956 use crate::hummock::event_handler::{
957 HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
958 };
959 use crate::hummock::iterator::test_utils::mock_sstable_store;
960 use crate::hummock::local_version::pinned_version::PinnedVersion;
961 use crate::hummock::local_version::recent_versions::RecentVersions;
962 use crate::hummock::test_utils::default_opts_for_test;
963 use crate::mem_table::ImmutableMemtable;
964 use crate::monitor::HummockStateStoreMetrics;
965 use crate::store::SealCurrentEpochOptions;
966
967 #[tokio::test]
968 async fn test_old_epoch_sync_fail() {
969 let epoch0 = test_epoch(233);
970
971 let initial_version = PinnedVersion::new(
972 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
973 id: 1.into(),
974 state_table_info: HashMap::from_iter([(
975 TEST_TABLE_ID,
976 StateTableInfo {
977 committed_epoch: epoch0,
978 compaction_group_id: StaticCompactionGroupId::StateDefault,
979 },
980 )]),
981 ..Default::default()
982 }),
983 unbounded_channel().0,
984 );
985
986 let (_version_update_tx, version_update_rx) = unbounded_channel();
987
988 let epoch1 = epoch0.next_epoch();
989 let epoch2 = epoch1.next_epoch();
990 let (tx, rx) = oneshot::channel();
991 let rx = Arc::new(Mutex::new(Some(rx)));
992
993 let storage_opt = default_opts_for_test();
994 let metrics = Arc::new(HummockStateStoreMetrics::unused());
995
996 let event_handler = HummockEventHandler::new_inner(
997 version_update_rx,
998 mock_sstable_store().await,
999 metrics.clone(),
1000 CacheRefillConfig::from_storage_opts(&storage_opt),
1001 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1002 BufferTracker::from_storage_opts(&storage_opt, &metrics),
1003 Arc::new(move |_, info| {
1004 assert_eq!(info.epochs.len(), 1);
1005 let epoch = info.epochs[0];
1006 match epoch {
1007 epoch if epoch == epoch1 => {
1008 let rx = rx.lock().take().unwrap();
1009 spawn(async move {
1010 rx.await.unwrap();
1011 Err(HummockError::other("fail"))
1012 })
1013 }
1014 epoch if epoch == epoch2 => spawn(async move {
1015 Ok(UploadTaskOutput {
1016 new_value_ssts: vec![],
1017 old_value_ssts: vec![],
1018 wait_poll_timer: None,
1019 })
1020 }),
1021 _ => unreachable!(),
1022 }
1023 }),
1024 CacheRefiller::default_spawn_refill_task(),
1025 );
1026
1027 let event_tx = event_handler.event_sender();
1028
1029 let send_event = |event| event_tx.send(event).unwrap();
1030
1031 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1032
1033 let (read_version, guard) = {
1034 let (tx, rx) = oneshot::channel();
1035 send_event(HummockEvent::RegisterReadVersion {
1036 table_id: TEST_TABLE_ID,
1037 new_read_version_sender: tx,
1038 is_replicated: false,
1039 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1040 });
1041 rx.await.unwrap()
1042 };
1043
1044 send_event(HummockEvent::StartEpoch {
1045 epoch: epoch1,
1046 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1047 });
1048
1049 read_version.write().init();
1050
1051 send_event(HummockEvent::InitEpoch {
1052 instance_id: guard.instance_id,
1053 init_epoch: epoch1,
1054 });
1055
1056 let (imm1, tracker1) = gen_imm_with_unlimit(epoch1);
1057 read_version.write().add_pending_imm(imm1.clone(), tracker1);
1058
1059 send_event(HummockEvent::ImmToUploader {
1060 instance_id: guard.instance_id,
1061 imms: read_version.write().start_upload_pending_imms(),
1062 });
1063
1064 send_event(HummockEvent::StartEpoch {
1065 epoch: epoch2,
1066 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1067 });
1068
1069 send_event(HummockEvent::LocalSealEpoch {
1070 instance_id: guard.instance_id,
1071 next_epoch: epoch2,
1072 opts: SealCurrentEpochOptions::for_test(),
1073 });
1074
1075 {
1076 let (imm2, tracker2) = gen_imm_with_unlimit(epoch2);
1077 let mut read_version = read_version.write();
1078 read_version.add_pending_imm(imm2, tracker2);
1079
1080 send_event(HummockEvent::ImmToUploader {
1081 instance_id: guard.instance_id,
1082 imms: read_version.start_upload_pending_imms(),
1083 });
1084 }
1085
1086 let epoch3 = epoch2.next_epoch();
1087 send_event(HummockEvent::StartEpoch {
1088 epoch: epoch3,
1089 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1090 });
1091 send_event(HummockEvent::LocalSealEpoch {
1092 instance_id: guard.instance_id,
1093 next_epoch: epoch3,
1094 opts: SealCurrentEpochOptions::for_test(),
1095 });
1096
1097 let (tx1, mut rx1) = oneshot::channel();
1098 send_event(HummockEvent::SyncEpoch {
1099 sync_result_sender: tx1,
1100 sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1101 });
1102 assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1103 let (tx2, mut rx2) = oneshot::channel();
1104 send_event(HummockEvent::SyncEpoch {
1105 sync_result_sender: tx2,
1106 sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1107 });
1108 assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1109
1110 tx.send(()).unwrap();
1111 rx1.await.unwrap().unwrap_err();
1112 rx2.await.unwrap().unwrap_err();
1113
1114 send_event(HummockEvent::Shutdown);
1115 join_handle.await.unwrap();
1116 }
1117
1118 #[tokio::test]
1119 async fn test_clear_tables() {
1120 let table_id1 = TableId::new(1);
1121 let table_id2 = TableId::new(2);
1122 let epoch0 = test_epoch(233);
1123
1124 let initial_version = PinnedVersion::new(
1125 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1126 id: 1.into(),
1127 state_table_info: HashMap::from_iter([
1128 (
1129 table_id1,
1130 StateTableInfo {
1131 committed_epoch: epoch0,
1132 compaction_group_id: StaticCompactionGroupId::StateDefault,
1133 },
1134 ),
1135 (
1136 table_id2,
1137 StateTableInfo {
1138 committed_epoch: epoch0,
1139 compaction_group_id: StaticCompactionGroupId::StateDefault,
1140 },
1141 ),
1142 ]),
1143 ..Default::default()
1144 }),
1145 unbounded_channel().0,
1146 );
1147
1148 let (_version_update_tx, version_update_rx) = unbounded_channel();
1149
1150 let epoch1 = epoch0.next_epoch();
1151 let epoch2 = epoch1.next_epoch();
1152 let epoch3 = epoch2.next_epoch();
1153
1154 let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0).size();
1155
1156 let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1158 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1159
1160 let gen_imm = |table_id, epoch, spill_offset| {
1161 let imm = gen_imm_inner(table_id, epoch, spill_offset);
1162 assert_eq!(imm.size(), imm_size);
1163 imm
1164 };
1165 let imm1_1 = gen_imm(table_id1, epoch1, 0);
1166 let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1167
1168 let storage_opt = default_opts_for_test();
1169 let metrics = Arc::new(HummockStateStoreMetrics::unused());
1170
1171 let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1172
1173 let event_handler = HummockEventHandler::new_inner(
1174 version_update_rx,
1175 mock_sstable_store().await,
1176 metrics.clone(),
1177 CacheRefillConfig::from_storage_opts(&storage_opt),
1178 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1179 buffer_tracker,
1180 spawn_task,
1181 CacheRefiller::default_spawn_refill_task(),
1182 );
1183
1184 let event_tx = event_handler.event_sender();
1185
1186 let send_event = |event| event_tx.send(event).unwrap();
1187 let flush_event = || async {
1188 let (tx, rx) = oneshot::channel();
1189 send_event(HummockEvent::FlushEvent(tx));
1190 rx.await.unwrap();
1191 };
1192 let start_epoch = |table_id, epoch| {
1193 send_event(HummockEvent::StartEpoch {
1194 epoch,
1195 table_ids: HashSet::from_iter([table_id]),
1196 })
1197 };
1198 let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1199 send_event(HummockEvent::InitEpoch {
1200 instance_id: instance.instance_id,
1201 init_epoch,
1202 })
1203 };
1204 let event_tx_clone = event_tx.clone();
1205 let write_imm = {
1206 let memory_limiter = memory_limiter.clone();
1207 move |read_version: &HummockReadVersionRef,
1208 instance: &LocalInstanceGuard,
1209 imm: &ImmutableMemtable| {
1210 let memory_limiter = memory_limiter.clone();
1211 let event_tx = event_tx_clone.clone();
1212 let read_version = read_version.clone();
1213 let imm = imm.clone();
1214 let instance_id = instance.instance_id;
1215 async move {
1216 let tracker = memory_limiter.require_memory(imm.size() as _).await;
1217 let mut read_version = read_version.write();
1218 read_version.add_pending_imm(imm.clone(), tracker);
1219
1220 event_tx
1221 .send(HummockEvent::ImmToUploader {
1222 instance_id,
1223 imms: read_version.start_upload_pending_imms(),
1224 })
1225 .unwrap();
1226 }
1227 }
1228 };
1229 let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1230 send_event(HummockEvent::LocalSealEpoch {
1231 instance_id: instance.instance_id,
1232 next_epoch,
1233 opts: SealCurrentEpochOptions::for_test(),
1234 })
1235 };
1236 let sync_epoch = |table_id, new_sync_epoch| {
1237 let (tx, rx) = oneshot::channel();
1238 send_event(HummockEvent::SyncEpoch {
1239 sync_result_sender: tx,
1240 sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1241 });
1242 rx
1243 };
1244
1245 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1246
1247 let (read_version1, guard1) = {
1248 let (tx, rx) = oneshot::channel();
1249 send_event(HummockEvent::RegisterReadVersion {
1250 table_id: table_id1,
1251 new_read_version_sender: tx,
1252 is_replicated: false,
1253 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1254 });
1255 rx.await.unwrap()
1256 };
1257
1258 let (read_version2, guard2) = {
1259 let (tx, rx) = oneshot::channel();
1260 send_event(HummockEvent::RegisterReadVersion {
1261 table_id: table_id2,
1262 new_read_version_sender: tx,
1263 is_replicated: false,
1264 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1265 });
1266 rx.await.unwrap()
1267 };
1268
1269 let (task1_1_finish_tx, task1_1_rx) = {
1271 start_epoch(table_id1, epoch1);
1272
1273 read_version1.write().init();
1274 init_epoch(&guard1, epoch1);
1275
1276 write_imm(&read_version1, &guard1, &imm1_1).await;
1277
1278 start_epoch(table_id1, epoch2);
1279
1280 seal_epoch(&guard1, epoch2);
1281
1282 let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1283 guard1.instance_id,
1284 vec![imm1_1.batch_id()],
1285 )]));
1286
1287 let mut rx = sync_epoch(table_id1, epoch1);
1288 wait_task_start.await;
1289 assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1290
1291 write_imm(&read_version1, &guard1, &imm1_2_1).await;
1292 flush_event().await;
1293
1294 (task_finish_tx, rx)
1295 };
1296 let (task1_2_finish_tx, _finish_txs) = {
1301 let mut finish_txs = vec![];
1302 let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1303 start_epoch(table_id2, epoch1);
1304 read_version2.write().init();
1305 init_epoch(&guard2, epoch1);
1306 let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1307 guard1.instance_id,
1308 vec![imm1_2_1.batch_id()],
1309 )]));
1310 write_imm(&read_version2, &guard2, &imm2_1_1).await;
1311 wait_task_start.await;
1312
1313 let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1314 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1315 guard2.instance_id,
1316 vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1317 )]));
1318 finish_txs.push(finish_tx);
1319 write_imm(&read_version2, &guard2, &imm2_1_2).await;
1320 wait_task_start.await;
1321
1322 let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1323 write_imm(&read_version2, &guard2, &imm2_1_3).await;
1324 start_epoch(table_id2, epoch2);
1325 seal_epoch(&guard2, epoch2);
1326 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1327 guard2.instance_id,
1328 vec![imm2_1_3.batch_id()],
1329 )]));
1330 finish_txs.push(finish_tx);
1331 let _sync_rx = sync_epoch(table_id2, epoch1);
1332 wait_task_start.await;
1333
1334 let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1335 write_imm(&read_version2, &guard2, &imm2_2_1).await;
1336 flush_event().await;
1337 let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1338 write_imm(&read_version2, &guard2, &imm2_2_2).await;
1339 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1340 guard2.instance_id,
1341 vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1342 )]));
1343 finish_txs.push(finish_tx);
1344 wait_task_start.await;
1345
1346 let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1347 write_imm(&read_version2, &guard2, &imm2_2_3).await;
1348
1349 drop(guard2);
1357 let (clear_tx, clear_rx) = oneshot::channel();
1358 send_event(HummockEvent::Clear(
1359 clear_tx,
1360 Some(HashSet::from_iter([table_id2])),
1361 ));
1362 clear_rx.await.unwrap();
1363 (task1_2_finish_tx, finish_txs)
1364 };
1365
1366 let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1367 write_imm(&read_version1, &guard1, &imm1_2_2).await;
1368 start_epoch(table_id1, epoch3);
1369 seal_epoch(&guard1, epoch3);
1370
1371 let (tx2, mut sync_rx2) = oneshot::channel();
1372 let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1373 guard1.instance_id,
1374 vec![imm1_2_2.batch_id()],
1375 )]));
1376 send_event(HummockEvent::SyncEpoch {
1377 sync_result_sender: tx2,
1378 sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1379 });
1380 wait_task_start.await;
1381 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1382
1383 task1_1_finish_tx.send(()).unwrap();
1384 let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1385 sync_data1
1386 .uploaded_ssts
1387 .iter()
1388 .all(|sst| sst.epochs() == &vec![epoch1]);
1389 task1_2_finish_tx.send(()).unwrap();
1390 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1391 task1_2_2_finish_tx.send(()).unwrap();
1392 let sync_data2 = sync_rx2.await.unwrap().unwrap();
1393 sync_data2
1394 .uploaded_ssts
1395 .iter()
1396 .all(|sst| sst.epochs() == &vec![epoch2]);
1397
1398 send_event(HummockEvent::Shutdown);
1399 join_handle.await.unwrap();
1400 }
1401}