risingwave_storage/hummock/event_handler/
hummock_event_handler.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet, VecDeque};
16use std::pin::pin;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::Relaxed;
19use std::sync::{Arc, LazyLock};
20use std::time::Duration;
21
22use arc_swap::ArcSwap;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::FutureExt;
25use itertools::Itertools;
26use parking_lot::RwLock;
27use prometheus::{Histogram, IntGauge};
28use risingwave_common::catalog::TableId;
29use risingwave_common::metrics::UintGauge;
30use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::version::{HummockVersionCommon, LocalHummockVersionDelta};
33use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
34use tokio::spawn;
35use tokio::sync::mpsc::error::SendError;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
37use tokio::sync::oneshot;
38use tracing::{debug, error, info, trace, warn};
39
40use super::refiller::{CacheRefillConfig, CacheRefiller};
41use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType};
42use crate::compaction_catalog_manager::CompactionCatalogManagerRef;
43use crate::hummock::compactor::{CompactorContext, await_tree_key, compact};
44use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
45use crate::hummock::event_handler::uploader::{
46    HummockUploader, SpawnUploadTask, SyncedData, UploadTaskOutput,
47};
48use crate::hummock::event_handler::{
49    HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
50    ReadOnlyRwLockRef,
51};
52use crate::hummock::local_version::pinned_version::PinnedVersion;
53use crate::hummock::local_version::recent_versions::RecentVersions;
54use crate::hummock::store::version::{
55    HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate,
56};
57use crate::hummock::{HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef};
58use crate::mem_table::ImmutableMemtable;
59use crate::monitor::HummockStateStoreMetrics;
60use crate::opts::StorageOpts;
61
62#[derive(Clone)]
63pub struct BufferTracker {
64    flush_threshold: usize,
65    min_batch_flush_size: usize,
66    global_buffer: Arc<MemoryLimiter>,
67    global_upload_task_size: UintGauge,
68}
69
70impl BufferTracker {
71    pub fn from_storage_opts(config: &StorageOpts, global_upload_task_size: UintGauge) -> Self {
72        let capacity = config.shared_buffer_capacity_mb * (1 << 20);
73        let flush_threshold = (capacity as f32 * config.shared_buffer_flush_ratio) as usize;
74        let shared_buffer_min_batch_flush_size =
75            config.shared_buffer_min_batch_flush_size_mb * (1 << 20);
76        assert!(
77            flush_threshold < capacity,
78            "flush_threshold {} should be less or equal to capacity {}",
79            flush_threshold,
80            capacity
81        );
82        Self::new(
83            capacity,
84            flush_threshold,
85            global_upload_task_size,
86            shared_buffer_min_batch_flush_size,
87        )
88    }
89
90    #[cfg(test)]
91    fn for_test_with_config(flush_threshold: usize, min_batch_flush_size: usize) -> Self {
92        Self::new(
93            usize::MAX,
94            flush_threshold,
95            UintGauge::new("test", "test").unwrap(),
96            min_batch_flush_size,
97        )
98    }
99
100    fn new(
101        capacity: usize,
102        flush_threshold: usize,
103        global_upload_task_size: UintGauge,
104        min_batch_flush_size: usize,
105    ) -> Self {
106        assert!(capacity >= flush_threshold);
107        Self {
108            flush_threshold,
109            global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)),
110            global_upload_task_size,
111            min_batch_flush_size,
112        }
113    }
114
115    pub fn for_test() -> Self {
116        Self::from_storage_opts(
117            &StorageOpts::default(),
118            UintGauge::new("test", "test").unwrap(),
119        )
120    }
121
122    pub fn get_buffer_size(&self) -> usize {
123        self.global_buffer.get_memory_usage() as usize
124    }
125
126    pub fn get_memory_limiter(&self) -> &Arc<MemoryLimiter> {
127        &self.global_buffer
128    }
129
130    pub fn global_upload_task_size(&self) -> &UintGauge {
131        &self.global_upload_task_size
132    }
133
134    /// Return true when the buffer size minus current upload task size is still greater than the
135    /// flush threshold.
136    pub fn need_flush(&self) -> bool {
137        self.get_buffer_size() > self.flush_threshold + self.global_upload_task_size.get() as usize
138    }
139
140    pub fn need_more_flush(&self, curr_batch_flush_size: usize) -> bool {
141        curr_batch_flush_size < self.min_batch_flush_size || self.need_flush()
142    }
143
144    #[cfg(test)]
145    pub(crate) fn flush_threshold(&self) -> usize {
146        self.flush_threshold
147    }
148}
149
150#[derive(Clone)]
151pub struct HummockEventSender {
152    inner: UnboundedSender<HummockEvent>,
153    event_count: IntGauge,
154}
155
156pub fn event_channel(event_count: IntGauge) -> (HummockEventSender, HummockEventReceiver) {
157    let (tx, rx) = unbounded_channel();
158    (
159        HummockEventSender {
160            inner: tx,
161            event_count: event_count.clone(),
162        },
163        HummockEventReceiver {
164            inner: rx,
165            event_count,
166        },
167    )
168}
169
170impl HummockEventSender {
171    pub fn send(&self, event: HummockEvent) -> Result<(), SendError<HummockEvent>> {
172        self.inner.send(event)?;
173        self.event_count.inc();
174        Ok(())
175    }
176}
177
178pub struct HummockEventReceiver {
179    inner: UnboundedReceiver<HummockEvent>,
180    event_count: IntGauge,
181}
182
183impl HummockEventReceiver {
184    async fn recv(&mut self) -> Option<HummockEvent> {
185        let event = self.inner.recv().await?;
186        self.event_count.dec();
187        Some(event)
188    }
189}
190
191struct HummockEventHandlerMetrics {
192    event_handler_on_upload_finish_latency: Histogram,
193    event_handler_on_apply_version_update: Histogram,
194    event_handler_on_recv_version_update: Histogram,
195}
196
197pub struct HummockEventHandler {
198    hummock_event_tx: HummockEventSender,
199    hummock_event_rx: HummockEventReceiver,
200    version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
201    read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
202    /// A copy of `read_version_mapping` but owned by event handler
203    local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,
204
205    version_update_notifier_tx: Arc<tokio::sync::watch::Sender<PinnedVersion>>,
206    recent_versions: Arc<ArcSwap<RecentVersions>>,
207
208    uploader: HummockUploader,
209    refiller: CacheRefiller,
210
211    last_instance_id: LocalInstanceId,
212
213    metrics: HummockEventHandlerMetrics,
214}
215
216async fn flush_imms(
217    payload: Vec<ImmutableMemtable>,
218    compactor_context: CompactorContext,
219    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
220    sstable_object_id_manager: Arc<SstableObjectIdManager>,
221) -> HummockResult<UploadTaskOutput> {
222    compact(
223        compactor_context,
224        sstable_object_id_manager,
225        payload,
226        compaction_catalog_manager_ref,
227    )
228    .instrument_await("shared_buffer_compact".verbose())
229    .await
230}
231
232impl HummockEventHandler {
233    pub fn new(
234        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
235        pinned_version: PinnedVersion,
236        compactor_context: CompactorContext,
237        compaction_catalog_manager_ref: CompactionCatalogManagerRef,
238        sstable_object_id_manager: Arc<SstableObjectIdManager>,
239        state_store_metrics: Arc<HummockStateStoreMetrics>,
240    ) -> Self {
241        let upload_compactor_context = compactor_context.clone();
242        let upload_task_latency = state_store_metrics.uploader_upload_task_latency.clone();
243        let wait_poll_latency = state_store_metrics.uploader_wait_poll_latency.clone();
244        let recent_versions = RecentVersions::new(
245            pinned_version,
246            compactor_context
247                .storage_opts
248                .max_cached_recent_versions_number,
249            state_store_metrics.clone(),
250        );
251        let buffer_tracker = BufferTracker::from_storage_opts(
252            &compactor_context.storage_opts,
253            state_store_metrics.uploader_uploading_task_size.clone(),
254        );
255        Self::new_inner(
256            version_update_rx,
257            compactor_context.sstable_store.clone(),
258            state_store_metrics,
259            CacheRefillConfig::from_storage_opts(&compactor_context.storage_opts),
260            recent_versions,
261            buffer_tracker,
262            Arc::new(move |payload, task_info| {
263                static NEXT_UPLOAD_TASK_ID: LazyLock<AtomicUsize> =
264                    LazyLock::new(|| AtomicUsize::new(0));
265                let tree_root = upload_compactor_context.await_tree_reg.as_ref().map(|reg| {
266                    let upload_task_id = NEXT_UPLOAD_TASK_ID.fetch_add(1, Relaxed);
267                    reg.register(
268                        await_tree_key::SpawnUploadTask { id: upload_task_id },
269                        format!("Spawn Upload Task: {}", task_info),
270                    )
271                });
272                let upload_task_latency = upload_task_latency.clone();
273                let wait_poll_latency = wait_poll_latency.clone();
274                let upload_compactor_context = upload_compactor_context.clone();
275                let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
276                let sstable_object_id_manager = sstable_object_id_manager.clone();
277                spawn({
278                    let future = async move {
279                        let _timer = upload_task_latency.start_timer();
280                        let mut output = flush_imms(
281                            payload
282                                .into_values()
283                                .flat_map(|imms| imms.into_iter())
284                                .collect(),
285                            upload_compactor_context.clone(),
286                            compaction_catalog_manager_ref.clone(),
287                            sstable_object_id_manager.clone(),
288                        )
289                        .await?;
290                        assert!(
291                            output
292                                .wait_poll_timer
293                                .replace(wait_poll_latency.start_timer())
294                                .is_none(),
295                            "should not set timer before"
296                        );
297                        Ok(output)
298                    };
299                    if let Some(tree_root) = tree_root {
300                        tree_root.instrument(future).left_future()
301                    } else {
302                        future.right_future()
303                    }
304                })
305            }),
306            CacheRefiller::default_spawn_refill_task(),
307        )
308    }
309
310    fn new_inner(
311        version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
312        sstable_store: SstableStoreRef,
313        state_store_metrics: Arc<HummockStateStoreMetrics>,
314        refill_config: CacheRefillConfig,
315        recent_versions: RecentVersions,
316        buffer_tracker: BufferTracker,
317        spawn_upload_task: SpawnUploadTask,
318        spawn_refill_task: SpawnRefillTask,
319    ) -> Self {
320        let (hummock_event_tx, hummock_event_rx) =
321            event_channel(state_store_metrics.event_handler_pending_event.clone());
322        let (version_update_notifier_tx, _) =
323            tokio::sync::watch::channel(recent_versions.latest_version().clone());
324        let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
325        let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
326
327        let metrics = HummockEventHandlerMetrics {
328            event_handler_on_upload_finish_latency: state_store_metrics
329                .event_handler_latency
330                .with_label_values(&["on_upload_finish"]),
331            event_handler_on_apply_version_update: state_store_metrics
332                .event_handler_latency
333                .with_label_values(&["apply_version"]),
334            event_handler_on_recv_version_update: state_store_metrics
335                .event_handler_latency
336                .with_label_values(&["recv_version_update"]),
337        };
338
339        let uploader = HummockUploader::new(
340            state_store_metrics.clone(),
341            recent_versions.latest_version().clone(),
342            spawn_upload_task,
343            buffer_tracker,
344        );
345        let refiller = CacheRefiller::new(refill_config, sstable_store, spawn_refill_task);
346
347        Self {
348            hummock_event_tx,
349            hummock_event_rx,
350            version_update_rx,
351            version_update_notifier_tx,
352            recent_versions: Arc::new(ArcSwap::from_pointee(recent_versions)),
353            read_version_mapping,
354            local_read_version_mapping: Default::default(),
355            uploader,
356            refiller,
357            last_instance_id: 0,
358            metrics,
359        }
360    }
361
362    pub fn version_update_notifier_tx(&self) -> Arc<tokio::sync::watch::Sender<PinnedVersion>> {
363        self.version_update_notifier_tx.clone()
364    }
365
366    pub fn recent_versions(&self) -> Arc<ArcSwap<RecentVersions>> {
367        self.recent_versions.clone()
368    }
369
370    pub fn read_version_mapping(&self) -> ReadOnlyReadVersionMapping {
371        ReadOnlyRwLockRef::new(self.read_version_mapping.clone())
372    }
373
374    pub fn event_sender(&self) -> HummockEventSender {
375        self.hummock_event_tx.clone()
376    }
377
378    pub fn buffer_tracker(&self) -> &BufferTracker {
379        self.uploader.buffer_tracker()
380    }
381}
382
383// Handler for different events
384impl HummockEventHandler {
385    /// This function will be performed under the protection of the `read_version_mapping` read
386    /// lock, and add write lock on each `read_version` operation
387    fn for_each_read_version(
388        &self,
389        instances: impl IntoIterator<Item = LocalInstanceId>,
390        mut f: impl FnMut(LocalInstanceId, &mut HummockReadVersion),
391    ) {
392        let instances = {
393            #[cfg(debug_assertions)]
394            {
395                // check duplication on debug_mode
396                let mut id_set = std::collections::HashSet::new();
397                for instance in instances {
398                    assert!(id_set.insert(instance));
399                }
400                id_set
401            }
402            #[cfg(not(debug_assertions))]
403            {
404                instances
405            }
406        };
407        let mut pending = VecDeque::new();
408        let mut total_count = 0;
409        for instance_id in instances {
410            let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
411                continue;
412            };
413            total_count += 1;
414            match read_version.try_write() {
415                Some(mut write_guard) => {
416                    f(instance_id, &mut write_guard);
417                }
418                _ => {
419                    pending.push_back(instance_id);
420                }
421            }
422        }
423        if !pending.is_empty() {
424            if pending.len() * 10 > total_count {
425                // Only print warn log when failed to acquire more than 10%
426                warn!(
427                    pending_count = pending.len(),
428                    total_count, "cannot acquire lock for all read version"
429                );
430            } else {
431                debug!(
432                    pending_count = pending.len(),
433                    total_count, "cannot acquire lock for all read version"
434                );
435            }
436        }
437
438        const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);
439
440        while let Some(instance_id) = pending.pop_front() {
441            let (_, read_version) = self
442                .local_read_version_mapping
443                .get(&instance_id)
444                .expect("have checked exist before");
445            match read_version.try_write_for(TRY_LOCK_TIMEOUT) {
446                Some(mut write_guard) => {
447                    f(instance_id, &mut write_guard);
448                }
449                _ => {
450                    warn!(instance_id, "failed to get lock again for instance");
451                    pending.push_back(instance_id);
452                }
453            }
454        }
455    }
456
457    fn handle_uploaded_sst_inner(&mut self, staging_sstable_info: Arc<StagingSstableInfo>) {
458        trace!("data_flushed. SST size {}", staging_sstable_info.imm_size());
459        self.for_each_read_version(
460            staging_sstable_info.imm_ids().keys().cloned(),
461            |_, read_version| {
462                read_version.update(VersionUpdate::Staging(StagingData::Sst(
463                    staging_sstable_info.clone(),
464                )))
465            },
466        )
467    }
468
469    fn handle_sync_epoch(
470        &mut self,
471        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
472        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
473    ) {
474        debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
475        self.uploader
476            .start_sync_epoch(sync_result_sender, sync_table_epochs);
477    }
478
479    fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
480        info!(
481            current_version_id = ?self.uploader.hummock_version().id(),
482            "handle clear event"
483        );
484
485        self.uploader.clear(table_ids.clone());
486
487        if table_ids.is_none() {
488            assert!(
489                self.local_read_version_mapping.is_empty(),
490                "read version mapping not empty when clear. remaining tables: {:?}",
491                self.local_read_version_mapping
492                    .values()
493                    .map(|(_, read_version)| read_version.read().table_id())
494                    .collect_vec()
495            );
496        }
497
498        // Notify completion of the Clear event.
499        let _ = notifier.send(()).inspect_err(|e| {
500            error!("failed to notify completion of clear event: {:?}", e);
501        });
502
503        info!("clear finished");
504    }
505
506    fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) {
507        let _timer = self
508            .metrics
509            .event_handler_on_recv_version_update
510            .start_timer();
511        let pinned_version = self
512            .refiller
513            .last_new_pinned_version()
514            .cloned()
515            .unwrap_or_else(|| self.uploader.hummock_version().clone());
516
517        let mut sst_delta_infos = vec![];
518        if let Some(new_pinned_version) = Self::resolve_version_update_info(
519            &pinned_version,
520            version_payload,
521            Some(&mut sst_delta_infos),
522        ) {
523            self.refiller
524                .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version);
525        }
526    }
527
528    fn resolve_version_update_info(
529        pinned_version: &PinnedVersion,
530        version_payload: HummockVersionUpdate,
531        mut sst_delta_infos: Option<&mut Vec<SstDeltaInfo>>,
532    ) -> Option<PinnedVersion> {
533        match version_payload {
534            HummockVersionUpdate::VersionDeltas(version_deltas) => {
535                let mut version_to_apply = (**pinned_version).clone();
536                {
537                    let mut table_change_log_to_apply_guard =
538                        pinned_version.table_change_log_write_lock();
539                    for version_delta in version_deltas {
540                        assert_eq!(version_to_apply.id, version_delta.prev_id);
541
542                        // apply change-log-delta
543                        {
544                            let mut state_table_info = version_to_apply.state_table_info.clone();
545                            let (changed_table_info, _is_commit_epoch) = state_table_info
546                                .apply_delta(
547                                    &version_delta.state_table_info_delta,
548                                    &version_delta.removed_table_ids,
549                                );
550
551                            HummockVersionCommon::<SstableInfo>::apply_change_log_delta(
552                                &mut *table_change_log_to_apply_guard,
553                                &version_delta.change_log_delta,
554                                &version_delta.removed_table_ids,
555                                &version_delta.state_table_info_delta,
556                                &changed_table_info,
557                            );
558                        }
559
560                        let local_hummock_version_delta =
561                            LocalHummockVersionDelta::from(version_delta);
562                        if let Some(sst_delta_infos) = &mut sst_delta_infos {
563                            sst_delta_infos.extend(
564                                version_to_apply
565                                    .build_sst_delta_infos(&local_hummock_version_delta)
566                                    .into_iter(),
567                            );
568                        }
569
570                        version_to_apply.apply_version_delta(&local_hummock_version_delta);
571                    }
572                }
573
574                pinned_version.new_with_local_version(version_to_apply)
575            }
576            HummockVersionUpdate::PinnedVersion(version) => {
577                pinned_version.new_pin_version(*version)
578            }
579        }
580    }
581
582    fn apply_version_update(
583        &mut self,
584        pinned_version: PinnedVersion,
585        new_pinned_version: PinnedVersion,
586    ) {
587        let _timer = self
588            .metrics
589            .event_handler_on_apply_version_update
590            .start_timer();
591        self.recent_versions.rcu(|prev_recent_versions| {
592            prev_recent_versions.with_new_version(new_pinned_version.clone())
593        });
594
595        {
596            self.for_each_read_version(
597                self.local_read_version_mapping.keys().cloned(),
598                |_, read_version| {
599                    read_version
600                        .update(VersionUpdate::CommittedSnapshot(new_pinned_version.clone()))
601                },
602            );
603        }
604
605        self.version_update_notifier_tx.send_if_modified(|state| {
606            assert_eq!(pinned_version.id(), state.id());
607            if state.id() == new_pinned_version.id() {
608                return false;
609            }
610            assert!(new_pinned_version.id() > state.id());
611            *state = new_pinned_version.clone();
612            true
613        });
614
615        debug!("update to hummock version: {}", new_pinned_version.id(),);
616
617        self.uploader.update_pinned_version(new_pinned_version);
618    }
619}
620
621impl HummockEventHandler {
622    pub async fn start_hummock_event_handler_worker(mut self) {
623        loop {
624            tokio::select! {
625                sst = self.uploader.next_uploaded_sst() => {
626                    self.handle_uploaded_sst(sst);
627                }
628                event = self.refiller.next_event() => {
629                    let CacheRefillerEvent {pinned_version, new_pinned_version } = event;
630                    self.apply_version_update(pinned_version, new_pinned_version);
631                }
632                event = pin!(self.hummock_event_rx.recv()) => {
633                    let Some(event) = event else { break };
634                    match event {
635                        HummockEvent::Shutdown => {
636                            info!("event handler shutdown");
637                            return;
638                        },
639                        event => {
640                            self.handle_hummock_event(event);
641                        }
642                    }
643                }
644                version_update = pin!(self.version_update_rx.recv()) => {
645                    let Some(version_update) = version_update else {
646                        warn!("version update stream ends. event handle shutdown");
647                        return;
648                    };
649                    self.handle_version_update(version_update);
650                }
651            }
652        }
653    }
654
655    fn handle_uploaded_sst(&mut self, sst: Arc<StagingSstableInfo>) {
656        let _timer = self
657            .metrics
658            .event_handler_on_upload_finish_latency
659            .start_timer();
660        self.handle_uploaded_sst_inner(sst);
661    }
662
663    /// Gracefully shutdown if returns `true`.
664    fn handle_hummock_event(&mut self, event: HummockEvent) {
665        match event {
666            HummockEvent::BufferMayFlush => {
667                self.uploader.may_flush();
668            }
669            HummockEvent::SyncEpoch {
670                sync_result_sender,
671                sync_table_epochs,
672            } => {
673                self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
674            }
675            HummockEvent::Clear(notifier, table_ids) => {
676                self.handle_clear(notifier, table_ids);
677            }
678            HummockEvent::Shutdown => {
679                unreachable!("shutdown is handled specially")
680            }
681            HummockEvent::StartEpoch { epoch, table_ids } => {
682                self.uploader.start_epoch(epoch, table_ids);
683            }
684            HummockEvent::InitEpoch {
685                instance_id,
686                init_epoch,
687            } => {
688                let table_id = self
689                    .local_read_version_mapping
690                    .get(&instance_id)
691                    .expect("should exist")
692                    .0;
693                self.uploader
694                    .init_instance(instance_id, table_id, init_epoch);
695            }
696            HummockEvent::ImmToUploader { instance_id, imm } => {
697                assert!(
698                    self.local_read_version_mapping.contains_key(&instance_id),
699                    "add imm from non-existing read version instance: instance_id: {}, table_id {}",
700                    instance_id,
701                    imm.table_id,
702                );
703                self.uploader.add_imm(instance_id, imm);
704                self.uploader.may_flush();
705            }
706
707            HummockEvent::LocalSealEpoch {
708                next_epoch,
709                opts,
710                instance_id,
711            } => {
712                self.uploader
713                    .local_seal_epoch(instance_id, next_epoch, opts);
714            }
715
716            #[cfg(any(test, feature = "test"))]
717            HummockEvent::FlushEvent(sender) => {
718                let _ = sender.send(()).inspect_err(|e| {
719                    error!("unable to send flush result: {:?}", e);
720                });
721            }
722
723            HummockEvent::RegisterReadVersion {
724                table_id,
725                new_read_version_sender,
726                is_replicated,
727                vnodes,
728            } => {
729                let pinned_version = self.recent_versions.load().latest_version().clone();
730                let instance_id = self.generate_instance_id();
731                let basic_read_version = Arc::new(RwLock::new(
732                    HummockReadVersion::new_with_replication_option(
733                        table_id,
734                        instance_id,
735                        pinned_version,
736                        is_replicated,
737                        vnodes,
738                    ),
739                ));
740
741                debug!(
742                    "new read version registered: table_id: {}, instance_id: {}",
743                    table_id, instance_id
744                );
745
746                {
747                    self.local_read_version_mapping
748                        .insert(instance_id, (table_id, basic_read_version.clone()));
749                    let mut read_version_mapping_guard = self.read_version_mapping.write();
750
751                    read_version_mapping_guard
752                        .entry(table_id)
753                        .or_default()
754                        .insert(instance_id, basic_read_version.clone());
755                }
756
757                match new_read_version_sender.send((
758                    basic_read_version,
759                    LocalInstanceGuard {
760                        table_id,
761                        instance_id,
762                        event_sender: Some(self.hummock_event_tx.clone()),
763                    },
764                )) {
765                    Ok(_) => {}
766                    Err((_, mut guard)) => {
767                        warn!(
768                            "RegisterReadVersion send fail table_id {:?} instance_is {:?}",
769                            table_id, instance_id
770                        );
771                        guard.event_sender.take().expect("sender is just set");
772                        self.destroy_read_version(instance_id);
773                    }
774                }
775            }
776
777            HummockEvent::DestroyReadVersion { instance_id } => {
778                self.uploader.may_destroy_instance(instance_id);
779                self.destroy_read_version(instance_id);
780            }
781            HummockEvent::GetMinUncommittedSstId { result_tx } => {
782                let _ = result_tx
783                    .send(self.uploader.min_uncommitted_sst_id())
784                    .inspect_err(|e| {
785                        error!("unable to send get_min_uncommitted_sst_id result: {:?}", e);
786                    });
787            }
788        }
789    }
790
791    fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
792        {
793            {
794                debug!("read version deregister: instance_id: {}", instance_id);
795                let (table_id, _) = self
796                    .local_read_version_mapping
797                    .remove(&instance_id)
798                    .unwrap_or_else(|| {
799                        panic!(
800                            "DestroyHummockInstance inexist instance instance_id {}",
801                            instance_id
802                        )
803                    });
804                let mut read_version_mapping_guard = self.read_version_mapping.write();
805                let entry = read_version_mapping_guard
806                    .get_mut(&table_id)
807                    .unwrap_or_else(|| {
808                        panic!(
809                            "DestroyHummockInstance table_id {} instance_id {} fail",
810                            table_id, instance_id
811                        )
812                    });
813                entry.remove(&instance_id).unwrap_or_else(|| {
814                    panic!(
815                        "DestroyHummockInstance inexist instance table_id {} instance_id {}",
816                        table_id, instance_id
817                    )
818                });
819                if entry.is_empty() {
820                    read_version_mapping_guard.remove(&table_id);
821                }
822            }
823        }
824    }
825
826    fn generate_instance_id(&mut self) -> LocalInstanceId {
827        self.last_instance_id += 1;
828        self.last_instance_id
829    }
830}
831
832pub(super) fn send_sync_result(
833    sender: oneshot::Sender<HummockResult<SyncedData>>,
834    result: HummockResult<SyncedData>,
835) {
836    let _ = sender.send(result).inspect_err(|e| {
837        error!("unable to send sync result. Err: {:?}", e);
838    });
839}
840
841impl SyncedData {
842    pub fn into_sync_result(self) -> SyncResult {
843        {
844            let SyncedData {
845                uploaded_ssts,
846                table_watermarks,
847            } = self;
848            let mut sync_size = 0;
849            let mut uncommitted_ssts = Vec::new();
850            let mut old_value_ssts = Vec::new();
851            // The newly uploaded `sstable_infos` contains newer data. Therefore,
852            // `newly_upload_ssts` at the front
853            for sst in uploaded_ssts {
854                sync_size += sst.imm_size();
855                uncommitted_ssts.extend(sst.sstable_infos().iter().cloned());
856                old_value_ssts.extend(sst.old_value_sstable_infos().iter().cloned());
857            }
858            SyncResult {
859                sync_size,
860                uncommitted_ssts,
861                table_watermarks: table_watermarks.clone(),
862                old_value_ssts,
863            }
864        }
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use std::collections::{HashMap, HashSet};
871    use std::future::poll_fn;
872    use std::sync::Arc;
873    use std::task::Poll;
874
875    use futures::FutureExt;
876    use parking_lot::Mutex;
877    use risingwave_common::bitmap::Bitmap;
878    use risingwave_common::catalog::TableId;
879    use risingwave_common::hash::VirtualNode;
880    use risingwave_common::util::epoch::{EpochExt, test_epoch};
881    use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
882    use risingwave_hummock_sdk::version::HummockVersion;
883    use risingwave_pb::hummock::{PbHummockVersion, StateTableInfo};
884    use tokio::spawn;
885    use tokio::sync::mpsc::unbounded_channel;
886    use tokio::sync::oneshot;
887
888    use crate::hummock::HummockError;
889    use crate::hummock::event_handler::hummock_event_handler::BufferTracker;
890    use crate::hummock::event_handler::refiller::{CacheRefillConfig, CacheRefiller};
891    use crate::hummock::event_handler::uploader::UploadTaskOutput;
892    use crate::hummock::event_handler::uploader::test_utils::{
893        TEST_TABLE_ID, gen_imm, gen_imm_inner, prepare_uploader_order_test_spawn_task_fn,
894    };
895    use crate::hummock::event_handler::{
896        HummockEvent, HummockEventHandler, HummockReadVersionRef, LocalInstanceGuard,
897    };
898    use crate::hummock::iterator::test_utils::mock_sstable_store;
899    use crate::hummock::local_version::pinned_version::PinnedVersion;
900    use crate::hummock::local_version::recent_versions::RecentVersions;
901    use crate::hummock::store::version::{StagingData, VersionUpdate};
902    use crate::hummock::test_utils::default_opts_for_test;
903    use crate::mem_table::ImmutableMemtable;
904    use crate::monitor::HummockStateStoreMetrics;
905    use crate::store::SealCurrentEpochOptions;
906
907    #[tokio::test]
908    async fn test_old_epoch_sync_fail() {
909        let epoch0 = test_epoch(233);
910
911        let initial_version = PinnedVersion::new(
912            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
913                id: 1,
914                state_table_info: HashMap::from_iter([(
915                    TEST_TABLE_ID.table_id,
916                    StateTableInfo {
917                        committed_epoch: epoch0,
918                        compaction_group_id: StaticCompactionGroupId::StateDefault as _,
919                    },
920                )]),
921                ..Default::default()
922            }),
923            unbounded_channel().0,
924        );
925
926        let (_version_update_tx, version_update_rx) = unbounded_channel();
927
928        let epoch1 = epoch0.next_epoch();
929        let epoch2 = epoch1.next_epoch();
930        let (tx, rx) = oneshot::channel();
931        let rx = Arc::new(Mutex::new(Some(rx)));
932
933        let storage_opt = default_opts_for_test();
934        let metrics = Arc::new(HummockStateStoreMetrics::unused());
935
936        let event_handler = HummockEventHandler::new_inner(
937            version_update_rx,
938            mock_sstable_store().await,
939            metrics.clone(),
940            CacheRefillConfig::from_storage_opts(&storage_opt),
941            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
942            BufferTracker::from_storage_opts(
943                &storage_opt,
944                metrics.uploader_uploading_task_size.clone(),
945            ),
946            Arc::new(move |_, info| {
947                assert_eq!(info.epochs.len(), 1);
948                let epoch = info.epochs[0];
949                match epoch {
950                    epoch if epoch == epoch1 => {
951                        let rx = rx.lock().take().unwrap();
952                        spawn(async move {
953                            rx.await.unwrap();
954                            Err(HummockError::other("fail"))
955                        })
956                    }
957                    epoch if epoch == epoch2 => spawn(async move {
958                        Ok(UploadTaskOutput {
959                            new_value_ssts: vec![],
960                            old_value_ssts: vec![],
961                            wait_poll_timer: None,
962                        })
963                    }),
964                    _ => unreachable!(),
965                }
966            }),
967            CacheRefiller::default_spawn_refill_task(),
968        );
969
970        let event_tx = event_handler.event_sender();
971
972        let send_event = |event| event_tx.send(event).unwrap();
973
974        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
975
976        let (read_version, guard) = {
977            let (tx, rx) = oneshot::channel();
978            send_event(HummockEvent::RegisterReadVersion {
979                table_id: TEST_TABLE_ID,
980                new_read_version_sender: tx,
981                is_replicated: false,
982                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
983            });
984            rx.await.unwrap()
985        };
986
987        send_event(HummockEvent::StartEpoch {
988            epoch: epoch1,
989            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
990        });
991
992        send_event(HummockEvent::InitEpoch {
993            instance_id: guard.instance_id,
994            init_epoch: epoch1,
995        });
996
997        let imm1 = gen_imm(epoch1).await;
998        read_version
999            .write()
1000            .update(VersionUpdate::Staging(StagingData::ImmMem(imm1.clone())));
1001
1002        send_event(HummockEvent::ImmToUploader {
1003            instance_id: guard.instance_id,
1004            imm: imm1,
1005        });
1006
1007        send_event(HummockEvent::StartEpoch {
1008            epoch: epoch2,
1009            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1010        });
1011
1012        send_event(HummockEvent::LocalSealEpoch {
1013            instance_id: guard.instance_id,
1014            next_epoch: epoch2,
1015            opts: SealCurrentEpochOptions::for_test(),
1016        });
1017
1018        let imm2 = gen_imm(epoch2).await;
1019        read_version
1020            .write()
1021            .update(VersionUpdate::Staging(StagingData::ImmMem(imm2.clone())));
1022
1023        send_event(HummockEvent::ImmToUploader {
1024            instance_id: guard.instance_id,
1025            imm: imm2,
1026        });
1027
1028        let epoch3 = epoch2.next_epoch();
1029        send_event(HummockEvent::StartEpoch {
1030            epoch: epoch3,
1031            table_ids: HashSet::from_iter([TEST_TABLE_ID]),
1032        });
1033        send_event(HummockEvent::LocalSealEpoch {
1034            instance_id: guard.instance_id,
1035            next_epoch: epoch3,
1036            opts: SealCurrentEpochOptions::for_test(),
1037        });
1038
1039        let (tx1, mut rx1) = oneshot::channel();
1040        send_event(HummockEvent::SyncEpoch {
1041            sync_result_sender: tx1,
1042            sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
1043        });
1044        assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
1045        let (tx2, mut rx2) = oneshot::channel();
1046        send_event(HummockEvent::SyncEpoch {
1047            sync_result_sender: tx2,
1048            sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
1049        });
1050        assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);
1051
1052        tx.send(()).unwrap();
1053        rx1.await.unwrap().unwrap_err();
1054        rx2.await.unwrap().unwrap_err();
1055
1056        send_event(HummockEvent::Shutdown);
1057        join_handle.await.unwrap();
1058    }
1059
1060    #[tokio::test]
1061    async fn test_clear_tables() {
1062        let table_id1 = TableId::new(1);
1063        let table_id2 = TableId::new(2);
1064        let epoch0 = test_epoch(233);
1065
1066        let initial_version = PinnedVersion::new(
1067            HummockVersion::from_rpc_protobuf(&PbHummockVersion {
1068                id: 1,
1069                state_table_info: HashMap::from_iter([
1070                    (
1071                        table_id1.table_id,
1072                        StateTableInfo {
1073                            committed_epoch: epoch0,
1074                            compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1075                        },
1076                    ),
1077                    (
1078                        table_id2.table_id,
1079                        StateTableInfo {
1080                            committed_epoch: epoch0,
1081                            compaction_group_id: StaticCompactionGroupId::StateDefault as _,
1082                        },
1083                    ),
1084                ]),
1085                ..Default::default()
1086            }),
1087            unbounded_channel().0,
1088        );
1089
1090        let (_version_update_tx, version_update_rx) = unbounded_channel();
1091
1092        let epoch1 = epoch0.next_epoch();
1093        let epoch2 = epoch1.next_epoch();
1094        let epoch3 = epoch2.next_epoch();
1095
1096        let imm_size = gen_imm_inner(TEST_TABLE_ID, epoch1, 0, None).await.size();
1097
1098        // The buffer can hold at most 1 imm. When a new imm is added, the previous one will be spilled, and the newly added one will be retained.
1099        let buffer_tracker = BufferTracker::for_test_with_config(imm_size * 2 - 1, 1);
1100        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
1101
1102        let gen_imm = |table_id, epoch, spill_offset| {
1103            let imm = gen_imm_inner(table_id, epoch, spill_offset, Some(&*memory_limiter))
1104                .now_or_never()
1105                .unwrap();
1106            assert_eq!(imm.size(), imm_size);
1107            imm
1108        };
1109        let imm1_1 = gen_imm(table_id1, epoch1, 0);
1110        let imm1_2_1 = gen_imm(table_id1, epoch2, 0);
1111
1112        let storage_opt = default_opts_for_test();
1113        let metrics = Arc::new(HummockStateStoreMetrics::unused());
1114
1115        let (spawn_task, new_task_notifier) = prepare_uploader_order_test_spawn_task_fn(false);
1116
1117        let event_handler = HummockEventHandler::new_inner(
1118            version_update_rx,
1119            mock_sstable_store().await,
1120            metrics.clone(),
1121            CacheRefillConfig::from_storage_opts(&storage_opt),
1122            RecentVersions::new(initial_version.clone(), 10, metrics.clone()),
1123            buffer_tracker,
1124            spawn_task,
1125            CacheRefiller::default_spawn_refill_task(),
1126        );
1127
1128        let event_tx = event_handler.event_sender();
1129
1130        let send_event = |event| event_tx.send(event).unwrap();
1131        let flush_event = || async {
1132            let (tx, rx) = oneshot::channel();
1133            send_event(HummockEvent::FlushEvent(tx));
1134            rx.await.unwrap();
1135        };
1136        let start_epoch = |table_id, epoch| {
1137            send_event(HummockEvent::StartEpoch {
1138                epoch,
1139                table_ids: HashSet::from_iter([table_id]),
1140            })
1141        };
1142        let init_epoch = |instance: &LocalInstanceGuard, init_epoch| {
1143            send_event(HummockEvent::InitEpoch {
1144                instance_id: instance.instance_id,
1145                init_epoch,
1146            })
1147        };
1148        let write_imm = |read_version: &HummockReadVersionRef,
1149                         instance: &LocalInstanceGuard,
1150                         imm: &ImmutableMemtable| {
1151            read_version
1152                .write()
1153                .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));
1154
1155            send_event(HummockEvent::ImmToUploader {
1156                instance_id: instance.instance_id,
1157                imm: imm.clone(),
1158            });
1159        };
1160        let seal_epoch = |instance: &LocalInstanceGuard, next_epoch| {
1161            send_event(HummockEvent::LocalSealEpoch {
1162                instance_id: instance.instance_id,
1163                next_epoch,
1164                opts: SealCurrentEpochOptions::for_test(),
1165            })
1166        };
1167        let sync_epoch = |table_id, new_sync_epoch| {
1168            let (tx, rx) = oneshot::channel();
1169            send_event(HummockEvent::SyncEpoch {
1170                sync_result_sender: tx,
1171                sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
1172            });
1173            rx
1174        };
1175
1176        let join_handle = spawn(event_handler.start_hummock_event_handler_worker());
1177
1178        let (read_version1, guard1) = {
1179            let (tx, rx) = oneshot::channel();
1180            send_event(HummockEvent::RegisterReadVersion {
1181                table_id: table_id1,
1182                new_read_version_sender: tx,
1183                is_replicated: false,
1184                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1185            });
1186            rx.await.unwrap()
1187        };
1188
1189        let (read_version2, guard2) = {
1190            let (tx, rx) = oneshot::channel();
1191            send_event(HummockEvent::RegisterReadVersion {
1192                table_id: table_id2,
1193                new_read_version_sender: tx,
1194                is_replicated: false,
1195                vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
1196            });
1197            rx.await.unwrap()
1198        };
1199
1200        // prepare data of table1
1201        let (task1_1_finish_tx, task1_1_rx) = {
1202            start_epoch(table_id1, epoch1);
1203
1204            init_epoch(&guard1, epoch1);
1205
1206            write_imm(&read_version1, &guard1, &imm1_1);
1207
1208            start_epoch(table_id1, epoch2);
1209
1210            seal_epoch(&guard1, epoch2);
1211
1212            let (wait_task_start, task_finish_tx) = new_task_notifier(HashMap::from_iter([(
1213                guard1.instance_id,
1214                vec![imm1_1.batch_id()],
1215            )]));
1216
1217            let mut rx = sync_epoch(table_id1, epoch1);
1218            wait_task_start.await;
1219            assert!(poll_fn(|cx| Poll::Ready(rx.poll_unpin(cx).is_pending())).await);
1220
1221            write_imm(&read_version1, &guard1, &imm1_2_1);
1222            flush_event().await;
1223
1224            (task_finish_tx, rx)
1225        };
1226        // by now, the state in uploader of table_id1
1227        // unsync:  epoch2 -> [imm1_2]
1228        // syncing: epoch1 -> [imm1_1]
1229
1230        let (task1_2_finish_tx, _finish_txs) = {
1231            let mut finish_txs = vec![];
1232            let imm2_1_1 = gen_imm(table_id2, epoch1, 0);
1233            start_epoch(table_id2, epoch1);
1234            init_epoch(&guard2, epoch1);
1235            let (wait_task_start, task1_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1236                guard1.instance_id,
1237                vec![imm1_2_1.batch_id()],
1238            )]));
1239            write_imm(&read_version2, &guard2, &imm2_1_1);
1240            wait_task_start.await;
1241
1242            let imm2_1_2 = gen_imm(table_id2, epoch1, 1);
1243            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1244                guard2.instance_id,
1245                vec![imm2_1_2.batch_id(), imm2_1_1.batch_id()],
1246            )]));
1247            finish_txs.push(finish_tx);
1248            write_imm(&read_version2, &guard2, &imm2_1_2);
1249            wait_task_start.await;
1250
1251            let imm2_1_3 = gen_imm(table_id2, epoch1, 2);
1252            write_imm(&read_version2, &guard2, &imm2_1_3);
1253            start_epoch(table_id2, epoch2);
1254            seal_epoch(&guard2, epoch2);
1255            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1256                guard2.instance_id,
1257                vec![imm2_1_3.batch_id()],
1258            )]));
1259            finish_txs.push(finish_tx);
1260            let _sync_rx = sync_epoch(table_id2, epoch1);
1261            wait_task_start.await;
1262
1263            let imm2_2_1 = gen_imm(table_id2, epoch2, 0);
1264            write_imm(&read_version2, &guard2, &imm2_2_1);
1265            flush_event().await;
1266            let imm2_2_2 = gen_imm(table_id2, epoch2, 1);
1267            write_imm(&read_version2, &guard2, &imm2_2_2);
1268            let (wait_task_start, finish_tx) = new_task_notifier(HashMap::from_iter([(
1269                guard2.instance_id,
1270                vec![imm2_2_2.batch_id(), imm2_2_1.batch_id()],
1271            )]));
1272            finish_txs.push(finish_tx);
1273            wait_task_start.await;
1274
1275            let imm2_2_3 = gen_imm(table_id2, epoch2, 2);
1276            write_imm(&read_version2, &guard2, &imm2_2_3);
1277
1278            // by now, the state in uploader of table_id2
1279            // syncing: epoch1 -> spill: [imm2_1_2, imm2_1_1], sync: [imm2_1_3]
1280            // unsync: epoch2 -> spilling: [imm2_2_2, imm2_2_1], imm: [imm2_2_3]
1281            // the state in uploader of table_id1
1282            // unsync:  epoch2 -> spilling [imm1_2]
1283            // syncing: epoch1 -> [imm1_1]
1284
1285            drop(guard2);
1286            let (clear_tx, clear_rx) = oneshot::channel();
1287            send_event(HummockEvent::Clear(
1288                clear_tx,
1289                Some(HashSet::from_iter([table_id2])),
1290            ));
1291            clear_rx.await.unwrap();
1292            (task1_2_finish_tx, finish_txs)
1293        };
1294
1295        let imm1_2_2 = gen_imm(table_id1, epoch2, 1);
1296        write_imm(&read_version1, &guard1, &imm1_2_2);
1297        start_epoch(table_id1, epoch3);
1298        seal_epoch(&guard1, epoch3);
1299
1300        let (tx2, mut sync_rx2) = oneshot::channel();
1301        let (wait_task_start, task1_2_2_finish_tx) = new_task_notifier(HashMap::from_iter([(
1302            guard1.instance_id,
1303            vec![imm1_2_2.batch_id()],
1304        )]));
1305        send_event(HummockEvent::SyncEpoch {
1306            sync_result_sender: tx2,
1307            sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
1308        });
1309        wait_task_start.await;
1310        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1311
1312        task1_1_finish_tx.send(()).unwrap();
1313        let sync_data1 = task1_1_rx.await.unwrap().unwrap();
1314        sync_data1
1315            .uploaded_ssts
1316            .iter()
1317            .all(|sst| sst.epochs() == &vec![epoch1]);
1318        task1_2_finish_tx.send(()).unwrap();
1319        assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
1320        task1_2_2_finish_tx.send(()).unwrap();
1321        let sync_data2 = sync_rx2.await.unwrap().unwrap();
1322        sync_data2
1323            .uploaded_ssts
1324            .iter()
1325            .all(|sst| sst.epochs() == &vec![epoch2]);
1326
1327        send_event(HummockEvent::Shutdown);
1328        join_handle.await.unwrap();
1329    }
1330}