risingwave_meta_node/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::monitor::{RouterExt, TcpConfig};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::hummock::IcebergCompactorManager;
33use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManager;
34use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
35use risingwave_meta::rpc::ElectionClientRef;
36use risingwave_meta::rpc::election::dummy::DummyElectionClient;
37use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
38use risingwave_meta::stream::ScaleController;
39use risingwave_meta_service::AddressInfo;
40use risingwave_meta_service::backup_service::BackupServiceImpl;
41use risingwave_meta_service::cloud_service::CloudServiceImpl;
42use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
43use risingwave_meta_service::cluster_service::ClusterServiceImpl;
44use risingwave_meta_service::ddl_service::DdlServiceImpl;
45use risingwave_meta_service::event_log_service::EventLogServiceImpl;
46use risingwave_meta_service::health_service::HealthServiceImpl;
47use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
48use risingwave_meta_service::hosted_iceberg_catalog_service::HostedIcebergCatalogServiceImpl;
49use risingwave_meta_service::hummock_service::HummockServiceImpl;
50use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
51use risingwave_meta_service::monitor_service::MonitorServiceImpl;
52use risingwave_meta_service::notification_service::NotificationServiceImpl;
53use risingwave_meta_service::scale_service::ScaleServiceImpl;
54use risingwave_meta_service::serving_service::ServingServiceImpl;
55use risingwave_meta_service::session_config::SessionParamsServiceImpl;
56use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
57use risingwave_meta_service::stream_service::StreamServiceImpl;
58use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
59use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
60use risingwave_meta_service::user_service::UserServiceImpl;
61use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
62use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
63use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
64use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
65use risingwave_pb::health::health_server::HealthServer;
66use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
67use risingwave_pb::meta::SystemParams;
68use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
69use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
70use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
71use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
72use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogServiceServer;
73use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
74use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
75use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
76use risingwave_pb::meta::serving_service_server::ServingServiceServer;
77use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
78use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
79use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
80use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::user_service_server::UserServiceServer;
83use risingwave_rpc_client::ComputeClientPool;
84use sea_orm::{ConnectionTrait, DbBackend};
85use thiserror_ext::AsReport;
86use tokio::sync::watch;
87
88use crate::backup_restore::BackupManager;
89use crate::barrier::BarrierScheduler;
90use crate::controller::SqlMetaStore;
91use crate::controller::system_param::SystemParamsController;
92use crate::hummock::HummockManager;
93use crate::manager::sink_coordination::SinkCoordinatorManager;
94use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
95use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
96use crate::rpc::metrics::{
97    GLOBAL_META_METRICS, start_fragment_info_monitor, start_worker_info_monitor,
98};
99use crate::serving::ServingVnodeMapping;
100use crate::stream::{GlobalStreamManager, SourceManager};
101use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
102use crate::{MetaError, MetaResult, hummock, serving};
103
104/// Used for standalone mode checking the status of the meta service.
105/// This can be easier and more accurate than checking the TCP connection.
106pub mod started {
107    use std::sync::atomic::AtomicBool;
108    use std::sync::atomic::Ordering::Relaxed;
109
110    static STARTED: AtomicBool = AtomicBool::new(false);
111
112    /// Mark the meta service as started.
113    pub(crate) fn set() {
114        STARTED.store(true, Relaxed);
115    }
116
117    /// Check if the meta service has started.
118    pub fn get() -> bool {
119        STARTED.load(Relaxed)
120    }
121}
122
123/// A wrapper around [`rpc_serve_with_store`] that dispatches different store implementations.
124///
125/// For the timing of returning, see [`rpc_serve_with_store`].
126pub async fn rpc_serve(
127    address_info: AddressInfo,
128    meta_store_backend: MetaStoreBackend,
129    max_cluster_heartbeat_interval: Duration,
130    lease_interval_secs: u64,
131    opts: MetaOpts,
132    init_system_params: SystemParams,
133    init_session_config: SessionConfig,
134    shutdown: CancellationToken,
135) -> MetaResult<()> {
136    let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
137
138    let election_client = match meta_store_backend {
139        MetaStoreBackend::Mem => {
140            // Use a dummy election client.
141            Arc::new(DummyElectionClient::new(
142                address_info.advertise_addr.clone(),
143            ))
144        }
145        MetaStoreBackend::Sql { .. } => {
146            // Init election client.
147            let id = address_info.advertise_addr.clone();
148            let conn = meta_store_impl.conn.clone();
149            let election_client: ElectionClientRef = match conn.get_database_backend() {
150                DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
151                DbBackend::Postgres => {
152                    Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
153                }
154                DbBackend::MySql => {
155                    Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
156                }
157            };
158            election_client.init().await?;
159
160            election_client
161        }
162    };
163
164    rpc_serve_with_store(
165        meta_store_impl,
166        election_client,
167        address_info,
168        max_cluster_heartbeat_interval,
169        lease_interval_secs,
170        opts,
171        init_system_params,
172        init_session_config,
173        shutdown,
174    )
175    .await
176}
177
178/// Bootstraps the follower or leader service based on the election status.
179///
180/// Returns when the `shutdown` token is triggered, or when leader status is lost, or if the leader
181/// service fails to start.
182pub async fn rpc_serve_with_store(
183    meta_store_impl: SqlMetaStore,
184    election_client: ElectionClientRef,
185    address_info: AddressInfo,
186    max_cluster_heartbeat_interval: Duration,
187    lease_interval_secs: u64,
188    opts: MetaOpts,
189    init_system_params: SystemParams,
190    init_session_config: SessionConfig,
191    shutdown: CancellationToken,
192) -> MetaResult<()> {
193    // TODO(shutdown): directly use cancellation token
194    let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
195
196    let election_handle = tokio::spawn({
197        let shutdown = shutdown.clone();
198        let election_client = election_client.clone();
199
200        async move {
201            while let Err(e) = election_client
202                .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
203                .await
204            {
205                tracing::error!(error = %e.as_report(), "election error happened");
206            }
207            // Leader lost, shutdown the service.
208            shutdown.cancel();
209        }
210    });
211
212    // Spawn and run the follower service if not the leader.
213    // Watch the leader status and switch to the leader service when elected.
214    // TODO: the branch seems to be always hit since the default value of `is_leader` is false until
215    // the election is done (unless using `DummyElectionClient`).
216    if !election_client.is_leader() {
217        // The follower service can be shutdown separately if we're going to be the leader.
218        let follower_shutdown = shutdown.child_token();
219
220        let follower_handle = tokio::spawn(start_service_as_election_follower(
221            follower_shutdown.clone(),
222            address_info.clone(),
223            election_client.clone(),
224        ));
225
226        // Watch and wait until we become the leader.
227        let mut is_leader_watcher = election_client.subscribe();
228
229        while !*is_leader_watcher.borrow_and_update() {
230            tokio::select! {
231                // External shutdown signal. Directly return without switching to leader.
232                _ = shutdown.cancelled() => return Ok(()),
233
234                res = is_leader_watcher.changed() => {
235                    if res.is_err() {
236                        tracing::error!("leader watcher recv failed");
237                    }
238                }
239            }
240        }
241
242        tracing::info!("elected as leader, shutting down follower services");
243        follower_shutdown.cancel();
244        let _ = follower_handle.await;
245    }
246
247    // Run the leader service.
248    let result = start_service_as_election_leader(
249        meta_store_impl,
250        address_info,
251        max_cluster_heartbeat_interval,
252        opts,
253        init_system_params,
254        init_session_config,
255        election_client,
256        shutdown,
257    )
258    .await;
259
260    // Leader service has stopped, shutdown the election service to gracefully resign.
261    election_shutdown_tx.send(()).ok();
262    let _ = election_handle.await;
263
264    result
265}
266
267/// Starts all services needed for the meta follower node.
268///
269/// Returns when the `shutdown` token is triggered.
270pub async fn start_service_as_election_follower(
271    shutdown: CancellationToken,
272    address_info: AddressInfo,
273    election_client: ElectionClientRef,
274) {
275    tracing::info!("starting follower services");
276
277    let meta_member_srv = MetaMemberServiceImpl::new(election_client);
278
279    let health_srv = HealthServiceImpl::new();
280
281    let server = tonic::transport::Server::builder()
282        .layer(MetricsMiddlewareLayer::new(Arc::new(
283            GLOBAL_META_METRICS.clone(),
284        )))
285        .layer(TracingExtractLayer::new())
286        .add_service(MetaMemberServiceServer::new(meta_member_srv))
287        .add_service(HealthServer::new(health_srv))
288        .monitored_serve_with_shutdown(
289            address_info.listen_addr,
290            "grpc-meta-follower-service",
291            TcpConfig {
292                tcp_nodelay: true,
293                keepalive_duration: None,
294            },
295            shutdown.clone().cancelled_owned(),
296        );
297    let server_handle = tokio::spawn(server);
298    started::set();
299
300    // Wait for the shutdown signal.
301    shutdown.cancelled().await;
302    // Wait for the server to shutdown. This is necessary because we may be transitioning from follower
303    // to leader, and conflicts on the services must be avoided.
304    let _ = server_handle.await;
305}
306
307/// Starts all services needed for the meta leader node.
308///
309/// Returns when the `shutdown` token is triggered, or if the service initialization fails.
310pub async fn start_service_as_election_leader(
311    meta_store_impl: SqlMetaStore,
312    address_info: AddressInfo,
313    max_cluster_heartbeat_interval: Duration,
314    opts: MetaOpts,
315    init_system_params: SystemParams,
316    init_session_config: SessionConfig,
317    election_client: ElectionClientRef,
318    shutdown: CancellationToken,
319) -> MetaResult<()> {
320    tracing::info!("starting leader services");
321
322    let env = MetaSrvEnv::new(
323        opts.clone(),
324        init_system_params,
325        init_session_config,
326        meta_store_impl,
327    )
328    .await?;
329    tracing::info!("MetaSrvEnv started");
330    let _ = env.may_start_watch_license_key_file()?;
331    let system_params_reader = env.system_params_reader().await;
332
333    let data_directory = system_params_reader.data_directory();
334    if !is_correct_data_directory(data_directory) {
335        return Err(MetaError::system_params(format!(
336            "The data directory {:?} is misconfigured.
337            Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
338            The string cannot start or end with '/', and consecutive '/' are not allowed.
339            The data directory cannot be empty and its length should not exceed 800 characters.",
340            data_directory
341        )));
342    }
343
344    let cluster_controller = Arc::new(
345        ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
346            .await
347            .unwrap(),
348    );
349    let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
350    let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
351
352    let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
353    let max_serving_parallelism = env
354        .session_params_manager_impl_ref()
355        .get_params()
356        .await
357        .batch_parallelism()
358        .map(|p| p.get());
359    serving::on_meta_start(
360        env.notification_manager_ref(),
361        &metadata_manager,
362        serving_vnode_mapping.clone(),
363        max_serving_parallelism,
364    )
365    .await;
366
367    let compactor_manager = Arc::new(
368        hummock::CompactorManager::with_meta(env.clone())
369            .await
370            .unwrap(),
371    );
372    tracing::info!("CompactorManager started");
373
374    let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
375    tracing::info!("HeartbeatServiceImpl started");
376
377    let (compactor_streams_change_tx, compactor_streams_change_rx) =
378        tokio::sync::mpsc::unbounded_channel();
379
380    let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
381
382    let hummock_manager = hummock::HummockManager::new(
383        env.clone(),
384        metadata_manager.clone(),
385        meta_metrics.clone(),
386        compactor_manager.clone(),
387        compactor_streams_change_tx,
388    )
389    .await
390    .unwrap();
391    tracing::info!("HummockManager started");
392    let object_store_media_type = hummock_manager.object_store_media_type();
393
394    let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
395
396    let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
397        use std::str::FromStr;
398        prometheus_http_query::Client::from_str(x).unwrap()
399    });
400    let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
401    let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
402        metadata_manager.clone(),
403        env.await_tree_reg().clone(),
404        hummock_manager.clone(),
405        env.event_log_manager_ref(),
406        prometheus_client.clone(),
407        prometheus_selector.clone(),
408        opts.redact_sql_option_keywords.clone(),
409    ));
410
411    let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
412        max_length: opts.cached_traces_num,
413        max_memory_usage: opts.cached_traces_memory_limit_bytes,
414    });
415    let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
416
417    #[cfg(not(madsim))]
418    let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
419        let dashboard_service = crate::dashboard::DashboardService {
420            await_tree_reg: env.await_tree_reg().clone(),
421            dashboard_addr: *dashboard_addr,
422            prometheus_client,
423            prometheus_selector,
424            metadata_manager: metadata_manager.clone(),
425            hummock_manager: hummock_manager.clone(),
426            compute_clients: ComputeClientPool::new(1, env.opts.compute_client_config.clone()), /* typically no need for plural clients */
427            diagnose_command,
428            trace_state,
429        };
430        let task = tokio::spawn(dashboard_service.serve());
431        Some(task)
432    } else {
433        None
434    };
435
436    let (barrier_scheduler, scheduled_barriers) =
437        BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
438    tracing::info!("BarrierScheduler started");
439
440    // Initialize services.
441    let backup_manager = BackupManager::new(
442        env.clone(),
443        hummock_manager.clone(),
444        meta_metrics.clone(),
445        system_params_reader.backup_storage_url(),
446        system_params_reader.backup_storage_directory(),
447    )
448    .await?;
449    tracing::info!("BackupManager started");
450
451    LocalSecretManager::init(
452        opts.temp_secret_file_dir,
453        env.cluster_id().to_string(),
454        META_NODE_ID,
455    );
456    tracing::info!("LocalSecretManager started");
457
458    let notification_srv = NotificationServiceImpl::new(
459        env.clone(),
460        metadata_manager.clone(),
461        hummock_manager.clone(),
462        backup_manager.clone(),
463        serving_vnode_mapping.clone(),
464    )
465    .await?;
466    tracing::info!("NotificationServiceImpl started");
467
468    let source_manager = Arc::new(
469        SourceManager::new(
470            barrier_scheduler.clone(),
471            metadata_manager.clone(),
472            meta_metrics.clone(),
473        )
474        .await
475        .unwrap(),
476    );
477    tracing::info!("SourceManager started");
478
479    let (iceberg_compaction_stat_tx, iceberg_compaction_stat_rx) =
480        tokio::sync::mpsc::unbounded_channel();
481    let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
482        env.meta_store_ref().conn.clone(),
483        hummock_manager.clone(),
484        metadata_manager.clone(),
485        iceberg_compaction_stat_tx,
486    );
487    tracing::info!("SinkCoordinatorManager started");
488    // TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks.
489    let mut sub_tasks = vec![shutdown_handle];
490
491    let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
492
493    // TODO: introduce compactor event stream handler to handle iceberg compaction events.
494    let (iceberg_compaction_mgr, iceberg_compactor_event_rx) = IcebergCompactionManager::build(
495        env.clone(),
496        metadata_manager.clone(),
497        iceberg_compactor_manager.clone(),
498        meta_metrics.clone(),
499    );
500
501    sub_tasks.push(IcebergCompactionManager::compaction_stat_loop(
502        iceberg_compaction_mgr.clone(),
503        iceberg_compaction_stat_rx,
504    ));
505
506    sub_tasks.push(IcebergCompactionManager::gc_loop(
507        iceberg_compaction_mgr.clone(),
508    ));
509
510    let scale_controller = Arc::new(ScaleController::new(
511        &metadata_manager,
512        source_manager.clone(),
513        env.clone(),
514    ));
515
516    let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
517        scheduled_barriers,
518        env.clone(),
519        metadata_manager.clone(),
520        hummock_manager.clone(),
521        source_manager.clone(),
522        sink_manager.clone(),
523        scale_controller.clone(),
524        barrier_scheduler.clone(),
525    )
526    .await;
527    tracing::info!("GlobalBarrierManager started");
528    sub_tasks.push((join_handle, shutdown_rx));
529
530    {
531        let source_manager = source_manager.clone();
532        tokio::spawn(async move {
533            source_manager.run().await.unwrap();
534        });
535    }
536
537    let stream_manager = Arc::new(
538        GlobalStreamManager::new(
539            env.clone(),
540            metadata_manager.clone(),
541            barrier_scheduler.clone(),
542            source_manager.clone(),
543            scale_controller.clone(),
544        )
545        .unwrap(),
546    );
547
548    hummock_manager
549        .may_fill_backward_state_table_info()
550        .await
551        .unwrap();
552
553    let ddl_srv = DdlServiceImpl::new(
554        env.clone(),
555        metadata_manager.clone(),
556        stream_manager.clone(),
557        source_manager.clone(),
558        barrier_manager.clone(),
559        sink_manager.clone(),
560        meta_metrics.clone(),
561        iceberg_compaction_mgr.clone(),
562    )
563    .await;
564
565    let user_srv = UserServiceImpl::new(metadata_manager.clone());
566
567    let scale_srv = ScaleServiceImpl::new(
568        metadata_manager.clone(),
569        source_manager,
570        stream_manager.clone(),
571        barrier_manager.clone(),
572        scale_controller.clone(),
573    );
574
575    let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
576    let stream_srv = StreamServiceImpl::new(
577        env.clone(),
578        barrier_scheduler.clone(),
579        barrier_manager.clone(),
580        stream_manager.clone(),
581        metadata_manager.clone(),
582    );
583    let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
584    let hummock_srv = HummockServiceImpl::new(
585        hummock_manager.clone(),
586        metadata_manager.clone(),
587        backup_manager.clone(),
588        iceberg_compaction_mgr.clone(),
589    );
590
591    let health_srv = HealthServiceImpl::new();
592    let backup_srv = BackupServiceImpl::new(backup_manager.clone());
593    let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
594    let system_params_srv = SystemParamsServiceImpl::new(
595        env.system_params_manager_impl_ref(),
596        env.opts.license_key_path.is_some(),
597    );
598    let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
599    let serving_srv =
600        ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
601    let cloud_srv = CloudServiceImpl::new();
602    let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
603    let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
604    let hosted_iceberg_catalog_srv = HostedIcebergCatalogServiceImpl::new(env.clone());
605    let monitor_srv = MonitorServiceImpl {
606        metadata_manager: metadata_manager.clone(),
607        await_tree_reg: env.await_tree_reg().clone(),
608    };
609
610    if let Some(prometheus_addr) = address_info.prometheus_addr {
611        MetricsManager::boot_metrics_service(prometheus_addr.to_string())
612    }
613
614    // sub_tasks executed concurrently. Can be shutdown via shutdown_all
615    sub_tasks.extend(hummock::start_hummock_workers(
616        hummock_manager.clone(),
617        backup_manager.clone(),
618        &env.opts,
619    ));
620    sub_tasks.push(start_worker_info_monitor(
621        metadata_manager.clone(),
622        election_client.clone(),
623        Duration::from_secs(env.opts.node_num_monitor_interval_sec),
624        meta_metrics.clone(),
625    ));
626    sub_tasks.push(start_fragment_info_monitor(
627        metadata_manager.clone(),
628        hummock_manager.clone(),
629        meta_metrics.clone(),
630    ));
631    sub_tasks.push(SystemParamsController::start_params_notifier(
632        env.system_params_manager_impl_ref(),
633    ));
634    sub_tasks.push(HummockManager::hummock_timer_task(
635        hummock_manager.clone(),
636        Some(backup_manager),
637    ));
638    sub_tasks.extend(HummockManager::compaction_event_loop(
639        hummock_manager.clone(),
640        compactor_streams_change_rx,
641    ));
642
643    sub_tasks.extend(IcebergCompactionManager::iceberg_compaction_event_loop(
644        iceberg_compaction_mgr.clone(),
645        iceberg_compactor_event_rx,
646    ));
647
648    sub_tasks.push(serving::start_serving_vnode_mapping_worker(
649        env.notification_manager_ref(),
650        metadata_manager.clone(),
651        serving_vnode_mapping,
652        env.session_params_manager_impl_ref(),
653    ));
654
655    {
656        sub_tasks.push(ClusterController::start_heartbeat_checker(
657            metadata_manager.cluster_controller.clone(),
658            Duration::from_secs(1),
659        ));
660
661        if !env.opts.disable_automatic_parallelism_control {
662            sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
663        }
664    }
665
666    let _idle_checker_handle = IdleManager::start_idle_checker(
667        env.idle_manager_ref(),
668        Duration::from_secs(30),
669        shutdown.clone(),
670    );
671
672    let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
673    let notification_mgr = env.notification_manager_ref();
674    let stream_abort_handler = tokio::spawn(async move {
675        let _ = abort_recv.await;
676        notification_mgr.abort_all();
677        compactor_manager.abort_all_compactors();
678    });
679    sub_tasks.push((stream_abort_handler, abort_sender));
680
681    let telemetry_manager = TelemetryManager::new(
682        Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
683        Arc::new(MetaReportCreator::new(
684            metadata_manager.clone(),
685            object_store_media_type,
686        )),
687    );
688
689    // May start telemetry reporting
690    if env.opts.telemetry_enabled && telemetry_env_enabled() {
691        sub_tasks.push(telemetry_manager.start().await);
692    } else {
693        tracing::info!("Telemetry didn't start due to meta backend or config");
694    }
695    if !cfg!(madsim) && report_scarf_enabled() {
696        tokio::spawn(report_to_scarf());
697    } else {
698        tracing::info!("Scarf reporting is disabled");
699    };
700
701    if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
702        sub_tasks.push(pair);
703    }
704
705    tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
706    tracing::info!("Starting meta services");
707
708    let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
709        advertise_addr: address_info.advertise_addr,
710        listen_addr: address_info.listen_addr.to_string(),
711        opts: serde_json::to_string(&env.opts).unwrap(),
712    };
713    env.event_log_manager_ref().add_event_logs(vec![
714        risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
715    ]);
716
717    let server_builder = tonic::transport::Server::builder()
718        .layer(MetricsMiddlewareLayer::new(meta_metrics))
719        .layer(TracingExtractLayer::new())
720        .add_service(HeartbeatServiceServer::new(heartbeat_srv))
721        .add_service(ClusterServiceServer::new(cluster_srv))
722        .add_service(StreamManagerServiceServer::new(stream_srv))
723        .add_service(
724            HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
725        )
726        .add_service(NotificationServiceServer::new(notification_srv))
727        .add_service(MetaMemberServiceServer::new(meta_member_srv))
728        .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
729        .add_service(UserServiceServer::new(user_srv))
730        .add_service(CloudServiceServer::new(cloud_srv))
731        .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
732        .add_service(HealthServer::new(health_srv))
733        .add_service(BackupServiceServer::new(backup_srv))
734        .add_service(SystemParamsServiceServer::new(system_params_srv))
735        .add_service(SessionParamServiceServer::new(session_params_srv))
736        .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
737        .add_service(ServingServiceServer::new(serving_srv))
738        .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
739        .add_service(EventLogServiceServer::new(event_log_srv))
740        .add_service(ClusterLimitServiceServer::new(cluster_limit_srv))
741        .add_service(HostedIcebergCatalogServiceServer::new(
742            hosted_iceberg_catalog_srv,
743        ))
744        .add_service(MonitorServiceServer::new(monitor_srv));
745
746    #[cfg(not(madsim))] // `otlp-embedded` does not use madsim-patched tonic
747    let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
748
749    let server = server_builder.monitored_serve_with_shutdown(
750        address_info.listen_addr,
751        "grpc-meta-leader-service",
752        TcpConfig {
753            tcp_nodelay: true,
754            keepalive_duration: None,
755        },
756        shutdown.clone().cancelled_owned(),
757    );
758    started::set();
759    let _server_handle = tokio::spawn(server);
760
761    // Wait for the shutdown signal.
762    shutdown.cancelled().await;
763    // TODO(shutdown): may warn user if there's any other node still running in the cluster.
764    // TODO(shutdown): do we have any other shutdown tasks?
765    Ok(())
766}
767
768fn is_correct_data_directory(data_directory: &str) -> bool {
769    let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
770    if data_directory.is_empty()
771        || !data_directory_regex.is_match(data_directory)
772        || data_directory.ends_with('/')
773        || data_directory.starts_with('/')
774        || data_directory.contains("//")
775        || data_directory.len() > 800
776    {
777        return false;
778    }
779    true
780}