1use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::monitor::{RouterExt, TcpConfig};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
33use risingwave_meta::rpc::ElectionClientRef;
34use risingwave_meta::rpc::election::dummy::DummyElectionClient;
35use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
36use risingwave_meta::stream::ScaleController;
37use risingwave_meta_service::AddressInfo;
38use risingwave_meta_service::backup_service::BackupServiceImpl;
39use risingwave_meta_service::cloud_service::CloudServiceImpl;
40use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
41use risingwave_meta_service::cluster_service::ClusterServiceImpl;
42use risingwave_meta_service::ddl_service::DdlServiceImpl;
43use risingwave_meta_service::event_log_service::EventLogServiceImpl;
44use risingwave_meta_service::health_service::HealthServiceImpl;
45use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
46use risingwave_meta_service::hummock_service::HummockServiceImpl;
47use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
48use risingwave_meta_service::notification_service::NotificationServiceImpl;
49use risingwave_meta_service::scale_service::ScaleServiceImpl;
50use risingwave_meta_service::serving_service::ServingServiceImpl;
51use risingwave_meta_service::session_config::SessionParamsServiceImpl;
52use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
53use risingwave_meta_service::stream_service::StreamServiceImpl;
54use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
55use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
56use risingwave_meta_service::user_service::UserServiceImpl;
57use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
58use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
59use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
60use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
61use risingwave_pb::health::health_server::HealthServer;
62use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
63use risingwave_pb::meta::SystemParams;
64use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
65use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
66use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
67use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
68use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
69use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
70use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
71use risingwave_pb::meta::serving_service_server::ServingServiceServer;
72use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
73use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
74use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
75use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
76use risingwave_pb::user::user_service_server::UserServiceServer;
77use risingwave_rpc_client::ComputeClientPool;
78use sea_orm::{ConnectionTrait, DbBackend};
79use thiserror_ext::AsReport;
80use tokio::sync::watch;
81
82use crate::backup_restore::BackupManager;
83use crate::barrier::BarrierScheduler;
84use crate::controller::SqlMetaStore;
85use crate::controller::system_param::SystemParamsController;
86use crate::hummock::HummockManager;
87use crate::manager::sink_coordination::SinkCoordinatorManager;
88use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
89use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
90use crate::rpc::metrics::{
91 GLOBAL_META_METRICS, start_fragment_info_monitor, start_worker_info_monitor,
92};
93use crate::serving::ServingVnodeMapping;
94use crate::stream::{GlobalStreamManager, SourceManager};
95use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
96use crate::{MetaError, MetaResult, hummock, serving};
97
98pub mod started {
101 use std::sync::atomic::AtomicBool;
102 use std::sync::atomic::Ordering::Relaxed;
103
104 static STARTED: AtomicBool = AtomicBool::new(false);
105
106 pub(crate) fn set() {
108 STARTED.store(true, Relaxed);
109 }
110
111 pub fn get() -> bool {
113 STARTED.load(Relaxed)
114 }
115}
116
117pub async fn rpc_serve(
121 address_info: AddressInfo,
122 meta_store_backend: MetaStoreBackend,
123 max_cluster_heartbeat_interval: Duration,
124 lease_interval_secs: u64,
125 opts: MetaOpts,
126 init_system_params: SystemParams,
127 init_session_config: SessionConfig,
128 shutdown: CancellationToken,
129) -> MetaResult<()> {
130 let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
131
132 let election_client = match meta_store_backend {
133 MetaStoreBackend::Mem => {
134 Arc::new(DummyElectionClient::new(
136 address_info.advertise_addr.clone(),
137 ))
138 }
139 MetaStoreBackend::Sql { .. } => {
140 let id = address_info.advertise_addr.clone();
142 let conn = meta_store_impl.conn.clone();
143 let election_client: ElectionClientRef = match conn.get_database_backend() {
144 DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
145 DbBackend::Postgres => {
146 Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
147 }
148 DbBackend::MySql => {
149 Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
150 }
151 };
152 election_client.init().await?;
153
154 election_client
155 }
156 };
157
158 rpc_serve_with_store(
159 meta_store_impl,
160 election_client,
161 address_info,
162 max_cluster_heartbeat_interval,
163 lease_interval_secs,
164 opts,
165 init_system_params,
166 init_session_config,
167 shutdown,
168 )
169 .await
170}
171
172pub async fn rpc_serve_with_store(
177 meta_store_impl: SqlMetaStore,
178 election_client: ElectionClientRef,
179 address_info: AddressInfo,
180 max_cluster_heartbeat_interval: Duration,
181 lease_interval_secs: u64,
182 opts: MetaOpts,
183 init_system_params: SystemParams,
184 init_session_config: SessionConfig,
185 shutdown: CancellationToken,
186) -> MetaResult<()> {
187 let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
189
190 let election_handle = tokio::spawn({
191 let shutdown = shutdown.clone();
192 let election_client = election_client.clone();
193
194 async move {
195 while let Err(e) = election_client
196 .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
197 .await
198 {
199 tracing::error!(error = %e.as_report(), "election error happened");
200 }
201 shutdown.cancel();
203 }
204 });
205
206 if !election_client.is_leader() {
211 let follower_shutdown = shutdown.child_token();
213
214 let follower_handle = tokio::spawn(start_service_as_election_follower(
215 follower_shutdown.clone(),
216 address_info.clone(),
217 election_client.clone(),
218 ));
219
220 let mut is_leader_watcher = election_client.subscribe();
222
223 while !*is_leader_watcher.borrow_and_update() {
224 tokio::select! {
225 _ = shutdown.cancelled() => return Ok(()),
227
228 res = is_leader_watcher.changed() => {
229 if res.is_err() {
230 tracing::error!("leader watcher recv failed");
231 }
232 }
233 }
234 }
235
236 tracing::info!("elected as leader, shutting down follower services");
237 follower_shutdown.cancel();
238 let _ = follower_handle.await;
239 }
240
241 let result = start_service_as_election_leader(
243 meta_store_impl,
244 address_info,
245 max_cluster_heartbeat_interval,
246 opts,
247 init_system_params,
248 init_session_config,
249 election_client,
250 shutdown,
251 )
252 .await;
253
254 election_shutdown_tx.send(()).ok();
256 let _ = election_handle.await;
257
258 result
259}
260
261pub async fn start_service_as_election_follower(
265 shutdown: CancellationToken,
266 address_info: AddressInfo,
267 election_client: ElectionClientRef,
268) {
269 tracing::info!("starting follower services");
270
271 let meta_member_srv = MetaMemberServiceImpl::new(election_client);
272
273 let health_srv = HealthServiceImpl::new();
274
275 let server = tonic::transport::Server::builder()
276 .layer(MetricsMiddlewareLayer::new(Arc::new(
277 GLOBAL_META_METRICS.clone(),
278 )))
279 .layer(TracingExtractLayer::new())
280 .add_service(MetaMemberServiceServer::new(meta_member_srv))
281 .add_service(HealthServer::new(health_srv))
282 .monitored_serve_with_shutdown(
283 address_info.listen_addr,
284 "grpc-meta-follower-service",
285 TcpConfig {
286 tcp_nodelay: true,
287 keepalive_duration: None,
288 },
289 shutdown.clone().cancelled_owned(),
290 );
291 let server_handle = tokio::spawn(server);
292 started::set();
293
294 shutdown.cancelled().await;
296 let _ = server_handle.await;
299}
300
301pub async fn start_service_as_election_leader(
305 meta_store_impl: SqlMetaStore,
306 address_info: AddressInfo,
307 max_cluster_heartbeat_interval: Duration,
308 opts: MetaOpts,
309 init_system_params: SystemParams,
310 init_session_config: SessionConfig,
311 election_client: ElectionClientRef,
312 shutdown: CancellationToken,
313) -> MetaResult<()> {
314 tracing::info!("starting leader services");
315
316 let env = MetaSrvEnv::new(
317 opts.clone(),
318 init_system_params,
319 init_session_config,
320 meta_store_impl,
321 )
322 .await?;
323 tracing::info!("MetaSrvEnv started");
324 let _ = env.may_start_watch_license_key_file()?;
325 let system_params_reader = env.system_params_reader().await;
326
327 let data_directory = system_params_reader.data_directory();
328 if !is_correct_data_directory(data_directory) {
329 return Err(MetaError::system_params(format!(
330 "The data directory {:?} is misconfigured.
331 Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
332 The string cannot start or end with '/', and consecutive '/' are not allowed.
333 The data directory cannot be empty and its length should not exceed 800 characters.",
334 data_directory
335 )));
336 }
337
338 let cluster_controller = Arc::new(
339 ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
340 .await
341 .unwrap(),
342 );
343 let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
344 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
345
346 let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
347 serving::on_meta_start(
348 env.notification_manager_ref(),
349 &metadata_manager,
350 serving_vnode_mapping.clone(),
351 )
352 .await;
353
354 let compactor_manager = Arc::new(
355 hummock::CompactorManager::with_meta(env.clone())
356 .await
357 .unwrap(),
358 );
359 tracing::info!("CompactorManager started");
360
361 let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
362 tracing::info!("HeartbeatServiceImpl started");
363
364 let (compactor_streams_change_tx, compactor_streams_change_rx) =
365 tokio::sync::mpsc::unbounded_channel();
366
367 let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
368
369 let hummock_manager = hummock::HummockManager::new(
370 env.clone(),
371 metadata_manager.clone(),
372 meta_metrics.clone(),
373 compactor_manager.clone(),
374 compactor_streams_change_tx,
375 )
376 .await
377 .unwrap();
378 tracing::info!("HummockManager started");
379 let object_store_media_type = hummock_manager.object_store_media_type();
380
381 let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
382
383 let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
384 use std::str::FromStr;
385 prometheus_http_query::Client::from_str(x).unwrap()
386 });
387 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
388 let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
389 metadata_manager.clone(),
390 hummock_manager.clone(),
391 env.event_log_manager_ref(),
392 prometheus_client.clone(),
393 prometheus_selector.clone(),
394 ));
395
396 let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
397 max_length: opts.cached_traces_num,
398 max_memory_usage: opts.cached_traces_memory_limit_bytes,
399 });
400 let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
401
402 #[cfg(not(madsim))]
403 let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
404 let dashboard_service = crate::dashboard::DashboardService {
405 dashboard_addr: *dashboard_addr,
406 prometheus_client,
407 prometheus_selector,
408 metadata_manager: metadata_manager.clone(),
409 compute_clients: ComputeClientPool::new(1, env.opts.compute_client_config.clone()), diagnose_command,
411 trace_state,
412 };
413 let task = tokio::spawn(dashboard_service.serve());
414 Some(task)
415 } else {
416 None
417 };
418
419 let (barrier_scheduler, scheduled_barriers) =
420 BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
421 tracing::info!("BarrierScheduler started");
422
423 let backup_manager = BackupManager::new(
425 env.clone(),
426 hummock_manager.clone(),
427 meta_metrics.clone(),
428 system_params_reader.backup_storage_url(),
429 system_params_reader.backup_storage_directory(),
430 )
431 .await?;
432 tracing::info!("BackupManager started");
433
434 LocalSecretManager::init(
435 opts.temp_secret_file_dir,
436 env.cluster_id().to_string(),
437 META_NODE_ID,
438 );
439 tracing::info!("LocalSecretManager started");
440
441 let notification_srv = NotificationServiceImpl::new(
442 env.clone(),
443 metadata_manager.clone(),
444 hummock_manager.clone(),
445 backup_manager.clone(),
446 serving_vnode_mapping.clone(),
447 )
448 .await?;
449 tracing::info!("NotificationServiceImpl started");
450
451 let source_manager = Arc::new(
452 SourceManager::new(
453 barrier_scheduler.clone(),
454 metadata_manager.clone(),
455 meta_metrics.clone(),
456 )
457 .await
458 .unwrap(),
459 );
460 tracing::info!("SourceManager started");
461
462 let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
463 env.meta_store_ref().conn.clone(),
464 hummock_manager.clone(),
465 metadata_manager.clone(),
466 );
467 tracing::info!("SinkCoordinatorManager started");
468 let mut sub_tasks = vec![shutdown_handle];
470
471 let scale_controller = Arc::new(ScaleController::new(
472 &metadata_manager,
473 source_manager.clone(),
474 env.clone(),
475 ));
476
477 let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
478 scheduled_barriers,
479 env.clone(),
480 metadata_manager.clone(),
481 hummock_manager.clone(),
482 source_manager.clone(),
483 sink_manager.clone(),
484 scale_controller.clone(),
485 )
486 .await;
487 tracing::info!("GlobalBarrierManager started");
488 sub_tasks.push((join_handle, shutdown_rx));
489
490 {
491 let source_manager = source_manager.clone();
492 tokio::spawn(async move {
493 source_manager.run().await.unwrap();
494 });
495 }
496
497 let stream_manager = Arc::new(
498 GlobalStreamManager::new(
499 env.clone(),
500 metadata_manager.clone(),
501 barrier_scheduler.clone(),
502 source_manager.clone(),
503 scale_controller.clone(),
504 )
505 .unwrap(),
506 );
507
508 hummock_manager
509 .may_fill_backward_state_table_info()
510 .await
511 .unwrap();
512
513 let ddl_srv = DdlServiceImpl::new(
514 env.clone(),
515 metadata_manager.clone(),
516 stream_manager.clone(),
517 source_manager.clone(),
518 barrier_manager.clone(),
519 sink_manager.clone(),
520 meta_metrics.clone(),
521 )
522 .await;
523
524 let user_srv = UserServiceImpl::new(metadata_manager.clone());
525
526 let scale_srv = ScaleServiceImpl::new(
527 metadata_manager.clone(),
528 source_manager,
529 stream_manager.clone(),
530 barrier_manager.clone(),
531 scale_controller.clone(),
532 );
533
534 let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
535 let stream_srv = StreamServiceImpl::new(
536 env.clone(),
537 barrier_scheduler.clone(),
538 barrier_manager.clone(),
539 stream_manager.clone(),
540 metadata_manager.clone(),
541 );
542 let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
543 let hummock_srv = HummockServiceImpl::new(
544 hummock_manager.clone(),
545 metadata_manager.clone(),
546 backup_manager.clone(),
547 );
548
549 let health_srv = HealthServiceImpl::new();
550 let backup_srv = BackupServiceImpl::new(backup_manager.clone());
551 let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
552 let system_params_srv = SystemParamsServiceImpl::new(
553 env.system_params_manager_impl_ref(),
554 env.opts.license_key_path.is_some(),
555 );
556 let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
557 let serving_srv =
558 ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
559 let cloud_srv = CloudServiceImpl::new();
560 let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
561 let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
562
563 if let Some(prometheus_addr) = address_info.prometheus_addr {
564 MetricsManager::boot_metrics_service(prometheus_addr.to_string())
565 }
566
567 sub_tasks.extend(hummock::start_hummock_workers(
569 hummock_manager.clone(),
570 backup_manager.clone(),
571 &env.opts,
572 ));
573 sub_tasks.push(start_worker_info_monitor(
574 metadata_manager.clone(),
575 election_client.clone(),
576 Duration::from_secs(env.opts.node_num_monitor_interval_sec),
577 meta_metrics.clone(),
578 ));
579 sub_tasks.push(start_fragment_info_monitor(
580 metadata_manager.clone(),
581 hummock_manager.clone(),
582 meta_metrics.clone(),
583 ));
584 sub_tasks.push(SystemParamsController::start_params_notifier(
585 env.system_params_manager_impl_ref(),
586 ));
587 sub_tasks.push(HummockManager::hummock_timer_task(
588 hummock_manager.clone(),
589 Some(backup_manager),
590 ));
591 sub_tasks.extend(HummockManager::compaction_event_loop(
592 hummock_manager,
593 compactor_streams_change_rx,
594 ));
595 sub_tasks.push(
596 serving::start_serving_vnode_mapping_worker(
597 env.notification_manager_ref(),
598 metadata_manager.clone(),
599 serving_vnode_mapping,
600 )
601 .await,
602 );
603
604 {
605 sub_tasks.push(ClusterController::start_heartbeat_checker(
606 metadata_manager.cluster_controller.clone(),
607 Duration::from_secs(1),
608 ));
609
610 if !env.opts.disable_automatic_parallelism_control {
611 sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
612 }
613 }
614
615 let _idle_checker_handle = IdleManager::start_idle_checker(
616 env.idle_manager_ref(),
617 Duration::from_secs(30),
618 shutdown.clone(),
619 );
620
621 let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
622 let notification_mgr = env.notification_manager_ref();
623 let stream_abort_handler = tokio::spawn(async move {
624 let _ = abort_recv.await;
625 notification_mgr.abort_all().await;
626 compactor_manager.abort_all_compactors();
627 });
628 sub_tasks.push((stream_abort_handler, abort_sender));
629
630 let telemetry_manager = TelemetryManager::new(
631 Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
632 Arc::new(MetaReportCreator::new(
633 metadata_manager.clone(),
634 object_store_media_type,
635 )),
636 );
637
638 if env.opts.telemetry_enabled && telemetry_env_enabled() {
640 sub_tasks.push(telemetry_manager.start().await);
641 } else {
642 tracing::info!("Telemetry didn't start due to meta backend or config");
643 }
644 if !cfg!(madsim) && report_scarf_enabled() {
645 tokio::spawn(report_to_scarf());
646 } else {
647 tracing::info!("Scarf reporting is disabled");
648 };
649
650 if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
651 sub_tasks.push(pair);
652 }
653
654 tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
655 tracing::info!("Starting meta services");
656
657 let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
658 advertise_addr: address_info.advertise_addr,
659 listen_addr: address_info.listen_addr.to_string(),
660 opts: serde_json::to_string(&env.opts).unwrap(),
661 };
662 env.event_log_manager_ref().add_event_logs(vec![
663 risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
664 ]);
665
666 let server_builder = tonic::transport::Server::builder()
667 .layer(MetricsMiddlewareLayer::new(meta_metrics))
668 .layer(TracingExtractLayer::new())
669 .add_service(HeartbeatServiceServer::new(heartbeat_srv))
670 .add_service(ClusterServiceServer::new(cluster_srv))
671 .add_service(StreamManagerServiceServer::new(stream_srv))
672 .add_service(
673 HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
674 )
675 .add_service(NotificationServiceServer::new(notification_srv))
676 .add_service(MetaMemberServiceServer::new(meta_member_srv))
677 .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
678 .add_service(UserServiceServer::new(user_srv))
679 .add_service(CloudServiceServer::new(cloud_srv))
680 .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
681 .add_service(HealthServer::new(health_srv))
682 .add_service(BackupServiceServer::new(backup_srv))
683 .add_service(SystemParamsServiceServer::new(system_params_srv))
684 .add_service(SessionParamServiceServer::new(session_params_srv))
685 .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
686 .add_service(ServingServiceServer::new(serving_srv))
687 .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
688 .add_service(EventLogServiceServer::new(event_log_srv))
689 .add_service(ClusterLimitServiceServer::new(cluster_limit_srv));
690 #[cfg(not(madsim))] let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
692
693 let server = server_builder.monitored_serve_with_shutdown(
694 address_info.listen_addr,
695 "grpc-meta-leader-service",
696 TcpConfig {
697 tcp_nodelay: true,
698 keepalive_duration: None,
699 },
700 shutdown.clone().cancelled_owned(),
701 );
702 started::set();
703 let _server_handle = tokio::spawn(server);
704
705 shutdown.cancelled().await;
707 Ok(())
710}
711
712fn is_correct_data_directory(data_directory: &str) -> bool {
713 let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
714 if data_directory.is_empty()
715 || !data_directory_regex.is_match(data_directory)
716 || data_directory.ends_with('/')
717 || data_directory.starts_with('/')
718 || data_directory.contains("//")
719 || data_directory.len() > 800
720 {
721 return false;
722 }
723 true
724}