risingwave_meta/hummock/manager/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap, VecDeque};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::monitor::MonitoredRwLock;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
    version_archive_dir, version_checkpoint_path, CompactionGroupId, HummockCompactionTaskId,
    HummockContextId, HummockVersionId,
};
use risingwave_meta_model::{
    compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
    hummock_version_stats,
};
use risingwave_pb::hummock::{
    HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
    SubscribeCompactionEventRequest,
};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{Mutex, Semaphore};
use tonic::Streaming;

use crate::hummock::compaction::CompactStatus;
use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::{FullGcState, GcManager};
use crate::hummock::CompactorManagerRef;
use crate::manager::{MetaSrvEnv, MetadataManager};
use crate::model::{ClusterId, MetadataModelError};
use crate::rpc::metrics::MetaMetrics;

mod context;
mod gc;
mod tests;
mod versioning;
pub use context::HummockVersionSafePoint;
use versioning::*;
pub(crate) mod checkpoint;
mod commit_epoch;
mod compaction;
pub mod sequence;
pub mod time_travel;
mod timer_task;
mod transaction;
mod utils;
mod worker;

pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
use compaction::*;
pub use compaction::{check_cg_write_limit, WriteLimitType};
pub(crate) use utils::*;

// Update to states are performed as follow:
// - Initialize ValTransaction for the meta state to update
// - Make changes on the ValTransaction.
// - Call `commit_multi_var` to commit the changes via meta store transaction. If transaction
//   succeeds, the in-mem state will be updated by the way.
pub struct HummockManager {
    pub env: MetaSrvEnv,

    metadata_manager: MetadataManager,
    /// Lock order: `compaction`, `versioning`, `compaction_group_manager`, `context_info`
    /// - Lock `compaction` first, then `versioning`, then `compaction_group_manager` and finally `context_info`.
    /// - This order should be strictly followed to prevent deadlock.
    compaction: MonitoredRwLock<Compaction>,
    versioning: MonitoredRwLock<Versioning>,
    /// `CompactionGroupManager` manages compaction configs for compaction groups.
    compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
    context_info: MonitoredRwLock<ContextInfo>,

    pub metrics: Arc<MetaMetrics>,

    pub compactor_manager: CompactorManagerRef,
    event_sender: HummockManagerEventSender,
    object_store: ObjectStoreRef,
    version_checkpoint_path: String,
    version_archive_dir: String,
    pause_version_checkpoint: AtomicBool,
    history_table_throughput: parking_lot::RwLock<HashMap<u32, VecDeque<u64>>>,

    // for compactor
    // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
    // and is maintained in memory. All event_streams are consumed through a separate event loop
    compactor_streams_change_tx: UnboundedSender<(u32, Streaming<SubscribeCompactionEventRequest>)>,

    // `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
    // and suggest types with a certain priority.
    pub compaction_state: CompactionState,
    full_gc_state: Arc<FullGcState>,
    now: Mutex<u64>,
    inflight_time_travel_query: Semaphore,
    gc_manager: GcManager,
}

pub type HummockManagerRef = Arc<HummockManager>;

use risingwave_object_store::object::{build_remote_object_store, ObjectError, ObjectStoreRef};
use risingwave_pb::catalog::Table;

macro_rules! start_measure_real_process_timer {
    ($hummock_mgr:expr, $func_name:literal) => {
        $hummock_mgr
            .metrics
            .hummock_manager_real_process_time
            .with_label_values(&[$func_name])
            .start_timer()
    };
}
pub(crate) use start_measure_real_process_timer;

use crate::controller::SqlMetaStore;
use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
use crate::hummock::manager::worker::HummockManagerEventSender;

impl HummockManager {
    pub async fn new(
        env: MetaSrvEnv,
        metadata_manager: MetadataManager,
        metrics: Arc<MetaMetrics>,
        compactor_manager: CompactorManagerRef,
        compactor_streams_change_tx: UnboundedSender<(
            u32,
            Streaming<SubscribeCompactionEventRequest>,
        )>,
    ) -> Result<HummockManagerRef> {
        let compaction_group_manager = CompactionGroupManager::new(&env).await?;
        Self::new_impl(
            env,
            metadata_manager,
            metrics,
            compactor_manager,
            compaction_group_manager,
            compactor_streams_change_tx,
        )
        .await
    }

    #[cfg(any(test, feature = "test"))]
    pub(super) async fn with_config(
        env: MetaSrvEnv,
        cluster_controller: crate::controller::cluster::ClusterControllerRef,
        catalog_controller: crate::controller::catalog::CatalogControllerRef,
        metrics: Arc<MetaMetrics>,
        compactor_manager: CompactorManagerRef,
        config: risingwave_pb::hummock::CompactionConfig,
        compactor_streams_change_tx: UnboundedSender<(
            u32,
            Streaming<SubscribeCompactionEventRequest>,
        )>,
    ) -> HummockManagerRef {
        let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
            .await
            .unwrap();
        let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
        Self::new_impl(
            env,
            metadata_manager,
            metrics,
            compactor_manager,
            compaction_group_manager,
            compactor_streams_change_tx,
        )
        .await
        .unwrap()
    }

    async fn new_impl(
        env: MetaSrvEnv,
        metadata_manager: MetadataManager,
        metrics: Arc<MetaMetrics>,
        compactor_manager: CompactorManagerRef,
        compaction_group_manager: CompactionGroupManager,
        compactor_streams_change_tx: UnboundedSender<(
            u32,
            Streaming<SubscribeCompactionEventRequest>,
        )>,
    ) -> Result<HummockManagerRef> {
        let sys_params = env.system_params_reader().await;
        let state_store_url = sys_params.state_store();
        let state_store_dir: &str = sys_params.data_directory();
        let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
        let deterministic_mode = env.opts.compaction_deterministic_test;
        let mut object_store_config = env.opts.object_store_config.clone();
        // For fs and hdfs object store, operations are not always atomic.
        // We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
        object_store_config.set_atomic_write_dir();
        let object_store = Arc::new(
            build_remote_object_store(
                state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
                metrics.object_store_metric.clone(),
                "Version Checkpoint",
                Arc::new(object_store_config),
            )
            .await,
        );
        // Make sure data dir is not used by another cluster.
        // Skip this check in e2e compaction test, which needs to start a secondary cluster with
        // same bucket
        if !deterministic_mode {
            write_exclusive_cluster_id(
                state_store_dir,
                env.cluster_id().clone(),
                object_store.clone(),
            )
            .await?;

            // config bucket lifecycle for new cluster.
            if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
                && !env.opts.do_not_config_object_storage_lifecycle
            {
                let is_bucket_expiration_configured =
                    s3.inner().configure_bucket_lifecycle(state_store_dir).await;
                if is_bucket_expiration_configured {
                    return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
                    Please disable object expiration and restart the cluster.")
                    .into());
                }
            }
        }
        let version_checkpoint_path = version_checkpoint_path(state_store_dir);
        let version_archive_dir = version_archive_dir(state_store_dir);
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
        let gc_manager = GcManager::new(
            object_store.clone(),
            state_store_dir,
            use_new_object_prefix_strategy,
        );
        let instance = HummockManager {
            env,
            versioning: MonitoredRwLock::new(
                metrics.hummock_manager_lock_time.clone(),
                Default::default(),
                "hummock_manager::versioning",
            ),
            compaction: MonitoredRwLock::new(
                metrics.hummock_manager_lock_time.clone(),
                Default::default(),
                "hummock_manager::compaction",
            ),
            compaction_group_manager: MonitoredRwLock::new(
                metrics.hummock_manager_lock_time.clone(),
                compaction_group_manager,
                "hummock_manager::compaction_group_manager",
            ),
            context_info: MonitoredRwLock::new(
                metrics.hummock_manager_lock_time.clone(),
                Default::default(),
                "hummock_manager::context_info",
            ),
            metrics,
            metadata_manager,
            // compaction_request_channel: parking_lot::RwLock::new(None),
            compactor_manager,
            event_sender: tx,
            object_store,
            version_checkpoint_path,
            version_archive_dir,
            pause_version_checkpoint: AtomicBool::new(false),
            history_table_throughput: parking_lot::RwLock::new(HashMap::default()),
            compactor_streams_change_tx,
            compaction_state: CompactionState::new(),
            full_gc_state: FullGcState::new().into(),
            now: Mutex::new(0),
            inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
            gc_manager,
        };
        let instance = Arc::new(instance);
        instance.init_time_travel_state().await?;
        instance.start_worker(rx).await;
        instance.load_meta_store_state().await?;
        instance.release_invalid_contexts().await?;
        // Release snapshots pinned by meta on restarting.
        instance.release_meta_context().await?;
        Ok(instance)
    }

    fn meta_store_ref(&self) -> &SqlMetaStore {
        self.env.meta_store_ref()
    }

    /// Load state from meta store.
    async fn load_meta_store_state(&self) -> Result<()> {
        let now = self.load_now().await?;
        *self.now.lock().await = now.unwrap_or(0);

        let mut compaction_guard = self.compaction.write().await;
        let mut versioning_guard = self.versioning.write().await;
        let mut context_info_guard = self.context_info.write().await;
        self.load_meta_store_state_impl(
            &mut compaction_guard,
            &mut versioning_guard,
            &mut context_info_guard,
        )
        .await
    }

    /// Load state from meta store.
    async fn load_meta_store_state_impl(
        &self,
        compaction_guard: &mut Compaction,
        versioning_guard: &mut Versioning,
        context_info: &mut ContextInfo,
    ) -> Result<()> {
        use sea_orm::EntityTrait;
        let meta_store = self.meta_store_ref();
        let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
            compaction_status::Entity::find()
                .all(&meta_store.conn)
                .await
                .map_err(MetadataModelError::from)?
                .into_iter()
                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
                .collect();
        if !compaction_statuses.is_empty() {
            compaction_guard.compaction_statuses = compaction_statuses;
        }

        compaction_guard.compact_task_assignment = compaction_task::Entity::find()
            .all(&meta_store.conn)
            .await
            .map_err(MetadataModelError::from)?
            .into_iter()
            .map(|m| {
                (
                    m.id as HummockCompactionTaskId,
                    PbCompactTaskAssignment::from(m),
                )
            })
            .collect();

        let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
            hummock_version_delta::Entity::find()
                .all(&meta_store.conn)
                .await
                .map_err(MetadataModelError::from)?
                .into_iter()
                .map(|m| {
                    (
                        HummockVersionId::new(m.id as _),
                        HummockVersionDelta::from_persisted_protobuf(&m.into()),
                    )
                })
                .collect();

        let checkpoint = self.try_read_checkpoint().await?;
        let mut redo_state = if let Some(c) = checkpoint {
            versioning_guard.checkpoint = c;
            versioning_guard.checkpoint.version.clone()
        } else {
            let default_compaction_config = self
                .compaction_group_manager
                .read()
                .await
                .default_compaction_config();
            let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
            tracing::info!("init hummock version checkpoint");
            versioning_guard.checkpoint = HummockVersionCheckpoint {
                version: checkpoint_version.clone(),
                stale_objects: Default::default(),
            };
            self.write_checkpoint(&versioning_guard.checkpoint).await?;
            checkpoint_version
        };
        for version_delta in hummock_version_deltas.values() {
            if version_delta.prev_id == redo_state.id {
                redo_state.apply_version_delta(version_delta);
            }
        }
        versioning_guard.version_stats = hummock_version_stats::Entity::find()
            .one(&meta_store.conn)
            .await
            .map_err(MetadataModelError::from)?
            .map(HummockVersionStats::from)
            .unwrap_or_else(|| HummockVersionStats {
                // version_stats.hummock_version_id is always 0 in meta store.
                hummock_version_id: 0,
                ..Default::default()
            });

        versioning_guard.current_version = redo_state;
        versioning_guard.hummock_version_deltas = hummock_version_deltas;

        context_info.pinned_versions = hummock_pinned_version::Entity::find()
            .all(&meta_store.conn)
            .await
            .map_err(MetadataModelError::from)?
            .into_iter()
            .map(|m| (m.context_id as HummockContextId, m.into()))
            .collect();

        self.initial_compaction_group_config_after_load(
            versioning_guard,
            self.compaction_group_manager.write().await.deref_mut(),
        )
        .await?;

        Ok(())
    }

    pub fn init_metadata_for_version_replay(
        &self,
        _table_catalogs: Vec<Table>,
        _compaction_groups: Vec<PbCompactionGroupInfo>,
    ) -> Result<()> {
        unimplemented!("kv meta store is deprecated");
    }

    /// Replay a version delta to current hummock version.
    /// Returns the `version_id`, `max_committed_epoch` of the new version and the modified
    /// compaction groups
    pub async fn replay_version_delta(
        &self,
        mut version_delta: HummockVersionDelta,
    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
        let mut versioning_guard = self.versioning.write().await;
        // ensure the version id is ascending after replay
        version_delta.id = versioning_guard.current_version.next_version_id();
        version_delta.prev_id = versioning_guard.current_version.id;
        versioning_guard
            .current_version
            .apply_version_delta(&version_delta);

        let version_new = versioning_guard.current_version.clone();
        let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
        Ok((version_new, compaction_group_ids))
    }

    pub async fn disable_commit_epoch(&self) -> HummockVersion {
        let mut versioning_guard = self.versioning.write().await;
        versioning_guard.disable_commit_epochs = true;
        versioning_guard.current_version.clone()
    }

    pub fn metadata_manager(&self) -> &MetadataManager {
        &self.metadata_manager
    }

    pub fn object_store_media_type(&self) -> &'static str {
        self.object_store.media_type()
    }
}

async fn write_exclusive_cluster_id(
    state_store_dir: &str,
    cluster_id: ClusterId,
    object_store: ObjectStoreRef,
) -> Result<()> {
    const CLUSTER_ID_DIR: &str = "cluster_id";
    const CLUSTER_ID_NAME: &str = "0";
    let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
    let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
    match object_store.read(&cluster_id_full_path, ..).await {
        Ok(stored_cluster_id) => {
            let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
            if cluster_id.deref() == stored_cluster_id {
                return Ok(());
            }

            Err(ObjectError::internal(format!(
                "Data directory is already used by another cluster with id {:?}, path {}.",
                stored_cluster_id, cluster_id_full_path,
            ))
            .into())
        }
        Err(e) => {
            if e.is_object_not_found_error() {
                object_store
                    .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
                    .await?;
                return Ok(());
            }
            Err(e.into())
        }
    }
}