1use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19use anyhow::Context;
20use risingwave_common::config::{
21 CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
22};
23use risingwave_common::session_config::SessionConfig;
24use risingwave_common::system_param::reader::SystemParamsReader;
25use risingwave_common::{bail, system_param};
26use risingwave_meta_model::prelude::Cluster;
27use risingwave_pb::meta::SystemParams;
28use risingwave_rpc_client::{
29 FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
30};
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use sea_orm::EntityTrait;
33
34use crate::MetaResult;
35use crate::barrier::SharedActorInfos;
36use crate::barrier::cdc_progress::{CdcTableBackfillTracker, CdcTableBackfillTrackerRef};
37use crate::controller::SqlMetaStore;
38use crate::controller::id::{
39 IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
40};
41use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
42use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
43use crate::hummock::sequence::SequenceGenerator;
44use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
45use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
46use crate::model::ClusterId;
47
48#[derive(Clone)]
51pub struct MetaSrvEnv {
52 id_gen_manager_impl: SqlIdGeneratorManagerRef,
54
55 system_param_manager_impl: SystemParamsControllerRef,
57
58 session_param_manager_impl: SessionParamsControllerRef,
60
61 meta_store_impl: SqlMetaStore,
63
64 notification_manager: NotificationManagerRef,
66
67 shared_actor_info: SharedActorInfos,
68
69 stream_client_pool: StreamClientPoolRef,
71
72 frontend_client_pool: FrontendClientPoolRef,
74
75 idle_manager: IdleManagerRef,
77
78 event_log_manager: EventLogManagerRef,
79
80 cluster_id: ClusterId,
82
83 pub hummock_seq: Arc<SequenceGenerator>,
84
85 await_tree_reg: await_tree::Registry,
87
88 pub opts: Arc<MetaOpts>,
90
91 pub cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
92}
93
94#[derive(Clone, serde::Serialize)]
96pub struct MetaOpts {
97 pub enable_recovery: bool,
100 pub disable_automatic_parallelism_control: bool,
102 pub parallelism_control_batch_size: usize,
104 pub parallelism_control_trigger_period_sec: u64,
106 pub parallelism_control_trigger_first_delay_sec: u64,
108 pub in_flight_barrier_nums: usize,
110 pub max_idle_ms: u64,
113 pub compaction_deterministic_test: bool,
115 pub default_parallelism: DefaultParallelism,
117
118 pub vacuum_interval_sec: u64,
121 pub vacuum_spin_interval_ms: u64,
124 pub time_travel_vacuum_interval_sec: u64,
125 pub hummock_version_checkpoint_interval_sec: u64,
127 pub enable_hummock_data_archive: bool,
128 pub hummock_time_travel_snapshot_interval: u64,
129 pub hummock_time_travel_sst_info_fetch_batch_size: usize,
130 pub hummock_time_travel_sst_info_insert_batch_size: usize,
131 pub hummock_time_travel_epoch_version_insert_batch_size: usize,
132 pub hummock_gc_history_insert_batch_size: usize,
133 pub hummock_time_travel_filter_out_objects_batch_size: usize,
134 pub hummock_time_travel_filter_out_objects_v1: bool,
135 pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
136 pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
137 pub min_delta_log_num_for_hummock_version_checkpoint: u64,
142 pub min_sst_retention_time_sec: u64,
145 pub full_gc_interval_sec: u64,
147 pub full_gc_object_limit: u64,
149 pub gc_history_retention_time_sec: u64,
151 pub max_inflight_time_travel_query: u64,
153 pub enable_committed_sst_sanity_check: bool,
155 pub periodic_compaction_interval_sec: u64,
157 pub node_num_monitor_interval_sec: u64,
159 pub protect_drop_table_with_incoming_sink: bool,
161 pub prometheus_endpoint: Option<String>,
167
168 pub prometheus_selector: Option<String>,
170
171 pub vpc_id: Option<String>,
173
174 pub security_group_id: Option<String>,
176
177 pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
181
182 pub periodic_space_reclaim_compaction_interval_sec: u64,
184
185 pub telemetry_enabled: bool,
187 pub periodic_ttl_reclaim_compaction_interval_sec: u64,
189
190 pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
192
193 pub periodic_scheduling_compaction_group_split_interval_sec: u64,
195
196 pub do_not_config_object_storage_lifecycle: bool,
198
199 pub partition_vnode_count: u32,
200
201 pub table_high_write_throughput_threshold: u64,
203 pub table_low_write_throughput_threshold: u64,
205
206 pub compaction_task_max_heartbeat_interval_secs: u64,
207 pub compaction_task_max_progress_interval_secs: u64,
208 pub compaction_config: Option<CompactionConfig>,
209
210 pub hybrid_partition_node_count: u32,
218
219 pub event_log_enabled: bool,
220 pub event_log_channel_max_size: u32,
221 pub advertise_addr: String,
222 pub cached_traces_num: u32,
225 pub cached_traces_memory_limit_bytes: usize,
228
229 pub enable_trivial_move: bool,
231
232 pub enable_check_task_level_overlap: bool,
234 pub enable_dropped_column_reclaim: bool,
235
236 pub split_group_size_ratio: f64,
238
239 pub table_stat_high_write_throughput_ratio_for_split: f64,
241
242 pub table_stat_low_write_throughput_ratio_for_merge: f64,
244
245 pub table_stat_throuput_window_seconds_for_split: usize,
247
248 pub table_stat_throuput_window_seconds_for_merge: usize,
250
251 pub object_store_config: ObjectStoreConfig,
253
254 pub max_trivial_move_task_count_per_loop: usize,
256
257 pub max_get_task_probe_times: usize,
259
260 pub compact_task_table_size_partition_threshold_low: u64,
261 pub compact_task_table_size_partition_threshold_high: u64,
262
263 pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
264
265 pub compaction_group_merge_dimension_threshold: f64,
266
267 pub secret_store_private_key: Option<Vec<u8>>,
269 pub temp_secret_file_dir: String,
271
272 pub actor_cnt_per_worker_parallelism_hard_limit: usize,
274 pub actor_cnt_per_worker_parallelism_soft_limit: usize,
275
276 pub license_key_path: Option<PathBuf>,
277
278 pub compute_client_config: RpcClientConfig,
279 pub stream_client_config: RpcClientConfig,
280 pub frontend_client_config: RpcClientConfig,
281 pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
282
283 pub cdc_table_split_init_sleep_interval_splits: u64,
284 pub cdc_table_split_init_sleep_duration_millis: u64,
285 pub cdc_table_split_init_insert_batch_size: u64,
286}
287
288impl MetaOpts {
289 pub fn test(enable_recovery: bool) -> Self {
291 Self {
292 enable_recovery,
293 disable_automatic_parallelism_control: false,
294 parallelism_control_batch_size: 1,
295 parallelism_control_trigger_period_sec: 10,
296 parallelism_control_trigger_first_delay_sec: 30,
297 in_flight_barrier_nums: 40,
298 max_idle_ms: 0,
299 compaction_deterministic_test: false,
300 default_parallelism: DefaultParallelism::Full,
301 vacuum_interval_sec: 30,
302 time_travel_vacuum_interval_sec: 30,
303 vacuum_spin_interval_ms: 0,
304 hummock_version_checkpoint_interval_sec: 30,
305 enable_hummock_data_archive: false,
306 hummock_time_travel_snapshot_interval: 0,
307 hummock_time_travel_sst_info_fetch_batch_size: 10_000,
308 hummock_time_travel_sst_info_insert_batch_size: 10,
309 hummock_time_travel_epoch_version_insert_batch_size: 1000,
310 hummock_gc_history_insert_batch_size: 1000,
311 hummock_time_travel_filter_out_objects_batch_size: 1000,
312 hummock_time_travel_filter_out_objects_v1: false,
313 hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
314 hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
315 min_delta_log_num_for_hummock_version_checkpoint: 1,
316 min_sst_retention_time_sec: 3600 * 24 * 7,
317 full_gc_interval_sec: 3600 * 24 * 7,
318 full_gc_object_limit: 100_000,
319 gc_history_retention_time_sec: 3600 * 24 * 7,
320 max_inflight_time_travel_query: 1000,
321 enable_committed_sst_sanity_check: false,
322 periodic_compaction_interval_sec: 60,
323 node_num_monitor_interval_sec: 10,
324 protect_drop_table_with_incoming_sink: false,
325 prometheus_endpoint: None,
326 prometheus_selector: None,
327 vpc_id: None,
328 security_group_id: None,
329 privatelink_endpoint_default_tags: None,
330 periodic_space_reclaim_compaction_interval_sec: 60,
331 telemetry_enabled: false,
332 periodic_ttl_reclaim_compaction_interval_sec: 60,
333 periodic_tombstone_reclaim_compaction_interval_sec: 60,
334 periodic_scheduling_compaction_group_split_interval_sec: 60,
335 compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
336 compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
337 table_high_write_throughput_threshold: 128 * 1024 * 1024,
338 table_low_write_throughput_threshold: 64 * 1024 * 1024,
339 do_not_config_object_storage_lifecycle: true,
340 partition_vnode_count: 32,
341 compaction_task_max_heartbeat_interval_secs: 0,
342 compaction_task_max_progress_interval_secs: 1,
343 compaction_config: None,
344 hybrid_partition_node_count: 4,
345 event_log_enabled: false,
346 event_log_channel_max_size: 1,
347 advertise_addr: "".to_owned(),
348 cached_traces_num: 1,
349 cached_traces_memory_limit_bytes: usize::MAX,
350 enable_trivial_move: true,
351 enable_check_task_level_overlap: true,
352 enable_dropped_column_reclaim: false,
353 object_store_config: ObjectStoreConfig::default(),
354 max_trivial_move_task_count_per_loop: 256,
355 max_get_task_probe_times: 5,
356 secret_store_private_key: Some(
357 hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
358 ),
359 temp_secret_file_dir: "./secrets".to_owned(),
360 actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
361 actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
362 split_group_size_ratio: 0.9,
363 table_stat_high_write_throughput_ratio_for_split: 0.5,
364 table_stat_low_write_throughput_ratio_for_merge: 0.7,
365 table_stat_throuput_window_seconds_for_split: 60,
366 table_stat_throuput_window_seconds_for_merge: 240,
367 periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
368 compaction_group_merge_dimension_threshold: 1.2,
369 license_key_path: None,
370 compute_client_config: RpcClientConfig::default(),
371 stream_client_config: RpcClientConfig::default(),
372 frontend_client_config: RpcClientConfig::default(),
373 redact_sql_option_keywords: Arc::new(Default::default()),
374 cdc_table_split_init_sleep_interval_splits: 1000,
375 cdc_table_split_init_sleep_duration_millis: 10,
376 cdc_table_split_init_insert_batch_size: 1000,
377 }
378 }
379}
380
381impl MetaSrvEnv {
382 pub async fn new(
383 opts: MetaOpts,
384 mut init_system_params: SystemParams,
385 init_session_config: SessionConfig,
386 meta_store_impl: SqlMetaStore,
387 ) -> MetaResult<Self> {
388 let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
389 let stream_client_pool =
390 Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); let frontend_client_pool = Arc::new(FrontendClientPool::new(
392 1,
393 opts.frontend_client_config.clone(),
394 ));
395 let event_log_manager = Arc::new(start_event_log_manager(
396 opts.event_log_enabled,
397 opts.event_log_channel_max_size,
398 ));
399
400 if opts.license_key_path.is_some()
403 && init_system_params.license_key
404 != system_param::default::license_key_opt().map(Into::into)
405 {
406 bail!(
407 "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
408 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
409 be set at the same time"
410 );
411 }
412
413 let cluster_first_launch = meta_store_impl.up().await.context(
414 "Failed to initialize the meta store, \
415 this may happen if there's existing metadata incompatible with the current version of RisingWave, \
416 e.g., downgrading from a newer release or a nightly build to an older one. \
417 For a single-node deployment, you may want to reset all data by deleting the data directory, \
418 typically located at `~/.risingwave`.",
419 )?;
420
421 let notification_manager =
422 Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
423 let cluster_id = Cluster::find()
424 .one(&meta_store_impl.conn)
425 .await?
426 .map(|c| c.cluster_id.to_string().into())
427 .unwrap();
428
429 init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
435
436 let system_param_controller = Arc::new(
437 SystemParamsController::new(
438 meta_store_impl.clone(),
439 notification_manager.clone(),
440 init_system_params,
441 )
442 .await?,
443 );
444 let session_param_controller = Arc::new(
445 SessionParamsController::new(
446 meta_store_impl.clone(),
447 notification_manager.clone(),
448 init_session_config,
449 )
450 .await?,
451 );
452 let cdc_table_backfill_tracker = CdcTableBackfillTracker::new(meta_store_impl.clone())
453 .await?
454 .into();
455 Ok(Self {
456 id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
457 system_param_manager_impl: system_param_controller,
458 session_param_manager_impl: session_param_controller,
459 meta_store_impl: meta_store_impl.clone(),
460 shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
461 notification_manager,
462 stream_client_pool,
463 frontend_client_pool,
464 idle_manager,
465 event_log_manager,
466 cluster_id,
467 hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
468 opts: opts.into(),
469 await_tree_reg: await_tree::Registry::new(Default::default()),
471 cdc_table_backfill_tracker,
472 })
473 }
474
475 pub fn meta_store(&self) -> SqlMetaStore {
476 self.meta_store_impl.clone()
477 }
478
479 pub fn meta_store_ref(&self) -> &SqlMetaStore {
480 &self.meta_store_impl
481 }
482
483 pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
484 &self.id_gen_manager_impl
485 }
486
487 pub fn notification_manager_ref(&self) -> NotificationManagerRef {
488 self.notification_manager.clone()
489 }
490
491 pub fn notification_manager(&self) -> &NotificationManager {
492 self.notification_manager.deref()
493 }
494
495 pub fn idle_manager_ref(&self) -> IdleManagerRef {
496 self.idle_manager.clone()
497 }
498
499 pub fn idle_manager(&self) -> &IdleManager {
500 self.idle_manager.deref()
501 }
502
503 pub async fn system_params_reader(&self) -> SystemParamsReader {
504 self.system_param_manager_impl.get_params().await
505 }
506
507 pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
508 self.system_param_manager_impl.clone()
509 }
510
511 pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
512 self.session_param_manager_impl.clone()
513 }
514
515 pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
516 self.stream_client_pool.clone()
517 }
518
519 pub fn stream_client_pool(&self) -> &StreamClientPool {
520 self.stream_client_pool.deref()
521 }
522
523 pub fn frontend_client_pool(&self) -> &FrontendClientPool {
524 self.frontend_client_pool.deref()
525 }
526
527 pub fn cluster_id(&self) -> &ClusterId {
528 &self.cluster_id
529 }
530
531 pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
532 self.event_log_manager.clone()
533 }
534
535 pub fn await_tree_reg(&self) -> &await_tree::Registry {
536 &self.await_tree_reg
537 }
538
539 pub(crate) fn shared_actor_infos(&self) -> &SharedActorInfos {
540 &self.shared_actor_info
541 }
542
543 pub fn cdc_table_backfill_tracker(&self) -> CdcTableBackfillTrackerRef {
544 self.cdc_table_backfill_tracker.clone()
545 }
546}
547
548#[cfg(any(test, feature = "test"))]
549impl MetaSrvEnv {
550 pub async fn for_test() -> Self {
552 Self::for_test_opts(MetaOpts::test(false), |_| ()).await
553 }
554
555 pub async fn for_test_opts(
556 opts: MetaOpts,
557 on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
558 ) -> Self {
559 let mut system_params = risingwave_common::system_param::system_params_for_test();
560 on_test_system_params(&mut system_params);
561 Self::new(
562 opts,
563 system_params,
564 Default::default(),
565 SqlMetaStore::for_test().await,
566 )
567 .await
568 .unwrap()
569 }
570}