risingwave_meta/manager/
env.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::atomic::AtomicU32;
19
20use anyhow::Context;
21use risingwave_common::config::{
22    CompactionConfig, DefaultParallelism, ObjectStoreConfig, RpcClientConfig,
23};
24use risingwave_common::session_config::SessionConfig;
25use risingwave_common::system_param::reader::SystemParamsReader;
26use risingwave_common::{bail, system_param};
27use risingwave_meta_model::prelude::Cluster;
28use risingwave_pb::meta::SystemParams;
29use risingwave_rpc_client::{
30    FrontendClientPool, FrontendClientPoolRef, StreamClientPool, StreamClientPoolRef,
31};
32use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
33use sea_orm::EntityTrait;
34
35use crate::MetaResult;
36use crate::barrier::SharedActorInfos;
37use crate::barrier::cdc_progress::{CdcTableBackfillTracker, CdcTableBackfillTrackerRef};
38use crate::controller::SqlMetaStore;
39use crate::controller::id::{
40    IdGeneratorManager as SqlIdGeneratorManager, IdGeneratorManagerRef as SqlIdGeneratorManagerRef,
41};
42use crate::controller::session_params::{SessionParamsController, SessionParamsControllerRef};
43use crate::controller::system_param::{SystemParamsController, SystemParamsControllerRef};
44use crate::hummock::sequence::SequenceGenerator;
45use crate::manager::event_log::{EventLogManagerRef, start_event_log_manager};
46use crate::manager::{IdleManager, IdleManagerRef, NotificationManager, NotificationManagerRef};
47use crate::model::ClusterId;
48
49/// [`MetaSrvEnv`] is the global environment in Meta service. The instance will be shared by all
50/// kind of managers inside Meta.
51#[derive(Clone)]
52pub struct MetaSrvEnv {
53    /// id generator manager.
54    id_gen_manager_impl: SqlIdGeneratorManagerRef,
55
56    /// system param manager.
57    system_param_manager_impl: SystemParamsControllerRef,
58
59    /// session param manager.
60    session_param_manager_impl: SessionParamsControllerRef,
61
62    /// meta store.
63    meta_store_impl: SqlMetaStore,
64
65    /// notification manager.
66    notification_manager: NotificationManagerRef,
67
68    pub shared_actor_info: SharedActorInfos,
69
70    /// stream client pool memorization.
71    stream_client_pool: StreamClientPoolRef,
72
73    /// rpc client pool for frontend nodes.
74    frontend_client_pool: FrontendClientPoolRef,
75
76    /// idle status manager.
77    idle_manager: IdleManagerRef,
78
79    event_log_manager: EventLogManagerRef,
80
81    /// Unique identifier of the cluster.
82    cluster_id: ClusterId,
83
84    pub hummock_seq: Arc<SequenceGenerator>,
85
86    /// The await-tree registry of the current meta node.
87    await_tree_reg: await_tree::Registry,
88
89    /// options read by all services
90    pub opts: Arc<MetaOpts>,
91
92    pub cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
93
94    actor_id_generator: Arc<AtomicU32>,
95}
96
97/// Options shared by all meta service instances
98#[derive(Clone, serde::Serialize)]
99pub struct MetaOpts {
100    /// Whether to enable the recovery of the cluster. If disabled, the meta service will exit on
101    /// abnormal cases.
102    pub enable_recovery: bool,
103    /// Whether to disable the auto-scaling feature.
104    pub disable_automatic_parallelism_control: bool,
105    /// The number of streaming jobs per scaling operation.
106    pub parallelism_control_batch_size: usize,
107    /// The period of parallelism control trigger.
108    pub parallelism_control_trigger_period_sec: u64,
109    /// The first delay of parallelism control.
110    pub parallelism_control_trigger_first_delay_sec: u64,
111    /// The maximum number of barriers in-flight in the compute nodes.
112    pub in_flight_barrier_nums: usize,
113    /// After specified seconds of idle (no mview or flush), the process will be exited.
114    /// 0 for infinite, process will never be exited due to long idle time.
115    pub max_idle_ms: u64,
116    /// Whether run in compaction detection test mode
117    pub compaction_deterministic_test: bool,
118    /// Default parallelism of units for all streaming jobs.
119    pub default_parallelism: DefaultParallelism,
120
121    /// Interval of invoking a vacuum job, to remove stale metadata from meta store and objects
122    /// from object store.
123    pub vacuum_interval_sec: u64,
124    /// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of
125    /// meta node.
126    pub vacuum_spin_interval_ms: u64,
127    /// Interval of invoking iceberg garbage collection, to expire old snapshots.
128    pub iceberg_gc_interval_sec: u64,
129    pub time_travel_vacuum_interval_sec: u64,
130    /// Interval of hummock version checkpoint.
131    pub hummock_version_checkpoint_interval_sec: u64,
132    pub enable_hummock_data_archive: bool,
133    pub hummock_time_travel_snapshot_interval: u64,
134    pub hummock_time_travel_sst_info_fetch_batch_size: usize,
135    pub hummock_time_travel_sst_info_insert_batch_size: usize,
136    pub hummock_time_travel_epoch_version_insert_batch_size: usize,
137    pub hummock_gc_history_insert_batch_size: usize,
138    pub hummock_time_travel_filter_out_objects_batch_size: usize,
139    pub hummock_time_travel_filter_out_objects_v1: bool,
140    pub hummock_time_travel_filter_out_objects_list_version_batch_size: usize,
141    pub hummock_time_travel_filter_out_objects_list_delta_batch_size: usize,
142    /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
143    /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
144    /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
145    /// restarted.
146    pub min_delta_log_num_for_hummock_version_checkpoint: u64,
147    /// Objects within `min_sst_retention_time_sec` won't be deleted by hummock full GC, even they
148    /// are dangling.
149    pub min_sst_retention_time_sec: u64,
150    /// Interval of automatic hummock full GC.
151    pub full_gc_interval_sec: u64,
152    /// Max number of object per full GC job can fetch.
153    pub full_gc_object_limit: u64,
154    /// Duration in seconds to retain garbage collection history data.
155    pub gc_history_retention_time_sec: u64,
156    /// Max number of inflight time travel query.
157    pub max_inflight_time_travel_query: u64,
158    /// Enable sanity check when SSTs are committed
159    pub enable_committed_sst_sanity_check: bool,
160    /// Schedule compaction for all compaction groups with this interval.
161    pub periodic_compaction_interval_sec: u64,
162    /// Interval of reporting the number of nodes in the cluster.
163    pub node_num_monitor_interval_sec: u64,
164    /// Whether to protect the drop table operation with incoming sink.
165    pub protect_drop_table_with_incoming_sink: bool,
166    /// The Prometheus endpoint for Meta Dashboard Service.
167    /// The Dashboard service uses this in the following ways:
168    /// 1. Query Prometheus for relevant metrics to find Stream Graph Bottleneck, and display it.
169    /// 2. Provide cluster diagnostics, at `/api/monitor/diagnose` to troubleshoot cluster.
170    ///    These are just examples which show how the Meta Dashboard Service queries Prometheus.
171    pub prometheus_endpoint: Option<String>,
172
173    /// The additional selector used when querying Prometheus.
174    pub prometheus_selector: Option<String>,
175
176    /// The VPC id of the cluster.
177    pub vpc_id: Option<String>,
178
179    /// A usable security group id to assign to a vpc endpoint
180    pub security_group_id: Option<String>,
181
182    /// Default tag for the endpoint created when creating a privatelink connection.
183    /// Will be appended to the tags specified in the `tags` field in with clause in `create
184    /// connection`.
185    pub privatelink_endpoint_default_tags: Option<Vec<(String, String)>>,
186
187    /// Schedule `space_reclaim_compaction` for all compaction groups with this interval.
188    pub periodic_space_reclaim_compaction_interval_sec: u64,
189
190    /// telemetry enabled in config file or not
191    pub telemetry_enabled: bool,
192    /// Schedule `ttl_reclaim_compaction` for all compaction groups with this interval.
193    pub periodic_ttl_reclaim_compaction_interval_sec: u64,
194
195    /// Schedule `tombstone_reclaim_compaction` for all compaction groups with this interval.
196    pub periodic_tombstone_reclaim_compaction_interval_sec: u64,
197
198    /// Schedule `periodic_scheduling_compaction_group_split_interval_sec` for all compaction groups with this interval.
199    pub periodic_scheduling_compaction_group_split_interval_sec: u64,
200
201    /// Whether config object storage bucket lifecycle to purge stale data.
202    pub do_not_config_object_storage_lifecycle: bool,
203
204    pub partition_vnode_count: u32,
205
206    /// threshold of high write throughput of state-table, unit: B/sec
207    pub table_high_write_throughput_threshold: u64,
208    /// threshold of low write throughput of state-table, unit: B/sec
209    pub table_low_write_throughput_threshold: u64,
210
211    pub compaction_task_max_heartbeat_interval_secs: u64,
212    pub compaction_task_max_progress_interval_secs: u64,
213    pub compaction_config: Option<CompactionConfig>,
214
215    /// hybrid compaction group config
216    ///
217    /// `hybrid_partition_vnode_count` determines the granularity of vnodes in the hybrid compaction group for SST alignment.
218    /// When `hybrid_partition_vnode_count` > 0, in hybrid compaction group
219    /// - Tables with high write throughput will be split at vnode granularity
220    /// - Tables with high size tables will be split by table granularity
221    ///   When `hybrid_partition_vnode_count` = 0,no longer be special alignment operations for the hybrid compaction group
222    pub hybrid_partition_node_count: u32,
223
224    pub event_log_enabled: bool,
225    pub event_log_channel_max_size: u32,
226    pub advertise_addr: String,
227    /// The number of traces to be cached in-memory by the tracing collector
228    /// embedded in the meta node.
229    pub cached_traces_num: u32,
230    /// The maximum memory usage in bytes for the tracing collector embedded
231    /// in the meta node.
232    pub cached_traces_memory_limit_bytes: usize,
233
234    /// l0 picker whether to select trivial move task
235    pub enable_trivial_move: bool,
236
237    /// l0 multi level picker whether to check the overlap accuracy between sub levels
238    pub enable_check_task_level_overlap: bool,
239    pub enable_dropped_column_reclaim: bool,
240
241    /// Whether to split the compaction group when the size of the group exceeds the threshold.
242    pub split_group_size_ratio: f64,
243
244    /// To split the compaction group when the high throughput statistics of the group exceeds the threshold.
245    pub table_stat_high_write_throughput_ratio_for_split: f64,
246
247    /// To merge the compaction group when the low throughput statistics of the group exceeds the threshold.
248    pub table_stat_low_write_throughput_ratio_for_merge: f64,
249
250    /// The window seconds of table throughput statistic history for split compaction group.
251    pub table_stat_throuput_window_seconds_for_split: usize,
252
253    /// The window seconds of table throughput statistic history for merge compaction group.
254    pub table_stat_throuput_window_seconds_for_merge: usize,
255
256    /// The configuration of the object store
257    pub object_store_config: ObjectStoreConfig,
258
259    /// The maximum number of trivial move tasks to be picked in a single loop
260    pub max_trivial_move_task_count_per_loop: usize,
261
262    /// The maximum number of times to probe for `PullTaskEvent`
263    pub max_get_task_probe_times: usize,
264
265    pub compact_task_table_size_partition_threshold_low: u64,
266    pub compact_task_table_size_partition_threshold_high: u64,
267
268    pub periodic_scheduling_compaction_group_merge_interval_sec: u64,
269
270    pub compaction_group_merge_dimension_threshold: f64,
271
272    // The private key for the secret store, used when the secret is stored in the meta.
273    pub secret_store_private_key: Option<Vec<u8>>,
274    /// The path of the temp secret file directory.
275    pub temp_secret_file_dir: String,
276
277    // Cluster limits
278    pub actor_cnt_per_worker_parallelism_hard_limit: usize,
279    pub actor_cnt_per_worker_parallelism_soft_limit: usize,
280
281    pub license_key_path: Option<PathBuf>,
282
283    pub compute_client_config: RpcClientConfig,
284    pub stream_client_config: RpcClientConfig,
285    pub frontend_client_config: RpcClientConfig,
286    pub redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
287
288    pub cdc_table_split_init_sleep_interval_splits: u64,
289    pub cdc_table_split_init_sleep_duration_millis: u64,
290    pub cdc_table_split_init_insert_batch_size: u64,
291}
292
293impl MetaOpts {
294    /// Default opts for testing. Some tests need `enable_recovery=true`
295    pub fn test(enable_recovery: bool) -> Self {
296        Self {
297            enable_recovery,
298            disable_automatic_parallelism_control: false,
299            parallelism_control_batch_size: 1,
300            parallelism_control_trigger_period_sec: 10,
301            parallelism_control_trigger_first_delay_sec: 30,
302            in_flight_barrier_nums: 40,
303            max_idle_ms: 0,
304            compaction_deterministic_test: false,
305            default_parallelism: DefaultParallelism::Full,
306            vacuum_interval_sec: 30,
307            time_travel_vacuum_interval_sec: 30,
308            vacuum_spin_interval_ms: 0,
309            iceberg_gc_interval_sec: 3600,
310            hummock_version_checkpoint_interval_sec: 30,
311            enable_hummock_data_archive: false,
312            hummock_time_travel_snapshot_interval: 0,
313            hummock_time_travel_sst_info_fetch_batch_size: 10_000,
314            hummock_time_travel_sst_info_insert_batch_size: 10,
315            hummock_time_travel_epoch_version_insert_batch_size: 1000,
316            hummock_gc_history_insert_batch_size: 1000,
317            hummock_time_travel_filter_out_objects_batch_size: 1000,
318            hummock_time_travel_filter_out_objects_v1: false,
319            hummock_time_travel_filter_out_objects_list_version_batch_size: 10,
320            hummock_time_travel_filter_out_objects_list_delta_batch_size: 1000,
321            min_delta_log_num_for_hummock_version_checkpoint: 1,
322            min_sst_retention_time_sec: 3600 * 24 * 7,
323            full_gc_interval_sec: 3600 * 24 * 7,
324            full_gc_object_limit: 100_000,
325            gc_history_retention_time_sec: 3600 * 24 * 7,
326            max_inflight_time_travel_query: 1000,
327            enable_committed_sst_sanity_check: false,
328            periodic_compaction_interval_sec: 60,
329            node_num_monitor_interval_sec: 10,
330            protect_drop_table_with_incoming_sink: false,
331            prometheus_endpoint: None,
332            prometheus_selector: None,
333            vpc_id: None,
334            security_group_id: None,
335            privatelink_endpoint_default_tags: None,
336            periodic_space_reclaim_compaction_interval_sec: 60,
337            telemetry_enabled: false,
338            periodic_ttl_reclaim_compaction_interval_sec: 60,
339            periodic_tombstone_reclaim_compaction_interval_sec: 60,
340            periodic_scheduling_compaction_group_split_interval_sec: 60,
341            compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024,
342            compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024,
343            table_high_write_throughput_threshold: 128 * 1024 * 1024,
344            table_low_write_throughput_threshold: 64 * 1024 * 1024,
345            do_not_config_object_storage_lifecycle: true,
346            partition_vnode_count: 32,
347            compaction_task_max_heartbeat_interval_secs: 0,
348            compaction_task_max_progress_interval_secs: 1,
349            compaction_config: None,
350            hybrid_partition_node_count: 4,
351            event_log_enabled: false,
352            event_log_channel_max_size: 1,
353            advertise_addr: "".to_owned(),
354            cached_traces_num: 1,
355            cached_traces_memory_limit_bytes: usize::MAX,
356            enable_trivial_move: true,
357            enable_check_task_level_overlap: true,
358            enable_dropped_column_reclaim: false,
359            object_store_config: ObjectStoreConfig::default(),
360            max_trivial_move_task_count_per_loop: 256,
361            max_get_task_probe_times: 5,
362            secret_store_private_key: Some(
363                hex::decode("0123456789abcdef0123456789abcdef").unwrap(),
364            ),
365            temp_secret_file_dir: "./secrets".to_owned(),
366            actor_cnt_per_worker_parallelism_hard_limit: usize::MAX,
367            actor_cnt_per_worker_parallelism_soft_limit: usize::MAX,
368            split_group_size_ratio: 0.9,
369            table_stat_high_write_throughput_ratio_for_split: 0.5,
370            table_stat_low_write_throughput_ratio_for_merge: 0.7,
371            table_stat_throuput_window_seconds_for_split: 60,
372            table_stat_throuput_window_seconds_for_merge: 240,
373            periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10,
374            compaction_group_merge_dimension_threshold: 1.2,
375            license_key_path: None,
376            compute_client_config: RpcClientConfig::default(),
377            stream_client_config: RpcClientConfig::default(),
378            frontend_client_config: RpcClientConfig::default(),
379            redact_sql_option_keywords: Arc::new(Default::default()),
380            cdc_table_split_init_sleep_interval_splits: 1000,
381            cdc_table_split_init_sleep_duration_millis: 10,
382            cdc_table_split_init_insert_batch_size: 1000,
383        }
384    }
385}
386
387impl MetaSrvEnv {
388    pub async fn new(
389        opts: MetaOpts,
390        mut init_system_params: SystemParams,
391        init_session_config: SessionConfig,
392        meta_store_impl: SqlMetaStore,
393    ) -> MetaResult<Self> {
394        let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
395        let stream_client_pool =
396            Arc::new(StreamClientPool::new(1, opts.stream_client_config.clone())); // typically no need for plural clients
397        let frontend_client_pool = Arc::new(FrontendClientPool::new(
398            1,
399            opts.frontend_client_config.clone(),
400        ));
401        let event_log_manager = Arc::new(start_event_log_manager(
402            opts.event_log_enabled,
403            opts.event_log_channel_max_size,
404        ));
405
406        // When license key path is specified, license key from system parameters can be easily
407        // overwritten. So we simply reject this case.
408        if opts.license_key_path.is_some()
409            && init_system_params.license_key
410                != system_param::default::license_key_opt().map(Into::into)
411        {
412            bail!(
413                "argument `--license-key-path` (or env var `RW_LICENSE_KEY_PATH`) and \
414                 system parameter `license_key` (or env var `RW_LICENSE_KEY`) may not \
415                 be set at the same time"
416            );
417        }
418
419        let cluster_first_launch = meta_store_impl.up().await.context(
420            "Failed to initialize the meta store, \
421            this may happen if there's existing metadata incompatible with the current version of RisingWave, \
422            e.g., downgrading from a newer release or a nightly build to an older one. \
423            For a single-node deployment, you may want to reset all data by deleting the data directory, \
424            typically located at `~/.risingwave`.",
425        )?;
426
427        let notification_manager =
428            Arc::new(NotificationManager::new(meta_store_impl.clone()).await);
429        let cluster_id = Cluster::find()
430            .one(&meta_store_impl.conn)
431            .await?
432            .map(|c| c.cluster_id.to_string().into())
433            .unwrap();
434
435        // For new clusters:
436        // - the name of the object store needs to be prefixed according to the object id.
437        //
438        // For old clusters
439        // - the prefix is ​​not divided for the sake of compatibility.
440        init_system_params.use_new_object_prefix_strategy = Some(cluster_first_launch);
441
442        let system_param_controller = Arc::new(
443            SystemParamsController::new(
444                meta_store_impl.clone(),
445                notification_manager.clone(),
446                init_system_params,
447            )
448            .await?,
449        );
450        let session_param_controller = Arc::new(
451            SessionParamsController::new(
452                meta_store_impl.clone(),
453                notification_manager.clone(),
454                init_session_config,
455            )
456            .await?,
457        );
458        let cdc_table_backfill_tracker = CdcTableBackfillTracker::new(meta_store_impl.clone())
459            .await?
460            .into();
461        Ok(Self {
462            id_gen_manager_impl: Arc::new(SqlIdGeneratorManager::new(&meta_store_impl.conn).await?),
463            system_param_manager_impl: system_param_controller,
464            session_param_manager_impl: session_param_controller,
465            meta_store_impl: meta_store_impl.clone(),
466            shared_actor_info: SharedActorInfos::new(notification_manager.clone()),
467            notification_manager,
468            stream_client_pool,
469            frontend_client_pool,
470            idle_manager,
471            event_log_manager,
472            cluster_id,
473            hummock_seq: Arc::new(SequenceGenerator::new(meta_store_impl.conn.clone())),
474            opts: opts.into(),
475            // Await trees on the meta node is lightweight, thus always enabled.
476            await_tree_reg: await_tree::Registry::new(Default::default()),
477            cdc_table_backfill_tracker,
478            actor_id_generator: Arc::new(AtomicU32::new(0)),
479        })
480    }
481
482    pub fn meta_store(&self) -> SqlMetaStore {
483        self.meta_store_impl.clone()
484    }
485
486    pub fn meta_store_ref(&self) -> &SqlMetaStore {
487        &self.meta_store_impl
488    }
489
490    pub fn id_gen_manager(&self) -> &SqlIdGeneratorManagerRef {
491        &self.id_gen_manager_impl
492    }
493
494    pub fn notification_manager_ref(&self) -> NotificationManagerRef {
495        self.notification_manager.clone()
496    }
497
498    pub fn notification_manager(&self) -> &NotificationManager {
499        self.notification_manager.deref()
500    }
501
502    pub fn idle_manager_ref(&self) -> IdleManagerRef {
503        self.idle_manager.clone()
504    }
505
506    pub fn idle_manager(&self) -> &IdleManager {
507        self.idle_manager.deref()
508    }
509
510    pub fn actor_id_generator(&self) -> &AtomicU32 {
511        self.actor_id_generator.deref()
512    }
513
514    pub async fn system_params_reader(&self) -> SystemParamsReader {
515        self.system_param_manager_impl.get_params().await
516    }
517
518    pub fn system_params_manager_impl_ref(&self) -> SystemParamsControllerRef {
519        self.system_param_manager_impl.clone()
520    }
521
522    pub fn session_params_manager_impl_ref(&self) -> SessionParamsControllerRef {
523        self.session_param_manager_impl.clone()
524    }
525
526    pub fn stream_client_pool_ref(&self) -> StreamClientPoolRef {
527        self.stream_client_pool.clone()
528    }
529
530    pub fn stream_client_pool(&self) -> &StreamClientPool {
531        self.stream_client_pool.deref()
532    }
533
534    pub fn frontend_client_pool(&self) -> &FrontendClientPool {
535        self.frontend_client_pool.deref()
536    }
537
538    pub fn cluster_id(&self) -> &ClusterId {
539        &self.cluster_id
540    }
541
542    pub fn event_log_manager_ref(&self) -> EventLogManagerRef {
543        self.event_log_manager.clone()
544    }
545
546    pub fn await_tree_reg(&self) -> &await_tree::Registry {
547        &self.await_tree_reg
548    }
549
550    pub fn shared_actor_infos(&self) -> &SharedActorInfos {
551        &self.shared_actor_info
552    }
553
554    pub fn cdc_table_backfill_tracker(&self) -> CdcTableBackfillTrackerRef {
555        self.cdc_table_backfill_tracker.clone()
556    }
557}
558
559#[cfg(any(test, feature = "test"))]
560impl MetaSrvEnv {
561    // Instance for test.
562    pub async fn for_test() -> Self {
563        Self::for_test_opts(MetaOpts::test(false), |_| ()).await
564    }
565
566    pub async fn for_test_opts(
567        opts: MetaOpts,
568        on_test_system_params: impl FnOnce(&mut risingwave_pb::meta::PbSystemParams),
569    ) -> Self {
570        let mut system_params = risingwave_common::system_param::system_params_for_test();
571        on_test_system_params(&mut system_params);
572        Self::new(
573            opts,
574            system_params,
575            Default::default(),
576            SqlMetaStore::for_test().await,
577        )
578        .await
579        .unwrap()
580    }
581}