1use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::monitor::{RouterExt, TcpConfig};
21use risingwave_common::secret::LocalSecretManager;
22use risingwave_common::session_config::SessionConfig;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::hummock::IcebergCompactorManager;
33use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManager;
34use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
35use risingwave_meta::rpc::ElectionClientRef;
36use risingwave_meta::rpc::election::dummy::DummyElectionClient;
37use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
38use risingwave_meta::stream::{GlobalRefreshManager, ScaleController};
39use risingwave_meta_service::AddressInfo;
40use risingwave_meta_service::backup_service::BackupServiceImpl;
41use risingwave_meta_service::cloud_service::CloudServiceImpl;
42use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
43use risingwave_meta_service::cluster_service::ClusterServiceImpl;
44use risingwave_meta_service::ddl_service::DdlServiceImpl;
45use risingwave_meta_service::event_log_service::EventLogServiceImpl;
46use risingwave_meta_service::health_service::HealthServiceImpl;
47use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
48use risingwave_meta_service::hosted_iceberg_catalog_service::HostedIcebergCatalogServiceImpl;
49use risingwave_meta_service::hummock_service::HummockServiceImpl;
50use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
51use risingwave_meta_service::monitor_service::MonitorServiceImpl;
52use risingwave_meta_service::notification_service::NotificationServiceImpl;
53use risingwave_meta_service::scale_service::ScaleServiceImpl;
54use risingwave_meta_service::serving_service::ServingServiceImpl;
55use risingwave_meta_service::session_config::SessionParamsServiceImpl;
56use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
57use risingwave_meta_service::stream_service::StreamServiceImpl;
58use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
59use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
60use risingwave_meta_service::user_service::UserServiceImpl;
61use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
62use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
63use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
64use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
65use risingwave_pb::health::health_server::HealthServer;
66use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
67use risingwave_pb::meta::SystemParams;
68use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
69use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
70use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
71use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
72use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogServiceServer;
73use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
74use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
75use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
76use risingwave_pb::meta::serving_service_server::ServingServiceServer;
77use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
78use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
79use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
80use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::user_service_server::UserServiceServer;
83use sea_orm::{ConnectionTrait, DbBackend};
84use thiserror_ext::AsReport;
85use tokio::sync::watch;
86
87use crate::backup_restore::BackupManager;
88use crate::barrier::BarrierScheduler;
89use crate::controller::SqlMetaStore;
90use crate::controller::system_param::SystemParamsController;
91use crate::hummock::HummockManager;
92use crate::manager::sink_coordination::SinkCoordinatorManager;
93use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
94use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
95use crate::rpc::metrics::{GLOBAL_META_METRICS, start_info_monitor, start_worker_info_monitor};
96use crate::serving::ServingVnodeMapping;
97use crate::stream::{GlobalStreamManager, SourceManager};
98use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
99use crate::{MetaError, MetaResult, hummock, serving};
100
101pub mod started {
104 use std::sync::atomic::AtomicBool;
105 use std::sync::atomic::Ordering::Relaxed;
106
107 static STARTED: AtomicBool = AtomicBool::new(false);
108
109 pub(crate) fn set() {
111 STARTED.store(true, Relaxed);
112 }
113
114 pub fn get() -> bool {
116 STARTED.load(Relaxed)
117 }
118}
119
120pub async fn rpc_serve(
124 address_info: AddressInfo,
125 meta_store_backend: MetaStoreBackend,
126 max_cluster_heartbeat_interval: Duration,
127 lease_interval_secs: u64,
128 server_config: risingwave_common::config::ServerConfig,
129 opts: MetaOpts,
130 init_system_params: SystemParams,
131 init_session_config: SessionConfig,
132 shutdown: CancellationToken,
133) -> MetaResult<()> {
134 let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
135
136 let election_client = match meta_store_backend {
137 MetaStoreBackend::Mem => {
138 Arc::new(DummyElectionClient::new(
140 address_info.advertise_addr.clone(),
141 ))
142 }
143 MetaStoreBackend::Sql { .. } => {
144 let id = address_info.advertise_addr.clone();
146 let conn = meta_store_impl.conn.clone();
147 let election_client: ElectionClientRef = match conn.get_database_backend() {
148 DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
149 DbBackend::Postgres => {
150 Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
151 }
152 DbBackend::MySql => {
153 Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
154 }
155 };
156 election_client.init().await?;
157
158 election_client
159 }
160 };
161
162 Box::pin(rpc_serve_with_store(
163 meta_store_impl,
164 election_client,
165 address_info,
166 max_cluster_heartbeat_interval,
167 lease_interval_secs,
168 server_config,
169 opts,
170 init_system_params,
171 init_session_config,
172 shutdown,
173 ))
174 .await
175}
176
177pub async fn rpc_serve_with_store(
182 meta_store_impl: SqlMetaStore,
183 election_client: ElectionClientRef,
184 address_info: AddressInfo,
185 max_cluster_heartbeat_interval: Duration,
186 lease_interval_secs: u64,
187 server_config: risingwave_common::config::ServerConfig,
188 opts: MetaOpts,
189 init_system_params: SystemParams,
190 init_session_config: SessionConfig,
191 shutdown: CancellationToken,
192) -> MetaResult<()> {
193 let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
195
196 let election_handle = tokio::spawn({
197 let shutdown = shutdown.clone();
198 let election_client = election_client.clone();
199
200 async move {
201 while let Err(e) = election_client
202 .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
203 .await
204 {
205 tracing::error!(error = %e.as_report(), "election error happened");
206 }
207 shutdown.cancel();
209 }
210 });
211
212 if !election_client.is_leader() {
217 let follower_shutdown = shutdown.child_token();
219
220 let follower_handle = tokio::spawn(start_service_as_election_follower(
221 follower_shutdown.clone(),
222 address_info.clone(),
223 election_client.clone(),
224 ));
225
226 let mut is_leader_watcher = election_client.subscribe();
228
229 while !*is_leader_watcher.borrow_and_update() {
230 tokio::select! {
231 _ = shutdown.cancelled() => return Ok(()),
233
234 res = is_leader_watcher.changed() => {
235 if res.is_err() {
236 tracing::error!("leader watcher recv failed");
237 }
238 }
239 }
240 }
241
242 tracing::info!("elected as leader, shutting down follower services");
243 follower_shutdown.cancel();
244 let _ = follower_handle.await;
245 }
246
247 let result = start_service_as_election_leader(
249 meta_store_impl,
250 address_info,
251 max_cluster_heartbeat_interval,
252 opts,
253 init_system_params,
254 init_session_config,
255 server_config,
256 election_client,
257 shutdown,
258 )
259 .await;
260
261 election_shutdown_tx.send(()).ok();
263 let _ = election_handle.await;
264
265 result
266}
267
268pub async fn start_service_as_election_follower(
272 shutdown: CancellationToken,
273 address_info: AddressInfo,
274 election_client: ElectionClientRef,
275) {
276 tracing::info!("starting follower services");
277
278 let meta_member_srv = MetaMemberServiceImpl::new(election_client);
279
280 let health_srv = HealthServiceImpl::new();
281
282 let server = tonic::transport::Server::builder()
283 .layer(MetricsMiddlewareLayer::new(Arc::new(
284 GLOBAL_META_METRICS.clone(),
285 )))
286 .layer(TracingExtractLayer::new())
287 .add_service(MetaMemberServiceServer::new(meta_member_srv))
288 .add_service(HealthServer::new(health_srv))
289 .monitored_serve_with_shutdown(
290 address_info.listen_addr,
291 "grpc-meta-follower-service",
292 TcpConfig {
293 tcp_nodelay: true,
294 keepalive_duration: None,
295 },
296 shutdown.clone().cancelled_owned(),
297 );
298 let server_handle = tokio::spawn(server);
299 started::set();
300
301 shutdown.cancelled().await;
303 let _ = server_handle.await;
306}
307
308pub async fn start_service_as_election_leader(
312 meta_store_impl: SqlMetaStore,
313 address_info: AddressInfo,
314 max_cluster_heartbeat_interval: Duration,
315 opts: MetaOpts,
316 init_system_params: SystemParams,
317 init_session_config: SessionConfig,
318 server_config: risingwave_common::config::ServerConfig,
319 election_client: ElectionClientRef,
320 shutdown: CancellationToken,
321) -> MetaResult<()> {
322 tracing::info!("starting leader services");
323
324 let env = MetaSrvEnv::new(
325 opts.clone(),
326 init_system_params,
327 init_session_config,
328 meta_store_impl,
329 )
330 .await?;
331 tracing::info!("MetaSrvEnv started");
332 let _ = env.may_start_watch_license_key_file()?;
333 let system_params_reader = env.system_params_reader().await;
334
335 let data_directory = system_params_reader.data_directory();
336 if !is_correct_data_directory(data_directory) {
337 return Err(MetaError::system_params(format!(
338 "The data directory {:?} is misconfigured.
339 Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
340 The string cannot start or end with '/', and consecutive '/' are not allowed.
341 The data directory cannot be empty and its length should not exceed 800 characters.",
342 data_directory
343 )));
344 }
345
346 let cluster_controller = Arc::new(
347 ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
348 .await
349 .unwrap(),
350 );
351 let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
352 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
353
354 let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
355 let max_serving_parallelism = env
356 .session_params_manager_impl_ref()
357 .get_params()
358 .await
359 .batch_parallelism()
360 .map(|p| p.get());
361 serving::on_meta_start(
362 env.notification_manager_ref(),
363 &metadata_manager,
364 serving_vnode_mapping.clone(),
365 max_serving_parallelism,
366 )
367 .await;
368
369 let compactor_manager = Arc::new(
370 hummock::CompactorManager::with_meta(env.clone())
371 .await
372 .unwrap(),
373 );
374 tracing::info!("CompactorManager started");
375
376 let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
377 tracing::info!("HeartbeatServiceImpl started");
378
379 let (compactor_streams_change_tx, compactor_streams_change_rx) =
380 tokio::sync::mpsc::unbounded_channel();
381
382 let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
383
384 let hummock_manager = hummock::HummockManager::new(
385 env.clone(),
386 metadata_manager.clone(),
387 meta_metrics.clone(),
388 compactor_manager.clone(),
389 compactor_streams_change_tx,
390 )
391 .await
392 .unwrap();
393 tracing::info!("HummockManager started");
394 let object_store_media_type = hummock_manager.object_store_media_type();
395
396 let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
397
398 let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
399 use std::str::FromStr;
400 prometheus_http_query::Client::from_str(x).unwrap()
401 });
402 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
403
404 let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
405 max_length: opts.cached_traces_num,
406 max_memory_usage: opts.cached_traces_memory_limit_bytes,
407 });
408 let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
409
410 let (barrier_scheduler, scheduled_barriers) =
411 BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
412 tracing::info!("BarrierScheduler started");
413
414 let backup_manager = BackupManager::new(
416 env.clone(),
417 hummock_manager.clone(),
418 meta_metrics.clone(),
419 system_params_reader.backup_storage_url(),
420 system_params_reader.backup_storage_directory(),
421 )
422 .await?;
423 tracing::info!("BackupManager started");
424
425 LocalSecretManager::init(
426 opts.temp_secret_file_dir,
427 env.cluster_id().to_string(),
428 META_NODE_ID,
429 );
430 tracing::info!("LocalSecretManager started");
431
432 let notification_srv = NotificationServiceImpl::new(
433 env.clone(),
434 metadata_manager.clone(),
435 hummock_manager.clone(),
436 backup_manager.clone(),
437 serving_vnode_mapping.clone(),
438 )
439 .await?;
440 tracing::info!("NotificationServiceImpl started");
441
442 let source_manager = Arc::new(
443 SourceManager::new(
444 barrier_scheduler.clone(),
445 metadata_manager.clone(),
446 meta_metrics.clone(),
447 env.clone(),
448 )
449 .await?,
450 );
451 tracing::info!("SourceManager started");
452
453 let (iceberg_compaction_stat_tx, iceberg_compaction_stat_rx) =
454 tokio::sync::mpsc::unbounded_channel();
455 let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
456 env.meta_store_ref().conn.clone(),
457 hummock_manager.clone(),
458 metadata_manager.clone(),
459 iceberg_compaction_stat_tx,
460 env.await_tree_reg().clone(),
461 );
462 tracing::info!("SinkCoordinatorManager started");
463 let mut sub_tasks = vec![shutdown_handle];
465
466 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
467
468 let (iceberg_compaction_mgr, iceberg_compactor_event_rx) = IcebergCompactionManager::build(
470 env.clone(),
471 metadata_manager.clone(),
472 iceberg_compactor_manager.clone(),
473 meta_metrics.clone(),
474 );
475
476 sub_tasks.push(IcebergCompactionManager::compaction_stat_loop(
477 iceberg_compaction_mgr.clone(),
478 iceberg_compaction_stat_rx,
479 ));
480
481 sub_tasks.push(IcebergCompactionManager::gc_loop(
482 iceberg_compaction_mgr.clone(),
483 env.opts.iceberg_gc_interval_sec,
484 ));
485
486 let refresh_scheduler_interval = Duration::from_secs(env.opts.refresh_scheduler_interval_sec);
487 let (refresh_manager, refresh_handle, refresh_shutdown) = GlobalRefreshManager::start(
488 metadata_manager.clone(),
489 barrier_scheduler.clone(),
490 &env,
491 refresh_scheduler_interval,
492 )
493 .await?;
494 sub_tasks.push((refresh_handle, refresh_shutdown));
495
496 let scale_controller = Arc::new(ScaleController::new(
497 &metadata_manager,
498 source_manager.clone(),
499 env.clone(),
500 ));
501
502 let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
503 scheduled_barriers,
504 env.clone(),
505 metadata_manager.clone(),
506 hummock_manager.clone(),
507 source_manager.clone(),
508 sink_manager.clone(),
509 scale_controller.clone(),
510 barrier_scheduler.clone(),
511 refresh_manager.clone(),
512 )
513 .await;
514 tracing::info!("GlobalBarrierManager started");
515 sub_tasks.push((join_handle, shutdown_rx));
516
517 {
518 let source_manager = source_manager.clone();
519 tokio::spawn(async move {
520 source_manager.run().await.unwrap();
521 });
522 }
523
524 let stream_manager = Arc::new(
525 GlobalStreamManager::new(
526 env.clone(),
527 metadata_manager.clone(),
528 barrier_scheduler.clone(),
529 source_manager.clone(),
530 scale_controller.clone(),
531 )
532 .unwrap(),
533 );
534
535 hummock_manager
536 .may_fill_backward_state_table_info()
537 .await
538 .unwrap();
539
540 let ddl_srv = DdlServiceImpl::new(
541 env.clone(),
542 metadata_manager.clone(),
543 stream_manager.clone(),
544 source_manager.clone(),
545 barrier_manager.clone(),
546 sink_manager.clone(),
547 meta_metrics.clone(),
548 iceberg_compaction_mgr.clone(),
549 barrier_scheduler.clone(),
550 )
551 .await;
552
553 if env.opts.enable_legacy_table_migration {
554 sub_tasks.push(ddl_srv.start_migrate_table_fragments());
555 }
556
557 let user_srv = UserServiceImpl::new(metadata_manager.clone());
558
559 let scale_srv = ScaleServiceImpl::new(
560 metadata_manager.clone(),
561 stream_manager.clone(),
562 barrier_manager.clone(),
563 env.clone(),
564 );
565
566 let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
567 let stream_srv = StreamServiceImpl::new(
568 env.clone(),
569 barrier_scheduler.clone(),
570 barrier_manager.clone(),
571 stream_manager.clone(),
572 metadata_manager.clone(),
573 refresh_manager.clone(),
574 iceberg_compaction_mgr.clone(),
575 );
576 let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
577 let hummock_srv = HummockServiceImpl::new(
578 hummock_manager.clone(),
579 metadata_manager.clone(),
580 backup_manager.clone(),
581 iceberg_compaction_mgr.clone(),
582 );
583
584 let health_srv = HealthServiceImpl::new();
585 let backup_srv = BackupServiceImpl::new(backup_manager.clone());
586 let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
587 let system_params_srv = SystemParamsServiceImpl::new(
588 env.system_params_manager_impl_ref(),
589 env.opts.license_key_path.is_some(),
590 );
591 let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
592 let serving_srv =
593 ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
594 let cloud_srv = CloudServiceImpl::new();
595 let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
596 let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
597 let hosted_iceberg_catalog_srv = HostedIcebergCatalogServiceImpl::new(env.clone());
598 let monitor_srv = MonitorServiceImpl::new(
599 metadata_manager.clone(),
600 env.await_tree_reg().clone(),
601 server_config.clone(),
602 );
603 let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
604 metadata_manager.clone(),
605 env.await_tree_reg().clone(),
606 hummock_manager.clone(),
607 iceberg_compaction_mgr.clone(),
608 env.event_log_manager_ref(),
609 prometheus_client.clone(),
610 prometheus_selector.clone(),
611 opts.redact_sql_option_keywords.clone(),
612 env.system_params_manager_impl_ref(),
613 ));
614
615 #[cfg(not(madsim))]
616 let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
617 use risingwave_common::config::RpcClientConfig;
618 use risingwave_rpc_client::MonitorClientPool;
619
620 let dashboard_service = crate::dashboard::DashboardService {
621 await_tree_reg: env.await_tree_reg().clone(),
622 dashboard_addr: *dashboard_addr,
623 prometheus_client,
624 prometheus_selector,
625 metadata_manager: metadata_manager.clone(),
626 hummock_manager: hummock_manager.clone(),
627 monitor_clients: MonitorClientPool::new(1, RpcClientConfig::default()),
628 diagnose_command,
629 profile_service: risingwave_common_heap_profiling::ProfileServiceImpl::new(
630 server_config.clone(),
631 ),
632 trace_state,
633 };
634 let task = tokio::spawn(dashboard_service.serve());
635 Some(task)
636 } else {
637 None
638 };
639
640 if let Some(prometheus_addr) = address_info.prometheus_addr {
641 MetricsManager::boot_metrics_service(prometheus_addr.to_string())
642 }
643
644 sub_tasks.extend(hummock::start_hummock_workers(
646 hummock_manager.clone(),
647 backup_manager.clone(),
648 &env.opts,
649 {
650 let barrier_manager = barrier_manager.clone();
651 Box::new(move || {
652 let barrier_manager = barrier_manager.clone();
653 Box::pin(async move {
654 barrier_manager.may_snapshot_backfilling_job().await.unwrap_or_else(|e| {
655 tracing::warn!(err = %e.as_report(), "failed to check having snapshot backfilling jobs. pause vacuum time travel");
656 true
657 })
658 })
659 })
660 }
661 ));
662 sub_tasks.push(start_worker_info_monitor(
663 metadata_manager.clone(),
664 election_client.clone(),
665 Duration::from_secs(env.opts.node_num_monitor_interval_sec),
666 meta_metrics.clone(),
667 ));
668 sub_tasks.push(start_info_monitor(
669 metadata_manager.clone(),
670 hummock_manager.clone(),
671 barrier_manager.clone(),
672 env.system_params_manager_impl_ref(),
673 meta_metrics.clone(),
674 ));
675 sub_tasks.push(SystemParamsController::start_params_notifier(
676 env.system_params_manager_impl_ref(),
677 ));
678 sub_tasks.push(HummockManager::hummock_timer_task(
679 hummock_manager.clone(),
680 Some(backup_manager),
681 ));
682 sub_tasks.extend(HummockManager::compaction_event_loop(
683 hummock_manager.clone(),
684 compactor_streams_change_rx,
685 ));
686
687 sub_tasks.extend(IcebergCompactionManager::iceberg_compaction_event_loop(
688 iceberg_compaction_mgr.clone(),
689 iceberg_compactor_event_rx,
690 ));
691
692 sub_tasks.push(serving::start_serving_vnode_mapping_worker(
693 env.notification_manager_ref(),
694 metadata_manager.clone(),
695 serving_vnode_mapping,
696 env.session_params_manager_impl_ref(),
697 ));
698
699 {
700 sub_tasks.push(ClusterController::start_heartbeat_checker(
701 metadata_manager.cluster_controller.clone(),
702 Duration::from_secs(1),
703 ));
704
705 if !env.opts.disable_automatic_parallelism_control {
706 sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
707 }
708 }
709
710 let _idle_checker_handle = IdleManager::start_idle_checker(
711 env.idle_manager_ref(),
712 Duration::from_secs(30),
713 shutdown.clone(),
714 );
715
716 let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
717 let notification_mgr = env.notification_manager_ref();
718 let stream_abort_handler = tokio::spawn(async move {
719 let _ = abort_recv.await;
720 notification_mgr.abort_all();
721 compactor_manager.abort_all_compactors();
722 });
723 sub_tasks.push((stream_abort_handler, abort_sender));
724
725 let telemetry_manager = TelemetryManager::new(
726 Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
727 Arc::new(MetaReportCreator::new(
728 metadata_manager.clone(),
729 object_store_media_type,
730 )),
731 );
732
733 if env.opts.telemetry_enabled && telemetry_env_enabled() {
735 sub_tasks.push(telemetry_manager.start().await);
736 } else {
737 tracing::info!("Telemetry didn't start due to meta backend or config");
738 }
739 if !cfg!(madsim) && report_scarf_enabled() {
740 tokio::spawn(report_to_scarf());
741 } else {
742 tracing::info!("Scarf reporting is disabled");
743 };
744
745 if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
746 sub_tasks.push(pair);
747 }
748
749 tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
750 tracing::info!("Starting meta services");
751
752 let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
753 advertise_addr: address_info.advertise_addr,
754 listen_addr: address_info.listen_addr.to_string(),
755 opts: serde_json::to_string(&env.opts).unwrap(),
756 };
757 env.event_log_manager_ref().add_event_logs(vec![
758 risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
759 ]);
760
761 let server_builder = tonic::transport::Server::builder()
762 .layer(MetricsMiddlewareLayer::new(meta_metrics))
763 .layer(TracingExtractLayer::new())
764 .add_service(HeartbeatServiceServer::new(heartbeat_srv))
765 .add_service(ClusterServiceServer::new(cluster_srv))
766 .add_service(StreamManagerServiceServer::new(stream_srv))
767 .add_service(
768 HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
769 )
770 .add_service(NotificationServiceServer::new(notification_srv))
771 .add_service(MetaMemberServiceServer::new(meta_member_srv))
772 .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
773 .add_service(UserServiceServer::new(user_srv))
774 .add_service(CloudServiceServer::new(cloud_srv))
775 .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
776 .add_service(HealthServer::new(health_srv))
777 .add_service(BackupServiceServer::new(backup_srv))
778 .add_service(SystemParamsServiceServer::new(system_params_srv))
779 .add_service(SessionParamServiceServer::new(session_params_srv))
780 .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
781 .add_service(ServingServiceServer::new(serving_srv))
782 .add_service(
783 SinkCoordinationServiceServer::new(sink_coordination_srv)
784 .max_decoding_message_size(usize::MAX),
785 )
786 .add_service(
787 EventLogServiceServer::new(event_log_srv).max_decoding_message_size(usize::MAX),
788 )
789 .add_service(ClusterLimitServiceServer::new(cluster_limit_srv))
790 .add_service(HostedIcebergCatalogServiceServer::new(
791 hosted_iceberg_catalog_srv,
792 ))
793 .add_service(MonitorServiceServer::new(monitor_srv));
794
795 #[cfg(not(madsim))] let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
797
798 let server = server_builder.monitored_serve_with_shutdown(
799 address_info.listen_addr,
800 "grpc-meta-leader-service",
801 TcpConfig {
802 tcp_nodelay: true,
803 keepalive_duration: None,
804 },
805 shutdown.clone().cancelled_owned(),
806 );
807 started::set();
808 let _server_handle = tokio::spawn(server);
809
810 shutdown.cancelled().await;
812 Ok(())
815}
816
817fn is_correct_data_directory(data_directory: &str) -> bool {
818 let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
819 if data_directory.is_empty()
820 || !data_directory_regex.is_match(data_directory)
821 || data_directory.ends_with('/')
822 || data_directory.starts_with('/')
823 || data_directory.contains("//")
824 || data_directory.len() > 800
825 {
826 return false;
827 }
828 true
829}