1use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU32;
19
20use anyhow::Context;
21use risingwave_common::config::{
22 CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
23};
24use risingwave_common::session_config::SessionConfig;
25use risingwave_common::system_param::reader::SystemParamsReader;
26use risingwave_common::{bail, system_param};
27use risingwave_meta_model::prelude::Cluster;
28use risingwave_pb::meta::SystemParams;
29use risingwave_rpc_client::{
30 FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
31};
32use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
33use sea_orm::EntityTrait;
34
35use crate::MetaResult;
36use crate::barrier::SharedActorInfos;
37use crate::barrier::cdc_progress::{CdcTableBackfillTracker, CdcTableBackfillTrackerRef};
38use crate::controller::SqlMetaStore;
39use crate::controller::id::{
40 IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
41};
42use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
43use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
44use crate::hummock::sequence::SequenceGenerator;
45use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
46use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
47use crate::model::ClusterId;
48
49#[derive(Clone)]
52pub struct MetaSrvEnv {
53 id_gen_manager_impl: SqlIdGeneratorManagerRef,
55
56 system_param_manager_impl: SystemParamsControllerRef,
58
59 session_param_manager_impl: SessionParamsControllerRef,
61
62 meta_store_impl: SqlMetaStore,
64
65 notification_manager: NotificationManagerRef,
67
68 pub shared_actor_info: SharedActorInfos,
69
70 stream_client_pool: StreamClientPoolRef,
72
73 frontend_client_pool: FrontendClientPoolRef,
75
76 idle_manager: IdleManagerRef,
78
79 event_log_manager: EventLogManagerRef,
80
81 cluster_id: ClusterId,
83
84 pub hummock_seq: Arc<SequenceGenerator>,
85
86 await_tree_reg: await_tree::Registry,
88
89 pub opts: Arc<MetaOpts>,
91
92 pub cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
93
94 actor_id_generator: Arc<AtomicU32>,
95}
96
97#[derive(Clone, serde::Serialize)]
99pub struct MetaOpts {
100 pub enable_recovery: bool,
103 pub disable_automatic_parallelism_control: bool,
105 pub parallelism_control_batch_size: usize,
107 pub parallelism_control_trigger_period_sec: u64,
109 pub parallelism_control_trigger_first_delay_sec: u64,
111 pub in_flight_barrier_nums: usize,
113 pub max_idle_ms: u64,
116 pub compaction_deterministic_test: bool,
118 pub default_parallelism: DefaultParallelism,
120
121 pub vacuum_interval_sec: u64,
124 pub vacuum_spin_interval_ms: u64,
127 pub iceberg_gc_interval_sec: u64,
129 pub time_travel_vacuum_interval_sec: u64,
130 pub hummock_version_checkpoint_interval_sec: u64,
132 pub enable_hummock_data_archive: bool,
133 pub hummock_time_travel_snapshot_interval: u64,
134 pub hummock_time_travel_sst_info_fetch_batch_size: usize,
135 pub hummock_time_travel_sst_info_insert_batch_size: usize,
136 pub hummock_time_travel_epoch_version_insert_batch_size: usize,
137 pub hummock_gc_history_insert_batch_size: usize,
138 pub hummock_time_travel_filter_out_objects_batch_size: usize,
139 pub hummock_time_travel_filter_out_objects_v1: bool,
140 pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
141 pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
142 pub min_delta_log_num_for_hummock_version_checkpoint: u64,
147 pub min_sst_retention_time_sec: u64,
150 pub full_gc_interval_sec: u64,
152 pub full_gc_object_limit: u64,
154 pub gc_history_retention_time_sec: u64,
156 pub max_inflight_time_travel_query: u64,
158 pub enable_committed_sst_sanity_check: bool,
160 pub periodic_compaction_interval_sec: u64,
162 pub node_num_monitor_interval_sec: u64,
164 pub protect_drop_table_with_incoming_sink: bool,
166 pub prometheus_endpoint: Option<String>,
172
173 pub prometheus_selector: Option<String>,
175
176 pub vpc_id: Option<String>,
178
179 pub security_group_id: Option<String>,
181
182 pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
186
187 pub periodic_space_reclaim_compaction_interval_sec: u64,
189
190 pub telemetry_enabled: bool,
192 pub periodic_ttl_reclaim_compaction_interval_sec: u64,
194
195 pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
197
198 pub periodic_scheduling_compaction_group_split_interval_sec: u64,
200
201 pub do_not_config_object_storage_lifecycle: bool,
203
204 pub partition_vnode_count: u32,
205
206 pub table_high_write_throughput_threshold: u64,
208 pub table_low_write_throughput_threshold: u64,
210
211 pub compaction_task_max_heartbeat_interval_secs: u64,
212 pub compaction_task_max_progress_interval_secs: u64,
213 pub compaction_config: Option<CompactionConfig>,
214
215 pub hybrid_partition_node_count: u32,
223
224 pub event_log_enabled: bool,
225 pub event_log_channel_max_size: u32,
226 pub advertise_addr: String,
227 pub cached_traces_num: u32,
230 pub cached_traces_memory_limit_bytes: usize,
233
234 pub enable_trivial_move: bool,
236
237 pub enable_check_task_level_overlap: bool,
239 pub enable_dropped_column_reclaim: bool,
240
241 pub split_group_size_ratio: f64,
243
244 pub table_stat_high_write_throughput_ratio_for_split: f64,
246
247 pub table_stat_low_write_throughput_ratio_for_merge: f64,
249
250 pub table_stat_throuput_window_seconds_for_split: usize,
252
253 pub table_stat_throuput_window_seconds_for_merge: usize,
255
256 pub object_store_config: ObjectStoreConfig,
258
259 pub max_trivial_move_task_count_per_loop: usize,
261
262 pub max_get_task_probe_times: usize,
264
265 pub compact_task_table_size_partition_threshold_low: u64,
266 pub compact_task_table_size_partition_threshold_high: u64,
267
268 pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
269
270 pub compaction_group_merge_dimension_threshold: f64,
271
272 pub secret_store_private_key: Option<Vec<u8>>,
274 pub temp_secret_file_dir: String,
276
277 pub actor_cnt_per_worker_parallelism_hard_limit: usize,
279 pub actor_cnt_per_worker_parallelism_soft_limit: usize,
280
281 pub license_key_path: Option<PathBuf>,
282
283 pub compute_client_config: RpcClientConfig,
284 pub stream_client_config: RpcClientConfig,
285 pub frontend_client_config: RpcClientConfig,
286 pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
287
288 pub cdc_table_split_init_sleep_interval_splits: u64,
289 pub cdc_table_split_init_sleep_duration_millis: u64,
290 pub cdc_table_split_init_insert_batch_size: u64,
291
292 pub enable_legacy_table_migration: bool,
293}
294
295impl MetaOpts {
296 pub fn test(enable_recovery: bool) -> Self {
298 Self {
299 enable_recovery,
300 disable_automatic_parallelism_control: false,
301 parallelism_control_batch_size: 1,
302 parallelism_control_trigger_period_sec: 10,
303 parallelism_control_trigger_first_delay_sec: 30,
304 in_flight_barrier_nums: 40,
305 max_idle_ms: 0,
306 compaction_deterministic_test: false,
307 default_parallelism: DefaultParallelism::Full,
308 vacuum_interval_sec: 30,
309 time_travel_vacuum_interval_sec: 30,
310 vacuum_spin_interval_ms: 0,
311 iceberg_gc_interval_sec: 3600,
312 hummock_version_checkpoint_interval_sec: 30,
313 enable_hummock_data_archive: false,
314 hummock_time_travel_snapshot_interval: 0,
315 hummock_time_travel_sst_info_fetch_batch_size: 10_000,
316 hummock_time_travel_sst_info_insert_batch_size: 10,
317 hummock_time_travel_epoch_version_insert_batch_size: 1000,
318 hummock_gc_history_insert_batch_size: 1000,
319 hummock_time_travel_filter_out_objects_batch_size: 1000,
320 hummock_time_travel_filter_out_objects_v1: false,
321 hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
322 hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
323 min_delta_log_num_for_hummock_version_checkpoint: 1,
324 min_sst_retention_time_sec: 3600 * 24 * 7,
325 full_gc_interval_sec: 3600 * 24 * 7,
326 full_gc_object_limit: 100_000,
327 gc_history_retention_time_sec: 3600 * 24 * 7,
328 max_inflight_time_travel_query: 1000,
329 enable_committed_sst_sanity_check: false,
330 periodic_compaction_interval_sec: 60,
331 node_num_monitor_interval_sec: 10,
332 protect_drop_table_with_incoming_sink: false,
333 prometheus_endpoint: None,
334 prometheus_selector: None,
335 vpc_id: None,
336 security_group_id: None,
337 privatelink_endpoint_default_tags: None,
338 periodic_space_reclaim_compaction_interval_sec: 60,
339 telemetry_enabled: false,
340 periodic_ttl_reclaim_compaction_interval_sec: 60,
341 periodic_tombstone_reclaim_compaction_interval_sec: 60,
342 periodic_scheduling_compaction_group_split_interval_sec: 60,
343 compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
344 compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
345 table_high_write_throughput_threshold: 128 * 1024 * 1024,
346 table_low_write_throughput_threshold: 64 * 1024 * 1024,
347 do_not_config_object_storage_lifecycle: true,
348 partition_vnode_count: 32,
349 compaction_task_max_heartbeat_interval_secs: 0,
350 compaction_task_max_progress_interval_secs: 1,
351 compaction_config: None,
352 hybrid_partition_node_count: 4,
353 event_log_enabled: false,
354 event_log_channel_max_size: 1,
355 advertise_addr: "".to_owned(),
356 cached_traces_num: 1,
357 cached_traces_memory_limit_bytes: usize::MAX,
358 enable_trivial_move: true,
359 enable_check_task_level_overlap: true,
360 enable_dropped_column_reclaim: false,
361 object_store_config: ObjectStoreConfig::default(),
362 max_trivial_move_task_count_per_loop: 256,
363 max_get_task_probe_times: 5,
364 secret_store_private_key: Some(
365 hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
366 ),
367 temp_secret_file_dir: "./secrets".to_owned(),
368 actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
369 actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
370 split_group_size_ratio: 0.9,
371 table_stat_high_write_throughput_ratio_for_split: 0.5,
372 table_stat_low_write_throughput_ratio_for_merge: 0.7,
373 table_stat_throuput_window_seconds_for_split: 60,
374 table_stat_throuput_window_seconds_for_merge: 240,
375 periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
376 compaction_group_merge_dimension_threshold: 1.2,
377 license_key_path: None,
378 compute_client_config: RpcClientConfig::default(),
379 stream_client_config: RpcClientConfig::default(),
380 frontend_client_config: RpcClientConfig::default(),
381 redact_sql_option_keywords: Arc::new(Default::default()),
382 cdc_table_split_init_sleep_interval_splits: 1000,
383 cdc_table_split_init_sleep_duration_millis: 10,
384 cdc_table_split_init_insert_batch_size: 1000,
385 enable_legacy_table_migration: true,
386 }
387 }
388}
389
390impl MetaSrvEnv {
391 pub async fn new(
392 opts: MetaOpts,
393 mut init_system_params: SystemParams,
394 init_session_config: SessionConfig,
395 meta_store_impl: SqlMetaStore,
396 ) -> MetaResult<Self> {
397 let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
398 let stream_client_pool =
399 Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); let frontend_client_pool = Arc::new(FrontendClientPool::new(
401 1,
402 opts.frontend_client_config.clone(),
403 ));
404 let event_log_manager = Arc::new(start_event_log_manager(
405 opts.event_log_enabled,
406 opts.event_log_channel_max_size,
407 ));
408
409 if opts.license_key_path.is_some()
412 && init_system_params.license_key
413 != system_param::default::license_key_opt().map(Into::into)
414 {
415 bail!(
416 "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
417 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
418 be set at the same time"
419 );
420 }
421
422 let cluster_first_launch = meta_store_impl.up().await.context(
423 "Failed to initialize the meta store, \
424 this may happen if there's existing metadata incompatible with the current version of RisingWave, \
425 e.g., downgrading from a newer release or a nightly build to an older one. \
426 For a single-node deployment, you may want to reset all data by deleting the data directory, \
427 typically located at `~/.risingwave`.",
428 )?;
429
430 let notification_manager =
431 Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
432 let cluster_id = Cluster::find()
433 .one(&meta_store_impl.conn)
434 .await?
435 .map(|c| c.cluster_id.to_string().into())
436 .unwrap();
437
438 init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
444
445 let system_param_controller = Arc::new(
446 SystemParamsController::new(
447 meta_store_impl.clone(),
448 notification_manager.clone(),
449 init_system_params,
450 )
451 .await?,
452 );
453 let session_param_controller = Arc::new(
454 SessionParamsController::new(
455 meta_store_impl.clone(),
456 notification_manager.clone(),
457 init_session_config,
458 )
459 .await?,
460 );
461 let cdc_table_backfill_tracker = CdcTableBackfillTracker::new(meta_store_impl.clone())
462 .await?
463 .into();
464 Ok(Self {
465 id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
466 system_param_manager_impl: system_param_controller,
467 session_param_manager_impl: session_param_controller,
468 meta_store_impl: meta_store_impl.clone(),
469 shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
470 notification_manager,
471 stream_client_pool,
472 frontend_client_pool,
473 idle_manager,
474 event_log_manager,
475 cluster_id,
476 hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
477 opts: opts.into(),
478 await_tree_reg: await_tree::Registry::new(Default::default()),
480 cdc_table_backfill_tracker,
481 actor_id_generator: Arc::new(AtomicU32::new(0)),
482 })
483 }
484
485 pub fn meta_store(&self) -> SqlMetaStore {
486 self.meta_store_impl.clone()
487 }
488
489 pub fn meta_store_ref(&self) -> &SqlMetaStore {
490 &self.meta_store_impl
491 }
492
493 pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
494 &self.id_gen_manager_impl
495 }
496
497 pub fn notification_manager_ref(&self) -> NotificationManagerRef {
498 self.notification_manager.clone()
499 }
500
501 pub fn notification_manager(&self) -> &NotificationManager {
502 self.notification_manager.deref()
503 }
504
505 pub fn idle_manager_ref(&self) -> IdleManagerRef {
506 self.idle_manager.clone()
507 }
508
509 pub fn idle_manager(&self) -> &IdleManager {
510 self.idle_manager.deref()
511 }
512
513 pub fn actor_id_generator(&self) -> &AtomicU32 {
514 self.actor_id_generator.deref()
515 }
516
517 pub async fn system_params_reader(&self) -> SystemParamsReader {
518 self.system_param_manager_impl.get_params().await
519 }
520
521 pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
522 self.system_param_manager_impl.clone()
523 }
524
525 pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
526 self.session_param_manager_impl.clone()
527 }
528
529 pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
530 self.stream_client_pool.clone()
531 }
532
533 pub fn stream_client_pool(&self) -> &StreamClientPool {
534 self.stream_client_pool.deref()
535 }
536
537 pub fn frontend_client_pool(&self) -> &FrontendClientPool {
538 self.frontend_client_pool.deref()
539 }
540
541 pub fn cluster_id(&self) -> &ClusterId {
542 &self.cluster_id
543 }
544
545 pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
546 self.event_log_manager.clone()
547 }
548
549 pub fn await_tree_reg(&self) -> &await_tree::Registry {
550 &self.await_tree_reg
551 }
552
553 pub fn shared_actor_infos(&self) -> &SharedActorInfos {
554 &self.shared_actor_info
555 }
556
557 pub fn cdc_table_backfill_tracker(&self) -> CdcTableBackfillTrackerRef {
558 self.cdc_table_backfill_tracker.clone()
559 }
560}
561
562#[cfg(any(test, feature = "test"))]
563impl MetaSrvEnv {
564 pub async fn for_test() -> Self {
566 Self::for_test_opts(MetaOpts::test(false), |_| ()).await
567 }
568
569 pub async fn for_test_opts(
570 opts: MetaOpts,
571 on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
572 ) -> Self {
573 let mut system_params = risingwave_common::system_param::system_params_for_test();
574 on_test_system_params(&mut system_params);
575 Self::new(
576 opts,
577 system_params,
578 Default::default(),
579 SqlMetaStore::for_test().await,
580 )
581 .await
582 .unwrap()
583 }
584}