risingwave_storage/hummock/event_handler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::{RwLock, RwLockReadGuard};
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId};
23use thiserror_ext::AsReport;
24use tokio::sync::oneshot;
25
26use crate::hummock::HummockResult;
27use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
28use crate::mem_table::ImmutableMemtable;
29use crate::store::SealCurrentEpochOptions;
30
31pub mod hummock_event_handler;
32pub mod refiller;
33pub mod uploader;
34
35pub use hummock_event_handler::HummockEventHandler;
36use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38
39use super::store::version::HummockReadVersion;
40use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
41use crate::hummock::event_handler::uploader::SyncedData;
42
43#[derive(Debug)]
44pub struct BufferWriteRequest {
45    pub batch: SharedBufferBatch,
46    pub epoch: HummockEpoch,
47    pub grant_sender: oneshot::Sender<()>,
48}
49
50#[derive(Debug)]
51pub enum HummockVersionUpdate {
52    VersionDeltas(Vec<HummockVersionDelta>),
53    PinnedVersion(Box<HummockVersion>),
54}
55
56pub enum HummockEvent {
57    /// Notify that we may flush the shared buffer.
58    BufferMayFlush,
59
60    /// An epoch is going to be synced. Once the event is processed, there will be no more flush
61    /// task on this epoch. Previous concurrent flush task join handle will be returned by the join
62    /// handle sender.
63    SyncEpoch {
64        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
65        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
66    },
67
68    /// Clear shared buffer and reset all states
69    Clear(oneshot::Sender<()>, Option<HashSet<TableId>>),
70
71    Shutdown,
72
73    ImmToUploader {
74        instance_id: SharedBufferBatchId,
75        imms: Vec<ImmutableMemtable>,
76    },
77
78    StartEpoch {
79        epoch: HummockEpoch,
80        table_ids: HashSet<TableId>,
81    },
82
83    InitEpoch {
84        instance_id: LocalInstanceId,
85        init_epoch: HummockEpoch,
86    },
87
88    LocalSealEpoch {
89        instance_id: LocalInstanceId,
90        next_epoch: HummockEpoch,
91        opts: SealCurrentEpochOptions,
92    },
93
94    #[cfg(any(test, feature = "test"))]
95    /// Flush all previous event. When all previous events has been consumed, the event handler
96    /// will notify
97    FlushEvent(oneshot::Sender<()>),
98
99    RegisterReadVersion {
100        table_id: TableId,
101        new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
102        is_replicated: bool,
103        vnodes: Arc<Bitmap>,
104    },
105
106    DestroyReadVersion {
107        instance_id: LocalInstanceId,
108    },
109
110    RegisterVectorWriter {
111        table_id: TableId,
112        init_epoch: HummockEpoch,
113    },
114
115    VectorWriterSealEpoch {
116        table_id: TableId,
117        next_epoch: HummockEpoch,
118        add: Option<VectorIndexAdd>,
119    },
120
121    DropVectorWriter {
122        table_id: TableId,
123    },
124
125    GetMinUncommittedObjectId {
126        result_tx: oneshot::Sender<Option<HummockRawObjectId>>,
127    },
128}
129
130impl HummockEvent {
131    fn to_debug_string(&self) -> String {
132        match self {
133            HummockEvent::BufferMayFlush => "BufferMayFlush".to_owned(),
134
135            HummockEvent::SyncEpoch {
136                sync_result_sender: _,
137                sync_table_epochs,
138            } => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs),
139
140            HummockEvent::Clear(_, table_ids) => {
141                format!("Clear {:?}", table_ids)
142            }
143
144            HummockEvent::Shutdown => "Shutdown".to_owned(),
145
146            HummockEvent::StartEpoch { epoch, table_ids } => {
147                format!("StartEpoch {} {:?}", epoch, table_ids)
148            }
149
150            HummockEvent::InitEpoch {
151                instance_id,
152                init_epoch,
153            } => {
154                format!("InitEpoch {} {}", instance_id, init_epoch)
155            }
156
157            HummockEvent::ImmToUploader { instance_id, imms } => {
158                format!(
159                    "ImmToUploader {} {:?}",
160                    instance_id,
161                    imms.iter().map(|imm| imm.batch_id()).collect_vec()
162                )
163            }
164
165            HummockEvent::LocalSealEpoch {
166                instance_id,
167                next_epoch,
168                opts,
169            } => {
170                format!(
171                    "LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
172                    next_epoch, instance_id, opts
173                )
174            }
175
176            HummockEvent::RegisterReadVersion {
177                table_id,
178                new_read_version_sender: _,
179                is_replicated,
180                vnodes: _,
181            } => format!(
182                "RegisterReadVersion table_id {:?}, is_replicated: {:?}",
183                table_id, is_replicated
184            ),
185
186            HummockEvent::DestroyReadVersion { instance_id } => {
187                format!("DestroyReadVersion instance_id {:?}", instance_id)
188            }
189
190            #[cfg(any(test, feature = "test"))]
191            HummockEvent::FlushEvent(_) => "FlushEvent".to_owned(),
192            HummockEvent::GetMinUncommittedObjectId { .. } => {
193                "GetMinUncommittedObjectId".to_owned()
194            }
195            HummockEvent::RegisterVectorWriter { .. } => "RegisterVectorWriter".to_owned(),
196            HummockEvent::VectorWriterSealEpoch { .. } => "VectorWriterSealEpoch".to_owned(),
197            HummockEvent::DropVectorWriter { .. } => "DropVectorWriter".to_owned(),
198        }
199    }
200}
201
202impl std::fmt::Debug for HummockEvent {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("HummockEvent")
205            .field("debug_string", &self.to_debug_string())
206            .finish()
207    }
208}
209
210pub type LocalInstanceId = u64;
211pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233;
212pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
213pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
214pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;
215
216pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);
217
218impl<T> Clone for ReadOnlyRwLockRef<T> {
219    fn clone(&self) -> Self {
220        Self(self.0.clone())
221    }
222}
223
224impl<T> ReadOnlyRwLockRef<T> {
225    pub fn new(inner: Arc<RwLock<T>>) -> Self {
226        Self(inner)
227    }
228
229    pub fn read(&self) -> RwLockReadGuard<'_, T> {
230        self.0.read()
231    }
232}
233
234pub struct LocalInstanceGuard {
235    pub table_id: TableId,
236    pub instance_id: LocalInstanceId,
237    // Only send destroy event when event_sender when is_some
238    event_sender: Option<HummockEventSender>,
239}
240
241impl Drop for LocalInstanceGuard {
242    fn drop(&mut self) {
243        if let Some(sender) = self.event_sender.take() {
244            // If sending fails, it means that event_handler and event_channel have been destroyed, no
245            // need to handle failure
246            sender
247                .send(HummockEvent::DestroyReadVersion {
248                    instance_id: self.instance_id,
249                })
250                .unwrap_or_else(|err| {
251                    tracing::debug!(
252                        error = %err.as_report(),
253                        table_id = %self.table_id,
254                        instance_id = self.instance_id,
255                        "LocalInstanceGuard Drop SendError",
256                    )
257                })
258        }
259    }
260}