risingwave_storage/hummock/event_handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use parking_lot::{RwLock, RwLockReadGuard};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
22use thiserror_ext::AsReport;
23use tokio::sync::oneshot;
24
25use crate::hummock::HummockResult;
26use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
27use crate::mem_table::ImmutableMemtable;
28use crate::store::SealCurrentEpochOptions;
29
30pub mod hummock_event_handler;
31pub mod refiller;
32pub mod uploader;
33
34pub use hummock_event_handler::HummockEventHandler;
35use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
36
37use super::store::version::HummockReadVersion;
38use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
39use crate::hummock::event_handler::uploader::SyncedData;
40
41#[derive(Debug)]
42pub struct BufferWriteRequest {
43    pub batch: SharedBufferBatch,
44    pub epoch: HummockEpoch,
45    pub grant_sender: oneshot::Sender<()>,
46}
47
48#[derive(Debug)]
49pub enum HummockVersionUpdate {
50    VersionDeltas(Vec<HummockVersionDelta>),
51    PinnedVersion(Box<HummockVersion>),
52}
53
54pub enum HummockEvent {
55    /// Notify that we may flush the shared buffer.
56    BufferMayFlush,
57
58    /// An epoch is going to be synced. Once the event is processed, there will be no more flush
59    /// task on this epoch. Previous concurrent flush task join handle will be returned by the join
60    /// handle sender.
61    SyncEpoch {
62        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
63        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
64    },
65
66    /// Clear shared buffer and reset all states
67    Clear(oneshot::Sender<()>, Option<HashSet<TableId>>),
68
69    Shutdown,
70
71    ImmToUploader {
72        instance_id: SharedBufferBatchId,
73        imm: ImmutableMemtable,
74    },
75
76    StartEpoch {
77        epoch: HummockEpoch,
78        table_ids: HashSet<TableId>,
79    },
80
81    InitEpoch {
82        instance_id: LocalInstanceId,
83        init_epoch: HummockEpoch,
84    },
85
86    LocalSealEpoch {
87        instance_id: LocalInstanceId,
88        next_epoch: HummockEpoch,
89        opts: SealCurrentEpochOptions,
90    },
91
92    #[cfg(any(test, feature = "test"))]
93    /// Flush all previous event. When all previous events has been consumed, the event handler
94    /// will notify
95    FlushEvent(oneshot::Sender<()>),
96
97    RegisterReadVersion {
98        table_id: TableId,
99        new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
100        is_replicated: bool,
101        vnodes: Arc<Bitmap>,
102    },
103
104    DestroyReadVersion {
105        instance_id: LocalInstanceId,
106    },
107
108    GetMinUncommittedSstId {
109        result_tx: oneshot::Sender<Option<HummockSstableObjectId>>,
110    },
111}
112
113impl HummockEvent {
114    fn to_debug_string(&self) -> String {
115        match self {
116            HummockEvent::BufferMayFlush => "BufferMayFlush".to_owned(),
117
118            HummockEvent::SyncEpoch {
119                sync_result_sender: _,
120                sync_table_epochs,
121            } => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs),
122
123            HummockEvent::Clear(_, table_ids) => {
124                format!("Clear {:?}", table_ids)
125            }
126
127            HummockEvent::Shutdown => "Shutdown".to_owned(),
128
129            HummockEvent::StartEpoch { epoch, table_ids } => {
130                format!("StartEpoch {} {:?}", epoch, table_ids)
131            }
132
133            HummockEvent::InitEpoch {
134                instance_id,
135                init_epoch,
136            } => {
137                format!("InitEpoch {} {}", instance_id, init_epoch)
138            }
139
140            HummockEvent::ImmToUploader { instance_id, imm } => {
141                format!("ImmToUploader {} {}", instance_id, imm.batch_id())
142            }
143
144            HummockEvent::LocalSealEpoch {
145                instance_id,
146                next_epoch,
147                opts,
148            } => {
149                format!(
150                    "LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
151                    next_epoch, instance_id, opts
152                )
153            }
154
155            HummockEvent::RegisterReadVersion {
156                table_id,
157                new_read_version_sender: _,
158                is_replicated,
159                vnodes: _,
160            } => format!(
161                "RegisterReadVersion table_id {:?}, is_replicated: {:?}",
162                table_id, is_replicated
163            ),
164
165            HummockEvent::DestroyReadVersion { instance_id } => {
166                format!("DestroyReadVersion instance_id {:?}", instance_id)
167            }
168
169            #[cfg(any(test, feature = "test"))]
170            HummockEvent::FlushEvent(_) => "FlushEvent".to_owned(),
171            HummockEvent::GetMinUncommittedSstId { .. } => "GetMinSpilledSstId".to_owned(),
172        }
173    }
174}
175
176impl std::fmt::Debug for HummockEvent {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        f.debug_struct("HummockEvent")
179            .field("debug_string", &self.to_debug_string())
180            .finish()
181    }
182}
183
184pub type LocalInstanceId = u64;
185pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233;
186pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
187pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
188pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;
189
190pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);
191
192impl<T> Clone for ReadOnlyRwLockRef<T> {
193    fn clone(&self) -> Self {
194        Self(self.0.clone())
195    }
196}
197
198impl<T> ReadOnlyRwLockRef<T> {
199    pub fn new(inner: Arc<RwLock<T>>) -> Self {
200        Self(inner)
201    }
202
203    pub fn read(&self) -> RwLockReadGuard<'_, T> {
204        self.0.read()
205    }
206}
207
208pub struct LocalInstanceGuard {
209    pub table_id: TableId,
210    pub instance_id: LocalInstanceId,
211    // Only send destroy event when event_sender when is_some
212    event_sender: Option<HummockEventSender>,
213}
214
215impl Drop for LocalInstanceGuard {
216    fn drop(&mut self) {
217        if let Some(sender) = self.event_sender.take() {
218            // If sending fails, it means that event_handler and event_channel have been destroyed, no
219            // need to handle failure
220            sender
221                .send(HummockEvent::DestroyReadVersion {
222                    instance_id: self.instance_id,
223                })
224                .unwrap_or_else(|err| {
225                    tracing::debug!(
226                        error = %err.as_report(),
227                        table_id = %self.table_id,
228                        instance_id = self.instance_id,
229                        "LocalInstanceGuard Drop SendError",
230                    )
231                })
232        }
233    }
234}