1use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::monitor::{RouterExt, TcpConfig};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::hummock::IcebergCompactorManager;
33use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManager;
34use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
35use risingwave_meta::rpc::ElectionClientRef;
36use risingwave_meta::rpc::election::dummy::DummyElectionClient;
37use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
38use risingwave_meta::stream::ScaleController;
39use risingwave_meta_service::AddressInfo;
40use risingwave_meta_service::backup_service::BackupServiceImpl;
41use risingwave_meta_service::cloud_service::CloudServiceImpl;
42use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
43use risingwave_meta_service::cluster_service::ClusterServiceImpl;
44use risingwave_meta_service::ddl_service::DdlServiceImpl;
45use risingwave_meta_service::event_log_service::EventLogServiceImpl;
46use risingwave_meta_service::health_service::HealthServiceImpl;
47use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
48use risingwave_meta_service::hosted_iceberg_catalog_service::HostedIcebergCatalogServiceImpl;
49use risingwave_meta_service::hummock_service::HummockServiceImpl;
50use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
51use risingwave_meta_service::monitor_service::MonitorServiceImpl;
52use risingwave_meta_service::notification_service::NotificationServiceImpl;
53use risingwave_meta_service::scale_service::ScaleServiceImpl;
54use risingwave_meta_service::serving_service::ServingServiceImpl;
55use risingwave_meta_service::session_config::SessionParamsServiceImpl;
56use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
57use risingwave_meta_service::stream_service::StreamServiceImpl;
58use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
59use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
60use risingwave_meta_service::user_service::UserServiceImpl;
61use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
62use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
63use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
64use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
65use risingwave_pb::health::health_server::HealthServer;
66use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
67use risingwave_pb::meta::SystemParams;
68use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
69use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
70use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
71use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
72use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogServiceServer;
73use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
74use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
75use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
76use risingwave_pb::meta::serving_service_server::ServingServiceServer;
77use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
78use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
79use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
80use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::user_service_server::UserServiceServer;
83use risingwave_rpc_client::ComputeClientPool;
84use sea_orm::{ConnectionTrait, DbBackend};
85use thiserror_ext::AsReport;
86use tokio::sync::watch;
87
88use crate::backup_restore::BackupManager;
89use crate::barrier::BarrierScheduler;
90use crate::controller::SqlMetaStore;
91use crate::controller::system_param::SystemParamsController;
92use crate::hummock::HummockManager;
93use crate::manager::sink_coordination::SinkCoordinatorManager;
94use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
95use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
96use crate::rpc::metrics::{
97 GLOBAL_META_METRICS, start_fragment_info_monitor, start_worker_info_monitor,
98};
99use crate::serving::ServingVnodeMapping;
100use crate::stream::{GlobalStreamManager, SourceManager};
101use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
102use crate::{MetaError, MetaResult, hummock, serving};
103
104pub mod started {
107 use std::sync::atomic::AtomicBool;
108 use std::sync::atomic::Ordering::Relaxed;
109
110 static STARTED: AtomicBool = AtomicBool::new(false);
111
112 pub(crate) fn set() {
114 STARTED.store(true, Relaxed);
115 }
116
117 pub fn get() -> bool {
119 STARTED.load(Relaxed)
120 }
121}
122
123pub async fn rpc_serve(
127 address_info: AddressInfo,
128 meta_store_backend: MetaStoreBackend,
129 max_cluster_heartbeat_interval: Duration,
130 lease_interval_secs: u64,
131 opts: MetaOpts,
132 init_system_params: SystemParams,
133 init_session_config: SessionConfig,
134 shutdown: CancellationToken,
135) -> MetaResult<()> {
136 let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
137
138 let election_client = match meta_store_backend {
139 MetaStoreBackend::Mem => {
140 Arc::new(DummyElectionClient::new(
142 address_info.advertise_addr.clone(),
143 ))
144 }
145 MetaStoreBackend::Sql { .. } => {
146 let id = address_info.advertise_addr.clone();
148 let conn = meta_store_impl.conn.clone();
149 let election_client: ElectionClientRef = match conn.get_database_backend() {
150 DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
151 DbBackend::Postgres => {
152 Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
153 }
154 DbBackend::MySql => {
155 Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
156 }
157 };
158 election_client.init().await?;
159
160 election_client
161 }
162 };
163
164 rpc_serve_with_store(
165 meta_store_impl,
166 election_client,
167 address_info,
168 max_cluster_heartbeat_interval,
169 lease_interval_secs,
170 opts,
171 init_system_params,
172 init_session_config,
173 shutdown,
174 )
175 .await
176}
177
178pub async fn rpc_serve_with_store(
183 meta_store_impl: SqlMetaStore,
184 election_client: ElectionClientRef,
185 address_info: AddressInfo,
186 max_cluster_heartbeat_interval: Duration,
187 lease_interval_secs: u64,
188 opts: MetaOpts,
189 init_system_params: SystemParams,
190 init_session_config: SessionConfig,
191 shutdown: CancellationToken,
192) -> MetaResult<()> {
193 let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
195
196 let election_handle = tokio::spawn({
197 let shutdown = shutdown.clone();
198 let election_client = election_client.clone();
199
200 async move {
201 while let Err(e) = election_client
202 .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
203 .await
204 {
205 tracing::error!(error = %e.as_report(), "election error happened");
206 }
207 shutdown.cancel();
209 }
210 });
211
212 if !election_client.is_leader() {
217 let follower_shutdown = shutdown.child_token();
219
220 let follower_handle = tokio::spawn(start_service_as_election_follower(
221 follower_shutdown.clone(),
222 address_info.clone(),
223 election_client.clone(),
224 ));
225
226 let mut is_leader_watcher = election_client.subscribe();
228
229 while !*is_leader_watcher.borrow_and_update() {
230 tokio::select! {
231 _ = shutdown.cancelled() => return Ok(()),
233
234 res = is_leader_watcher.changed() => {
235 if res.is_err() {
236 tracing::error!("leader watcher recv failed");
237 }
238 }
239 }
240 }
241
242 tracing::info!("elected as leader, shutting down follower services");
243 follower_shutdown.cancel();
244 let _ = follower_handle.await;
245 }
246
247 let result = start_service_as_election_leader(
249 meta_store_impl,
250 address_info,
251 max_cluster_heartbeat_interval,
252 opts,
253 init_system_params,
254 init_session_config,
255 election_client,
256 shutdown,
257 )
258 .await;
259
260 election_shutdown_tx.send(()).ok();
262 let _ = election_handle.await;
263
264 result
265}
266
267pub async fn start_service_as_election_follower(
271 shutdown: CancellationToken,
272 address_info: AddressInfo,
273 election_client: ElectionClientRef,
274) {
275 tracing::info!("starting follower services");
276
277 let meta_member_srv = MetaMemberServiceImpl::new(election_client);
278
279 let health_srv = HealthServiceImpl::new();
280
281 let server = tonic::transport::Server::builder()
282 .layer(MetricsMiddlewareLayer::new(Arc::new(
283 GLOBAL_META_METRICS.clone(),
284 )))
285 .layer(TracingExtractLayer::new())
286 .add_service(MetaMemberServiceServer::new(meta_member_srv))
287 .add_service(HealthServer::new(health_srv))
288 .monitored_serve_with_shutdown(
289 address_info.listen_addr,
290 "grpc-meta-follower-service",
291 TcpConfig {
292 tcp_nodelay: true,
293 keepalive_duration: None,
294 },
295 shutdown.clone().cancelled_owned(),
296 );
297 let server_handle = tokio::spawn(server);
298 started::set();
299
300 shutdown.cancelled().await;
302 let _ = server_handle.await;
305}
306
307pub async fn start_service_as_election_leader(
311 meta_store_impl: SqlMetaStore,
312 address_info: AddressInfo,
313 max_cluster_heartbeat_interval: Duration,
314 opts: MetaOpts,
315 init_system_params: SystemParams,
316 init_session_config: SessionConfig,
317 election_client: ElectionClientRef,
318 shutdown: CancellationToken,
319) -> MetaResult<()> {
320 tracing::info!("starting leader services");
321
322 let env = MetaSrvEnv::new(
323 opts.clone(),
324 init_system_params,
325 init_session_config,
326 meta_store_impl,
327 )
328 .await?;
329 tracing::info!("MetaSrvEnv started");
330 let _ = env.may_start_watch_license_key_file()?;
331 let system_params_reader = env.system_params_reader().await;
332
333 let data_directory = system_params_reader.data_directory();
334 if !is_correct_data_directory(data_directory) {
335 return Err(MetaError::system_params(format!(
336 "The data directory {:?} is misconfigured.
337 Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
338 The string cannot start or end with '/', and consecutive '/' are not allowed.
339 The data directory cannot be empty and its length should not exceed 800 characters.",
340 data_directory
341 )));
342 }
343
344 let cluster_controller = Arc::new(
345 ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
346 .await
347 .unwrap(),
348 );
349 let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
350 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
351
352 let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
353 let max_serving_parallelism = env
354 .session_params_manager_impl_ref()
355 .get_params()
356 .await
357 .batch_parallelism()
358 .map(|p| p.get());
359 serving::on_meta_start(
360 env.notification_manager_ref(),
361 &metadata_manager,
362 serving_vnode_mapping.clone(),
363 max_serving_parallelism,
364 )
365 .await;
366
367 let compactor_manager = Arc::new(
368 hummock::CompactorManager::with_meta(env.clone())
369 .await
370 .unwrap(),
371 );
372 tracing::info!("CompactorManager started");
373
374 let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
375 tracing::info!("HeartbeatServiceImpl started");
376
377 let (compactor_streams_change_tx, compactor_streams_change_rx) =
378 tokio::sync::mpsc::unbounded_channel();
379
380 let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
381
382 let hummock_manager = hummock::HummockManager::new(
383 env.clone(),
384 metadata_manager.clone(),
385 meta_metrics.clone(),
386 compactor_manager.clone(),
387 compactor_streams_change_tx,
388 )
389 .await
390 .unwrap();
391 tracing::info!("HummockManager started");
392 let object_store_media_type = hummock_manager.object_store_media_type();
393
394 let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
395
396 let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
397 use std::str::FromStr;
398 prometheus_http_query::Client::from_str(x).unwrap()
399 });
400 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
401 let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
402 metadata_manager.clone(),
403 env.await_tree_reg().clone(),
404 hummock_manager.clone(),
405 env.event_log_manager_ref(),
406 prometheus_client.clone(),
407 prometheus_selector.clone(),
408 opts.redact_sql_option_keywords.clone(),
409 ));
410
411 let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
412 max_length: opts.cached_traces_num,
413 max_memory_usage: opts.cached_traces_memory_limit_bytes,
414 });
415 let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
416
417 #[cfg(not(madsim))]
418 let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
419 let dashboard_service = crate::dashboard::DashboardService {
420 await_tree_reg: env.await_tree_reg().clone(),
421 dashboard_addr: *dashboard_addr,
422 prometheus_client,
423 prometheus_selector,
424 metadata_manager: metadata_manager.clone(),
425 hummock_manager: hummock_manager.clone(),
426 compute_clients: ComputeClientPool::new(1, env.opts.compute_client_config.clone()), diagnose_command,
428 trace_state,
429 };
430 let task = tokio::spawn(dashboard_service.serve());
431 Some(task)
432 } else {
433 None
434 };
435
436 let (barrier_scheduler, scheduled_barriers) =
437 BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
438 tracing::info!("BarrierScheduler started");
439
440 let backup_manager = BackupManager::new(
442 env.clone(),
443 hummock_manager.clone(),
444 meta_metrics.clone(),
445 system_params_reader.backup_storage_url(),
446 system_params_reader.backup_storage_directory(),
447 )
448 .await?;
449 tracing::info!("BackupManager started");
450
451 LocalSecretManager::init(
452 opts.temp_secret_file_dir,
453 env.cluster_id().to_string(),
454 META_NODE_ID,
455 );
456 tracing::info!("LocalSecretManager started");
457
458 let notification_srv = NotificationServiceImpl::new(
459 env.clone(),
460 metadata_manager.clone(),
461 hummock_manager.clone(),
462 backup_manager.clone(),
463 serving_vnode_mapping.clone(),
464 )
465 .await?;
466 tracing::info!("NotificationServiceImpl started");
467
468 let source_manager = Arc::new(
469 SourceManager::new(
470 barrier_scheduler.clone(),
471 metadata_manager.clone(),
472 meta_metrics.clone(),
473 env.clone(),
474 )
475 .await?,
476 );
477 tracing::info!("SourceManager started");
478
479 let (iceberg_compaction_stat_tx, iceberg_compaction_stat_rx) =
480 tokio::sync::mpsc::unbounded_channel();
481 let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
482 env.meta_store_ref().conn.clone(),
483 hummock_manager.clone(),
484 metadata_manager.clone(),
485 iceberg_compaction_stat_tx,
486 );
487 tracing::info!("SinkCoordinatorManager started");
488 let mut sub_tasks = vec![shutdown_handle];
490
491 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
492
493 let (iceberg_compaction_mgr, iceberg_compactor_event_rx) = IcebergCompactionManager::build(
495 env.clone(),
496 metadata_manager.clone(),
497 iceberg_compactor_manager.clone(),
498 meta_metrics.clone(),
499 );
500
501 sub_tasks.push(IcebergCompactionManager::compaction_stat_loop(
502 iceberg_compaction_mgr.clone(),
503 iceberg_compaction_stat_rx,
504 ));
505
506 sub_tasks.push(IcebergCompactionManager::gc_loop(
507 iceberg_compaction_mgr.clone(),
508 env.opts.iceberg_gc_interval_sec,
509 ));
510
511 let scale_controller = Arc::new(ScaleController::new(
512 &metadata_manager,
513 source_manager.clone(),
514 env.clone(),
515 ));
516
517 let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
518 scheduled_barriers,
519 env.clone(),
520 metadata_manager.clone(),
521 hummock_manager.clone(),
522 source_manager.clone(),
523 sink_manager.clone(),
524 scale_controller.clone(),
525 barrier_scheduler.clone(),
526 )
527 .await;
528 tracing::info!("GlobalBarrierManager started");
529 sub_tasks.push((join_handle, shutdown_rx));
530
531 {
532 let source_manager = source_manager.clone();
533 tokio::spawn(async move {
534 source_manager.run().await.unwrap();
535 });
536 }
537
538 let stream_manager = Arc::new(
539 GlobalStreamManager::new(
540 env.clone(),
541 metadata_manager.clone(),
542 barrier_scheduler.clone(),
543 source_manager.clone(),
544 scale_controller.clone(),
545 )
546 .unwrap(),
547 );
548
549 hummock_manager
550 .may_fill_backward_state_table_info()
551 .await
552 .unwrap();
553
554 let ddl_srv = DdlServiceImpl::new(
555 env.clone(),
556 metadata_manager.clone(),
557 stream_manager.clone(),
558 source_manager.clone(),
559 barrier_manager.clone(),
560 sink_manager.clone(),
561 meta_metrics.clone(),
562 iceberg_compaction_mgr.clone(),
563 )
564 .await;
565
566 sub_tasks.push(ddl_srv.start_migrate_table_fragments());
567
568 let user_srv = UserServiceImpl::new(metadata_manager.clone());
569
570 let scale_srv = ScaleServiceImpl::new(
571 metadata_manager.clone(),
572 stream_manager.clone(),
573 barrier_manager.clone(),
574 env.clone(),
575 );
576
577 let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
578 let stream_srv = StreamServiceImpl::new(
579 env.clone(),
580 barrier_scheduler.clone(),
581 barrier_manager.clone(),
582 stream_manager.clone(),
583 metadata_manager.clone(),
584 );
585 let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
586 let hummock_srv = HummockServiceImpl::new(
587 hummock_manager.clone(),
588 metadata_manager.clone(),
589 backup_manager.clone(),
590 iceberg_compaction_mgr.clone(),
591 );
592
593 let health_srv = HealthServiceImpl::new();
594 let backup_srv = BackupServiceImpl::new(backup_manager.clone());
595 let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
596 let system_params_srv = SystemParamsServiceImpl::new(
597 env.system_params_manager_impl_ref(),
598 env.opts.license_key_path.is_some(),
599 );
600 let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
601 let serving_srv =
602 ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
603 let cloud_srv = CloudServiceImpl::new();
604 let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
605 let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
606 let hosted_iceberg_catalog_srv = HostedIcebergCatalogServiceImpl::new(env.clone());
607 let monitor_srv = MonitorServiceImpl {
608 metadata_manager: metadata_manager.clone(),
609 await_tree_reg: env.await_tree_reg().clone(),
610 };
611
612 if let Some(prometheus_addr) = address_info.prometheus_addr {
613 MetricsManager::boot_metrics_service(prometheus_addr.to_string())
614 }
615
616 sub_tasks.extend(hummock::start_hummock_workers(
618 hummock_manager.clone(),
619 backup_manager.clone(),
620 &env.opts,
621 ));
622 sub_tasks.push(start_worker_info_monitor(
623 metadata_manager.clone(),
624 election_client.clone(),
625 Duration::from_secs(env.opts.node_num_monitor_interval_sec),
626 meta_metrics.clone(),
627 ));
628 sub_tasks.push(start_fragment_info_monitor(
629 metadata_manager.clone(),
630 hummock_manager.clone(),
631 meta_metrics.clone(),
632 ));
633 sub_tasks.push(SystemParamsController::start_params_notifier(
634 env.system_params_manager_impl_ref(),
635 ));
636 sub_tasks.push(HummockManager::hummock_timer_task(
637 hummock_manager.clone(),
638 Some(backup_manager),
639 ));
640 sub_tasks.extend(HummockManager::compaction_event_loop(
641 hummock_manager.clone(),
642 compactor_streams_change_rx,
643 ));
644
645 sub_tasks.extend(IcebergCompactionManager::iceberg_compaction_event_loop(
646 iceberg_compaction_mgr.clone(),
647 iceberg_compactor_event_rx,
648 ));
649
650 sub_tasks.push(serving::start_serving_vnode_mapping_worker(
651 env.notification_manager_ref(),
652 metadata_manager.clone(),
653 serving_vnode_mapping,
654 env.session_params_manager_impl_ref(),
655 ));
656
657 {
658 sub_tasks.push(ClusterController::start_heartbeat_checker(
659 metadata_manager.cluster_controller.clone(),
660 Duration::from_secs(1),
661 ));
662
663 if !env.opts.disable_automatic_parallelism_control {
664 sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
665 }
666 }
667
668 let _idle_checker_handle = IdleManager::start_idle_checker(
669 env.idle_manager_ref(),
670 Duration::from_secs(30),
671 shutdown.clone(),
672 );
673
674 let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
675 let notification_mgr = env.notification_manager_ref();
676 let stream_abort_handler = tokio::spawn(async move {
677 let _ = abort_recv.await;
678 notification_mgr.abort_all();
679 compactor_manager.abort_all_compactors();
680 });
681 sub_tasks.push((stream_abort_handler, abort_sender));
682
683 let telemetry_manager = TelemetryManager::new(
684 Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
685 Arc::new(MetaReportCreator::new(
686 metadata_manager.clone(),
687 object_store_media_type,
688 )),
689 );
690
691 if env.opts.telemetry_enabled && telemetry_env_enabled() {
693 sub_tasks.push(telemetry_manager.start().await);
694 } else {
695 tracing::info!("Telemetry didn't start due to meta backend or config");
696 }
697 if !cfg!(madsim) && report_scarf_enabled() {
698 tokio::spawn(report_to_scarf());
699 } else {
700 tracing::info!("Scarf reporting is disabled");
701 };
702
703 if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
704 sub_tasks.push(pair);
705 }
706
707 tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
708 tracing::info!("Starting meta services");
709
710 let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
711 advertise_addr: address_info.advertise_addr,
712 listen_addr: address_info.listen_addr.to_string(),
713 opts: serde_json::to_string(&env.opts).unwrap(),
714 };
715 env.event_log_manager_ref().add_event_logs(vec![
716 risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
717 ]);
718
719 let server_builder = tonic::transport::Server::builder()
720 .layer(MetricsMiddlewareLayer::new(meta_metrics))
721 .layer(TracingExtractLayer::new())
722 .add_service(HeartbeatServiceServer::new(heartbeat_srv))
723 .add_service(ClusterServiceServer::new(cluster_srv))
724 .add_service(StreamManagerServiceServer::new(stream_srv))
725 .add_service(
726 HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
727 )
728 .add_service(NotificationServiceServer::new(notification_srv))
729 .add_service(MetaMemberServiceServer::new(meta_member_srv))
730 .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
731 .add_service(UserServiceServer::new(user_srv))
732 .add_service(CloudServiceServer::new(cloud_srv))
733 .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
734 .add_service(HealthServer::new(health_srv))
735 .add_service(BackupServiceServer::new(backup_srv))
736 .add_service(SystemParamsServiceServer::new(system_params_srv))
737 .add_service(SessionParamServiceServer::new(session_params_srv))
738 .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
739 .add_service(ServingServiceServer::new(serving_srv))
740 .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
741 .add_service(
742 EventLogServiceServer::new(event_log_srv).max_decoding_message_size(usize::MAX),
743 )
744 .add_service(ClusterLimitServiceServer::new(cluster_limit_srv))
745 .add_service(HostedIcebergCatalogServiceServer::new(
746 hosted_iceberg_catalog_srv,
747 ))
748 .add_service(MonitorServiceServer::new(monitor_srv));
749
750 #[cfg(not(madsim))] let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
752
753 let server = server_builder.monitored_serve_with_shutdown(
754 address_info.listen_addr,
755 "grpc-meta-leader-service",
756 TcpConfig {
757 tcp_nodelay: true,
758 keepalive_duration: None,
759 },
760 shutdown.clone().cancelled_owned(),
761 );
762 started::set();
763 let _server_handle = tokio::spawn(server);
764
765 shutdown.cancelled().await;
767 Ok(())
770}
771
772fn is_correct_data_directory(data_directory: &str) -> bool {
773 let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
774 if data_directory.is_empty()
775 || !data_directory_regex.is_match(data_directory)
776 || data_directory.ends_with('/')
777 || data_directory.starts_with('/')
778 || data_directory.contains("//")
779 || data_directory.len() > 800
780 {
781 return false;
782 }
783 true
784}