risingwave_meta_node/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::monitor::{RouterExt, TcpConfig};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::hummock::IcebergCompactorManager;
33use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManager;
34use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
35use risingwave_meta::rpc::ElectionClientRef;
36use risingwave_meta::rpc::election::dummy::DummyElectionClient;
37use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
38use risingwave_meta::stream::{GlobalRefreshManager, ScaleController};
39use risingwave_meta_service::AddressInfo;
40use risingwave_meta_service::backup_service::BackupServiceImpl;
41use risingwave_meta_service::cloud_service::CloudServiceImpl;
42use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
43use risingwave_meta_service::cluster_service::ClusterServiceImpl;
44use risingwave_meta_service::ddl_service::DdlServiceImpl;
45use risingwave_meta_service::event_log_service::EventLogServiceImpl;
46use risingwave_meta_service::health_service::HealthServiceImpl;
47use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
48use risingwave_meta_service::hosted_iceberg_catalog_service::HostedIcebergCatalogServiceImpl;
49use risingwave_meta_service::hummock_service::HummockServiceImpl;
50use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
51use risingwave_meta_service::monitor_service::MonitorServiceImpl;
52use risingwave_meta_service::notification_service::NotificationServiceImpl;
53use risingwave_meta_service::scale_service::ScaleServiceImpl;
54use risingwave_meta_service::serving_service::ServingServiceImpl;
55use risingwave_meta_service::session_config::SessionParamsServiceImpl;
56use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
57use risingwave_meta_service::stream_service::StreamServiceImpl;
58use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
59use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
60use risingwave_meta_service::user_service::UserServiceImpl;
61use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
62use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
63use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
64use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
65use risingwave_pb::health::health_server::HealthServer;
66use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
67use risingwave_pb::meta::SystemParams;
68use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
69use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
70use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
71use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
72use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogServiceServer;
73use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
74use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
75use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
76use risingwave_pb::meta::serving_service_server::ServingServiceServer;
77use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
78use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
79use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
80use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::user_service_server::UserServiceServer;
83use risingwave_rpc_client::ComputeClientPool;
84use sea_orm::{ConnectionTrait, DbBackend};
85use thiserror_ext::AsReport;
86use tokio::sync::watch;
87
88use crate::backup_restore::BackupManager;
89use crate::barrier::BarrierScheduler;
90use crate::controller::SqlMetaStore;
91use crate::controller::system_param::SystemParamsController;
92use crate::hummock::HummockManager;
93use crate::manager::sink_coordination::SinkCoordinatorManager;
94use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
95use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
96use crate::rpc::metrics::{GLOBAL_META_METRICS, start_info_monitor, start_worker_info_monitor};
97use crate::serving::ServingVnodeMapping;
98use crate::stream::{GlobalStreamManager, SourceManager};
99use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
100use crate::{MetaError, MetaResult, hummock, serving};
101
102/// Used for standalone mode checking the status of the meta service.
103/// This can be easier and more accurate than checking the TCP connection.
104pub mod started {
105    use std::sync::atomic::AtomicBool;
106    use std::sync::atomic::Ordering::Relaxed;
107
108    static STARTED: AtomicBool = AtomicBool::new(false);
109
110    /// Mark the meta service as started.
111    pub(crate) fn set() {
112        STARTED.store(true, Relaxed);
113    }
114
115    /// Check if the meta service has started.
116    pub fn get() -> bool {
117        STARTED.load(Relaxed)
118    }
119}
120
121/// A wrapper around [`rpc_serve_with_store`] that dispatches different store implementations.
122///
123/// For the timing of returning, see [`rpc_serve_with_store`].
124pub async fn rpc_serve(
125    address_info: AddressInfo,
126    meta_store_backend: MetaStoreBackend,
127    max_cluster_heartbeat_interval: Duration,
128    lease_interval_secs: u64,
129    opts: MetaOpts,
130    init_system_params: SystemParams,
131    init_session_config: SessionConfig,
132    shutdown: CancellationToken,
133) -> MetaResult<()> {
134    let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
135
136    let election_client = match meta_store_backend {
137        MetaStoreBackend::Mem => {
138            // Use a dummy election client.
139            Arc::new(DummyElectionClient::new(
140                address_info.advertise_addr.clone(),
141            ))
142        }
143        MetaStoreBackend::Sql { .. } => {
144            // Init election client.
145            let id = address_info.advertise_addr.clone();
146            let conn = meta_store_impl.conn.clone();
147            let election_client: ElectionClientRef = match conn.get_database_backend() {
148                DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
149                DbBackend::Postgres => {
150                    Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
151                }
152                DbBackend::MySql => {
153                    Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
154                }
155            };
156            election_client.init().await?;
157
158            election_client
159        }
160    };
161
162    rpc_serve_with_store(
163        meta_store_impl,
164        election_client,
165        address_info,
166        max_cluster_heartbeat_interval,
167        lease_interval_secs,
168        opts,
169        init_system_params,
170        init_session_config,
171        shutdown,
172    )
173    .await
174}
175
176/// Bootstraps the follower or leader service based on the election status.
177///
178/// Returns when the `shutdown` token is triggered, or when leader status is lost, or if the leader
179/// service fails to start.
180pub async fn rpc_serve_with_store(
181    meta_store_impl: SqlMetaStore,
182    election_client: ElectionClientRef,
183    address_info: AddressInfo,
184    max_cluster_heartbeat_interval: Duration,
185    lease_interval_secs: u64,
186    opts: MetaOpts,
187    init_system_params: SystemParams,
188    init_session_config: SessionConfig,
189    shutdown: CancellationToken,
190) -> MetaResult<()> {
191    // TODO(shutdown): directly use cancellation token
192    let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
193
194    let election_handle = tokio::spawn({
195        let shutdown = shutdown.clone();
196        let election_client = election_client.clone();
197
198        async move {
199            while let Err(e) = election_client
200                .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
201                .await
202            {
203                tracing::error!(error = %e.as_report(), "election error happened");
204            }
205            // Leader lost, shutdown the service.
206            shutdown.cancel();
207        }
208    });
209
210    // Spawn and run the follower service if not the leader.
211    // Watch the leader status and switch to the leader service when elected.
212    // TODO: the branch seems to be always hit since the default value of `is_leader` is false until
213    // the election is done (unless using `DummyElectionClient`).
214    if !election_client.is_leader() {
215        // The follower service can be shutdown separately if we're going to be the leader.
216        let follower_shutdown = shutdown.child_token();
217
218        let follower_handle = tokio::spawn(start_service_as_election_follower(
219            follower_shutdown.clone(),
220            address_info.clone(),
221            election_client.clone(),
222        ));
223
224        // Watch and wait until we become the leader.
225        let mut is_leader_watcher = election_client.subscribe();
226
227        while !*is_leader_watcher.borrow_and_update() {
228            tokio::select! {
229                // External shutdown signal. Directly return without switching to leader.
230                _ = shutdown.cancelled() => return Ok(()),
231
232                res = is_leader_watcher.changed() => {
233                    if res.is_err() {
234                        tracing::error!("leader watcher recv failed");
235                    }
236                }
237            }
238        }
239
240        tracing::info!("elected as leader, shutting down follower services");
241        follower_shutdown.cancel();
242        let _ = follower_handle.await;
243    }
244
245    // Run the leader service.
246    let result = start_service_as_election_leader(
247        meta_store_impl,
248        address_info,
249        max_cluster_heartbeat_interval,
250        opts,
251        init_system_params,
252        init_session_config,
253        election_client,
254        shutdown,
255    )
256    .await;
257
258    // Leader service has stopped, shutdown the election service to gracefully resign.
259    election_shutdown_tx.send(()).ok();
260    let _ = election_handle.await;
261
262    result
263}
264
265/// Starts all services needed for the meta follower node.
266///
267/// Returns when the `shutdown` token is triggered.
268pub async fn start_service_as_election_follower(
269    shutdown: CancellationToken,
270    address_info: AddressInfo,
271    election_client: ElectionClientRef,
272) {
273    tracing::info!("starting follower services");
274
275    let meta_member_srv = MetaMemberServiceImpl::new(election_client);
276
277    let health_srv = HealthServiceImpl::new();
278
279    let server = tonic::transport::Server::builder()
280        .layer(MetricsMiddlewareLayer::new(Arc::new(
281            GLOBAL_META_METRICS.clone(),
282        )))
283        .layer(TracingExtractLayer::new())
284        .add_service(MetaMemberServiceServer::new(meta_member_srv))
285        .add_service(HealthServer::new(health_srv))
286        .monitored_serve_with_shutdown(
287            address_info.listen_addr,
288            "grpc-meta-follower-service",
289            TcpConfig {
290                tcp_nodelay: true,
291                keepalive_duration: None,
292            },
293            shutdown.clone().cancelled_owned(),
294        );
295    let server_handle = tokio::spawn(server);
296    started::set();
297
298    // Wait for the shutdown signal.
299    shutdown.cancelled().await;
300    // Wait for the server to shutdown. This is necessary because we may be transitioning from follower
301    // to leader, and conflicts on the services must be avoided.
302    let _ = server_handle.await;
303}
304
305/// Starts all services needed for the meta leader node.
306///
307/// Returns when the `shutdown` token is triggered, or if the service initialization fails.
308pub async fn start_service_as_election_leader(
309    meta_store_impl: SqlMetaStore,
310    address_info: AddressInfo,
311    max_cluster_heartbeat_interval: Duration,
312    opts: MetaOpts,
313    init_system_params: SystemParams,
314    init_session_config: SessionConfig,
315    election_client: ElectionClientRef,
316    shutdown: CancellationToken,
317) -> MetaResult<()> {
318    tracing::info!("starting leader services");
319
320    let env = MetaSrvEnv::new(
321        opts.clone(),
322        init_system_params,
323        init_session_config,
324        meta_store_impl,
325    )
326    .await?;
327    tracing::info!("MetaSrvEnv started");
328    let _ = env.may_start_watch_license_key_file()?;
329    let system_params_reader = env.system_params_reader().await;
330
331    let data_directory = system_params_reader.data_directory();
332    if !is_correct_data_directory(data_directory) {
333        return Err(MetaError::system_params(format!(
334            "The data directory {:?} is misconfigured.
335            Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
336            The string cannot start or end with '/', and consecutive '/' are not allowed.
337            The data directory cannot be empty and its length should not exceed 800 characters.",
338            data_directory
339        )));
340    }
341
342    let cluster_controller = Arc::new(
343        ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
344            .await
345            .unwrap(),
346    );
347    let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
348    let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
349
350    let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
351    let max_serving_parallelism = env
352        .session_params_manager_impl_ref()
353        .get_params()
354        .await
355        .batch_parallelism()
356        .map(|p| p.get());
357    serving::on_meta_start(
358        env.notification_manager_ref(),
359        &metadata_manager,
360        serving_vnode_mapping.clone(),
361        max_serving_parallelism,
362    )
363    .await;
364
365    let compactor_manager = Arc::new(
366        hummock::CompactorManager::with_meta(env.clone())
367            .await
368            .unwrap(),
369    );
370    tracing::info!("CompactorManager started");
371
372    let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
373    tracing::info!("HeartbeatServiceImpl started");
374
375    let (compactor_streams_change_tx, compactor_streams_change_rx) =
376        tokio::sync::mpsc::unbounded_channel();
377
378    let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
379
380    let hummock_manager = hummock::HummockManager::new(
381        env.clone(),
382        metadata_manager.clone(),
383        meta_metrics.clone(),
384        compactor_manager.clone(),
385        compactor_streams_change_tx,
386    )
387    .await
388    .unwrap();
389    tracing::info!("HummockManager started");
390    let object_store_media_type = hummock_manager.object_store_media_type();
391
392    let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
393
394    let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
395        use std::str::FromStr;
396        prometheus_http_query::Client::from_str(x).unwrap()
397    });
398    let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
399    let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
400        metadata_manager.clone(),
401        env.await_tree_reg().clone(),
402        hummock_manager.clone(),
403        env.event_log_manager_ref(),
404        prometheus_client.clone(),
405        prometheus_selector.clone(),
406        opts.redact_sql_option_keywords.clone(),
407    ));
408
409    let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
410        max_length: opts.cached_traces_num,
411        max_memory_usage: opts.cached_traces_memory_limit_bytes,
412    });
413    let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
414
415    #[cfg(not(madsim))]
416    let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
417        let dashboard_service = crate::dashboard::DashboardService {
418            await_tree_reg: env.await_tree_reg().clone(),
419            dashboard_addr: *dashboard_addr,
420            prometheus_client,
421            prometheus_selector,
422            metadata_manager: metadata_manager.clone(),
423            hummock_manager: hummock_manager.clone(),
424            compute_clients: ComputeClientPool::new(1, env.opts.compute_client_config.clone()), /* typically no need for plural clients */
425            diagnose_command,
426            trace_state,
427        };
428        let task = tokio::spawn(dashboard_service.serve());
429        Some(task)
430    } else {
431        None
432    };
433
434    let (barrier_scheduler, scheduled_barriers) =
435        BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
436    tracing::info!("BarrierScheduler started");
437
438    // Initialize services.
439    let backup_manager = BackupManager::new(
440        env.clone(),
441        hummock_manager.clone(),
442        meta_metrics.clone(),
443        system_params_reader.backup_storage_url(),
444        system_params_reader.backup_storage_directory(),
445    )
446    .await?;
447    tracing::info!("BackupManager started");
448
449    LocalSecretManager::init(
450        opts.temp_secret_file_dir,
451        env.cluster_id().to_string(),
452        META_NODE_ID,
453    );
454    tracing::info!("LocalSecretManager started");
455
456    let notification_srv = NotificationServiceImpl::new(
457        env.clone(),
458        metadata_manager.clone(),
459        hummock_manager.clone(),
460        backup_manager.clone(),
461        serving_vnode_mapping.clone(),
462    )
463    .await?;
464    tracing::info!("NotificationServiceImpl started");
465
466    let source_manager = Arc::new(
467        SourceManager::new(
468            barrier_scheduler.clone(),
469            metadata_manager.clone(),
470            meta_metrics.clone(),
471            env.clone(),
472        )
473        .await?,
474    );
475    tracing::info!("SourceManager started");
476
477    let (iceberg_compaction_stat_tx, iceberg_compaction_stat_rx) =
478        tokio::sync::mpsc::unbounded_channel();
479    let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
480        env.meta_store_ref().conn.clone(),
481        hummock_manager.clone(),
482        metadata_manager.clone(),
483        iceberg_compaction_stat_tx,
484    );
485    tracing::info!("SinkCoordinatorManager started");
486    // TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks.
487    let mut sub_tasks = vec![shutdown_handle];
488
489    let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
490
491    // TODO: introduce compactor event stream handler to handle iceberg compaction events.
492    let (iceberg_compaction_mgr, iceberg_compactor_event_rx) = IcebergCompactionManager::build(
493        env.clone(),
494        metadata_manager.clone(),
495        iceberg_compactor_manager.clone(),
496        meta_metrics.clone(),
497    );
498
499    sub_tasks.push(IcebergCompactionManager::compaction_stat_loop(
500        iceberg_compaction_mgr.clone(),
501        iceberg_compaction_stat_rx,
502    ));
503
504    sub_tasks.push(IcebergCompactionManager::gc_loop(
505        iceberg_compaction_mgr.clone(),
506        env.opts.iceberg_gc_interval_sec,
507    ));
508
509    let refresh_scheduler_interval = Duration::from_secs(env.opts.refresh_scheduler_interval_sec);
510    let (refresh_manager, refresh_handle, refresh_shutdown) = GlobalRefreshManager::start(
511        metadata_manager.clone(),
512        barrier_scheduler.clone(),
513        &env,
514        refresh_scheduler_interval,
515    )
516    .await?;
517    sub_tasks.push((refresh_handle, refresh_shutdown));
518
519    let scale_controller = Arc::new(ScaleController::new(
520        &metadata_manager,
521        source_manager.clone(),
522        env.clone(),
523    ));
524
525    let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
526        scheduled_barriers,
527        env.clone(),
528        metadata_manager.clone(),
529        hummock_manager.clone(),
530        source_manager.clone(),
531        sink_manager.clone(),
532        scale_controller.clone(),
533        barrier_scheduler.clone(),
534        refresh_manager.clone(),
535    )
536    .await;
537    tracing::info!("GlobalBarrierManager started");
538    sub_tasks.push((join_handle, shutdown_rx));
539
540    {
541        let source_manager = source_manager.clone();
542        tokio::spawn(async move {
543            source_manager.run().await.unwrap();
544        });
545    }
546
547    let stream_manager = Arc::new(
548        GlobalStreamManager::new(
549            env.clone(),
550            metadata_manager.clone(),
551            barrier_scheduler.clone(),
552            source_manager.clone(),
553            scale_controller.clone(),
554        )
555        .unwrap(),
556    );
557
558    hummock_manager
559        .may_fill_backward_state_table_info()
560        .await
561        .unwrap();
562
563    let ddl_srv = DdlServiceImpl::new(
564        env.clone(),
565        metadata_manager.clone(),
566        stream_manager.clone(),
567        source_manager.clone(),
568        barrier_manager.clone(),
569        sink_manager.clone(),
570        meta_metrics.clone(),
571        iceberg_compaction_mgr.clone(),
572        barrier_scheduler.clone(),
573    )
574    .await;
575
576    if env.opts.enable_legacy_table_migration {
577        sub_tasks.push(ddl_srv.start_migrate_table_fragments());
578    }
579
580    let user_srv = UserServiceImpl::new(metadata_manager.clone());
581
582    let scale_srv = ScaleServiceImpl::new(
583        metadata_manager.clone(),
584        stream_manager.clone(),
585        barrier_manager.clone(),
586        env.clone(),
587    );
588
589    let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
590    let stream_srv = StreamServiceImpl::new(
591        env.clone(),
592        barrier_scheduler.clone(),
593        barrier_manager.clone(),
594        stream_manager.clone(),
595        metadata_manager.clone(),
596        refresh_manager.clone(),
597    );
598    let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
599    let hummock_srv = HummockServiceImpl::new(
600        hummock_manager.clone(),
601        metadata_manager.clone(),
602        backup_manager.clone(),
603        iceberg_compaction_mgr.clone(),
604    );
605
606    let health_srv = HealthServiceImpl::new();
607    let backup_srv = BackupServiceImpl::new(backup_manager.clone());
608    let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
609    let system_params_srv = SystemParamsServiceImpl::new(
610        env.system_params_manager_impl_ref(),
611        env.opts.license_key_path.is_some(),
612    );
613    let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
614    let serving_srv =
615        ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
616    let cloud_srv = CloudServiceImpl::new();
617    let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
618    let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
619    let hosted_iceberg_catalog_srv = HostedIcebergCatalogServiceImpl::new(env.clone());
620    let monitor_srv = MonitorServiceImpl {
621        metadata_manager: metadata_manager.clone(),
622        await_tree_reg: env.await_tree_reg().clone(),
623    };
624
625    if let Some(prometheus_addr) = address_info.prometheus_addr {
626        MetricsManager::boot_metrics_service(prometheus_addr.to_string())
627    }
628
629    // sub_tasks executed concurrently. Can be shutdown via shutdown_all
630    sub_tasks.extend(hummock::start_hummock_workers(
631        hummock_manager.clone(),
632        backup_manager.clone(),
633        &env.opts,
634    ));
635    sub_tasks.push(start_worker_info_monitor(
636        metadata_manager.clone(),
637        election_client.clone(),
638        Duration::from_secs(env.opts.node_num_monitor_interval_sec),
639        meta_metrics.clone(),
640    ));
641    sub_tasks.push(start_info_monitor(
642        metadata_manager.clone(),
643        hummock_manager.clone(),
644        env.system_params_manager_impl_ref(),
645        meta_metrics.clone(),
646    ));
647    sub_tasks.push(SystemParamsController::start_params_notifier(
648        env.system_params_manager_impl_ref(),
649    ));
650    sub_tasks.push(HummockManager::hummock_timer_task(
651        hummock_manager.clone(),
652        Some(backup_manager),
653    ));
654    sub_tasks.extend(HummockManager::compaction_event_loop(
655        hummock_manager.clone(),
656        compactor_streams_change_rx,
657    ));
658
659    sub_tasks.extend(IcebergCompactionManager::iceberg_compaction_event_loop(
660        iceberg_compaction_mgr.clone(),
661        iceberg_compactor_event_rx,
662    ));
663
664    sub_tasks.push(serving::start_serving_vnode_mapping_worker(
665        env.notification_manager_ref(),
666        metadata_manager.clone(),
667        serving_vnode_mapping,
668        env.session_params_manager_impl_ref(),
669    ));
670
671    {
672        sub_tasks.push(ClusterController::start_heartbeat_checker(
673            metadata_manager.cluster_controller.clone(),
674            Duration::from_secs(1),
675        ));
676
677        if !env.opts.disable_automatic_parallelism_control {
678            sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
679        }
680    }
681
682    let _idle_checker_handle = IdleManager::start_idle_checker(
683        env.idle_manager_ref(),
684        Duration::from_secs(30),
685        shutdown.clone(),
686    );
687
688    let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
689    let notification_mgr = env.notification_manager_ref();
690    let stream_abort_handler = tokio::spawn(async move {
691        let _ = abort_recv.await;
692        notification_mgr.abort_all();
693        compactor_manager.abort_all_compactors();
694    });
695    sub_tasks.push((stream_abort_handler, abort_sender));
696
697    let telemetry_manager = TelemetryManager::new(
698        Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
699        Arc::new(MetaReportCreator::new(
700            metadata_manager.clone(),
701            object_store_media_type,
702        )),
703    );
704
705    // May start telemetry reporting
706    if env.opts.telemetry_enabled && telemetry_env_enabled() {
707        sub_tasks.push(telemetry_manager.start().await);
708    } else {
709        tracing::info!("Telemetry didn't start due to meta backend or config");
710    }
711    if !cfg!(madsim) && report_scarf_enabled() {
712        tokio::spawn(report_to_scarf());
713    } else {
714        tracing::info!("Scarf reporting is disabled");
715    };
716
717    if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
718        sub_tasks.push(pair);
719    }
720
721    tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
722    tracing::info!("Starting meta services");
723
724    let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
725        advertise_addr: address_info.advertise_addr,
726        listen_addr: address_info.listen_addr.to_string(),
727        opts: serde_json::to_string(&env.opts).unwrap(),
728    };
729    env.event_log_manager_ref().add_event_logs(vec![
730        risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
731    ]);
732
733    let server_builder = tonic::transport::Server::builder()
734        .layer(MetricsMiddlewareLayer::new(meta_metrics))
735        .layer(TracingExtractLayer::new())
736        .add_service(HeartbeatServiceServer::new(heartbeat_srv))
737        .add_service(ClusterServiceServer::new(cluster_srv))
738        .add_service(StreamManagerServiceServer::new(stream_srv))
739        .add_service(
740            HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
741        )
742        .add_service(NotificationServiceServer::new(notification_srv))
743        .add_service(MetaMemberServiceServer::new(meta_member_srv))
744        .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
745        .add_service(UserServiceServer::new(user_srv))
746        .add_service(CloudServiceServer::new(cloud_srv))
747        .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
748        .add_service(HealthServer::new(health_srv))
749        .add_service(BackupServiceServer::new(backup_srv))
750        .add_service(SystemParamsServiceServer::new(system_params_srv))
751        .add_service(SessionParamServiceServer::new(session_params_srv))
752        .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
753        .add_service(ServingServiceServer::new(serving_srv))
754        .add_service(
755            SinkCoordinationServiceServer::new(sink_coordination_srv)
756                .max_decoding_message_size(usize::MAX),
757        )
758        .add_service(
759            EventLogServiceServer::new(event_log_srv).max_decoding_message_size(usize::MAX),
760        )
761        .add_service(ClusterLimitServiceServer::new(cluster_limit_srv))
762        .add_service(HostedIcebergCatalogServiceServer::new(
763            hosted_iceberg_catalog_srv,
764        ))
765        .add_service(MonitorServiceServer::new(monitor_srv));
766
767    #[cfg(not(madsim))] // `otlp-embedded` does not use madsim-patched tonic
768    let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
769
770    let server = server_builder.monitored_serve_with_shutdown(
771        address_info.listen_addr,
772        "grpc-meta-leader-service",
773        TcpConfig {
774            tcp_nodelay: true,
775            keepalive_duration: None,
776        },
777        shutdown.clone().cancelled_owned(),
778    );
779    started::set();
780    let _server_handle = tokio::spawn(server);
781
782    // Wait for the shutdown signal.
783    shutdown.cancelled().await;
784    // TODO(shutdown): may warn user if there's any other node still running in the cluster.
785    // TODO(shutdown): do we have any other shutdown tasks?
786    Ok(())
787}
788
789fn is_correct_data_directory(data_directory: &str) -> bool {
790    let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
791    if data_directory.is_empty()
792        || !data_directory_regex.is_match(data_directory)
793        || data_directory.ends_with('/')
794        || data_directory.starts_with('/')
795        || data_directory.contains("//")
796        || data_directory.len() > 800
797    {
798        return false;
799    }
800    true
801}