Skip to main content

risingwave_meta_node/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::config::SessionInitConfig;
21use risingwave_common::monitor::{RouterExt, TcpConfig};
22use risingwave_common::secret::LocalSecretManager;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::hummock::IcebergCompactorManager;
33use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManager;
34use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
35use risingwave_meta::rpc::ElectionClientRef;
36use risingwave_meta::rpc::election::dummy::DummyElectionClient;
37use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
38use risingwave_meta::stream::{GlobalRefreshManager, ScaleController};
39use risingwave_meta_service::AddressInfo;
40use risingwave_meta_service::backup_service::BackupServiceImpl;
41use risingwave_meta_service::cloud_service::CloudServiceImpl;
42use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
43use risingwave_meta_service::cluster_service::ClusterServiceImpl;
44use risingwave_meta_service::ddl_service::DdlServiceImpl;
45use risingwave_meta_service::event_log_service::EventLogServiceImpl;
46use risingwave_meta_service::health_service::HealthServiceImpl;
47use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
48use risingwave_meta_service::hosted_iceberg_catalog_service::HostedIcebergCatalogServiceImpl;
49use risingwave_meta_service::hummock_service::HummockServiceImpl;
50use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
51use risingwave_meta_service::monitor_service::MonitorServiceImpl;
52use risingwave_meta_service::notification_service::NotificationServiceImpl;
53use risingwave_meta_service::scale_service::ScaleServiceImpl;
54use risingwave_meta_service::serving_service::ServingServiceImpl;
55use risingwave_meta_service::session_config::SessionParamsServiceImpl;
56use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
57use risingwave_meta_service::stream_service::StreamServiceImpl;
58use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
59use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
60use risingwave_meta_service::user_service::UserServiceImpl;
61use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
62use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
63use risingwave_pb::configured_monitor_service_server;
64use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
65use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
66use risingwave_pb::health::health_server::HealthServer;
67use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
68use risingwave_pb::meta::SystemParams;
69use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
70use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
71use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
72use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
73use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogServiceServer;
74use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
75use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
76use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
77use risingwave_pb::meta::serving_service_server::ServingServiceServer;
78use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
79use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
80use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
81use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
82use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
83use risingwave_pb::user::user_service_server::UserServiceServer;
84use sea_orm::{ConnectionTrait, DbBackend};
85use thiserror_ext::AsReport;
86use tokio::sync::watch;
87
88use crate::backup_restore::BackupManager;
89use crate::barrier::BarrierScheduler;
90use crate::controller::SqlMetaStore;
91use crate::controller::system_param::SystemParamsController;
92use crate::hummock::HummockManager;
93use crate::manager::sink_coordination::SinkCoordinatorManager;
94use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
95use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
96use crate::rpc::metrics::{GLOBAL_META_METRICS, start_info_monitor, start_worker_info_monitor};
97use crate::serving::ServingVnodeMapping;
98use crate::stream::{GlobalStreamManager, SourceManager};
99use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
100use crate::{MetaError, MetaResult, hummock, serving};
101
102/// Used for standalone mode checking the status of the meta service.
103/// This can be easier and more accurate than checking the TCP connection.
104pub mod started {
105    use std::sync::atomic::AtomicBool;
106    use std::sync::atomic::Ordering::Relaxed;
107
108    static STARTED: AtomicBool = AtomicBool::new(false);
109
110    /// Mark the meta service as started.
111    pub(crate) fn set() {
112        STARTED.store(true, Relaxed);
113    }
114
115    /// Check if the meta service has started.
116    pub fn get() -> bool {
117        STARTED.load(Relaxed)
118    }
119}
120
121/// A wrapper around [`rpc_serve_with_store`] that dispatches different store implementations.
122///
123/// For the timing of returning, see [`rpc_serve_with_store`].
124pub async fn rpc_serve(
125    address_info: AddressInfo,
126    meta_store_backend: MetaStoreBackend,
127    max_cluster_heartbeat_interval: Duration,
128    lease_interval_secs: u64,
129    server_config: risingwave_common::config::ServerConfig,
130    opts: MetaOpts,
131    init_system_params: SystemParams,
132    session_init: SessionInitConfig,
133    shutdown: CancellationToken,
134) -> MetaResult<()> {
135    let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
136
137    let election_client = match meta_store_backend {
138        MetaStoreBackend::Mem => {
139            // Use a dummy election client.
140            Arc::new(DummyElectionClient::new(
141                address_info.advertise_addr.clone(),
142            ))
143        }
144        MetaStoreBackend::Sql { .. } => {
145            // Init election client.
146            let id = address_info.advertise_addr.clone();
147            let conn = meta_store_impl.conn.clone();
148            let election_client: ElectionClientRef = match conn.get_database_backend() {
149                DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
150                DbBackend::Postgres => {
151                    Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
152                }
153                DbBackend::MySql => {
154                    Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
155                }
156            };
157            election_client.init().await?;
158
159            election_client
160        }
161    };
162
163    Box::pin(rpc_serve_with_store(
164        meta_store_impl,
165        election_client,
166        address_info,
167        max_cluster_heartbeat_interval,
168        lease_interval_secs,
169        server_config,
170        opts,
171        init_system_params,
172        session_init,
173        shutdown,
174    ))
175    .await
176}
177
178/// Bootstraps the follower or leader service based on the election status.
179///
180/// Returns when the `shutdown` token is triggered, or when leader status is lost, or if the leader
181/// service fails to start.
182pub async fn rpc_serve_with_store(
183    meta_store_impl: SqlMetaStore,
184    election_client: ElectionClientRef,
185    address_info: AddressInfo,
186    max_cluster_heartbeat_interval: Duration,
187    lease_interval_secs: u64,
188    server_config: risingwave_common::config::ServerConfig,
189    opts: MetaOpts,
190    init_system_params: SystemParams,
191    session_init: SessionInitConfig,
192    shutdown: CancellationToken,
193) -> MetaResult<()> {
194    // TODO(shutdown): directly use cancellation token
195    let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
196
197    let election_handle = tokio::spawn({
198        let shutdown = shutdown.clone();
199        let election_client = election_client.clone();
200
201        async move {
202            while let Err(e) = election_client
203                .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
204                .await
205            {
206                tracing::error!(error = %e.as_report(), "election error happened");
207            }
208            // Leader lost, shutdown the service.
209            shutdown.cancel();
210        }
211    });
212
213    // Spawn and run the follower service if not the leader.
214    // Watch the leader status and switch to the leader service when elected.
215    // TODO: the branch seems to be always hit since the default value of `is_leader` is false until
216    // the election is done (unless using `DummyElectionClient`).
217    if !election_client.is_leader() {
218        // The follower service can be shutdown separately if we're going to be the leader.
219        let follower_shutdown = shutdown.child_token();
220
221        let follower_handle = tokio::spawn(start_service_as_election_follower(
222            follower_shutdown.clone(),
223            address_info.clone(),
224            election_client.clone(),
225        ));
226
227        // Watch and wait until we become the leader.
228        let mut is_leader_watcher = election_client.subscribe();
229
230        while !*is_leader_watcher.borrow_and_update() {
231            tokio::select! {
232                // External shutdown signal. Directly return without switching to leader.
233                _ = shutdown.cancelled() => return Ok(()),
234
235                res = is_leader_watcher.changed() => {
236                    if res.is_err() {
237                        tracing::error!("leader watcher recv failed");
238                    }
239                }
240            }
241        }
242
243        tracing::info!("elected as leader, shutting down follower services");
244        follower_shutdown.cancel();
245        let _ = follower_handle.await;
246    }
247
248    // Run the leader service.
249    let result = start_service_as_election_leader(
250        meta_store_impl,
251        address_info,
252        max_cluster_heartbeat_interval,
253        opts,
254        init_system_params,
255        session_init,
256        server_config,
257        election_client,
258        shutdown,
259    )
260    .await;
261
262    // Leader service has stopped, shutdown the election service to gracefully resign.
263    election_shutdown_tx.send(()).ok();
264    let _ = election_handle.await;
265
266    result
267}
268
269/// Starts all services needed for the meta follower node.
270///
271/// Returns when the `shutdown` token is triggered.
272pub async fn start_service_as_election_follower(
273    shutdown: CancellationToken,
274    address_info: AddressInfo,
275    election_client: ElectionClientRef,
276) {
277    tracing::info!("starting follower services");
278
279    let meta_member_srv = MetaMemberServiceImpl::new(election_client);
280
281    let health_srv = HealthServiceImpl::new();
282
283    let server = tonic::transport::Server::builder()
284        .layer(MetricsMiddlewareLayer::new(Arc::new(
285            GLOBAL_META_METRICS.clone(),
286        )))
287        .layer(TracingExtractLayer::new())
288        .add_service(MetaMemberServiceServer::new(meta_member_srv))
289        .add_service(HealthServer::new(health_srv))
290        .monitored_serve_with_shutdown(
291            address_info.listen_addr,
292            "grpc-meta-follower-service",
293            TcpConfig {
294                tcp_nodelay: true,
295                keepalive_duration: None,
296            },
297            shutdown.clone().cancelled_owned(),
298        );
299    let server_handle = tokio::spawn(server);
300    started::set();
301
302    // Wait for the shutdown signal.
303    shutdown.cancelled().await;
304    // Wait for the server to shutdown. This is necessary because we may be transitioning from follower
305    // to leader, and conflicts on the services must be avoided.
306    let _ = server_handle.await;
307}
308
309/// Starts all services needed for the meta leader node.
310///
311/// Returns when the `shutdown` token is triggered, or if the service initialization fails.
312pub async fn start_service_as_election_leader(
313    meta_store_impl: SqlMetaStore,
314    address_info: AddressInfo,
315    max_cluster_heartbeat_interval: Duration,
316    opts: MetaOpts,
317    init_system_params: SystemParams,
318    session_init: SessionInitConfig,
319    server_config: risingwave_common::config::ServerConfig,
320    election_client: ElectionClientRef,
321    shutdown: CancellationToken,
322) -> MetaResult<()> {
323    tracing::info!("starting leader services");
324
325    let env = MetaSrvEnv::new(
326        opts.clone(),
327        init_system_params,
328        session_init,
329        meta_store_impl,
330    )
331    .await?;
332    tracing::info!("MetaSrvEnv started");
333    let _ = env.may_start_watch_license_key_file()?;
334    let system_params_reader = env.system_params_reader().await;
335
336    let data_directory = system_params_reader.data_directory();
337    if !is_correct_data_directory(data_directory) {
338        return Err(MetaError::system_params(format!(
339            "The data directory {:?} is misconfigured.
340            Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
341            The string cannot start or end with '/', and consecutive '/' are not allowed.
342            The data directory cannot be empty and its length should not exceed 800 characters.",
343            data_directory
344        )));
345    }
346
347    let cluster_controller = Arc::new(
348        ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
349            .await
350            .unwrap(),
351    );
352    let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
353    let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
354
355    let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
356    let max_serving_parallelism = env
357        .session_params_manager_impl_ref()
358        .get_params()
359        .await
360        .batch_parallelism()
361        .map(|p| p.get());
362    serving::on_meta_start(
363        env.notification_manager_ref(),
364        &metadata_manager,
365        serving_vnode_mapping.clone(),
366        max_serving_parallelism,
367    )
368    .await;
369
370    let compactor_manager = Arc::new(
371        hummock::CompactorManager::with_meta(env.clone())
372            .await
373            .unwrap(),
374    );
375    tracing::info!("CompactorManager started");
376
377    let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
378    tracing::info!("HeartbeatServiceImpl started");
379
380    let (compactor_streams_change_tx, compactor_streams_change_rx) =
381        tokio::sync::mpsc::unbounded_channel();
382
383    let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
384
385    let hummock_manager = hummock::HummockManager::new(
386        env.clone(),
387        metadata_manager.clone(),
388        meta_metrics.clone(),
389        compactor_manager.clone(),
390        compactor_streams_change_tx,
391    )
392    .await
393    .unwrap();
394    tracing::info!("HummockManager started");
395    let object_store_media_type = hummock_manager.object_store_media_type();
396
397    let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
398
399    let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
400        use std::str::FromStr;
401        prometheus_http_query::Client::from_str(x).unwrap()
402    });
403    let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
404
405    let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
406        max_length: opts.cached_traces_num,
407        max_memory_usage: opts.cached_traces_memory_limit_bytes,
408    });
409    let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
410
411    let (barrier_scheduler, scheduled_barriers) =
412        BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
413    tracing::info!("BarrierScheduler started");
414
415    // Initialize services.
416    let backup_manager = BackupManager::new(
417        env.clone(),
418        hummock_manager.clone(),
419        meta_metrics.clone(),
420        system_params_reader.backup_storage_url(),
421        system_params_reader.backup_storage_directory(),
422    )
423    .await?;
424    tracing::info!("BackupManager started");
425
426    LocalSecretManager::init(
427        opts.temp_secret_file_dir,
428        env.cluster_id().to_string(),
429        META_NODE_ID,
430    );
431    tracing::info!("LocalSecretManager started");
432
433    let notification_srv = NotificationServiceImpl::new(
434        env.clone(),
435        metadata_manager.clone(),
436        hummock_manager.clone(),
437        backup_manager.clone(),
438        serving_vnode_mapping.clone(),
439    )
440    .await?;
441    tracing::info!("NotificationServiceImpl started");
442
443    let source_manager = Arc::new(
444        SourceManager::new(
445            barrier_scheduler.clone(),
446            metadata_manager.clone(),
447            meta_metrics.clone(),
448            env.clone(),
449        )
450        .await?,
451    );
452    tracing::info!("SourceManager started");
453
454    let (iceberg_compaction_stat_tx, iceberg_compaction_stat_rx) =
455        tokio::sync::mpsc::unbounded_channel();
456    let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
457        env.meta_store_ref().conn.clone(),
458        hummock_manager.clone(),
459        metadata_manager.clone(),
460        iceberg_compaction_stat_tx,
461        env.await_tree_reg().clone(),
462    );
463    tracing::info!("SinkCoordinatorManager started");
464    // TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks.
465    let mut sub_tasks = vec![shutdown_handle];
466
467    let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
468
469    // TODO: introduce compactor event stream handler to handle iceberg compaction events.
470    let (iceberg_compaction_mgr, iceberg_compactor_event_rx) = IcebergCompactionManager::build(
471        env.clone(),
472        metadata_manager.clone(),
473        iceberg_compactor_manager.clone(),
474        meta_metrics.clone(),
475    );
476
477    sub_tasks.push(IcebergCompactionManager::compaction_stat_loop(
478        iceberg_compaction_mgr.clone(),
479        iceberg_compaction_stat_rx,
480    ));
481
482    sub_tasks.push(IcebergCompactionManager::gc_loop(
483        iceberg_compaction_mgr.clone(),
484        env.opts.iceberg_gc_interval_sec,
485    ));
486
487    let refresh_scheduler_interval = Duration::from_secs(env.opts.refresh_scheduler_interval_sec);
488    let (refresh_manager, refresh_handle, refresh_shutdown) = GlobalRefreshManager::start(
489        metadata_manager.clone(),
490        barrier_scheduler.clone(),
491        &env,
492        refresh_scheduler_interval,
493    )
494    .await?;
495    sub_tasks.push((refresh_handle, refresh_shutdown));
496
497    let scale_controller = Arc::new(ScaleController::new(
498        &metadata_manager,
499        source_manager.clone(),
500        env.clone(),
501    ));
502
503    let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
504        scheduled_barriers,
505        env.clone(),
506        metadata_manager.clone(),
507        hummock_manager.clone(),
508        source_manager.clone(),
509        sink_manager.clone(),
510        scale_controller.clone(),
511        barrier_scheduler.clone(),
512        refresh_manager.clone(),
513    )
514    .await;
515    tracing::info!("GlobalBarrierManager started");
516    sub_tasks.push((join_handle, shutdown_rx));
517
518    {
519        let source_manager = source_manager.clone();
520        tokio::spawn(async move {
521            source_manager.run().await.unwrap();
522        });
523    }
524
525    let stream_manager = Arc::new(
526        GlobalStreamManager::new(
527            env.clone(),
528            metadata_manager.clone(),
529            barrier_scheduler.clone(),
530            hummock_manager.clone(),
531            source_manager.clone(),
532            refresh_manager.clone(),
533            scale_controller.clone(),
534        )
535        .unwrap(),
536    );
537
538    hummock_manager
539        .may_fill_backward_state_table_info()
540        .await
541        .unwrap();
542
543    let ddl_srv = DdlServiceImpl::new(
544        env.clone(),
545        metadata_manager.clone(),
546        stream_manager.clone(),
547        source_manager.clone(),
548        barrier_manager.clone(),
549        sink_manager.clone(),
550        meta_metrics.clone(),
551        iceberg_compaction_mgr.clone(),
552        barrier_scheduler.clone(),
553    )
554    .await;
555
556    if env.opts.enable_legacy_table_migration {
557        sub_tasks.push(ddl_srv.start_migrate_table_fragments());
558    }
559
560    let user_srv = UserServiceImpl::new(metadata_manager.clone());
561
562    let scale_srv = ScaleServiceImpl::new(
563        metadata_manager.clone(),
564        stream_manager.clone(),
565        barrier_manager.clone(),
566        env.clone(),
567    );
568
569    let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
570    let stream_srv = StreamServiceImpl::new(
571        env.clone(),
572        barrier_scheduler.clone(),
573        barrier_manager.clone(),
574        stream_manager.clone(),
575        metadata_manager.clone(),
576        refresh_manager.clone(),
577        iceberg_compaction_mgr.clone(),
578    );
579    let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
580    let hummock_srv = HummockServiceImpl::new(
581        hummock_manager.clone(),
582        metadata_manager.clone(),
583        backup_manager.clone(),
584        iceberg_compaction_mgr.clone(),
585    );
586
587    let health_srv = HealthServiceImpl::new();
588    let backup_srv = BackupServiceImpl::new(backup_manager.clone());
589    let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
590    let system_params_srv = SystemParamsServiceImpl::new(
591        env.system_params_manager_impl_ref(),
592        env.opts.license_key_path.is_some(),
593    );
594    let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
595    let serving_srv =
596        ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
597    let cloud_srv = CloudServiceImpl::new();
598    let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
599    let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
600    let hosted_iceberg_catalog_srv = HostedIcebergCatalogServiceImpl::new(env.clone());
601    let monitor_srv = MonitorServiceImpl::new(
602        metadata_manager.clone(),
603        env.await_tree_reg().clone(),
604        server_config.clone(),
605    );
606    let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
607        metadata_manager.clone(),
608        env.await_tree_reg().clone(),
609        hummock_manager.clone(),
610        iceberg_compaction_mgr.clone(),
611        env.event_log_manager_ref(),
612        prometheus_client.clone(),
613        prometheus_selector.clone(),
614        opts.redact_sql_option_keywords.clone(),
615        env.system_params_manager_impl_ref(),
616    ));
617
618    #[cfg(not(madsim))]
619    let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
620        use risingwave_common::config::RpcClientConfig;
621        use risingwave_rpc_client::MonitorClientPool;
622
623        let dashboard_service = crate::dashboard::DashboardService {
624            await_tree_reg: env.await_tree_reg().clone(),
625            dashboard_addr: *dashboard_addr,
626            prometheus_client,
627            prometheus_selector,
628            metadata_manager: metadata_manager.clone(),
629            hummock_manager: hummock_manager.clone(),
630            monitor_clients: MonitorClientPool::new(1, RpcClientConfig::default()),
631            diagnose_command,
632            profile_service: risingwave_common_heap_profiling::ProfileServiceImpl::new(
633                server_config.clone(),
634            ),
635            trace_state,
636        };
637        let task = tokio::spawn(dashboard_service.serve());
638        Some(task)
639    } else {
640        None
641    };
642
643    if let Some(prometheus_addr) = address_info.prometheus_addr {
644        MetricsManager::boot_metrics_service(prometheus_addr.to_string())
645    }
646
647    // sub_tasks executed concurrently. Can be shutdown via shutdown_all
648    sub_tasks.extend(hummock::start_hummock_workers(
649        hummock_manager.clone(),
650        backup_manager.clone(),
651        &env.opts,
652        {
653            let barrier_manager = barrier_manager.clone();
654            Box::new(move || {
655                let barrier_manager = barrier_manager.clone();
656                Box::pin(async move {
657                    barrier_manager.may_snapshot_backfilling_job().await.unwrap_or_else(|e| {
658                        tracing::warn!(err = %e.as_report(), "failed to check having snapshot backfilling jobs. pause vacuum time travel");
659                        true
660                    })
661                })
662            })
663        }
664    ));
665    sub_tasks.push(start_worker_info_monitor(
666        metadata_manager.clone(),
667        election_client.clone(),
668        Duration::from_secs(env.opts.node_num_monitor_interval_sec),
669        meta_metrics.clone(),
670    ));
671    sub_tasks.push(start_info_monitor(
672        metadata_manager.clone(),
673        hummock_manager.clone(),
674        barrier_manager.clone(),
675        env.system_params_manager_impl_ref(),
676        meta_metrics.clone(),
677    ));
678    sub_tasks.push(SystemParamsController::start_params_notifier(
679        env.system_params_manager_impl_ref(),
680    ));
681    sub_tasks.push(HummockManager::hummock_timer_task(
682        hummock_manager.clone(),
683        Some(backup_manager),
684    ));
685    sub_tasks.extend(HummockManager::compaction_event_loop(
686        hummock_manager.clone(),
687        compactor_streams_change_rx,
688    ));
689
690    sub_tasks.extend(IcebergCompactionManager::iceberg_compaction_event_loop(
691        iceberg_compaction_mgr.clone(),
692        iceberg_compactor_event_rx,
693    ));
694
695    sub_tasks.push(serving::start_serving_vnode_mapping_worker(
696        env.notification_manager_ref(),
697        metadata_manager.clone(),
698        serving_vnode_mapping,
699        env.session_params_manager_impl_ref(),
700    ));
701
702    {
703        sub_tasks.push(ClusterController::start_heartbeat_checker(
704            metadata_manager.cluster_controller.clone(),
705            Duration::from_secs(1),
706        ));
707
708        if !env.opts.disable_automatic_parallelism_control {
709            sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
710        }
711    }
712
713    let _idle_checker_handle = IdleManager::start_idle_checker(
714        env.idle_manager_ref(),
715        Duration::from_secs(30),
716        shutdown.clone(),
717    );
718
719    let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
720    let notification_mgr = env.notification_manager_ref();
721    let stream_abort_handler = tokio::spawn(async move {
722        let _ = abort_recv.await;
723        notification_mgr.abort_all();
724        compactor_manager.abort_all_compactors();
725    });
726    sub_tasks.push((stream_abort_handler, abort_sender));
727
728    let telemetry_manager = TelemetryManager::new(
729        Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
730        Arc::new(MetaReportCreator::new(
731            metadata_manager.clone(),
732            object_store_media_type,
733        )),
734    );
735
736    // May start telemetry reporting
737    if env.opts.telemetry_enabled && telemetry_env_enabled() {
738        sub_tasks.push(telemetry_manager.start().await);
739    } else {
740        tracing::info!("Telemetry didn't start due to meta backend or config");
741    }
742    if !cfg!(madsim) && report_scarf_enabled() {
743        tokio::spawn(report_to_scarf());
744    } else {
745        tracing::info!("Scarf reporting is disabled");
746    };
747
748    if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
749        sub_tasks.push(pair);
750    }
751
752    tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
753    tracing::info!("Starting meta services");
754
755    let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
756        advertise_addr: address_info.advertise_addr,
757        listen_addr: address_info.listen_addr.to_string(),
758        opts: serde_json::to_string(&env.opts).unwrap(),
759    };
760    env.event_log_manager_ref().add_event_logs(vec![
761        risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
762    ]);
763
764    let server_builder = tonic::transport::Server::builder()
765        .layer(MetricsMiddlewareLayer::new(meta_metrics))
766        .layer(TracingExtractLayer::new())
767        .add_service(HeartbeatServiceServer::new(heartbeat_srv))
768        .add_service(ClusterServiceServer::new(cluster_srv))
769        .add_service(StreamManagerServiceServer::new(stream_srv))
770        .add_service(
771            HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
772        )
773        .add_service(NotificationServiceServer::new(notification_srv))
774        .add_service(MetaMemberServiceServer::new(meta_member_srv))
775        .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
776        .add_service(UserServiceServer::new(user_srv))
777        .add_service(CloudServiceServer::new(cloud_srv))
778        .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
779        .add_service(HealthServer::new(health_srv))
780        .add_service(BackupServiceServer::new(backup_srv))
781        .add_service(SystemParamsServiceServer::new(system_params_srv))
782        .add_service(SessionParamServiceServer::new(session_params_srv))
783        .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
784        .add_service(ServingServiceServer::new(serving_srv))
785        .add_service(
786            SinkCoordinationServiceServer::new(sink_coordination_srv)
787                .max_decoding_message_size(usize::MAX),
788        )
789        .add_service(
790            EventLogServiceServer::new(event_log_srv).max_decoding_message_size(usize::MAX),
791        )
792        .add_service(ClusterLimitServiceServer::new(cluster_limit_srv))
793        .add_service(HostedIcebergCatalogServiceServer::new(
794            hosted_iceberg_catalog_srv,
795        ))
796        .add_service(configured_monitor_service_server(
797            MonitorServiceServer::new(monitor_srv),
798        ));
799
800    #[cfg(not(madsim))] // `otlp-embedded` does not use madsim-patched tonic
801    let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
802
803    let server = server_builder.monitored_serve_with_shutdown(
804        address_info.listen_addr,
805        "grpc-meta-leader-service",
806        TcpConfig {
807            tcp_nodelay: true,
808            keepalive_duration: None,
809        },
810        shutdown.clone().cancelled_owned(),
811    );
812    started::set();
813    let _server_handle = tokio::spawn(server);
814
815    // Wait for the shutdown signal.
816    shutdown.cancelled().await;
817    // TODO(shutdown): may warn user if there's any other node still running in the cluster.
818    // TODO(shutdown): do we have any other shutdown tasks?
819    Ok(())
820}
821
822fn is_correct_data_directory(data_directory: &str) -> bool {
823    let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
824    if data_directory.is_empty()
825        || !data_directory_regex.is_match(data_directory)
826        || data_directory.ends_with('/')
827        || data_directory.starts_with('/')
828        || data_directory.contains("//")
829        || data_directory.len() > 800
830    {
831        return false;
832    }
833    true
834}