risingwave_meta_node/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::monitor::{RouterExt, TcpConfig};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
33use risingwave_meta::rpc::ElectionClientRef;
34use risingwave_meta::rpc::election::dummy::DummyElectionClient;
35use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
36use risingwave_meta::stream::ScaleController;
37use risingwave_meta_service::AddressInfo;
38use risingwave_meta_service::backup_service::BackupServiceImpl;
39use risingwave_meta_service::cloud_service::CloudServiceImpl;
40use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
41use risingwave_meta_service::cluster_service::ClusterServiceImpl;
42use risingwave_meta_service::ddl_service::DdlServiceImpl;
43use risingwave_meta_service::event_log_service::EventLogServiceImpl;
44use risingwave_meta_service::health_service::HealthServiceImpl;
45use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
46use risingwave_meta_service::hummock_service::HummockServiceImpl;
47use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
48use risingwave_meta_service::notification_service::NotificationServiceImpl;
49use risingwave_meta_service::scale_service::ScaleServiceImpl;
50use risingwave_meta_service::serving_service::ServingServiceImpl;
51use risingwave_meta_service::session_config::SessionParamsServiceImpl;
52use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
53use risingwave_meta_service::stream_service::StreamServiceImpl;
54use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
55use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
56use risingwave_meta_service::user_service::UserServiceImpl;
57use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
58use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
59use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
60use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
61use risingwave_pb::health::health_server::HealthServer;
62use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
63use risingwave_pb::meta::SystemParams;
64use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
65use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
66use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
67use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
68use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
69use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
70use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
71use risingwave_pb::meta::serving_service_server::ServingServiceServer;
72use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
73use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
74use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
75use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
76use risingwave_pb::user::user_service_server::UserServiceServer;
77use risingwave_rpc_client::ComputeClientPool;
78use sea_orm::{ConnectionTrait, DbBackend};
79use thiserror_ext::AsReport;
80use tokio::sync::watch;
81
82use crate::backup_restore::BackupManager;
83use crate::barrier::BarrierScheduler;
84use crate::controller::SqlMetaStore;
85use crate::controller::system_param::SystemParamsController;
86use crate::hummock::HummockManager;
87use crate::manager::sink_coordination::SinkCoordinatorManager;
88use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
89use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
90use crate::rpc::metrics::{
91    GLOBAL_META_METRICS, start_fragment_info_monitor, start_worker_info_monitor,
92};
93use crate::serving::ServingVnodeMapping;
94use crate::stream::{GlobalStreamManager, SourceManager};
95use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
96use crate::{MetaError, MetaResult, hummock, serving};
97
98/// Used for standalone mode checking the status of the meta service.
99/// This can be easier and more accurate than checking the TCP connection.
100pub mod started {
101    use std::sync::atomic::AtomicBool;
102    use std::sync::atomic::Ordering::Relaxed;
103
104    static STARTED: AtomicBool = AtomicBool::new(false);
105
106    /// Mark the meta service as started.
107    pub(crate) fn set() {
108        STARTED.store(true, Relaxed);
109    }
110
111    /// Check if the meta service has started.
112    pub fn get() -> bool {
113        STARTED.load(Relaxed)
114    }
115}
116
117/// A wrapper around [`rpc_serve_with_store`] that dispatches different store implementations.
118///
119/// For the timing of returning, see [`rpc_serve_with_store`].
120pub async fn rpc_serve(
121    address_info: AddressInfo,
122    meta_store_backend: MetaStoreBackend,
123    max_cluster_heartbeat_interval: Duration,
124    lease_interval_secs: u64,
125    opts: MetaOpts,
126    init_system_params: SystemParams,
127    init_session_config: SessionConfig,
128    shutdown: CancellationToken,
129) -> MetaResult<()> {
130    let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
131
132    let election_client = match meta_store_backend {
133        MetaStoreBackend::Mem => {
134            // Use a dummy election client.
135            Arc::new(DummyElectionClient::new(
136                address_info.advertise_addr.clone(),
137            ))
138        }
139        MetaStoreBackend::Sql { .. } => {
140            // Init election client.
141            let id = address_info.advertise_addr.clone();
142            let conn = meta_store_impl.conn.clone();
143            let election_client: ElectionClientRef = match conn.get_database_backend() {
144                DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
145                DbBackend::Postgres => {
146                    Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
147                }
148                DbBackend::MySql => {
149                    Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
150                }
151            };
152            election_client.init().await?;
153
154            election_client
155        }
156    };
157
158    rpc_serve_with_store(
159        meta_store_impl,
160        election_client,
161        address_info,
162        max_cluster_heartbeat_interval,
163        lease_interval_secs,
164        opts,
165        init_system_params,
166        init_session_config,
167        shutdown,
168    )
169    .await
170}
171
172/// Bootstraps the follower or leader service based on the election status.
173///
174/// Returns when the `shutdown` token is triggered, or when leader status is lost, or if the leader
175/// service fails to start.
176pub async fn rpc_serve_with_store(
177    meta_store_impl: SqlMetaStore,
178    election_client: ElectionClientRef,
179    address_info: AddressInfo,
180    max_cluster_heartbeat_interval: Duration,
181    lease_interval_secs: u64,
182    opts: MetaOpts,
183    init_system_params: SystemParams,
184    init_session_config: SessionConfig,
185    shutdown: CancellationToken,
186) -> MetaResult<()> {
187    // TODO(shutdown): directly use cancellation token
188    let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
189
190    let election_handle = tokio::spawn({
191        let shutdown = shutdown.clone();
192        let election_client = election_client.clone();
193
194        async move {
195            while let Err(e) = election_client
196                .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
197                .await
198            {
199                tracing::error!(error = %e.as_report(), "election error happened");
200            }
201            // Leader lost, shutdown the service.
202            shutdown.cancel();
203        }
204    });
205
206    // Spawn and run the follower service if not the leader.
207    // Watch the leader status and switch to the leader service when elected.
208    // TODO: the branch seems to be always hit since the default value of `is_leader` is false until
209    // the election is done (unless using `DummyElectionClient`).
210    if !election_client.is_leader() {
211        // The follower service can be shutdown separately if we're going to be the leader.
212        let follower_shutdown = shutdown.child_token();
213
214        let follower_handle = tokio::spawn(start_service_as_election_follower(
215            follower_shutdown.clone(),
216            address_info.clone(),
217            election_client.clone(),
218        ));
219
220        // Watch and wait until we become the leader.
221        let mut is_leader_watcher = election_client.subscribe();
222
223        while !*is_leader_watcher.borrow_and_update() {
224            tokio::select! {
225                // External shutdown signal. Directly return without switching to leader.
226                _ = shutdown.cancelled() => return Ok(()),
227
228                res = is_leader_watcher.changed() => {
229                    if res.is_err() {
230                        tracing::error!("leader watcher recv failed");
231                    }
232                }
233            }
234        }
235
236        tracing::info!("elected as leader, shutting down follower services");
237        follower_shutdown.cancel();
238        let _ = follower_handle.await;
239    }
240
241    // Run the leader service.
242    let result = start_service_as_election_leader(
243        meta_store_impl,
244        address_info,
245        max_cluster_heartbeat_interval,
246        opts,
247        init_system_params,
248        init_session_config,
249        election_client,
250        shutdown,
251    )
252    .await;
253
254    // Leader service has stopped, shutdown the election service to gracefully resign.
255    election_shutdown_tx.send(()).ok();
256    let _ = election_handle.await;
257
258    result
259}
260
261/// Starts all services needed for the meta follower node.
262///
263/// Returns when the `shutdown` token is triggered.
264pub async fn start_service_as_election_follower(
265    shutdown: CancellationToken,
266    address_info: AddressInfo,
267    election_client: ElectionClientRef,
268) {
269    tracing::info!("starting follower services");
270
271    let meta_member_srv = MetaMemberServiceImpl::new(election_client);
272
273    let health_srv = HealthServiceImpl::new();
274
275    let server = tonic::transport::Server::builder()
276        .layer(MetricsMiddlewareLayer::new(Arc::new(
277            GLOBAL_META_METRICS.clone(),
278        )))
279        .layer(TracingExtractLayer::new())
280        .add_service(MetaMemberServiceServer::new(meta_member_srv))
281        .add_service(HealthServer::new(health_srv))
282        .monitored_serve_with_shutdown(
283            address_info.listen_addr,
284            "grpc-meta-follower-service",
285            TcpConfig {
286                tcp_nodelay: true,
287                keepalive_duration: None,
288            },
289            shutdown.clone().cancelled_owned(),
290        );
291    let server_handle = tokio::spawn(server);
292    started::set();
293
294    // Wait for the shutdown signal.
295    shutdown.cancelled().await;
296    // Wait for the server to shutdown. This is necessary because we may be transitioning from follower
297    // to leader, and conflicts on the services must be avoided.
298    let _ = server_handle.await;
299}
300
301/// Starts all services needed for the meta leader node.
302///
303/// Returns when the `shutdown` token is triggered, or if the service initialization fails.
304pub async fn start_service_as_election_leader(
305    meta_store_impl: SqlMetaStore,
306    address_info: AddressInfo,
307    max_cluster_heartbeat_interval: Duration,
308    opts: MetaOpts,
309    init_system_params: SystemParams,
310    init_session_config: SessionConfig,
311    election_client: ElectionClientRef,
312    shutdown: CancellationToken,
313) -> MetaResult<()> {
314    tracing::info!("starting leader services");
315
316    let env = MetaSrvEnv::new(
317        opts.clone(),
318        init_system_params,
319        init_session_config,
320        meta_store_impl,
321    )
322    .await?;
323    tracing::info!("MetaSrvEnv started");
324    let _ = env.may_start_watch_license_key_file()?;
325    let system_params_reader = env.system_params_reader().await;
326
327    let data_directory = system_params_reader.data_directory();
328    if !is_correct_data_directory(data_directory) {
329        return Err(MetaError::system_params(format!(
330            "The data directory {:?} is misconfigured.
331            Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
332            The string cannot start or end with '/', and consecutive '/' are not allowed.
333            The data directory cannot be empty and its length should not exceed 800 characters.",
334            data_directory
335        )));
336    }
337
338    let cluster_controller = Arc::new(
339        ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
340            .await
341            .unwrap(),
342    );
343    let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
344    let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
345
346    let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
347    serving::on_meta_start(
348        env.notification_manager_ref(),
349        &metadata_manager,
350        serving_vnode_mapping.clone(),
351    )
352    .await;
353
354    let compactor_manager = Arc::new(
355        hummock::CompactorManager::with_meta(env.clone())
356            .await
357            .unwrap(),
358    );
359    tracing::info!("CompactorManager started");
360
361    let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
362    tracing::info!("HeartbeatServiceImpl started");
363
364    let (compactor_streams_change_tx, compactor_streams_change_rx) =
365        tokio::sync::mpsc::unbounded_channel();
366
367    let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
368
369    let hummock_manager = hummock::HummockManager::new(
370        env.clone(),
371        metadata_manager.clone(),
372        meta_metrics.clone(),
373        compactor_manager.clone(),
374        compactor_streams_change_tx,
375    )
376    .await
377    .unwrap();
378    tracing::info!("HummockManager started");
379    let object_store_media_type = hummock_manager.object_store_media_type();
380
381    let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
382
383    let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
384        use std::str::FromStr;
385        prometheus_http_query::Client::from_str(x).unwrap()
386    });
387    let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
388    let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
389        metadata_manager.clone(),
390        hummock_manager.clone(),
391        env.event_log_manager_ref(),
392        prometheus_client.clone(),
393        prometheus_selector.clone(),
394    ));
395
396    let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
397        max_length: opts.cached_traces_num,
398        max_memory_usage: opts.cached_traces_memory_limit_bytes,
399    });
400    let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
401
402    #[cfg(not(madsim))]
403    let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
404        let dashboard_service = crate::dashboard::DashboardService {
405            dashboard_addr: *dashboard_addr,
406            prometheus_client,
407            prometheus_selector,
408            metadata_manager: metadata_manager.clone(),
409            compute_clients: ComputeClientPool::new(1, env.opts.compute_client_config.clone()), /* typically no need for plural clients */
410            diagnose_command,
411            trace_state,
412        };
413        let task = tokio::spawn(dashboard_service.serve());
414        Some(task)
415    } else {
416        None
417    };
418
419    let (barrier_scheduler, scheduled_barriers) =
420        BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
421    tracing::info!("BarrierScheduler started");
422
423    // Initialize services.
424    let backup_manager = BackupManager::new(
425        env.clone(),
426        hummock_manager.clone(),
427        meta_metrics.clone(),
428        system_params_reader.backup_storage_url(),
429        system_params_reader.backup_storage_directory(),
430    )
431    .await?;
432    tracing::info!("BackupManager started");
433
434    LocalSecretManager::init(
435        opts.temp_secret_file_dir,
436        env.cluster_id().to_string(),
437        META_NODE_ID,
438    );
439    tracing::info!("LocalSecretManager started");
440
441    let notification_srv = NotificationServiceImpl::new(
442        env.clone(),
443        metadata_manager.clone(),
444        hummock_manager.clone(),
445        backup_manager.clone(),
446        serving_vnode_mapping.clone(),
447    )
448    .await?;
449    tracing::info!("NotificationServiceImpl started");
450
451    let source_manager = Arc::new(
452        SourceManager::new(
453            barrier_scheduler.clone(),
454            metadata_manager.clone(),
455            meta_metrics.clone(),
456        )
457        .await
458        .unwrap(),
459    );
460    tracing::info!("SourceManager started");
461
462    let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
463        env.meta_store_ref().conn.clone(),
464        hummock_manager.clone(),
465        metadata_manager.clone(),
466    );
467    tracing::info!("SinkCoordinatorManager started");
468    // TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks.
469    let mut sub_tasks = vec![shutdown_handle];
470
471    let scale_controller = Arc::new(ScaleController::new(
472        &metadata_manager,
473        source_manager.clone(),
474        env.clone(),
475    ));
476
477    let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
478        scheduled_barriers,
479        env.clone(),
480        metadata_manager.clone(),
481        hummock_manager.clone(),
482        source_manager.clone(),
483        sink_manager.clone(),
484        scale_controller.clone(),
485    )
486    .await;
487    tracing::info!("GlobalBarrierManager started");
488    sub_tasks.push((join_handle, shutdown_rx));
489
490    {
491        let source_manager = source_manager.clone();
492        tokio::spawn(async move {
493            source_manager.run().await.unwrap();
494        });
495    }
496
497    let stream_manager = Arc::new(
498        GlobalStreamManager::new(
499            env.clone(),
500            metadata_manager.clone(),
501            barrier_scheduler.clone(),
502            source_manager.clone(),
503            scale_controller.clone(),
504        )
505        .unwrap(),
506    );
507
508    hummock_manager
509        .may_fill_backward_state_table_info()
510        .await
511        .unwrap();
512
513    let ddl_srv = DdlServiceImpl::new(
514        env.clone(),
515        metadata_manager.clone(),
516        stream_manager.clone(),
517        source_manager.clone(),
518        barrier_manager.clone(),
519        sink_manager.clone(),
520        meta_metrics.clone(),
521    )
522    .await;
523
524    let user_srv = UserServiceImpl::new(metadata_manager.clone());
525
526    let scale_srv = ScaleServiceImpl::new(
527        metadata_manager.clone(),
528        source_manager,
529        stream_manager.clone(),
530        barrier_manager.clone(),
531        scale_controller.clone(),
532    );
533
534    let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
535    let stream_srv = StreamServiceImpl::new(
536        env.clone(),
537        barrier_scheduler.clone(),
538        barrier_manager.clone(),
539        stream_manager.clone(),
540        metadata_manager.clone(),
541    );
542    let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
543    let hummock_srv = HummockServiceImpl::new(
544        hummock_manager.clone(),
545        metadata_manager.clone(),
546        backup_manager.clone(),
547    );
548
549    let health_srv = HealthServiceImpl::new();
550    let backup_srv = BackupServiceImpl::new(backup_manager.clone());
551    let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
552    let system_params_srv = SystemParamsServiceImpl::new(
553        env.system_params_manager_impl_ref(),
554        env.opts.license_key_path.is_some(),
555    );
556    let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
557    let serving_srv =
558        ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
559    let cloud_srv = CloudServiceImpl::new();
560    let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
561    let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
562
563    if let Some(prometheus_addr) = address_info.prometheus_addr {
564        MetricsManager::boot_metrics_service(prometheus_addr.to_string())
565    }
566
567    // sub_tasks executed concurrently. Can be shutdown via shutdown_all
568    sub_tasks.extend(hummock::start_hummock_workers(
569        hummock_manager.clone(),
570        backup_manager.clone(),
571        &env.opts,
572    ));
573    sub_tasks.push(start_worker_info_monitor(
574        metadata_manager.clone(),
575        election_client.clone(),
576        Duration::from_secs(env.opts.node_num_monitor_interval_sec),
577        meta_metrics.clone(),
578    ));
579    sub_tasks.push(start_fragment_info_monitor(
580        metadata_manager.clone(),
581        hummock_manager.clone(),
582        meta_metrics.clone(),
583    ));
584    sub_tasks.push(SystemParamsController::start_params_notifier(
585        env.system_params_manager_impl_ref(),
586    ));
587    sub_tasks.push(HummockManager::hummock_timer_task(
588        hummock_manager.clone(),
589        Some(backup_manager),
590    ));
591    sub_tasks.extend(HummockManager::compaction_event_loop(
592        hummock_manager,
593        compactor_streams_change_rx,
594    ));
595    sub_tasks.push(
596        serving::start_serving_vnode_mapping_worker(
597            env.notification_manager_ref(),
598            metadata_manager.clone(),
599            serving_vnode_mapping,
600        )
601        .await,
602    );
603
604    {
605        sub_tasks.push(ClusterController::start_heartbeat_checker(
606            metadata_manager.cluster_controller.clone(),
607            Duration::from_secs(1),
608        ));
609
610        if !env.opts.disable_automatic_parallelism_control {
611            sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
612        }
613    }
614
615    let _idle_checker_handle = IdleManager::start_idle_checker(
616        env.idle_manager_ref(),
617        Duration::from_secs(30),
618        shutdown.clone(),
619    );
620
621    let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
622    let notification_mgr = env.notification_manager_ref();
623    let stream_abort_handler = tokio::spawn(async move {
624        let _ = abort_recv.await;
625        notification_mgr.abort_all().await;
626        compactor_manager.abort_all_compactors();
627    });
628    sub_tasks.push((stream_abort_handler, abort_sender));
629
630    let telemetry_manager = TelemetryManager::new(
631        Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
632        Arc::new(MetaReportCreator::new(
633            metadata_manager.clone(),
634            object_store_media_type,
635        )),
636    );
637
638    // May start telemetry reporting
639    if env.opts.telemetry_enabled && telemetry_env_enabled() {
640        sub_tasks.push(telemetry_manager.start().await);
641    } else {
642        tracing::info!("Telemetry didn't start due to meta backend or config");
643    }
644    if !cfg!(madsim) && report_scarf_enabled() {
645        tokio::spawn(report_to_scarf());
646    } else {
647        tracing::info!("Scarf reporting is disabled");
648    };
649
650    if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
651        sub_tasks.push(pair);
652    }
653
654    tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
655    tracing::info!("Starting meta services");
656
657    let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
658        advertise_addr: address_info.advertise_addr,
659        listen_addr: address_info.listen_addr.to_string(),
660        opts: serde_json::to_string(&env.opts).unwrap(),
661    };
662    env.event_log_manager_ref().add_event_logs(vec![
663        risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
664    ]);
665
666    let server_builder = tonic::transport::Server::builder()
667        .layer(MetricsMiddlewareLayer::new(meta_metrics))
668        .layer(TracingExtractLayer::new())
669        .add_service(HeartbeatServiceServer::new(heartbeat_srv))
670        .add_service(ClusterServiceServer::new(cluster_srv))
671        .add_service(StreamManagerServiceServer::new(stream_srv))
672        .add_service(
673            HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
674        )
675        .add_service(NotificationServiceServer::new(notification_srv))
676        .add_service(MetaMemberServiceServer::new(meta_member_srv))
677        .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
678        .add_service(UserServiceServer::new(user_srv))
679        .add_service(CloudServiceServer::new(cloud_srv))
680        .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
681        .add_service(HealthServer::new(health_srv))
682        .add_service(BackupServiceServer::new(backup_srv))
683        .add_service(SystemParamsServiceServer::new(system_params_srv))
684        .add_service(SessionParamServiceServer::new(session_params_srv))
685        .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
686        .add_service(ServingServiceServer::new(serving_srv))
687        .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
688        .add_service(EventLogServiceServer::new(event_log_srv))
689        .add_service(ClusterLimitServiceServer::new(cluster_limit_srv));
690    #[cfg(not(madsim))] // `otlp-embedded` does not use madsim-patched tonic
691    let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
692
693    let server = server_builder.monitored_serve_with_shutdown(
694        address_info.listen_addr,
695        "grpc-meta-leader-service",
696        TcpConfig {
697            tcp_nodelay: true,
698            keepalive_duration: None,
699        },
700        shutdown.clone().cancelled_owned(),
701    );
702    started::set();
703    let _server_handle = tokio::spawn(server);
704
705    // Wait for the shutdown signal.
706    shutdown.cancelled().await;
707    // TODO(shutdown): may warn user if there's any other node still running in the cluster.
708    // TODO(shutdown): do we have any other shutdown tasks?
709    Ok(())
710}
711
712fn is_correct_data_directory(data_directory: &str) -> bool {
713    let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
714    if data_directory.is_empty()
715        || !data_directory_regex.is_match(data_directory)
716        || data_directory.ends_with('/')
717        || data_directory.starts_with('/')
718        || data_directory.contains("//")
719        || data_directory.len() > 800
720    {
721        return false;
722    }
723    true
724}