1use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::monitor::{RouterExt, TcpConfig};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::hummock::IcebergCompactorManager;
33use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManager;
34use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
35use risingwave_meta::rpc::ElectionClientRef;
36use risingwave_meta::rpc::election::dummy::DummyElectionClient;
37use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
38use risingwave_meta::stream::{GlobalRefreshManager, ScaleController};
39use risingwave_meta_service::AddressInfo;
40use risingwave_meta_service::backup_service::BackupServiceImpl;
41use risingwave_meta_service::cloud_service::CloudServiceImpl;
42use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
43use risingwave_meta_service::cluster_service::ClusterServiceImpl;
44use risingwave_meta_service::ddl_service::DdlServiceImpl;
45use risingwave_meta_service::event_log_service::EventLogServiceImpl;
46use risingwave_meta_service::health_service::HealthServiceImpl;
47use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
48use risingwave_meta_service::hosted_iceberg_catalog_service::HostedIcebergCatalogServiceImpl;
49use risingwave_meta_service::hummock_service::HummockServiceImpl;
50use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
51use risingwave_meta_service::monitor_service::MonitorServiceImpl;
52use risingwave_meta_service::notification_service::NotificationServiceImpl;
53use risingwave_meta_service::scale_service::ScaleServiceImpl;
54use risingwave_meta_service::serving_service::ServingServiceImpl;
55use risingwave_meta_service::session_config::SessionParamsServiceImpl;
56use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
57use risingwave_meta_service::stream_service::StreamServiceImpl;
58use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
59use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
60use risingwave_meta_service::user_service::UserServiceImpl;
61use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
62use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
63use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
64use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
65use risingwave_pb::health::health_server::HealthServer;
66use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
67use risingwave_pb::meta::SystemParams;
68use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
69use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
70use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
71use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
72use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogServiceServer;
73use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
74use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
75use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
76use risingwave_pb::meta::serving_service_server::ServingServiceServer;
77use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
78use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
79use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
80use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::user_service_server::UserServiceServer;
83use risingwave_rpc_client::ComputeClientPool;
84use sea_orm::{ConnectionTrait, DbBackend};
85use thiserror_ext::AsReport;
86use tokio::sync::watch;
87
88use crate::backup_restore::BackupManager;
89use crate::barrier::BarrierScheduler;
90use crate::controller::SqlMetaStore;
91use crate::controller::system_param::SystemParamsController;
92use crate::hummock::HummockManager;
93use crate::manager::sink_coordination::SinkCoordinatorManager;
94use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
95use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
96use crate::rpc::metrics::{GLOBAL_META_METRICS, start_info_monitor, start_worker_info_monitor};
97use crate::serving::ServingVnodeMapping;
98use crate::stream::{GlobalStreamManager, SourceManager};
99use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
100use crate::{MetaError, MetaResult, hummock, serving};
101
102pub mod started {
105 use std::sync::atomic::AtomicBool;
106 use std::sync::atomic::Ordering::Relaxed;
107
108 static STARTED: AtomicBool = AtomicBool::new(false);
109
110 pub(crate) fn set() {
112 STARTED.store(true, Relaxed);
113 }
114
115 pub fn get() -> bool {
117 STARTED.load(Relaxed)
118 }
119}
120
121pub async fn rpc_serve(
125 address_info: AddressInfo,
126 meta_store_backend: MetaStoreBackend,
127 max_cluster_heartbeat_interval: Duration,
128 lease_interval_secs: u64,
129 opts: MetaOpts,
130 init_system_params: SystemParams,
131 init_session_config: SessionConfig,
132 shutdown: CancellationToken,
133) -> MetaResult<()> {
134 let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
135
136 let election_client = match meta_store_backend {
137 MetaStoreBackend::Mem => {
138 Arc::new(DummyElectionClient::new(
140 address_info.advertise_addr.clone(),
141 ))
142 }
143 MetaStoreBackend::Sql { .. } => {
144 let id = address_info.advertise_addr.clone();
146 let conn = meta_store_impl.conn.clone();
147 let election_client: ElectionClientRef = match conn.get_database_backend() {
148 DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
149 DbBackend::Postgres => {
150 Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
151 }
152 DbBackend::MySql => {
153 Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
154 }
155 };
156 election_client.init().await?;
157
158 election_client
159 }
160 };
161
162 rpc_serve_with_store(
163 meta_store_impl,
164 election_client,
165 address_info,
166 max_cluster_heartbeat_interval,
167 lease_interval_secs,
168 opts,
169 init_system_params,
170 init_session_config,
171 shutdown,
172 )
173 .await
174}
175
176pub async fn rpc_serve_with_store(
181 meta_store_impl: SqlMetaStore,
182 election_client: ElectionClientRef,
183 address_info: AddressInfo,
184 max_cluster_heartbeat_interval: Duration,
185 lease_interval_secs: u64,
186 opts: MetaOpts,
187 init_system_params: SystemParams,
188 init_session_config: SessionConfig,
189 shutdown: CancellationToken,
190) -> MetaResult<()> {
191 let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
193
194 let election_handle = tokio::spawn({
195 let shutdown = shutdown.clone();
196 let election_client = election_client.clone();
197
198 async move {
199 while let Err(e) = election_client
200 .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
201 .await
202 {
203 tracing::error!(error = %e.as_report(), "election error happened");
204 }
205 shutdown.cancel();
207 }
208 });
209
210 if !election_client.is_leader() {
215 let follower_shutdown = shutdown.child_token();
217
218 let follower_handle = tokio::spawn(start_service_as_election_follower(
219 follower_shutdown.clone(),
220 address_info.clone(),
221 election_client.clone(),
222 ));
223
224 let mut is_leader_watcher = election_client.subscribe();
226
227 while !*is_leader_watcher.borrow_and_update() {
228 tokio::select! {
229 _ = shutdown.cancelled() => return Ok(()),
231
232 res = is_leader_watcher.changed() => {
233 if res.is_err() {
234 tracing::error!("leader watcher recv failed");
235 }
236 }
237 }
238 }
239
240 tracing::info!("elected as leader, shutting down follower services");
241 follower_shutdown.cancel();
242 let _ = follower_handle.await;
243 }
244
245 let result = start_service_as_election_leader(
247 meta_store_impl,
248 address_info,
249 max_cluster_heartbeat_interval,
250 opts,
251 init_system_params,
252 init_session_config,
253 election_client,
254 shutdown,
255 )
256 .await;
257
258 election_shutdown_tx.send(()).ok();
260 let _ = election_handle.await;
261
262 result
263}
264
265pub async fn start_service_as_election_follower(
269 shutdown: CancellationToken,
270 address_info: AddressInfo,
271 election_client: ElectionClientRef,
272) {
273 tracing::info!("starting follower services");
274
275 let meta_member_srv = MetaMemberServiceImpl::new(election_client);
276
277 let health_srv = HealthServiceImpl::new();
278
279 let server = tonic::transport::Server::builder()
280 .layer(MetricsMiddlewareLayer::new(Arc::new(
281 GLOBAL_META_METRICS.clone(),
282 )))
283 .layer(TracingExtractLayer::new())
284 .add_service(MetaMemberServiceServer::new(meta_member_srv))
285 .add_service(HealthServer::new(health_srv))
286 .monitored_serve_with_shutdown(
287 address_info.listen_addr,
288 "grpc-meta-follower-service",
289 TcpConfig {
290 tcp_nodelay: true,
291 keepalive_duration: None,
292 },
293 shutdown.clone().cancelled_owned(),
294 );
295 let server_handle = tokio::spawn(server);
296 started::set();
297
298 shutdown.cancelled().await;
300 let _ = server_handle.await;
303}
304
305pub async fn start_service_as_election_leader(
309 meta_store_impl: SqlMetaStore,
310 address_info: AddressInfo,
311 max_cluster_heartbeat_interval: Duration,
312 opts: MetaOpts,
313 init_system_params: SystemParams,
314 init_session_config: SessionConfig,
315 election_client: ElectionClientRef,
316 shutdown: CancellationToken,
317) -> MetaResult<()> {
318 tracing::info!("starting leader services");
319
320 let env = MetaSrvEnv::new(
321 opts.clone(),
322 init_system_params,
323 init_session_config,
324 meta_store_impl,
325 )
326 .await?;
327 tracing::info!("MetaSrvEnv started");
328 let _ = env.may_start_watch_license_key_file()?;
329 let system_params_reader = env.system_params_reader().await;
330
331 let data_directory = system_params_reader.data_directory();
332 if !is_correct_data_directory(data_directory) {
333 return Err(MetaError::system_params(format!(
334 "The data directory {:?} is misconfigured.
335 Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
336 The string cannot start or end with '/', and consecutive '/' are not allowed.
337 The data directory cannot be empty and its length should not exceed 800 characters.",
338 data_directory
339 )));
340 }
341
342 let cluster_controller = Arc::new(
343 ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
344 .await
345 .unwrap(),
346 );
347 let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
348 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
349
350 let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
351 let max_serving_parallelism = env
352 .session_params_manager_impl_ref()
353 .get_params()
354 .await
355 .batch_parallelism()
356 .map(|p| p.get());
357 serving::on_meta_start(
358 env.notification_manager_ref(),
359 &metadata_manager,
360 serving_vnode_mapping.clone(),
361 max_serving_parallelism,
362 )
363 .await;
364
365 let compactor_manager = Arc::new(
366 hummock::CompactorManager::with_meta(env.clone())
367 .await
368 .unwrap(),
369 );
370 tracing::info!("CompactorManager started");
371
372 let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
373 tracing::info!("HeartbeatServiceImpl started");
374
375 let (compactor_streams_change_tx, compactor_streams_change_rx) =
376 tokio::sync::mpsc::unbounded_channel();
377
378 let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
379
380 let hummock_manager = hummock::HummockManager::new(
381 env.clone(),
382 metadata_manager.clone(),
383 meta_metrics.clone(),
384 compactor_manager.clone(),
385 compactor_streams_change_tx,
386 )
387 .await
388 .unwrap();
389 tracing::info!("HummockManager started");
390 let object_store_media_type = hummock_manager.object_store_media_type();
391
392 let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
393
394 let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
395 use std::str::FromStr;
396 prometheus_http_query::Client::from_str(x).unwrap()
397 });
398 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
399 let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
400 metadata_manager.clone(),
401 env.await_tree_reg().clone(),
402 hummock_manager.clone(),
403 env.event_log_manager_ref(),
404 prometheus_client.clone(),
405 prometheus_selector.clone(),
406 opts.redact_sql_option_keywords.clone(),
407 ));
408
409 let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
410 max_length: opts.cached_traces_num,
411 max_memory_usage: opts.cached_traces_memory_limit_bytes,
412 });
413 let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
414
415 #[cfg(not(madsim))]
416 let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
417 let dashboard_service = crate::dashboard::DashboardService {
418 await_tree_reg: env.await_tree_reg().clone(),
419 dashboard_addr: *dashboard_addr,
420 prometheus_client,
421 prometheus_selector,
422 metadata_manager: metadata_manager.clone(),
423 hummock_manager: hummock_manager.clone(),
424 compute_clients: ComputeClientPool::new(1, env.opts.compute_client_config.clone()), diagnose_command,
426 trace_state,
427 };
428 let task = tokio::spawn(dashboard_service.serve());
429 Some(task)
430 } else {
431 None
432 };
433
434 let (barrier_scheduler, scheduled_barriers) =
435 BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
436 tracing::info!("BarrierScheduler started");
437
438 let backup_manager = BackupManager::new(
440 env.clone(),
441 hummock_manager.clone(),
442 meta_metrics.clone(),
443 system_params_reader.backup_storage_url(),
444 system_params_reader.backup_storage_directory(),
445 )
446 .await?;
447 tracing::info!("BackupManager started");
448
449 LocalSecretManager::init(
450 opts.temp_secret_file_dir,
451 env.cluster_id().to_string(),
452 META_NODE_ID,
453 );
454 tracing::info!("LocalSecretManager started");
455
456 let notification_srv = NotificationServiceImpl::new(
457 env.clone(),
458 metadata_manager.clone(),
459 hummock_manager.clone(),
460 backup_manager.clone(),
461 serving_vnode_mapping.clone(),
462 )
463 .await?;
464 tracing::info!("NotificationServiceImpl started");
465
466 let source_manager = Arc::new(
467 SourceManager::new(
468 barrier_scheduler.clone(),
469 metadata_manager.clone(),
470 meta_metrics.clone(),
471 env.clone(),
472 )
473 .await?,
474 );
475 tracing::info!("SourceManager started");
476
477 let (iceberg_compaction_stat_tx, iceberg_compaction_stat_rx) =
478 tokio::sync::mpsc::unbounded_channel();
479 let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
480 env.meta_store_ref().conn.clone(),
481 hummock_manager.clone(),
482 metadata_manager.clone(),
483 iceberg_compaction_stat_tx,
484 );
485 tracing::info!("SinkCoordinatorManager started");
486 let mut sub_tasks = vec![shutdown_handle];
488
489 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
490
491 let (iceberg_compaction_mgr, iceberg_compactor_event_rx) = IcebergCompactionManager::build(
493 env.clone(),
494 metadata_manager.clone(),
495 iceberg_compactor_manager.clone(),
496 meta_metrics.clone(),
497 );
498
499 sub_tasks.push(IcebergCompactionManager::compaction_stat_loop(
500 iceberg_compaction_mgr.clone(),
501 iceberg_compaction_stat_rx,
502 ));
503
504 sub_tasks.push(IcebergCompactionManager::gc_loop(
505 iceberg_compaction_mgr.clone(),
506 env.opts.iceberg_gc_interval_sec,
507 ));
508
509 let refresh_scheduler_interval = Duration::from_secs(env.opts.refresh_scheduler_interval_sec);
510 let (refresh_manager, refresh_handle, refresh_shutdown) = GlobalRefreshManager::start(
511 metadata_manager.clone(),
512 barrier_scheduler.clone(),
513 &env,
514 refresh_scheduler_interval,
515 )
516 .await?;
517 sub_tasks.push((refresh_handle, refresh_shutdown));
518
519 let scale_controller = Arc::new(ScaleController::new(
520 &metadata_manager,
521 source_manager.clone(),
522 env.clone(),
523 ));
524
525 let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
526 scheduled_barriers,
527 env.clone(),
528 metadata_manager.clone(),
529 hummock_manager.clone(),
530 source_manager.clone(),
531 sink_manager.clone(),
532 scale_controller.clone(),
533 barrier_scheduler.clone(),
534 refresh_manager.clone(),
535 )
536 .await;
537 tracing::info!("GlobalBarrierManager started");
538 sub_tasks.push((join_handle, shutdown_rx));
539
540 {
541 let source_manager = source_manager.clone();
542 tokio::spawn(async move {
543 source_manager.run().await.unwrap();
544 });
545 }
546
547 let stream_manager = Arc::new(
548 GlobalStreamManager::new(
549 env.clone(),
550 metadata_manager.clone(),
551 barrier_scheduler.clone(),
552 source_manager.clone(),
553 scale_controller.clone(),
554 )
555 .unwrap(),
556 );
557
558 hummock_manager
559 .may_fill_backward_state_table_info()
560 .await
561 .unwrap();
562
563 let ddl_srv = DdlServiceImpl::new(
564 env.clone(),
565 metadata_manager.clone(),
566 stream_manager.clone(),
567 source_manager.clone(),
568 barrier_manager.clone(),
569 sink_manager.clone(),
570 meta_metrics.clone(),
571 iceberg_compaction_mgr.clone(),
572 barrier_scheduler.clone(),
573 )
574 .await;
575
576 if env.opts.enable_legacy_table_migration {
577 sub_tasks.push(ddl_srv.start_migrate_table_fragments());
578 }
579
580 let user_srv = UserServiceImpl::new(metadata_manager.clone());
581
582 let scale_srv = ScaleServiceImpl::new(
583 metadata_manager.clone(),
584 stream_manager.clone(),
585 barrier_manager.clone(),
586 env.clone(),
587 );
588
589 let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
590 let stream_srv = StreamServiceImpl::new(
591 env.clone(),
592 barrier_scheduler.clone(),
593 barrier_manager.clone(),
594 stream_manager.clone(),
595 metadata_manager.clone(),
596 refresh_manager.clone(),
597 );
598 let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
599 let hummock_srv = HummockServiceImpl::new(
600 hummock_manager.clone(),
601 metadata_manager.clone(),
602 backup_manager.clone(),
603 iceberg_compaction_mgr.clone(),
604 );
605
606 let health_srv = HealthServiceImpl::new();
607 let backup_srv = BackupServiceImpl::new(backup_manager.clone());
608 let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
609 let system_params_srv = SystemParamsServiceImpl::new(
610 env.system_params_manager_impl_ref(),
611 env.opts.license_key_path.is_some(),
612 );
613 let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
614 let serving_srv =
615 ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
616 let cloud_srv = CloudServiceImpl::new();
617 let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
618 let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
619 let hosted_iceberg_catalog_srv = HostedIcebergCatalogServiceImpl::new(env.clone());
620 let monitor_srv = MonitorServiceImpl {
621 metadata_manager: metadata_manager.clone(),
622 await_tree_reg: env.await_tree_reg().clone(),
623 };
624
625 if let Some(prometheus_addr) = address_info.prometheus_addr {
626 MetricsManager::boot_metrics_service(prometheus_addr.to_string())
627 }
628
629 sub_tasks.extend(hummock::start_hummock_workers(
631 hummock_manager.clone(),
632 backup_manager.clone(),
633 &env.opts,
634 ));
635 sub_tasks.push(start_worker_info_monitor(
636 metadata_manager.clone(),
637 election_client.clone(),
638 Duration::from_secs(env.opts.node_num_monitor_interval_sec),
639 meta_metrics.clone(),
640 ));
641 sub_tasks.push(start_info_monitor(
642 metadata_manager.clone(),
643 hummock_manager.clone(),
644 env.system_params_manager_impl_ref(),
645 meta_metrics.clone(),
646 ));
647 sub_tasks.push(SystemParamsController::start_params_notifier(
648 env.system_params_manager_impl_ref(),
649 ));
650 sub_tasks.push(HummockManager::hummock_timer_task(
651 hummock_manager.clone(),
652 Some(backup_manager),
653 ));
654 sub_tasks.extend(HummockManager::compaction_event_loop(
655 hummock_manager.clone(),
656 compactor_streams_change_rx,
657 ));
658
659 sub_tasks.extend(IcebergCompactionManager::iceberg_compaction_event_loop(
660 iceberg_compaction_mgr.clone(),
661 iceberg_compactor_event_rx,
662 ));
663
664 sub_tasks.push(serving::start_serving_vnode_mapping_worker(
665 env.notification_manager_ref(),
666 metadata_manager.clone(),
667 serving_vnode_mapping,
668 env.session_params_manager_impl_ref(),
669 ));
670
671 {
672 sub_tasks.push(ClusterController::start_heartbeat_checker(
673 metadata_manager.cluster_controller.clone(),
674 Duration::from_secs(1),
675 ));
676
677 if !env.opts.disable_automatic_parallelism_control {
678 sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
679 }
680 }
681
682 let _idle_checker_handle = IdleManager::start_idle_checker(
683 env.idle_manager_ref(),
684 Duration::from_secs(30),
685 shutdown.clone(),
686 );
687
688 let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
689 let notification_mgr = env.notification_manager_ref();
690 let stream_abort_handler = tokio::spawn(async move {
691 let _ = abort_recv.await;
692 notification_mgr.abort_all();
693 compactor_manager.abort_all_compactors();
694 });
695 sub_tasks.push((stream_abort_handler, abort_sender));
696
697 let telemetry_manager = TelemetryManager::new(
698 Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
699 Arc::new(MetaReportCreator::new(
700 metadata_manager.clone(),
701 object_store_media_type,
702 )),
703 );
704
705 if env.opts.telemetry_enabled && telemetry_env_enabled() {
707 sub_tasks.push(telemetry_manager.start().await);
708 } else {
709 tracing::info!("Telemetry didn't start due to meta backend or config");
710 }
711 if !cfg!(madsim) && report_scarf_enabled() {
712 tokio::spawn(report_to_scarf());
713 } else {
714 tracing::info!("Scarf reporting is disabled");
715 };
716
717 if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
718 sub_tasks.push(pair);
719 }
720
721 tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
722 tracing::info!("Starting meta services");
723
724 let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
725 advertise_addr: address_info.advertise_addr,
726 listen_addr: address_info.listen_addr.to_string(),
727 opts: serde_json::to_string(&env.opts).unwrap(),
728 };
729 env.event_log_manager_ref().add_event_logs(vec![
730 risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
731 ]);
732
733 let server_builder = tonic::transport::Server::builder()
734 .layer(MetricsMiddlewareLayer::new(meta_metrics))
735 .layer(TracingExtractLayer::new())
736 .add_service(HeartbeatServiceServer::new(heartbeat_srv))
737 .add_service(ClusterServiceServer::new(cluster_srv))
738 .add_service(StreamManagerServiceServer::new(stream_srv))
739 .add_service(
740 HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
741 )
742 .add_service(NotificationServiceServer::new(notification_srv))
743 .add_service(MetaMemberServiceServer::new(meta_member_srv))
744 .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
745 .add_service(UserServiceServer::new(user_srv))
746 .add_service(CloudServiceServer::new(cloud_srv))
747 .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
748 .add_service(HealthServer::new(health_srv))
749 .add_service(BackupServiceServer::new(backup_srv))
750 .add_service(SystemParamsServiceServer::new(system_params_srv))
751 .add_service(SessionParamServiceServer::new(session_params_srv))
752 .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
753 .add_service(ServingServiceServer::new(serving_srv))
754 .add_service(
755 SinkCoordinationServiceServer::new(sink_coordination_srv)
756 .max_decoding_message_size(usize::MAX),
757 )
758 .add_service(
759 EventLogServiceServer::new(event_log_srv).max_decoding_message_size(usize::MAX),
760 )
761 .add_service(ClusterLimitServiceServer::new(cluster_limit_srv))
762 .add_service(HostedIcebergCatalogServiceServer::new(
763 hosted_iceberg_catalog_srv,
764 ))
765 .add_service(MonitorServiceServer::new(monitor_srv));
766
767 #[cfg(not(madsim))] let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
769
770 let server = server_builder.monitored_serve_with_shutdown(
771 address_info.listen_addr,
772 "grpc-meta-leader-service",
773 TcpConfig {
774 tcp_nodelay: true,
775 keepalive_duration: None,
776 },
777 shutdown.clone().cancelled_owned(),
778 );
779 started::set();
780 let _server_handle = tokio::spawn(server);
781
782 shutdown.cancelled().await;
784 Ok(())
787}
788
789fn is_correct_data_directory(data_directory: &str) -> bool {
790 let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
791 if data_directory.is_empty()
792 || !data_directory_regex.is_match(data_directory)
793 || data_directory.ends_with('/')
794 || data_directory.starts_with('/')
795 || data_directory.contains("//")
796 || data_directory.len() > 800
797 {
798 return false;
799 }
800 true
801}