1use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19use anyhow::Context;
20use risingwave_common::config::{
21 CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
22};
23use risingwave_common::session_config::SessionConfig;
24use risingwave_common::system_param::reader::SystemParamsReader;
25use risingwave_common::{bail, system_param};
26use risingwave_meta_model::prelude::Cluster;
27use risingwave_pb::meta::SystemParams;
28use risingwave_rpc_client::{
29 FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
30};
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use sea_orm::EntityTrait;
33
34use crate::MetaResult;
35use crate::barrier::SharedActorInfos;
36use crate::controller::SqlMetaStore;
37use crate::controller::id::{
38 IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
39};
40use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
41use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
42use crate::hummock::sequence::SequenceGenerator;
43use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
44use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
45use crate::model::ClusterId;
46
47#[derive(Clone)]
50pub struct MetaSrvEnv {
51 id_gen_manager_impl: SqlIdGeneratorManagerRef,
53
54 system_param_manager_impl: SystemParamsControllerRef,
56
57 session_param_manager_impl: SessionParamsControllerRef,
59
60 meta_store_impl: SqlMetaStore,
62
63 notification_manager: NotificationManagerRef,
65
66 shared_actor_info: SharedActorInfos,
67
68 stream_client_pool: StreamClientPoolRef,
70
71 frontend_client_pool: FrontendClientPoolRef,
73
74 idle_manager: IdleManagerRef,
76
77 event_log_manager: EventLogManagerRef,
78
79 cluster_id: ClusterId,
81
82 pub hummock_seq: Arc<SequenceGenerator>,
83
84 await_tree_reg: await_tree::Registry,
86
87 pub opts: Arc<MetaOpts>,
89}
90
91#[derive(Clone, serde::Serialize)]
93pub struct MetaOpts {
94 pub enable_recovery: bool,
97 pub disable_automatic_parallelism_control: bool,
99 pub parallelism_control_batch_size: usize,
101 pub parallelism_control_trigger_period_sec: u64,
103 pub parallelism_control_trigger_first_delay_sec: u64,
105 pub in_flight_barrier_nums: usize,
107 pub max_idle_ms: u64,
110 pub compaction_deterministic_test: bool,
112 pub default_parallelism: DefaultParallelism,
114
115 pub vacuum_interval_sec: u64,
118 pub vacuum_spin_interval_ms: u64,
121 pub time_travel_vacuum_interval_sec: u64,
122 pub hummock_version_checkpoint_interval_sec: u64,
124 pub enable_hummock_data_archive: bool,
125 pub hummock_time_travel_snapshot_interval: u64,
126 pub hummock_time_travel_sst_info_fetch_batch_size: usize,
127 pub hummock_time_travel_sst_info_insert_batch_size: usize,
128 pub hummock_time_travel_epoch_version_insert_batch_size: usize,
129 pub hummock_gc_history_insert_batch_size: usize,
130 pub hummock_time_travel_filter_out_objects_batch_size: usize,
131 pub hummock_time_travel_filter_out_objects_v1: bool,
132 pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
133 pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
134 pub min_delta_log_num_for_hummock_version_checkpoint: u64,
139 pub min_sst_retention_time_sec: u64,
142 pub full_gc_interval_sec: u64,
144 pub full_gc_object_limit: u64,
146 pub gc_history_retention_time_sec: u64,
148 pub max_inflight_time_travel_query: u64,
150 pub enable_committed_sst_sanity_check: bool,
152 pub periodic_compaction_interval_sec: u64,
154 pub node_num_monitor_interval_sec: u64,
156 pub protect_drop_table_with_incoming_sink: bool,
158 pub prometheus_endpoint: Option<String>,
164
165 pub prometheus_selector: Option<String>,
167
168 pub vpc_id: Option<String>,
170
171 pub security_group_id: Option<String>,
173
174 pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
178
179 pub periodic_space_reclaim_compaction_interval_sec: u64,
181
182 pub telemetry_enabled: bool,
184 pub periodic_ttl_reclaim_compaction_interval_sec: u64,
186
187 pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
189
190 pub periodic_scheduling_compaction_group_split_interval_sec: u64,
192
193 pub do_not_config_object_storage_lifecycle: bool,
195
196 pub partition_vnode_count: u32,
197
198 pub table_high_write_throughput_threshold: u64,
200 pub table_low_write_throughput_threshold: u64,
202
203 pub compaction_task_max_heartbeat_interval_secs: u64,
204 pub compaction_task_max_progress_interval_secs: u64,
205 pub compaction_config: Option<CompactionConfig>,
206
207 pub hybrid_partition_node_count: u32,
215
216 pub event_log_enabled: bool,
217 pub event_log_channel_max_size: u32,
218 pub advertise_addr: String,
219 pub cached_traces_num: u32,
222 pub cached_traces_memory_limit_bytes: usize,
225
226 pub enable_trivial_move: bool,
228
229 pub enable_check_task_level_overlap: bool,
231 pub enable_dropped_column_reclaim: bool,
232
233 pub split_group_size_ratio: f64,
235
236 pub table_stat_high_write_throughput_ratio_for_split: f64,
238
239 pub table_stat_low_write_throughput_ratio_for_merge: f64,
241
242 pub table_stat_throuput_window_seconds_for_split: usize,
244
245 pub table_stat_throuput_window_seconds_for_merge: usize,
247
248 pub object_store_config: ObjectStoreConfig,
250
251 pub max_trivial_move_task_count_per_loop: usize,
253
254 pub max_get_task_probe_times: usize,
256
257 pub compact_task_table_size_partition_threshold_low: u64,
258 pub compact_task_table_size_partition_threshold_high: u64,
259
260 pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
261
262 pub compaction_group_merge_dimension_threshold: f64,
263
264 pub secret_store_private_key: Option<Vec<u8>>,
266 pub temp_secret_file_dir: String,
268
269 pub actor_cnt_per_worker_parallelism_hard_limit: usize,
271 pub actor_cnt_per_worker_parallelism_soft_limit: usize,
272
273 pub license_key_path: Option<PathBuf>,
274
275 pub compute_client_config: RpcClientConfig,
276 pub stream_client_config: RpcClientConfig,
277 pub frontend_client_config: RpcClientConfig,
278 pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
279
280 pub cdc_table_split_init_sleep_interval_splits: u64,
281 pub cdc_table_split_init_sleep_duration_millis: u64,
282 pub cdc_table_split_init_insert_batch_size: u64,
283}
284
285impl MetaOpts {
286 pub fn test(enable_recovery: bool) -> Self {
288 Self {
289 enable_recovery,
290 disable_automatic_parallelism_control: false,
291 parallelism_control_batch_size: 1,
292 parallelism_control_trigger_period_sec: 10,
293 parallelism_control_trigger_first_delay_sec: 30,
294 in_flight_barrier_nums: 40,
295 max_idle_ms: 0,
296 compaction_deterministic_test: false,
297 default_parallelism: DefaultParallelism::Full,
298 vacuum_interval_sec: 30,
299 time_travel_vacuum_interval_sec: 30,
300 vacuum_spin_interval_ms: 0,
301 hummock_version_checkpoint_interval_sec: 30,
302 enable_hummock_data_archive: false,
303 hummock_time_travel_snapshot_interval: 0,
304 hummock_time_travel_sst_info_fetch_batch_size: 10_000,
305 hummock_time_travel_sst_info_insert_batch_size: 10,
306 hummock_time_travel_epoch_version_insert_batch_size: 1000,
307 hummock_gc_history_insert_batch_size: 1000,
308 hummock_time_travel_filter_out_objects_batch_size: 1000,
309 hummock_time_travel_filter_out_objects_v1: false,
310 hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
311 hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
312 min_delta_log_num_for_hummock_version_checkpoint: 1,
313 min_sst_retention_time_sec: 3600 * 24 * 7,
314 full_gc_interval_sec: 3600 * 24 * 7,
315 full_gc_object_limit: 100_000,
316 gc_history_retention_time_sec: 3600 * 24 * 7,
317 max_inflight_time_travel_query: 1000,
318 enable_committed_sst_sanity_check: false,
319 periodic_compaction_interval_sec: 60,
320 node_num_monitor_interval_sec: 10,
321 protect_drop_table_with_incoming_sink: false,
322 prometheus_endpoint: None,
323 prometheus_selector: None,
324 vpc_id: None,
325 security_group_id: None,
326 privatelink_endpoint_default_tags: None,
327 periodic_space_reclaim_compaction_interval_sec: 60,
328 telemetry_enabled: false,
329 periodic_ttl_reclaim_compaction_interval_sec: 60,
330 periodic_tombstone_reclaim_compaction_interval_sec: 60,
331 periodic_scheduling_compaction_group_split_interval_sec: 60,
332 compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
333 compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
334 table_high_write_throughput_threshold: 128 * 1024 * 1024,
335 table_low_write_throughput_threshold: 64 * 1024 * 1024,
336 do_not_config_object_storage_lifecycle: true,
337 partition_vnode_count: 32,
338 compaction_task_max_heartbeat_interval_secs: 0,
339 compaction_task_max_progress_interval_secs: 1,
340 compaction_config: None,
341 hybrid_partition_node_count: 4,
342 event_log_enabled: false,
343 event_log_channel_max_size: 1,
344 advertise_addr: "".to_owned(),
345 cached_traces_num: 1,
346 cached_traces_memory_limit_bytes: usize::MAX,
347 enable_trivial_move: true,
348 enable_check_task_level_overlap: true,
349 enable_dropped_column_reclaim: false,
350 object_store_config: ObjectStoreConfig::default(),
351 max_trivial_move_task_count_per_loop: 256,
352 max_get_task_probe_times: 5,
353 secret_store_private_key: Some(
354 hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
355 ),
356 temp_secret_file_dir: "./secrets".to_owned(),
357 actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
358 actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
359 split_group_size_ratio: 0.9,
360 table_stat_high_write_throughput_ratio_for_split: 0.5,
361 table_stat_low_write_throughput_ratio_for_merge: 0.7,
362 table_stat_throuput_window_seconds_for_split: 60,
363 table_stat_throuput_window_seconds_for_merge: 240,
364 periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
365 compaction_group_merge_dimension_threshold: 1.2,
366 license_key_path: None,
367 compute_client_config: RpcClientConfig::default(),
368 stream_client_config: RpcClientConfig::default(),
369 frontend_client_config: RpcClientConfig::default(),
370 redact_sql_option_keywords: Arc::new(Default::default()),
371 cdc_table_split_init_sleep_interval_splits: 1000,
372 cdc_table_split_init_sleep_duration_millis: 10,
373 cdc_table_split_init_insert_batch_size: 1000,
374 }
375 }
376}
377
378impl MetaSrvEnv {
379 pub async fn new(
380 opts: MetaOpts,
381 mut init_system_params: SystemParams,
382 init_session_config: SessionConfig,
383 meta_store_impl: SqlMetaStore,
384 ) -> MetaResult<Self> {
385 let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
386 let stream_client_pool =
387 Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); let frontend_client_pool = Arc::new(FrontendClientPool::new(
389 1,
390 opts.frontend_client_config.clone(),
391 ));
392 let event_log_manager = Arc::new(start_event_log_manager(
393 opts.event_log_enabled,
394 opts.event_log_channel_max_size,
395 ));
396
397 if opts.license_key_path.is_some()
400 && init_system_params.license_key
401 != system_param::default::license_key_opt().map(Into::into)
402 {
403 bail!(
404 "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
405 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
406 be set at the same time"
407 );
408 }
409
410 let cluster_first_launch = meta_store_impl.up().await.context(
411 "Failed to initialize the meta store, \
412 this may happen if there's existing metadata incompatible with the current version of RisingWave, \
413 e.g., downgrading from a newer release or a nightly build to an older one. \
414 For a single-node deployment, you may want to reset all data by deleting the data directory, \
415 typically located at `~/.risingwave`.",
416 )?;
417
418 let notification_manager =
419 Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
420 let cluster_id = Cluster::find()
421 .one(&meta_store_impl.conn)
422 .await?
423 .map(|c| c.cluster_id.to_string().into())
424 .unwrap();
425
426 init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
432
433 let system_param_controller = Arc::new(
434 SystemParamsController::new(
435 meta_store_impl.clone(),
436 notification_manager.clone(),
437 init_system_params,
438 )
439 .await?,
440 );
441 let session_param_controller = Arc::new(
442 SessionParamsController::new(
443 meta_store_impl.clone(),
444 notification_manager.clone(),
445 init_session_config,
446 )
447 .await?,
448 );
449 Ok(Self {
450 id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
451 system_param_manager_impl: system_param_controller,
452 session_param_manager_impl: session_param_controller,
453 meta_store_impl: meta_store_impl.clone(),
454 shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
455 notification_manager,
456 stream_client_pool,
457 frontend_client_pool,
458 idle_manager,
459 event_log_manager,
460 cluster_id,
461 hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
462 opts: opts.into(),
463 await_tree_reg: await_tree::Registry::new(Default::default()),
465 })
466 }
467
468 pub fn meta_store(&self) -> SqlMetaStore {
469 self.meta_store_impl.clone()
470 }
471
472 pub fn meta_store_ref(&self) -> &SqlMetaStore {
473 &self.meta_store_impl
474 }
475
476 pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
477 &self.id_gen_manager_impl
478 }
479
480 pub fn notification_manager_ref(&self) -> NotificationManagerRef {
481 self.notification_manager.clone()
482 }
483
484 pub fn notification_manager(&self) -> &NotificationManager {
485 self.notification_manager.deref()
486 }
487
488 pub fn idle_manager_ref(&self) -> IdleManagerRef {
489 self.idle_manager.clone()
490 }
491
492 pub fn idle_manager(&self) -> &IdleManager {
493 self.idle_manager.deref()
494 }
495
496 pub async fn system_params_reader(&self) -> SystemParamsReader {
497 self.system_param_manager_impl.get_params().await
498 }
499
500 pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
501 self.system_param_manager_impl.clone()
502 }
503
504 pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
505 self.session_param_manager_impl.clone()
506 }
507
508 pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
509 self.stream_client_pool.clone()
510 }
511
512 pub fn stream_client_pool(&self) -> &StreamClientPool {
513 self.stream_client_pool.deref()
514 }
515
516 pub fn frontend_client_pool(&self) -> &FrontendClientPool {
517 self.frontend_client_pool.deref()
518 }
519
520 pub fn cluster_id(&self) -> &ClusterId {
521 &self.cluster_id
522 }
523
524 pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
525 self.event_log_manager.clone()
526 }
527
528 pub fn await_tree_reg(&self) -> &await_tree::Registry {
529 &self.await_tree_reg
530 }
531
532 pub(crate) fn shared_actor_infos(&self) -> &SharedActorInfos {
533 &self.shared_actor_info
534 }
535}
536
537#[cfg(any(test, feature = "test"))]
538impl MetaSrvEnv {
539 pub async fn for_test() -> Self {
541 Self::for_test_opts(MetaOpts::test(false), |_| ()).await
542 }
543
544 pub async fn for_test_opts(
545 opts: MetaOpts,
546 on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
547 ) -> Self {
548 let mut system_params = risingwave_common::system_param::system_params_for_test();
549 on_test_system_params(&mut system_params);
550 Self::new(
551 opts,
552 system_params,
553 Default::default(),
554 SqlMetaStore::for_test().await,
555 )
556 .await
557 .unwrap()
558 }
559}