1use std::collections::{HashMap, HashSet, VecDeque};
16use std::pin::pin;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20use std::time::Duration;
21
22use arc_swap::ArcSwap;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::FutureExt;
25use itertools::Itertools;
26use parking_lot::RwLock;
27use prometheus::{Histogram, IntGauge};
28use risingwave_common::catalog::TableId;
29use risingwave_common::metrics::UintGauge;
30use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::version::{HummockVersionCommon, LocalHummockVersionDelta};
33use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
34use tokio::spawn;
35use tokio::sync::mpsc::error::SendError;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
37use tokio::sync::oneshot;
38use tracing::{debug, error, info, trace, warn};
39
40use super::refiller::{CacheRefillConfig, CacheRefiller};
41use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
44use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
45use crate::hummock::event_handler::uploader::{
46 HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
47};
48use crate::hummock::event_handler::{
49 HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
50 ReadOnlyRwLockRef,
51};
52use crate::hummock::local_version::pinned_version::PinnedVersion;
53use crate::hummock::local_version::recent_versions::RecentVersions;
54use crate::hummock::store::version::{HummockReadVersion, StagingSstableInfo, VersionUpdate};
55use crate::hummock::{HummockResult, MemoryLimiter, ObjectIdManager, SstableStoreRef};
56use crate::mem_table::ImmutableMemtable;
57use crate::monitor::HummockStateStoreMetrics;
58use crate::opts::StorageOpts;
59
60#[derive(Clone)]
61pub struct BufferTracker {
62 flush_threshold: usize,
63 min_batch_flush_size: usize,
64 global_buffer: Arc<MemoryLimiter>,
65 global_upload_task_size: UintGauge,
66}
67
68impl BufferTracker {
69 pub fn from_storage_opts(config: &StorageOpts, global_upload_task_size: UintGauge) -> Self {
70 let capacity = config.shared_buffer_capacity_mb * (1 << 20);
71 let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
72 let shared_buffer_min_batch_flush_size =
73 config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
74 assert!(
75 flush_threshold < capacity,
76 "flush_threshold {} should be less or equal to capacity {}",
77 flush_threshold,
78 capacity
79 );
80 Self::new(
81 capacity,
82 flush_threshold,
83 global_upload_task_size,
84 shared_buffer_min_batch_flush_size,
85 )
86 }
87
88 #[cfg(test)]
89 fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
90 Self::new(
91 usize::MAX,
92 flush_threshold,
93 UintGauge::new("test", "test").unwrap(),
94 min_batch_flush_size,
95 )
96 }
97
98 fn new(
99 capacity: usize,
100 flush_threshold: usize,
101 global_upload_task_size: UintGauge,
102 min_batch_flush_size: usize,
103 ) -> Self {
104 assert!(capacity >= flush_threshold);
105 Self {
106 flush_threshold,
107 global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)),
108 global_upload_task_size,
109 min_batch_flush_size,
110 }
111 }
112
113 pub fn for_test() -> Self {
114 Self::from_storage_opts(
115 &StorageOpts::default(),
116 UintGauge::new("test", "test").unwrap(),
117 )
118 }
119
120 pub fn get_buffer_size(&self) -> usize {
121 self.global_buffer.get_memory_usage() as usize
122 }
123
124 pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
125 &self.global_buffer
126 }
127
128 pub fn global_upload_task_size(&self) -> &UintGauge {
129 &self.global_upload_task_size
130 }
131
132 pub fn need_flush(&self) -> bool {
135 self.get_buffer_size() > self.flush_threshold + self.global_upload_task_size.get() as usize
136 }
137
138 pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
139 curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
140 }
141
142 #[cfg(test)]
143 pub(crate) fn flush_threshold(&self) -> usize {
144 self.flush_threshold
145 }
146}
147
148#[derive(Clone)]
149pub struct HummockEventSender {
150 inner: UnboundedSender<HummockEvent>,
151 event_count: IntGauge,
152}
153
154pub fn event_channel(event_count: IntGauge) -> (HummockEventSender, HummockEventReceiver) {
155 let (tx, rx) = unbounded_channel();
156 (
157 HummockEventSender {
158 inner: tx,
159 event_count: event_count.clone(),
160 },
161 HummockEventReceiver {
162 inner: rx,
163 event_count,
164 },
165 )
166}
167
168impl HummockEventSender {
169 pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
170 self.inner.send(event)?;
171 self.event_count.inc();
172 Ok(())
173 }
174}
175
176pub struct HummockEventReceiver {
177 inner: UnboundedReceiver<HummockEvent>,
178 event_count: IntGauge,
179}
180
181impl HummockEventReceiver {
182 async fn recv(&mut self) -> Option<HummockEvent> {
183 let event = self.inner.recv().await?;
184 self.event_count.dec();
185 Some(event)
186 }
187}
188
189struct HummockEventHandlerMetrics {
190 event_handler_on_upload_finish_latency: Histogram,
191 event_handler_on_apply_version_update: Histogram,
192 event_handler_on_recv_version_update: Histogram,
193}
194
195pub struct HummockEventHandler {
196 hummock_event_tx: HummockEventSender,
197 hummock_event_rx: HummockEventReceiver,
198 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
199 read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
200 local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
202
203 version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
204 recent_versions: Arc<ArcSwap<RecentVersions>>,
205
206 uploader: HummockUploader,
207 refiller: CacheRefiller,
208
209 last_instance_id: LocalInstanceId,
210
211 metrics: HummockEventHandlerMetrics,
212}
213
214async fn flush_imms(
215 payload: Vec<ImmutableMemtable>,
216 compactor_context: CompactorContext,
217 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
218 object_id_manager: Arc<ObjectIdManager>,
219) -> HummockResult<UploadTaskOutput> {
220 compact(
221 compactor_context,
222 object_id_manager,
223 payload,
224 compaction_catalog_manager_ref,
225 )
226 .instrument_await("shared_buffer_compact".verbose())
227 .await
228}
229
230impl HummockEventHandler {
231 pub fn new(
232 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
233 pinned_version: PinnedVersion,
234 compactor_context: CompactorContext,
235 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
236 object_id_manager: Arc<ObjectIdManager>,
237 state_store_metrics: Arc<HummockStateStoreMetrics>,
238 ) -> Self {
239 let upload_compactor_context = compactor_context.clone();
240 let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
241 let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
242 let recent_versions = RecentVersions::new(
243 pinned_version,
244 compactor_context
245 .storage_opts
246 .max_cached_recent_versions_number,
247 state_store_metrics.clone(),
248 );
249 let buffer_tracker = BufferTracker::from_storage_opts(
250 &compactor_context.storage_opts,
251 state_store_metrics.uploader_uploading_task_size.clone(),
252 );
253 Self::new_inner(
254 version_update_rx,
255 compactor_context.sstable_store.clone(),
256 state_store_metrics,
257 CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
258 recent_versions,
259 buffer_tracker,
260 Arc::new(move |payload, task_info| {
261 static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
262 LazyLock::new(|| AtomicUsize::new(0));
263 let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
264 let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
265 reg.register(
266 await_tree_key::SpawnUploadTask { id: upload_task_id },
267 format!("Spawn Upload Task: {}", task_info),
268 )
269 });
270 let upload_task_latency = upload_task_latency.clone();
271 let wait_poll_latency = wait_poll_latency.clone();
272 let upload_compactor_context = upload_compactor_context.clone();
273 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
274 let object_id_manager = object_id_manager.clone();
275 spawn({
276 let future = async move {
277 let _timer = upload_task_latency.start_timer();
278 let mut output = flush_imms(
279 payload
280 .into_values()
281 .flat_map(|imms| imms.into_iter())
282 .collect(),
283 upload_compactor_context.clone(),
284 compaction_catalog_manager_ref.clone(),
285 object_id_manager.clone(),
286 )
287 .await?;
288 assert!(
289 output
290 .wait_poll_timer
291 .replace(wait_poll_latency.start_timer())
292 .is_none(),
293 "should not set timer before"
294 );
295 Ok(output)
296 };
297 if let Some(tree_root) = tree_root {
298 tree_root.instrument(future).left_future()
299 } else {
300 future.right_future()
301 }
302 })
303 }),
304 CacheRefiller::default_spawn_refill_task(),
305 )
306 }
307
308 fn new_inner(
309 version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
310 sstable_store: SstableStoreRef,
311 state_store_metrics: Arc<HummockStateStoreMetrics>,
312 refill_config: CacheRefillConfig,
313 recent_versions: RecentVersions,
314 buffer_tracker: BufferTracker,
315 spawn_upload_task: SpawnUploadTask,
316 spawn_refill_task: SpawnRefillTask,
317 ) -> Self {
318 let (hummock_event_tx, hummock_event_rx) =
319 event_channel(state_store_metrics.event_handler_pending_event.clone());
320 let (version_update_notifier_tx, _) =
321 tokio::sync::watch::channel(recent_versions.latest_version().clone());
322 let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
323 let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
324
325 let metrics = HummockEventHandlerMetrics {
326 event_handler_on_upload_finish_latency: state_store_metrics
327 .event_handler_latency
328 .with_label_values(&["on_upload_finish"]),
329 event_handler_on_apply_version_update: state_store_metrics
330 .event_handler_latency
331 .with_label_values(&["apply_version"]),
332 event_handler_on_recv_version_update: state_store_metrics
333 .event_handler_latency
334 .with_label_values(&["recv_version_update"]),
335 };
336
337 let uploader = HummockUploader::new(
338 state_store_metrics.clone(),
339 recent_versions.latest_version().clone(),
340 spawn_upload_task,
341 buffer_tracker,
342 );
343 let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
344
345 Self {
346 hummock_event_tx,
347 hummock_event_rx,
348 version_update_rx,
349 version_update_notifier_tx,
350 recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
351 read_version_mapping,
352 local_read_version_mapping: Default::default(),
353 uploader,
354 refiller,
355 last_instance_id: 0,
356 metrics,
357 }
358 }
359
360 pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
361 self.version_update_notifier_tx.clone()
362 }
363
364 pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
365 self.recent_versions.clone()
366 }
367
368 pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
369 ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
370 }
371
372 pub fn event_sender(&self) -> HummockEventSender {
373 self.hummock_event_tx.clone()
374 }
375
376 pub fn buffer_tracker(&self) -> &BufferTracker {
377 self.uploader.buffer_tracker()
378 }
379}
380
381impl HummockEventHandler {
383 fn for_each_read_version(
386 &self,
387 instances: impl IntoIterator<Item = LocalInstanceId>,
388 mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
389 ) {
390 let instances = {
391 #[cfg(debug_assertions)]
392 {
393 let mut id_set = std::collections::HashSet::new();
395 for instance in instances {
396 assert!(id_set.insert(instance));
397 }
398 id_set
399 }
400 #[cfg(not(debug_assertions))]
401 {
402 instances
403 }
404 };
405 let mut pending = VecDeque::new();
406 let mut total_count = 0;
407 for instance_id in instances {
408 let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
409 continue;
410 };
411 total_count += 1;
412 match read_version.try_write() {
413 Some(mut write_guard) => {
414 f(instance_id, &mut write_guard);
415 }
416 _ => {
417 pending.push_back(instance_id);
418 }
419 }
420 }
421 if !pending.is_empty() {
422 if pending.len() * 10 > total_count {
423 warn!(
425 pending_count = pending.len(),
426 total_count, "cannot acquire lock for all read version"
427 );
428 } else {
429 debug!(
430 pending_count = pending.len(),
431 total_count, "cannot acquire lock for all read version"
432 );
433 }
434 }
435
436 const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
437
438 while let Some(instance_id) = pending.pop_front() {
439 let (_, read_version) = self
440 .local_read_version_mapping
441 .get(&instance_id)
442 .expect("have checked exist before");
443 match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
444 Some(mut write_guard) => {
445 f(instance_id, &mut write_guard);
446 }
447 _ => {
448 warn!(instance_id, "failed to get lock again for instance");
449 pending.push_back(instance_id);
450 }
451 }
452 }
453 }
454
455 fn handle_uploaded_sst_inner(&mut self, staging_sstable_info: Arc<StagingSstableInfo>) {
456 trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
457 self.for_each_read_version(
458 staging_sstable_info.imm_ids().keys().cloned(),
459 |_, read_version| read_version.update(VersionUpdate::Sst(staging_sstable_info.clone())),
460 )
461 }
462
463 fn handle_sync_epoch(
464 &mut self,
465 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
466 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
467 ) {
468 debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
469 self.uploader
470 .start_sync_epoch(sync_result_sender, sync_table_epochs);
471 }
472
473 fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
474 info!(
475 current_version_id = ?self.uploader.hummock_version().id(),
476 "handle clear event"
477 );
478
479 self.uploader.clear(table_ids.clone());
480
481 if table_ids.is_none() {
482 assert!(
483 self.local_read_version_mapping.is_empty(),
484 "read version mapping not empty when clear. remaining tables: {:?}",
485 self.local_read_version_mapping
486 .values()
487 .map(|(_, read_version)| read_version.read().table_id())
488 .collect_vec()
489 );
490 }
491
492 let _ = notifier.send(()).inspect_err(|e| {
494 error!("failed to notify completion of clear event: {:?}", e);
495 });
496
497 info!("clear finished");
498 }
499
500 fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
501 let _timer = self
502 .metrics
503 .event_handler_on_recv_version_update
504 .start_timer();
505 let pinned_version = self
506 .refiller
507 .last_new_pinned_version()
508 .cloned()
509 .unwrap_or_else(|| self.uploader.hummock_version().clone());
510
511 let mut sst_delta_infos = vec![];
512 if let Some(new_pinned_version) = Self::resolve_version_update_info(
513 &pinned_version,
514 version_payload,
515 Some(&mut sst_delta_infos),
516 ) {
517 self.refiller
518 .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
519 }
520 }
521
522 fn resolve_version_update_info(
523 pinned_version: &PinnedVersion,
524 version_payload: HummockVersionUpdate,
525 mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
526 ) -> Option<PinnedVersion> {
527 match version_payload {
528 HummockVersionUpdate::VersionDeltas(version_deltas) => {
529 let mut version_to_apply = (**pinned_version).clone();
530 {
531 let mut table_change_log_to_apply_guard =
532 pinned_version.table_change_log_write_lock();
533 for version_delta in version_deltas {
534 assert_eq!(version_to_apply.id, version_delta.prev_id);
535
536 {
538 let mut state_table_info = version_to_apply.state_table_info.clone();
539 let (changed_table_info, _is_commit_epoch) = state_table_info
540 .apply_delta(
541 &version_delta.state_table_info_delta,
542 &version_delta.removed_table_ids,
543 );
544
545 HummockVersionCommon::<SstableInfo>::apply_change_log_delta(
546 &mut *table_change_log_to_apply_guard,
547 &version_delta.change_log_delta,
548 &version_delta.removed_table_ids,
549 &version_delta.state_table_info_delta,
550 &changed_table_info,
551 );
552 }
553
554 let local_hummock_version_delta =
555 LocalHummockVersionDelta::from(version_delta);
556 if let Some(sst_delta_infos) = &mut sst_delta_infos {
557 sst_delta_infos.extend(
558 version_to_apply
559 .build_sst_delta_infos(&local_hummock_version_delta)
560 .into_iter(),
561 );
562 }
563
564 version_to_apply.apply_version_delta(&local_hummock_version_delta);
565 }
566 }
567
568 pinned_version.new_with_local_version(version_to_apply)
569 }
570 HummockVersionUpdate::PinnedVersion(version) => {
571 pinned_version.new_pin_version(*version)
572 }
573 }
574 }
575
576 fn apply_version_update(
577 &mut self,
578 pinned_version: PinnedVersion,
579 new_pinned_version: PinnedVersion,
580 ) {
581 let _timer = self
582 .metrics
583 .event_handler_on_apply_version_update
584 .start_timer();
585 self.recent_versions.rcu(|prev_recent_versions| {
586 prev_recent_versions.with_new_version(new_pinned_version.clone())
587 });
588
589 {
590 self.for_each_read_version(
591 self.local_read_version_mapping.keys().cloned(),
592 |_, read_version| {
593 read_version
594 .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
595 },
596 );
597 }
598
599 self.version_update_notifier_tx.send_if_modified(|state| {
600 assert_eq!(pinned_version.id(), state.id());
601 if state.id() == new_pinned_version.id() {
602 return false;
603 }
604 assert!(new_pinned_version.id() > state.id());
605 *state = new_pinned_version.clone();
606 true
607 });
608
609 debug!("update to hummock version: {}", new_pinned_version.id(),);
610
611 self.uploader.update_pinned_version(new_pinned_version);
612 }
613}
614
615impl HummockEventHandler {
616 pub async fn start_hummock_event_handler_worker(mut self) {
617 loop {
618 tokio::select! {
619 sst = self.uploader.next_uploaded_sst() => {
620 self.handle_uploaded_sst(sst);
621 }
622 event = self.refiller.next_event() => {
623 let CacheRefillerEvent {pinned_version, new_pinned_version } = event;
624 self.apply_version_update(pinned_version, new_pinned_version);
625 }
626 event = pin!(self.hummock_event_rx.recv()) => {
627 let Some(event) = event else { break };
628 match event {
629 HummockEvent::Shutdown => {
630 info!("event handler shutdown");
631 return;
632 },
633 event => {
634 self.handle_hummock_event(event);
635 }
636 }
637 }
638 version_update = pin!(self.version_update_rx.recv()) => {
639 let Some(version_update) = version_update else {
640 warn!("version update stream ends. event handle shutdown");
641 return;
642 };
643 self.handle_version_update(version_update);
644 }
645 }
646 }
647 }
648
649 fn handle_uploaded_sst(&mut self, sst: Arc<StagingSstableInfo>) {
650 let _timer = self
651 .metrics
652 .event_handler_on_upload_finish_latency
653 .start_timer();
654 self.handle_uploaded_sst_inner(sst);
655 }
656
657 fn handle_hummock_event(&mut self, event: HummockEvent) {
659 match event {
660 HummockEvent::BufferMayFlush => {
661 self.uploader.may_flush();
662 }
663 HummockEvent::SyncEpoch {
664 sync_result_sender,
665 sync_table_epochs,
666 } => {
667 self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
668 }
669 HummockEvent::Clear(notifier, table_ids) => {
670 self.handle_clear(notifier, table_ids);
671 }
672 HummockEvent::Shutdown => {
673 unreachable!("shutdown is handled specially")
674 }
675 HummockEvent::StartEpoch { epoch, table_ids } => {
676 self.uploader.start_epoch(epoch, table_ids);
677 }
678 HummockEvent::InitEpoch {
679 instance_id,
680 init_epoch,
681 } => {
682 let table_id = self
683 .local_read_version_mapping
684 .get(&instance_id)
685 .expect("should exist")
686 .0;
687 self.uploader
688 .init_instance(instance_id, table_id, init_epoch);
689 }
690 HummockEvent::ImmToUploader { instance_id, imms } => {
691 assert!(
692 self.local_read_version_mapping.contains_key(&instance_id),
693 "add imm from non-existing read version instance: instance_id: {}, table_id {:?}",
694 instance_id,
695 imms.first().map(|imm| imm.table_id),
696 );
697 self.uploader.add_imms(instance_id, imms);
698 self.uploader.may_flush();
699 }
700
701 HummockEvent::LocalSealEpoch {
702 next_epoch,
703 opts,
704 instance_id,
705 } => {
706 self.uploader
707 .local_seal_epoch(instance_id, next_epoch, opts);
708 }
709
710 #[cfg(any(test, feature = "test"))]
711 HummockEvent::FlushEvent(sender) => {
712 let _ = sender.send(()).inspect_err(|e| {
713 error!("unable to send flush result: {:?}", e);
714 });
715 }
716
717 HummockEvent::RegisterReadVersion {
718 table_id,
719 new_read_version_sender,
720 is_replicated,
721 vnodes,
722 } => {
723 let pinned_version = self.recent_versions.load().latest_version().clone();
724 let instance_id = self.generate_instance_id();
725 let basic_read_version = Arc::new(RwLock::new(
726 HummockReadVersion::new_with_replication_option(
727 table_id,
728 instance_id,
729 pinned_version,
730 is_replicated,
731 vnodes,
732 ),
733 ));
734
735 debug!(
736 "new read version registered: table_id: {}, instance_id: {}",
737 table_id, instance_id
738 );
739
740 {
741 self.local_read_version_mapping
742 .insert(instance_id, (table_id, basic_read_version.clone()));
743 let mut read_version_mapping_guard = self.read_version_mapping.write();
744
745 read_version_mapping_guard
746 .entry(table_id)
747 .or_default()
748 .insert(instance_id, basic_read_version.clone());
749 }
750
751 match new_read_version_sender.send((
752 basic_read_version,
753 LocalInstanceGuard {
754 table_id,
755 instance_id,
756 event_sender: Some(self.hummock_event_tx.clone()),
757 },
758 )) {
759 Ok(_) => {}
760 Err((_, mut guard)) => {
761 warn!(
762 "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
763 table_id, instance_id
764 );
765 guard.event_sender.take().expect("sender is just set");
766 self.destroy_read_version(instance_id);
767 }
768 }
769 }
770
771 HummockEvent::DestroyReadVersion { instance_id } => {
772 self.uploader.may_destroy_instance(instance_id);
773 self.destroy_read_version(instance_id);
774 }
775 HummockEvent::GetMinUncommittedObjectId { result_tx } => {
776 let _ = result_tx
777 .send(self.uploader.min_uncommitted_object_id())
778 .inspect_err(|e| {
779 error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
780 });
781 }
782 HummockEvent::RegisterVectorWriter {
783 table_id,
784 init_epoch,
785 } => self.uploader.register_vector_writer(table_id, init_epoch),
786 HummockEvent::VectorWriterSealEpoch {
787 table_id,
788 next_epoch,
789 add,
790 } => {
791 self.uploader
792 .vector_writer_seal_epoch(table_id, next_epoch, add);
793 }
794 HummockEvent::DropVectorWriter { table_id } => {
795 self.uploader.drop_vector_writer(table_id);
796 }
797 }
798 }
799
800 fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
801 {
802 {
803 debug!("read version deregister: instance_id: {}", instance_id);
804 let (table_id, _) = self
805 .local_read_version_mapping
806 .remove(&instance_id)
807 .unwrap_or_else(|| {
808 panic!(
809 "DestroyHummockInstance inexist instance instance_id {}",
810 instance_id
811 )
812 });
813 let mut read_version_mapping_guard = self.read_version_mapping.write();
814 let entry = read_version_mapping_guard
815 .get_mut(&table_id)
816 .unwrap_or_else(|| {
817 panic!(
818 "DestroyHummockInstance table_id {} instance_id {} fail",
819 table_id, instance_id
820 )
821 });
822 entry.remove(&instance_id).unwrap_or_else(|| {
823 panic!(
824 "DestroyHummockInstance inexist instance table_id {} instance_id {}",
825 table_id, instance_id
826 )
827 });
828 if entry.is_empty() {
829 read_version_mapping_guard.remove(&table_id);
830 }
831 }
832 }
833 }
834
835 fn generate_instance_id(&mut self) -> LocalInstanceId {
836 self.last_instance_id += 1;
837 self.last_instance_id
838 }
839}
840
841pub(super) fn send_sync_result(
842 sender: oneshot::Sender<HummockResult<SyncedData>>,
843 result: HummockResult<SyncedData>,
844) {
845 let _ = sender.send(result).inspect_err(|e| {
846 error!("unable to send sync result. Err: {:?}", e);
847 });
848}
849
850impl SyncedData {
851 pub fn into_sync_result(self) -> SyncResult {
852 {
853 let SyncedData {
854 uploaded_ssts,
855 table_watermarks,
856 vector_index_adds,
857 } = self;
858 let mut sync_size = 0;
859 let mut uncommitted_ssts = Vec::new();
860 let mut old_value_ssts = Vec::new();
861 for sst in uploaded_ssts {
864 sync_size += sst.imm_size();
865 uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
866 old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
867 }
868 SyncResult {
869 sync_size,
870 uncommitted_ssts,
871 table_watermarks: table_watermarks.clone(),
872 old_value_ssts,
873 vector_index_adds,
874 }
875 }
876 }
877}
878
879#[cfg(test)]
880mod tests {
881 use std::collections::{HashMap, HashSet};
882 use std::future::poll_fn;
883 use std::sync::Arc;
884 use std::task::Poll;
885
886 use futures::FutureExt;
887 use parking_lot::Mutex;
888 use risingwave_common::bitmap::Bitmap;
889 use risingwave_common::catalog::TableId;
890 use risingwave_common::hash::VirtualNode;
891 use risingwave_common::util::epoch::{EpochExt, test_epoch};
892 use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
893 use risingwave_hummock_sdk::version::HummockVersion;
894 use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
895 use tokio::spawn;
896 use tokio::sync::mpsc::unbounded_channel;
897 use tokio::sync::oneshot;
898
899 use crate::hummock::HummockError;
900 use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
901 use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
902 use crate::hummock::event_handler::uploader::UploadTaskOutput;
903 use crate::hummock::event_handler::uploader::test_utils::{
904 TEST_TABLE_ID, gen_imm, gen_imm_inner, prepare_uploader_order_test_spawn_task_fn,
905 };
906 use crate::hummock::event_handler::{
907 HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
908 };
909 use crate::hummock::iterator::test_utils::mock_sstable_store;
910 use crate::hummock::local_version::pinned_version::PinnedVersion;
911 use crate::hummock::local_version::recent_versions::RecentVersions;
912 use crate::hummock::test_utils::default_opts_for_test;
913 use crate::mem_table::ImmutableMemtable;
914 use crate::monitor::HummockStateStoreMetrics;
915 use crate::store::SealCurrentEpochOptions;
916
917 #[tokio::test]
918 async fn test_old_epoch_sync_fail() {
919 let epoch0 = test_epoch(233);
920
921 let initial_version = PinnedVersion::new(
922 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
923 id: 1,
924 state_table_info: HashMap::from_iter([(
925 TEST_TABLE_ID.table_id,
926 StateTableInfo {
927 committed_epoch: epoch0,
928 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
929 },
930 )]),
931 ..Default::default()
932 }),
933 unbounded_channel().0,
934 );
935
936 let (_version_update_tx, version_update_rx) = unbounded_channel();
937
938 let epoch1 = epoch0.next_epoch();
939 let epoch2 = epoch1.next_epoch();
940 let (tx, rx) = oneshot::channel();
941 let rx = Arc::new(Mutex::new(Some(rx)));
942
943 let storage_opt = default_opts_for_test();
944 let metrics = Arc::new(HummockStateStoreMetrics::unused());
945
946 let event_handler = HummockEventHandler::new_inner(
947 version_update_rx,
948 mock_sstable_store().await,
949 metrics.clone(),
950 CacheRefillConfig::from_storage_opts(&storage_opt),
951 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
952 BufferTracker::from_storage_opts(
953 &storage_opt,
954 metrics.uploader_uploading_task_size.clone(),
955 ),
956 Arc::new(move |_, info| {
957 assert_eq!(info.epochs.len(), 1);
958 let epoch = info.epochs[0];
959 match epoch {
960 epoch if epoch == epoch1 => {
961 let rx = rx.lock().take().unwrap();
962 spawn(async move {
963 rx.await.unwrap();
964 Err(HummockError::other("fail"))
965 })
966 }
967 epoch if epoch == epoch2 => spawn(async move {
968 Ok(UploadTaskOutput {
969 new_value_ssts: vec![],
970 old_value_ssts: vec![],
971 wait_poll_timer: None,
972 })
973 }),
974 _ => unreachable!(),
975 }
976 }),
977 CacheRefiller::default_spawn_refill_task(),
978 );
979
980 let event_tx = event_handler.event_sender();
981
982 let send_event = |event| event_tx.send(event).unwrap();
983
984 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
985
986 let (read_version, guard) = {
987 let (tx, rx) = oneshot::channel();
988 send_event(HummockEvent::RegisterReadVersion {
989 table_id: TEST_TABLE_ID,
990 new_read_version_sender: tx,
991 is_replicated: false,
992 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
993 });
994 rx.await.unwrap()
995 };
996
997 send_event(HummockEvent::StartEpoch {
998 epoch: epoch1,
999 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1000 });
1001
1002 send_event(HummockEvent::InitEpoch {
1003 instance_id: guard.instance_id,
1004 init_epoch: epoch1,
1005 });
1006
1007 let imm1 = gen_imm(epoch1).await;
1008 read_version.write().add_imm(imm1.clone());
1009
1010 send_event(HummockEvent::ImmToUploader {
1011 instance_id: guard.instance_id,
1012 imms: read_version.write().start_upload_pending_imms(),
1013 });
1014
1015 send_event(HummockEvent::StartEpoch {
1016 epoch: epoch2,
1017 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1018 });
1019
1020 send_event(HummockEvent::LocalSealEpoch {
1021 instance_id: guard.instance_id,
1022 next_epoch: epoch2,
1023 opts: SealCurrentEpochOptions::for_test(),
1024 });
1025
1026 {
1027 let imm2 = gen_imm(epoch2).await;
1028 let mut read_version = read_version.write();
1029 read_version.add_imm(imm2);
1030
1031 send_event(HummockEvent::ImmToUploader {
1032 instance_id: guard.instance_id,
1033 imms: read_version.start_upload_pending_imms(),
1034 });
1035 }
1036
1037 let epoch3 = epoch2.next_epoch();
1038 send_event(HummockEvent::StartEpoch {
1039 epoch: epoch3,
1040 table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1041 });
1042 send_event(HummockEvent::LocalSealEpoch {
1043 instance_id: guard.instance_id,
1044 next_epoch: epoch3,
1045 opts: SealCurrentEpochOptions::for_test(),
1046 });
1047
1048 let (tx1, mut rx1) = oneshot::channel();
1049 send_event(HummockEvent::SyncEpoch {
1050 sync_result_sender: tx1,
1051 sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1052 });
1053 assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1054 let (tx2, mut rx2) = oneshot::channel();
1055 send_event(HummockEvent::SyncEpoch {
1056 sync_result_sender: tx2,
1057 sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1058 });
1059 assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1060
1061 tx.send(()).unwrap();
1062 rx1.await.unwrap().unwrap_err();
1063 rx2.await.unwrap().unwrap_err();
1064
1065 send_event(HummockEvent::Shutdown);
1066 join_handle.await.unwrap();
1067 }
1068
1069 #[tokio::test]
1070 async fn test_clear_tables() {
1071 let table_id1 = TableId::new(1);
1072 let table_id2 = TableId::new(2);
1073 let epoch0 = test_epoch(233);
1074
1075 let initial_version = PinnedVersion::new(
1076 HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1077 id: 1,
1078 state_table_info: HashMap::from_iter([
1079 (
1080 table_id1.table_id,
1081 StateTableInfo {
1082 committed_epoch: epoch0,
1083 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1084 },
1085 ),
1086 (
1087 table_id2.table_id,
1088 StateTableInfo {
1089 committed_epoch: epoch0,
1090 compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1091 },
1092 ),
1093 ]),
1094 ..Default::default()
1095 }),
1096 unbounded_channel().0,
1097 );
1098
1099 let (_version_update_tx, version_update_rx) = unbounded_channel();
1100
1101 let epoch1 = epoch0.next_epoch();
1102 let epoch2 = epoch1.next_epoch();
1103 let epoch3 = epoch2.next_epoch();
1104
1105 let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0, None).await.size();
1106
1107 let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1109 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1110
1111 let gen_imm = |table_id, epoch, spill_offset| {
1112 let imm = gen_imm_inner(table_id, epoch, spill_offset, Some(&*memory_limiter))
1113 .now_or_never()
1114 .unwrap();
1115 assert_eq!(imm.size(), imm_size);
1116 imm
1117 };
1118 let imm1_1 = gen_imm(table_id1, epoch1, 0);
1119 let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1120
1121 let storage_opt = default_opts_for_test();
1122 let metrics = Arc::new(HummockStateStoreMetrics::unused());
1123
1124 let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1125
1126 let event_handler = HummockEventHandler::new_inner(
1127 version_update_rx,
1128 mock_sstable_store().await,
1129 metrics.clone(),
1130 CacheRefillConfig::from_storage_opts(&storage_opt),
1131 RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1132 buffer_tracker,
1133 spawn_task,
1134 CacheRefiller::default_spawn_refill_task(),
1135 );
1136
1137 let event_tx = event_handler.event_sender();
1138
1139 let send_event = |event| event_tx.send(event).unwrap();
1140 let flush_event = || async {
1141 let (tx, rx) = oneshot::channel();
1142 send_event(HummockEvent::FlushEvent(tx));
1143 rx.await.unwrap();
1144 };
1145 let start_epoch = |table_id, epoch| {
1146 send_event(HummockEvent::StartEpoch {
1147 epoch,
1148 table_ids: HashSet::from_iter([table_id]),
1149 })
1150 };
1151 let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1152 send_event(HummockEvent::InitEpoch {
1153 instance_id: instance.instance_id,
1154 init_epoch,
1155 })
1156 };
1157 let write_imm = |read_version: &HummockReadVersionRef,
1158 instance: &LocalInstanceGuard,
1159 imm: &ImmutableMemtable| {
1160 let mut read_version = read_version.write();
1161 read_version.add_imm(imm.clone());
1162
1163 send_event(HummockEvent::ImmToUploader {
1164 instance_id: instance.instance_id,
1165 imms: read_version.start_upload_pending_imms(),
1166 });
1167 };
1168 let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1169 send_event(HummockEvent::LocalSealEpoch {
1170 instance_id: instance.instance_id,
1171 next_epoch,
1172 opts: SealCurrentEpochOptions::for_test(),
1173 })
1174 };
1175 let sync_epoch = |table_id, new_sync_epoch| {
1176 let (tx, rx) = oneshot::channel();
1177 send_event(HummockEvent::SyncEpoch {
1178 sync_result_sender: tx,
1179 sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1180 });
1181 rx
1182 };
1183
1184 let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1185
1186 let (read_version1, guard1) = {
1187 let (tx, rx) = oneshot::channel();
1188 send_event(HummockEvent::RegisterReadVersion {
1189 table_id: table_id1,
1190 new_read_version_sender: tx,
1191 is_replicated: false,
1192 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1193 });
1194 rx.await.unwrap()
1195 };
1196
1197 let (read_version2, guard2) = {
1198 let (tx, rx) = oneshot::channel();
1199 send_event(HummockEvent::RegisterReadVersion {
1200 table_id: table_id2,
1201 new_read_version_sender: tx,
1202 is_replicated: false,
1203 vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1204 });
1205 rx.await.unwrap()
1206 };
1207
1208 let (task1_1_finish_tx, task1_1_rx) = {
1210 start_epoch(table_id1, epoch1);
1211
1212 init_epoch(&guard1, epoch1);
1213
1214 write_imm(&read_version1, &guard1, &imm1_1);
1215
1216 start_epoch(table_id1, epoch2);
1217
1218 seal_epoch(&guard1, epoch2);
1219
1220 let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1221 guard1.instance_id,
1222 vec![imm1_1.batch_id()],
1223 )]));
1224
1225 let mut rx = sync_epoch(table_id1, epoch1);
1226 wait_task_start.await;
1227 assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1228
1229 write_imm(&read_version1, &guard1, &imm1_2_1);
1230 flush_event().await;
1231
1232 (task_finish_tx, rx)
1233 };
1234 let (task1_2_finish_tx, _finish_txs) = {
1239 let mut finish_txs = vec![];
1240 let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1241 start_epoch(table_id2, epoch1);
1242 init_epoch(&guard2, epoch1);
1243 let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1244 guard1.instance_id,
1245 vec![imm1_2_1.batch_id()],
1246 )]));
1247 write_imm(&read_version2, &guard2, &imm2_1_1);
1248 wait_task_start.await;
1249
1250 let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1251 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1252 guard2.instance_id,
1253 vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1254 )]));
1255 finish_txs.push(finish_tx);
1256 write_imm(&read_version2, &guard2, &imm2_1_2);
1257 wait_task_start.await;
1258
1259 let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1260 write_imm(&read_version2, &guard2, &imm2_1_3);
1261 start_epoch(table_id2, epoch2);
1262 seal_epoch(&guard2, epoch2);
1263 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1264 guard2.instance_id,
1265 vec![imm2_1_3.batch_id()],
1266 )]));
1267 finish_txs.push(finish_tx);
1268 let _sync_rx = sync_epoch(table_id2, epoch1);
1269 wait_task_start.await;
1270
1271 let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1272 write_imm(&read_version2, &guard2, &imm2_2_1);
1273 flush_event().await;
1274 let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1275 write_imm(&read_version2, &guard2, &imm2_2_2);
1276 let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1277 guard2.instance_id,
1278 vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1279 )]));
1280 finish_txs.push(finish_tx);
1281 wait_task_start.await;
1282
1283 let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1284 write_imm(&read_version2, &guard2, &imm2_2_3);
1285
1286 drop(guard2);
1294 let (clear_tx, clear_rx) = oneshot::channel();
1295 send_event(HummockEvent::Clear(
1296 clear_tx,
1297 Some(HashSet::from_iter([table_id2])),
1298 ));
1299 clear_rx.await.unwrap();
1300 (task1_2_finish_tx, finish_txs)
1301 };
1302
1303 let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1304 write_imm(&read_version1, &guard1, &imm1_2_2);
1305 start_epoch(table_id1, epoch3);
1306 seal_epoch(&guard1, epoch3);
1307
1308 let (tx2, mut sync_rx2) = oneshot::channel();
1309 let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1310 guard1.instance_id,
1311 vec![imm1_2_2.batch_id()],
1312 )]));
1313 send_event(HummockEvent::SyncEpoch {
1314 sync_result_sender: tx2,
1315 sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1316 });
1317 wait_task_start.await;
1318 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1319
1320 task1_1_finish_tx.send(()).unwrap();
1321 let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1322 sync_data1
1323 .uploaded_ssts
1324 .iter()
1325 .all(|sst| sst.epochs() == &vec![epoch1]);
1326 task1_2_finish_tx.send(()).unwrap();
1327 assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1328 task1_2_2_finish_tx.send(()).unwrap();
1329 let sync_data2 = sync_rx2.await.unwrap().unwrap();
1330 sync_data2
1331 .uploaded_ssts
1332 .iter()
1333 .all(|sst| sst.epochs() == &vec![epoch2]);
1334
1335 send_event(HummockEvent::Shutdown);
1336 join_handle.await.unwrap();
1337 }
1338}