use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use risingwave_common::config::{CompactionConfig, DefaultParallelism, ObjectStoreConfig};
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::{bail, system_param};
use risingwave_meta_model::prelude::Cluster;
use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
use risingwave_pb::meta::SystemParams;
use risingwave_rpc_client::{
FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
};
use sea_orm::EntityTrait;
use crate::controller::id::{
IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
};
use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
use crate::controller::SqlMetaStore;
use crate::hummock::sequence::SequenceGenerator;
use crate::manager::event_log::{start_event_log_manager, EventLogManagerRef};
use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
use crate::model::ClusterId;
use crate::MetaResult;
#[derive(Clone)]
pub struct MetaSrvEnv {
id_gen_manager_impl: SqlIdGeneratorManagerRef,
system_param_manager_impl: SystemParamsControllerRef,
session_param_manager_impl: SessionParamsControllerRef,
meta_store_impl: SqlMetaStore,
notification_manager: NotificationManagerRef,
stream_client_pool: StreamClientPoolRef,
frontend_client_pool: FrontendClientPoolRef,
idle_manager: IdleManagerRef,
event_log_manager: EventLogManagerRef,
cluster_id: ClusterId,
pub hummock_seq: Arc<SequenceGenerator>,
pub opts: Arc<MetaOpts>,
}
#[derive(Clone, serde::Serialize)]
pub struct MetaOpts {
pub enable_recovery: bool,
pub disable_automatic_parallelism_control: bool,
pub parallelism_control_batch_size: usize,
pub parallelism_control_trigger_period_sec: u64,
pub parallelism_control_trigger_first_delay_sec: u64,
pub in_flight_barrier_nums: usize,
pub max_idle_ms: u64,
pub compaction_deterministic_test: bool,
pub default_parallelism: DefaultParallelism,
pub vacuum_interval_sec: u64,
pub vacuum_spin_interval_ms: u64,
pub hummock_version_checkpoint_interval_sec: u64,
pub enable_hummock_data_archive: bool,
pub hummock_time_travel_snapshot_interval: u64,
pub hummock_time_travel_sst_info_fetch_batch_size: usize,
pub min_delta_log_num_for_hummock_version_checkpoint: u64,
pub min_sst_retention_time_sec: u64,
pub full_gc_interval_sec: u64,
pub full_gc_object_limit: u64,
pub gc_history_retention_time_sec: u64,
pub max_inflight_time_travel_query: u64,
pub enable_committed_sst_sanity_check: bool,
pub periodic_compaction_interval_sec: u64,
pub node_num_monitor_interval_sec: u64,
pub prometheus_endpoint: Option<String>,
pub prometheus_selector: Option<String>,
pub vpc_id: Option<String>,
pub security_group_id: Option<String>,
pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
pub periodic_space_reclaim_compaction_interval_sec: u64,
pub telemetry_enabled: bool,
pub periodic_ttl_reclaim_compaction_interval_sec: u64,
pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
pub periodic_scheduling_compaction_group_split_interval_sec: u64,
pub do_not_config_object_storage_lifecycle: bool,
pub partition_vnode_count: u32,
pub table_high_write_throughput_threshold: u64,
pub table_low_write_throughput_threshold: u64,
pub compaction_task_max_heartbeat_interval_secs: u64,
pub compaction_task_max_progress_interval_secs: u64,
pub compaction_config: Option<CompactionConfig>,
pub hybrid_partition_node_count: u32,
pub event_log_enabled: bool,
pub event_log_channel_max_size: u32,
pub advertise_addr: String,
pub cached_traces_num: u32,
pub cached_traces_memory_limit_bytes: usize,
pub enable_trivial_move: bool,
pub enable_check_task_level_overlap: bool,
pub enable_dropped_column_reclaim: bool,
pub split_group_size_ratio: f64,
pub table_stat_high_write_throughput_ratio_for_split: f64,
pub table_stat_low_write_throughput_ratio_for_merge: f64,
pub table_stat_throuput_window_seconds_for_split: usize,
pub table_stat_throuput_window_seconds_for_merge: usize,
pub object_store_config: ObjectStoreConfig,
pub max_trivial_move_task_count_per_loop: usize,
pub max_get_task_probe_times: usize,
pub compact_task_table_size_partition_threshold_low: u64,
pub compact_task_table_size_partition_threshold_high: u64,
pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
pub secret_store_private_key: Option<Vec<u8>>,
pub temp_secret_file_dir: String,
pub actor_cnt_per_worker_parallelism_hard_limit: usize,
pub actor_cnt_per_worker_parallelism_soft_limit: usize,
pub license_key_path: Option<PathBuf>,
}
impl MetaOpts {
pub fn test(enable_recovery: bool) -> Self {
Self {
enable_recovery,
disable_automatic_parallelism_control: false,
parallelism_control_batch_size: 1,
parallelism_control_trigger_period_sec: 10,
parallelism_control_trigger_first_delay_sec: 30,
in_flight_barrier_nums: 40,
max_idle_ms: 0,
compaction_deterministic_test: false,
default_parallelism: DefaultParallelism::Full,
vacuum_interval_sec: 30,
vacuum_spin_interval_ms: 0,
hummock_version_checkpoint_interval_sec: 30,
enable_hummock_data_archive: false,
hummock_time_travel_snapshot_interval: 0,
hummock_time_travel_sst_info_fetch_batch_size: 10_000,
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
full_gc_interval_sec: 3600 * 24 * 7,
full_gc_object_limit: 100_000,
gc_history_retention_time_sec: 3600 * 24 * 7,
max_inflight_time_travel_query: 1000,
enable_committed_sst_sanity_check: false,
periodic_compaction_interval_sec: 60,
node_num_monitor_interval_sec: 10,
prometheus_endpoint: None,
prometheus_selector: None,
vpc_id: None,
security_group_id: None,
privatelink_endpoint_default_tags: None,
periodic_space_reclaim_compaction_interval_sec: 60,
telemetry_enabled: false,
periodic_ttl_reclaim_compaction_interval_sec: 60,
periodic_tombstone_reclaim_compaction_interval_sec: 60,
periodic_scheduling_compaction_group_split_interval_sec: 60,
compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
table_high_write_throughput_threshold: 128 * 1024 * 1024,
table_low_write_throughput_threshold: 64 * 1024 * 1024,
do_not_config_object_storage_lifecycle: true,
partition_vnode_count: 32,
compaction_task_max_heartbeat_interval_secs: 0,
compaction_task_max_progress_interval_secs: 1,
compaction_config: None,
hybrid_partition_node_count: 4,
event_log_enabled: false,
event_log_channel_max_size: 1,
advertise_addr: "".to_string(),
cached_traces_num: 1,
cached_traces_memory_limit_bytes: usize::MAX,
enable_trivial_move: true,
enable_check_task_level_overlap: true,
enable_dropped_column_reclaim: false,
object_store_config: ObjectStoreConfig::default(),
max_trivial_move_task_count_per_loop: 256,
max_get_task_probe_times: 5,
secret_store_private_key: Some(
hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
),
temp_secret_file_dir: "./secrets".to_string(),
actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
split_group_size_ratio: 0.9,
table_stat_high_write_throughput_ratio_for_split: 0.5,
table_stat_low_write_throughput_ratio_for_merge: 0.7,
table_stat_throuput_window_seconds_for_split: 60,
table_stat_throuput_window_seconds_for_merge: 240,
periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
license_key_path: None,
}
}
}
pub async fn is_first_launch_for_sql_backend_cluster(
sql_meta_store: &SqlMetaStore,
) -> MetaResult<bool> {
let migrations = Migrator::get_applied_migrations(&sql_meta_store.conn).await?;
for migration in migrations {
if migration.name() == "m20230908_072257_init"
&& migration.status() == MigrationStatus::Applied
{
return Ok(false);
}
}
Ok(true)
}
impl MetaSrvEnv {
pub async fn new(
opts: MetaOpts,
mut init_system_params: SystemParams,
init_session_config: SessionConfig,
meta_store_impl: SqlMetaStore,
) -> MetaResult<Self> {
let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
let stream_client_pool = Arc::new(StreamClientPool::new(1)); let frontend_client_pool = Arc::new(FrontendClientPool::new(1));
let event_log_manager = Arc::new(start_event_log_manager(
opts.event_log_enabled,
opts.event_log_channel_max_size,
));
if opts.license_key_path.is_some()
&& init_system_params.license_key
!= system_param::default::license_key_opt().map(Into::into)
{
bail!(
"argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
be set at the same time"
);
}
let cluster_first_launch =
is_first_launch_for_sql_backend_cluster(&meta_store_impl).await?;
Migrator::up(&meta_store_impl.conn, None)
.await
.expect("Failed to upgrade models in meta store");
let notification_manager =
Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
let cluster_id = Cluster::find()
.one(&meta_store_impl.conn)
.await?
.map(|c| c.cluster_id.to_string().into())
.unwrap();
init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
let system_param_controller = Arc::new(
SystemParamsController::new(
meta_store_impl.clone(),
notification_manager.clone(),
init_system_params,
)
.await?,
);
let session_param_controller = Arc::new(
SessionParamsController::new(
meta_store_impl.clone(),
notification_manager.clone(),
init_session_config,
)
.await?,
);
Ok(Self {
id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
system_param_manager_impl: system_param_controller,
session_param_manager_impl: session_param_controller,
meta_store_impl: meta_store_impl.clone(),
notification_manager,
stream_client_pool,
frontend_client_pool,
idle_manager,
event_log_manager,
cluster_id,
hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
opts: opts.into(),
})
}
pub fn meta_store(&self) -> SqlMetaStore {
self.meta_store_impl.clone()
}
pub fn meta_store_ref(&self) -> &SqlMetaStore {
&self.meta_store_impl
}
pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
&self.id_gen_manager_impl
}
pub fn notification_manager_ref(&self) -> NotificationManagerRef {
self.notification_manager.clone()
}
pub fn notification_manager(&self) -> &NotificationManager {
self.notification_manager.deref()
}
pub fn idle_manager_ref(&self) -> IdleManagerRef {
self.idle_manager.clone()
}
pub fn idle_manager(&self) -> &IdleManager {
self.idle_manager.deref()
}
pub async fn system_params_reader(&self) -> SystemParamsReader {
self.system_param_manager_impl.get_params().await
}
pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
self.system_param_manager_impl.clone()
}
pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
self.session_param_manager_impl.clone()
}
pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
self.stream_client_pool.clone()
}
pub fn stream_client_pool(&self) -> &StreamClientPool {
self.stream_client_pool.deref()
}
pub fn frontend_client_pool(&self) -> &FrontendClientPool {
self.frontend_client_pool.deref()
}
pub fn cluster_id(&self) -> &ClusterId {
&self.cluster_id
}
pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
self.event_log_manager.clone()
}
}
#[cfg(any(test, feature = "test"))]
impl MetaSrvEnv {
pub async fn for_test() -> Self {
Self::for_test_opts(MetaOpts::test(false)).await
}
pub async fn for_test_opts(opts: MetaOpts) -> Self {
Self::new(
opts,
risingwave_common::system_param::system_params_for_test(),
Default::default(),
SqlMetaStore::for_test().await,
)
.await
.unwrap()
}
}