risingwave_storage/hummock/event_handler/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::{RwLock, RwLockReadGuard};
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId};
23use thiserror_ext::AsReport;
24use tokio::sync::oneshot;
25
26use crate::hummock::HummockResult;
27use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
28use crate::mem_table::ImmutableMemtable;
29use crate::store::SealCurrentEpochOptions;
30
31pub mod hummock_event_handler;
32pub mod refiller;
33pub mod uploader;
34
35pub(crate) use hummock_event_handler::HummockEventHandler;
36use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38
39use super::store::version::HummockReadVersion;
40use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
41use crate::hummock::event_handler::uploader::SyncedData;
42use crate::hummock::utils::MemoryTracker;
43
44#[derive(Debug)]
45pub struct BufferWriteRequest {
46    pub batch: SharedBufferBatch,
47    pub epoch: HummockEpoch,
48    pub grant_sender: oneshot::Sender<()>,
49}
50
51#[derive(Debug)]
52pub enum HummockVersionUpdate {
53    VersionDeltas(Vec<HummockVersionDelta>),
54    PinnedVersion(Box<HummockVersion>),
55}
56
57pub enum HummockEvent {
58    /// Notify that we may flush the shared buffer.
59    BufferMayFlush,
60
61    /// An epoch is going to be synced. Once the event is processed, there will be no more flush
62    /// task on this epoch. Previous concurrent flush task join handle will be returned by the join
63    /// handle sender.
64    SyncEpoch {
65        sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
66        sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
67    },
68
69    /// Clear shared buffer and reset all states
70    Clear(oneshot::Sender<()>, Option<HashSet<TableId>>),
71
72    Shutdown,
73
74    ImmToUploader {
75        instance_id: SharedBufferBatchId,
76        imms: Vec<(ImmutableMemtable, MemoryTracker)>,
77    },
78
79    StartEpoch {
80        epoch: HummockEpoch,
81        table_ids: HashSet<TableId>,
82    },
83
84    InitEpoch {
85        instance_id: LocalInstanceId,
86        init_epoch: HummockEpoch,
87    },
88
89    LocalSealEpoch {
90        instance_id: LocalInstanceId,
91        next_epoch: HummockEpoch,
92        opts: SealCurrentEpochOptions,
93    },
94
95    #[cfg(any(test, feature = "test"))]
96    /// Flush all previous event. When all previous events has been consumed, the event handler
97    /// will notify
98    FlushEvent(oneshot::Sender<()>),
99
100    RegisterReadVersion {
101        table_id: TableId,
102        new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
103        is_replicated: bool,
104        vnodes: Arc<Bitmap>,
105    },
106
107    DestroyReadVersion {
108        instance_id: LocalInstanceId,
109    },
110
111    RegisterVectorWriter {
112        table_id: TableId,
113        init_epoch: HummockEpoch,
114    },
115
116    VectorWriterSealEpoch {
117        table_id: TableId,
118        next_epoch: HummockEpoch,
119        add: Option<VectorIndexAdd>,
120    },
121
122    DropVectorWriter {
123        table_id: TableId,
124    },
125
126    GetMinUncommittedObjectId {
127        result_tx: oneshot::Sender<Option<HummockRawObjectId>>,
128    },
129}
130
131impl HummockEvent {
132    fn to_debug_string(&self) -> String {
133        match self {
134            HummockEvent::BufferMayFlush => "BufferMayFlush".to_owned(),
135
136            HummockEvent::SyncEpoch {
137                sync_result_sender: _,
138                sync_table_epochs,
139            } => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs),
140
141            HummockEvent::Clear(_, table_ids) => {
142                format!("Clear {:?}", table_ids)
143            }
144
145            HummockEvent::Shutdown => "Shutdown".to_owned(),
146
147            HummockEvent::StartEpoch { epoch, table_ids } => {
148                format!("StartEpoch {} {:?}", epoch, table_ids)
149            }
150
151            HummockEvent::InitEpoch {
152                instance_id,
153                init_epoch,
154            } => {
155                format!("InitEpoch {} {}", instance_id, init_epoch)
156            }
157
158            HummockEvent::ImmToUploader { instance_id, imms } => {
159                format!(
160                    "ImmToUploader {} {:?}",
161                    instance_id,
162                    imms.iter().map(|(imm, _)| imm.batch_id()).collect_vec()
163                )
164            }
165
166            HummockEvent::LocalSealEpoch {
167                instance_id,
168                next_epoch,
169                opts,
170            } => {
171                format!(
172                    "LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
173                    next_epoch, instance_id, opts
174                )
175            }
176
177            HummockEvent::RegisterReadVersion {
178                table_id,
179                new_read_version_sender: _,
180                is_replicated,
181                vnodes: _,
182            } => format!(
183                "RegisterReadVersion table_id {:?}, is_replicated: {:?}",
184                table_id, is_replicated
185            ),
186
187            HummockEvent::DestroyReadVersion { instance_id } => {
188                format!("DestroyReadVersion instance_id {:?}", instance_id)
189            }
190
191            #[cfg(any(test, feature = "test"))]
192            HummockEvent::FlushEvent(_) => "FlushEvent".to_owned(),
193            HummockEvent::GetMinUncommittedObjectId { .. } => {
194                "GetMinUncommittedObjectId".to_owned()
195            }
196            HummockEvent::RegisterVectorWriter { .. } => "RegisterVectorWriter".to_owned(),
197            HummockEvent::VectorWriterSealEpoch { .. } => "VectorWriterSealEpoch".to_owned(),
198            HummockEvent::DropVectorWriter { .. } => "DropVectorWriter".to_owned(),
199        }
200    }
201
202    pub fn event_name(&self) -> &'static str {
203        match self {
204            HummockEvent::BufferMayFlush => "BufferMayFlush",
205            HummockEvent::SyncEpoch { .. } => "SyncEpoch",
206            HummockEvent::Clear(..) => "Clear",
207            HummockEvent::Shutdown => "Shutdown",
208            HummockEvent::ImmToUploader { .. } => "ImmToUploader",
209            HummockEvent::StartEpoch { .. } => "StartEpoch",
210            HummockEvent::InitEpoch { .. } => "InitEpoch",
211            HummockEvent::LocalSealEpoch { .. } => "LocalSealEpoch",
212            #[cfg(any(test, feature = "test"))]
213            HummockEvent::FlushEvent(_) => "FlushEvent",
214            HummockEvent::RegisterReadVersion { .. } => "RegisterReadVersion",
215            HummockEvent::DestroyReadVersion { .. } => "DestroyReadVersion",
216            HummockEvent::RegisterVectorWriter { .. } => "RegisterVectorWriter",
217            HummockEvent::VectorWriterSealEpoch { .. } => "VectorWriterSealEpoch",
218            HummockEvent::DropVectorWriter { .. } => "DropVectorWriter",
219            HummockEvent::GetMinUncommittedObjectId { .. } => "GetMinUncommittedObjectId",
220        }
221    }
222}
223
224impl std::fmt::Debug for HummockEvent {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        f.debug_struct("HummockEvent")
227            .field("debug_string", &self.to_debug_string())
228            .finish()
229    }
230}
231
232pub type LocalInstanceId = u64;
233pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233;
234pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
235pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
236pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;
237
238pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);
239
240impl<T> Clone for ReadOnlyRwLockRef<T> {
241    fn clone(&self) -> Self {
242        Self(self.0.clone())
243    }
244}
245
246impl<T> ReadOnlyRwLockRef<T> {
247    pub fn new(inner: Arc<RwLock<T>>) -> Self {
248        Self(inner)
249    }
250
251    pub fn read(&self) -> RwLockReadGuard<'_, T> {
252        self.0.read()
253    }
254}
255
256pub struct LocalInstanceGuard {
257    pub table_id: TableId,
258    pub instance_id: LocalInstanceId,
259    // Only send destroy event when event_sender when is_some
260    event_sender: Option<HummockEventSender>,
261}
262
263impl Drop for LocalInstanceGuard {
264    fn drop(&mut self) {
265        if let Some(sender) = self.event_sender.take() {
266            // If sending fails, it means that event_handler and event_channel have been destroyed, no
267            // need to handle failure
268            sender
269                .send(HummockEvent::DestroyReadVersion {
270                    instance_id: self.instance_id,
271                })
272                .unwrap_or_else(|err| {
273                    tracing::debug!(
274                        error = %err.as_report(),
275                        table_id = %self.table_id,
276                        instance_id = self.instance_id,
277                        "LocalInstanceGuard Drop SendError",
278                    )
279                })
280        }
281    }
282}