1use std::sync::Arc;
16use std::time::Duration;
17
18use otlp_embedded::TraceServiceServer;
19use regex::Regex;
20use risingwave_common::config::SessionInitConfig;
21use risingwave_common::monitor::{RouterExt, TcpConfig};
22use risingwave_common::secret::LocalSecretManager;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::telemetry::manager::TelemetryManager;
25use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27use risingwave_common_service::{MetricsManager, TracingExtractLayer};
28use risingwave_meta::MetaStoreBackend;
29use risingwave_meta::barrier::GlobalBarrierManager;
30use risingwave_meta::controller::catalog::CatalogController;
31use risingwave_meta::controller::cluster::ClusterController;
32use risingwave_meta::hummock::IcebergCompactorManager;
33use risingwave_meta::manager::iceberg_compaction::IcebergCompactionManager;
34use risingwave_meta::manager::{META_NODE_ID, MetadataManager};
35use risingwave_meta::rpc::ElectionClientRef;
36use risingwave_meta::rpc::election::dummy::DummyElectionClient;
37use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
38use risingwave_meta::stream::{GlobalRefreshManager, ScaleController};
39use risingwave_meta_service::AddressInfo;
40use risingwave_meta_service::backup_service::BackupServiceImpl;
41use risingwave_meta_service::cloud_service::CloudServiceImpl;
42use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
43use risingwave_meta_service::cluster_service::ClusterServiceImpl;
44use risingwave_meta_service::ddl_service::DdlServiceImpl;
45use risingwave_meta_service::event_log_service::EventLogServiceImpl;
46use risingwave_meta_service::health_service::HealthServiceImpl;
47use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
48use risingwave_meta_service::hosted_iceberg_catalog_service::HostedIcebergCatalogServiceImpl;
49use risingwave_meta_service::hummock_service::HummockServiceImpl;
50use risingwave_meta_service::meta_member_service::MetaMemberServiceImpl;
51use risingwave_meta_service::monitor_service::MonitorServiceImpl;
52use risingwave_meta_service::notification_service::NotificationServiceImpl;
53use risingwave_meta_service::scale_service::ScaleServiceImpl;
54use risingwave_meta_service::serving_service::ServingServiceImpl;
55use risingwave_meta_service::session_config::SessionParamsServiceImpl;
56use risingwave_meta_service::sink_coordination_service::SinkCoordinationServiceImpl;
57use risingwave_meta_service::stream_service::StreamServiceImpl;
58use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
59use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
60use risingwave_meta_service::user_service::UserServiceImpl;
61use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
62use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
63use risingwave_pb::configured_monitor_service_server;
64use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
65use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
66use risingwave_pb::health::health_server::HealthServer;
67use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
68use risingwave_pb::meta::SystemParams;
69use risingwave_pb::meta::cluster_limit_service_server::ClusterLimitServiceServer;
70use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
71use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
72use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
73use risingwave_pb::meta::hosted_iceberg_catalog_service_server::HostedIcebergCatalogServiceServer;
74use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
75use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
76use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
77use risingwave_pb::meta::serving_service_server::ServingServiceServer;
78use risingwave_pb::meta::session_param_service_server::SessionParamServiceServer;
79use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
80use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
81use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
82use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
83use risingwave_pb::user::user_service_server::UserServiceServer;
84use sea_orm::{ConnectionTrait, DbBackend};
85use thiserror_ext::AsReport;
86use tokio::sync::watch;
87
88use crate::backup_restore::BackupManager;
89use crate::barrier::BarrierScheduler;
90use crate::controller::SqlMetaStore;
91use crate::controller::system_param::SystemParamsController;
92use crate::hummock::HummockManager;
93use crate::manager::sink_coordination::SinkCoordinatorManager;
94use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
95use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
96use crate::rpc::metrics::{GLOBAL_META_METRICS, start_info_monitor, start_worker_info_monitor};
97use crate::serving::ServingVnodeMapping;
98use crate::stream::{GlobalStreamManager, SourceManager};
99use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
100use crate::{MetaError, MetaResult, hummock, serving};
101
102pub mod started {
105 use std::sync::atomic::AtomicBool;
106 use std::sync::atomic::Ordering::Relaxed;
107
108 static STARTED: AtomicBool = AtomicBool::new(false);
109
110 pub(crate) fn set() {
112 STARTED.store(true, Relaxed);
113 }
114
115 pub fn get() -> bool {
117 STARTED.load(Relaxed)
118 }
119}
120
121pub async fn rpc_serve(
125 address_info: AddressInfo,
126 meta_store_backend: MetaStoreBackend,
127 max_cluster_heartbeat_interval: Duration,
128 lease_interval_secs: u64,
129 server_config: risingwave_common::config::ServerConfig,
130 opts: MetaOpts,
131 init_system_params: SystemParams,
132 session_init: SessionInitConfig,
133 shutdown: CancellationToken,
134) -> MetaResult<()> {
135 let meta_store_impl = SqlMetaStore::connect(meta_store_backend.clone()).await?;
136
137 let election_client = match meta_store_backend {
138 MetaStoreBackend::Mem => {
139 Arc::new(DummyElectionClient::new(
141 address_info.advertise_addr.clone(),
142 ))
143 }
144 MetaStoreBackend::Sql { .. } => {
145 let id = address_info.advertise_addr.clone();
147 let conn = meta_store_impl.conn.clone();
148 let election_client: ElectionClientRef = match conn.get_database_backend() {
149 DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
150 DbBackend::Postgres => {
151 Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
152 }
153 DbBackend::MySql => {
154 Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
155 }
156 };
157 election_client.init().await?;
158
159 election_client
160 }
161 };
162
163 Box::pin(rpc_serve_with_store(
164 meta_store_impl,
165 election_client,
166 address_info,
167 max_cluster_heartbeat_interval,
168 lease_interval_secs,
169 server_config,
170 opts,
171 init_system_params,
172 session_init,
173 shutdown,
174 ))
175 .await
176}
177
178pub async fn rpc_serve_with_store(
183 meta_store_impl: SqlMetaStore,
184 election_client: ElectionClientRef,
185 address_info: AddressInfo,
186 max_cluster_heartbeat_interval: Duration,
187 lease_interval_secs: u64,
188 server_config: risingwave_common::config::ServerConfig,
189 opts: MetaOpts,
190 init_system_params: SystemParams,
191 session_init: SessionInitConfig,
192 shutdown: CancellationToken,
193) -> MetaResult<()> {
194 let (election_shutdown_tx, election_shutdown_rx) = watch::channel(());
196
197 let election_handle = tokio::spawn({
198 let shutdown = shutdown.clone();
199 let election_client = election_client.clone();
200
201 async move {
202 while let Err(e) = election_client
203 .run_once(lease_interval_secs as i64, election_shutdown_rx.clone())
204 .await
205 {
206 tracing::error!(error = %e.as_report(), "election error happened");
207 }
208 shutdown.cancel();
210 }
211 });
212
213 if !election_client.is_leader() {
218 let follower_shutdown = shutdown.child_token();
220
221 let follower_handle = tokio::spawn(start_service_as_election_follower(
222 follower_shutdown.clone(),
223 address_info.clone(),
224 election_client.clone(),
225 ));
226
227 let mut is_leader_watcher = election_client.subscribe();
229
230 while !*is_leader_watcher.borrow_and_update() {
231 tokio::select! {
232 _ = shutdown.cancelled() => return Ok(()),
234
235 res = is_leader_watcher.changed() => {
236 if res.is_err() {
237 tracing::error!("leader watcher recv failed");
238 }
239 }
240 }
241 }
242
243 tracing::info!("elected as leader, shutting down follower services");
244 follower_shutdown.cancel();
245 let _ = follower_handle.await;
246 }
247
248 let result = start_service_as_election_leader(
250 meta_store_impl,
251 address_info,
252 max_cluster_heartbeat_interval,
253 opts,
254 init_system_params,
255 session_init,
256 server_config,
257 election_client,
258 shutdown,
259 )
260 .await;
261
262 election_shutdown_tx.send(()).ok();
264 let _ = election_handle.await;
265
266 result
267}
268
269pub async fn start_service_as_election_follower(
273 shutdown: CancellationToken,
274 address_info: AddressInfo,
275 election_client: ElectionClientRef,
276) {
277 tracing::info!("starting follower services");
278
279 let meta_member_srv = MetaMemberServiceImpl::new(election_client);
280
281 let health_srv = HealthServiceImpl::new();
282
283 let server = tonic::transport::Server::builder()
284 .layer(MetricsMiddlewareLayer::new(Arc::new(
285 GLOBAL_META_METRICS.clone(),
286 )))
287 .layer(TracingExtractLayer::new())
288 .add_service(MetaMemberServiceServer::new(meta_member_srv))
289 .add_service(HealthServer::new(health_srv))
290 .monitored_serve_with_shutdown(
291 address_info.listen_addr,
292 "grpc-meta-follower-service",
293 TcpConfig {
294 tcp_nodelay: true,
295 keepalive_duration: None,
296 },
297 shutdown.clone().cancelled_owned(),
298 );
299 let server_handle = tokio::spawn(server);
300 started::set();
301
302 shutdown.cancelled().await;
304 let _ = server_handle.await;
307}
308
309pub async fn start_service_as_election_leader(
313 meta_store_impl: SqlMetaStore,
314 address_info: AddressInfo,
315 max_cluster_heartbeat_interval: Duration,
316 opts: MetaOpts,
317 init_system_params: SystemParams,
318 session_init: SessionInitConfig,
319 server_config: risingwave_common::config::ServerConfig,
320 election_client: ElectionClientRef,
321 shutdown: CancellationToken,
322) -> MetaResult<()> {
323 tracing::info!("starting leader services");
324
325 let env = MetaSrvEnv::new(
326 opts.clone(),
327 init_system_params,
328 session_init,
329 meta_store_impl,
330 )
331 .await?;
332 tracing::info!("MetaSrvEnv started");
333 let _ = env.may_start_watch_license_key_file()?;
334 let system_params_reader = env.system_params_reader().await;
335
336 let data_directory = system_params_reader.data_directory();
337 if !is_correct_data_directory(data_directory) {
338 return Err(MetaError::system_params(format!(
339 "The data directory {:?} is misconfigured.
340 Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9].
341 The string cannot start or end with '/', and consecutive '/' are not allowed.
342 The data directory cannot be empty and its length should not exceed 800 characters.",
343 data_directory
344 )));
345 }
346
347 let cluster_controller = Arc::new(
348 ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
349 .await
350 .unwrap(),
351 );
352 let catalog_controller = Arc::new(CatalogController::new(env.clone()).await?);
353 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
354
355 let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
356 let max_serving_parallelism = env
357 .session_params_manager_impl_ref()
358 .get_params()
359 .await
360 .batch_parallelism()
361 .map(|p| p.get());
362 serving::on_meta_start(
363 env.notification_manager_ref(),
364 &metadata_manager,
365 serving_vnode_mapping.clone(),
366 max_serving_parallelism,
367 )
368 .await;
369
370 let compactor_manager = Arc::new(
371 hummock::CompactorManager::with_meta(env.clone())
372 .await
373 .unwrap(),
374 );
375 tracing::info!("CompactorManager started");
376
377 let heartbeat_srv = HeartbeatServiceImpl::new(metadata_manager.clone());
378 tracing::info!("HeartbeatServiceImpl started");
379
380 let (compactor_streams_change_tx, compactor_streams_change_rx) =
381 tokio::sync::mpsc::unbounded_channel();
382
383 let meta_metrics = Arc::new(GLOBAL_META_METRICS.clone());
384
385 let hummock_manager = hummock::HummockManager::new(
386 env.clone(),
387 metadata_manager.clone(),
388 meta_metrics.clone(),
389 compactor_manager.clone(),
390 compactor_streams_change_tx,
391 )
392 .await
393 .unwrap();
394 tracing::info!("HummockManager started");
395 let object_store_media_type = hummock_manager.object_store_media_type();
396
397 let meta_member_srv = MetaMemberServiceImpl::new(election_client.clone());
398
399 let prometheus_client = opts.prometheus_endpoint.as_ref().map(|x| {
400 use std::str::FromStr;
401 prometheus_http_query::Client::from_str(x).unwrap()
402 });
403 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
404
405 let trace_state = otlp_embedded::State::new(otlp_embedded::Config {
406 max_length: opts.cached_traces_num,
407 max_memory_usage: opts.cached_traces_memory_limit_bytes,
408 });
409 let trace_srv = otlp_embedded::TraceServiceImpl::new(trace_state.clone());
410
411 let (barrier_scheduler, scheduled_barriers) =
412 BarrierScheduler::new_pair(hummock_manager.clone(), meta_metrics.clone());
413 tracing::info!("BarrierScheduler started");
414
415 let backup_manager = BackupManager::new(
417 env.clone(),
418 hummock_manager.clone(),
419 meta_metrics.clone(),
420 system_params_reader.backup_storage_url(),
421 system_params_reader.backup_storage_directory(),
422 )
423 .await?;
424 tracing::info!("BackupManager started");
425
426 LocalSecretManager::init(
427 opts.temp_secret_file_dir,
428 env.cluster_id().to_string(),
429 META_NODE_ID,
430 );
431 tracing::info!("LocalSecretManager started");
432
433 let notification_srv = NotificationServiceImpl::new(
434 env.clone(),
435 metadata_manager.clone(),
436 hummock_manager.clone(),
437 backup_manager.clone(),
438 serving_vnode_mapping.clone(),
439 )
440 .await?;
441 tracing::info!("NotificationServiceImpl started");
442
443 let source_manager = Arc::new(
444 SourceManager::new(
445 barrier_scheduler.clone(),
446 metadata_manager.clone(),
447 meta_metrics.clone(),
448 env.clone(),
449 )
450 .await?,
451 );
452 tracing::info!("SourceManager started");
453
454 let (iceberg_compaction_stat_tx, iceberg_compaction_stat_rx) =
455 tokio::sync::mpsc::unbounded_channel();
456 let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(
457 env.meta_store_ref().conn.clone(),
458 hummock_manager.clone(),
459 metadata_manager.clone(),
460 iceberg_compaction_stat_tx,
461 env.await_tree_reg().clone(),
462 );
463 tracing::info!("SinkCoordinatorManager started");
464 let mut sub_tasks = vec![shutdown_handle];
466
467 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
468
469 let (iceberg_compaction_mgr, iceberg_compactor_event_rx) = IcebergCompactionManager::build(
471 env.clone(),
472 metadata_manager.clone(),
473 iceberg_compactor_manager.clone(),
474 meta_metrics.clone(),
475 );
476
477 sub_tasks.push(IcebergCompactionManager::compaction_stat_loop(
478 iceberg_compaction_mgr.clone(),
479 iceberg_compaction_stat_rx,
480 ));
481
482 sub_tasks.push(IcebergCompactionManager::gc_loop(
483 iceberg_compaction_mgr.clone(),
484 env.opts.iceberg_gc_interval_sec,
485 ));
486
487 let refresh_scheduler_interval = Duration::from_secs(env.opts.refresh_scheduler_interval_sec);
488 let (refresh_manager, refresh_handle, refresh_shutdown) = GlobalRefreshManager::start(
489 metadata_manager.clone(),
490 barrier_scheduler.clone(),
491 &env,
492 refresh_scheduler_interval,
493 )
494 .await?;
495 sub_tasks.push((refresh_handle, refresh_shutdown));
496
497 let scale_controller = Arc::new(ScaleController::new(
498 &metadata_manager,
499 source_manager.clone(),
500 env.clone(),
501 ));
502
503 let (barrier_manager, join_handle, shutdown_rx) = GlobalBarrierManager::start(
504 scheduled_barriers,
505 env.clone(),
506 metadata_manager.clone(),
507 hummock_manager.clone(),
508 source_manager.clone(),
509 sink_manager.clone(),
510 scale_controller.clone(),
511 barrier_scheduler.clone(),
512 refresh_manager.clone(),
513 )
514 .await;
515 tracing::info!("GlobalBarrierManager started");
516 sub_tasks.push((join_handle, shutdown_rx));
517
518 {
519 let source_manager = source_manager.clone();
520 tokio::spawn(async move {
521 source_manager.run().await.unwrap();
522 });
523 }
524
525 let stream_manager = Arc::new(
526 GlobalStreamManager::new(
527 env.clone(),
528 metadata_manager.clone(),
529 barrier_scheduler.clone(),
530 hummock_manager.clone(),
531 source_manager.clone(),
532 refresh_manager.clone(),
533 scale_controller.clone(),
534 )
535 .unwrap(),
536 );
537
538 hummock_manager
539 .may_fill_backward_state_table_info()
540 .await
541 .unwrap();
542
543 let ddl_srv = DdlServiceImpl::new(
544 env.clone(),
545 metadata_manager.clone(),
546 stream_manager.clone(),
547 source_manager.clone(),
548 barrier_manager.clone(),
549 sink_manager.clone(),
550 meta_metrics.clone(),
551 iceberg_compaction_mgr.clone(),
552 barrier_scheduler.clone(),
553 )
554 .await;
555
556 if env.opts.enable_legacy_table_migration {
557 sub_tasks.push(ddl_srv.start_migrate_table_fragments());
558 }
559
560 let user_srv = UserServiceImpl::new(metadata_manager.clone());
561
562 let scale_srv = ScaleServiceImpl::new(
563 metadata_manager.clone(),
564 stream_manager.clone(),
565 barrier_manager.clone(),
566 env.clone(),
567 );
568
569 let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone(), barrier_manager.clone());
570 let stream_srv = StreamServiceImpl::new(
571 env.clone(),
572 barrier_scheduler.clone(),
573 barrier_manager.clone(),
574 stream_manager.clone(),
575 metadata_manager.clone(),
576 refresh_manager.clone(),
577 iceberg_compaction_mgr.clone(),
578 );
579 let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager);
580 let hummock_srv = HummockServiceImpl::new(
581 hummock_manager.clone(),
582 metadata_manager.clone(),
583 backup_manager.clone(),
584 iceberg_compaction_mgr.clone(),
585 );
586
587 let health_srv = HealthServiceImpl::new();
588 let backup_srv = BackupServiceImpl::new(backup_manager.clone());
589 let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store());
590 let system_params_srv = SystemParamsServiceImpl::new(
591 env.system_params_manager_impl_ref(),
592 env.opts.license_key_path.is_some(),
593 );
594 let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
595 let serving_srv =
596 ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
597 let cloud_srv = CloudServiceImpl::new();
598 let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
599 let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());
600 let hosted_iceberg_catalog_srv = HostedIcebergCatalogServiceImpl::new(env.clone());
601 let monitor_srv = MonitorServiceImpl::new(
602 metadata_manager.clone(),
603 env.await_tree_reg().clone(),
604 server_config.clone(),
605 );
606 let diagnose_command = Arc::new(risingwave_meta::manager::diagnose::DiagnoseCommand::new(
607 metadata_manager.clone(),
608 env.await_tree_reg().clone(),
609 hummock_manager.clone(),
610 iceberg_compaction_mgr.clone(),
611 env.event_log_manager_ref(),
612 prometheus_client.clone(),
613 prometheus_selector.clone(),
614 opts.redact_sql_option_keywords.clone(),
615 env.system_params_manager_impl_ref(),
616 ));
617
618 #[cfg(not(madsim))]
619 let _dashboard_task = if let Some(ref dashboard_addr) = address_info.dashboard_addr {
620 use risingwave_common::config::RpcClientConfig;
621 use risingwave_rpc_client::MonitorClientPool;
622
623 let dashboard_service = crate::dashboard::DashboardService {
624 await_tree_reg: env.await_tree_reg().clone(),
625 dashboard_addr: *dashboard_addr,
626 prometheus_client,
627 prometheus_selector,
628 metadata_manager: metadata_manager.clone(),
629 hummock_manager: hummock_manager.clone(),
630 monitor_clients: MonitorClientPool::new(1, RpcClientConfig::default()),
631 diagnose_command,
632 profile_service: risingwave_common_heap_profiling::ProfileServiceImpl::new(
633 server_config.clone(),
634 ),
635 trace_state,
636 };
637 let task = tokio::spawn(dashboard_service.serve());
638 Some(task)
639 } else {
640 None
641 };
642
643 if let Some(prometheus_addr) = address_info.prometheus_addr {
644 MetricsManager::boot_metrics_service(prometheus_addr.to_string())
645 }
646
647 sub_tasks.extend(hummock::start_hummock_workers(
649 hummock_manager.clone(),
650 backup_manager.clone(),
651 &env.opts,
652 {
653 let barrier_manager = barrier_manager.clone();
654 Box::new(move || {
655 let barrier_manager = barrier_manager.clone();
656 Box::pin(async move {
657 barrier_manager.may_snapshot_backfilling_job().await.unwrap_or_else(|e| {
658 tracing::warn!(err = %e.as_report(), "failed to check having snapshot backfilling jobs. pause vacuum time travel");
659 true
660 })
661 })
662 })
663 }
664 ));
665 sub_tasks.push(start_worker_info_monitor(
666 metadata_manager.clone(),
667 election_client.clone(),
668 Duration::from_secs(env.opts.node_num_monitor_interval_sec),
669 meta_metrics.clone(),
670 ));
671 sub_tasks.push(start_info_monitor(
672 metadata_manager.clone(),
673 hummock_manager.clone(),
674 barrier_manager.clone(),
675 env.system_params_manager_impl_ref(),
676 meta_metrics.clone(),
677 ));
678 sub_tasks.push(SystemParamsController::start_params_notifier(
679 env.system_params_manager_impl_ref(),
680 ));
681 sub_tasks.push(HummockManager::hummock_timer_task(
682 hummock_manager.clone(),
683 Some(backup_manager),
684 ));
685 sub_tasks.extend(HummockManager::compaction_event_loop(
686 hummock_manager.clone(),
687 compactor_streams_change_rx,
688 ));
689
690 sub_tasks.extend(IcebergCompactionManager::iceberg_compaction_event_loop(
691 iceberg_compaction_mgr.clone(),
692 iceberg_compactor_event_rx,
693 ));
694
695 sub_tasks.push(serving::start_serving_vnode_mapping_worker(
696 env.notification_manager_ref(),
697 metadata_manager.clone(),
698 serving_vnode_mapping,
699 env.session_params_manager_impl_ref(),
700 ));
701
702 {
703 sub_tasks.push(ClusterController::start_heartbeat_checker(
704 metadata_manager.cluster_controller.clone(),
705 Duration::from_secs(1),
706 ));
707
708 if !env.opts.disable_automatic_parallelism_control {
709 sub_tasks.push(stream_manager.start_auto_parallelism_monitor());
710 }
711 }
712
713 let _idle_checker_handle = IdleManager::start_idle_checker(
714 env.idle_manager_ref(),
715 Duration::from_secs(30),
716 shutdown.clone(),
717 );
718
719 let (abort_sender, abort_recv) = tokio::sync::oneshot::channel();
720 let notification_mgr = env.notification_manager_ref();
721 let stream_abort_handler = tokio::spawn(async move {
722 let _ = abort_recv.await;
723 notification_mgr.abort_all();
724 compactor_manager.abort_all_compactors();
725 });
726 sub_tasks.push((stream_abort_handler, abort_sender));
727
728 let telemetry_manager = TelemetryManager::new(
729 Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
730 Arc::new(MetaReportCreator::new(
731 metadata_manager.clone(),
732 object_store_media_type,
733 )),
734 );
735
736 if env.opts.telemetry_enabled && telemetry_env_enabled() {
738 sub_tasks.push(telemetry_manager.start().await);
739 } else {
740 tracing::info!("Telemetry didn't start due to meta backend or config");
741 }
742 if !cfg!(madsim) && report_scarf_enabled() {
743 tokio::spawn(report_to_scarf());
744 } else {
745 tracing::info!("Scarf reporting is disabled");
746 };
747
748 if let Some(pair) = env.event_log_manager_ref().take_join_handle() {
749 sub_tasks.push(pair);
750 }
751
752 tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
753 tracing::info!("Starting meta services");
754
755 let event = risingwave_pb::meta::event_log::EventMetaNodeStart {
756 advertise_addr: address_info.advertise_addr,
757 listen_addr: address_info.listen_addr.to_string(),
758 opts: serde_json::to_string(&env.opts).unwrap(),
759 };
760 env.event_log_manager_ref().add_event_logs(vec![
761 risingwave_pb::meta::event_log::Event::MetaNodeStart(event),
762 ]);
763
764 let server_builder = tonic::transport::Server::builder()
765 .layer(MetricsMiddlewareLayer::new(meta_metrics))
766 .layer(TracingExtractLayer::new())
767 .add_service(HeartbeatServiceServer::new(heartbeat_srv))
768 .add_service(ClusterServiceServer::new(cluster_srv))
769 .add_service(StreamManagerServiceServer::new(stream_srv))
770 .add_service(
771 HummockManagerServiceServer::new(hummock_srv).max_decoding_message_size(usize::MAX),
772 )
773 .add_service(NotificationServiceServer::new(notification_srv))
774 .add_service(MetaMemberServiceServer::new(meta_member_srv))
775 .add_service(DdlServiceServer::new(ddl_srv).max_decoding_message_size(usize::MAX))
776 .add_service(UserServiceServer::new(user_srv))
777 .add_service(CloudServiceServer::new(cloud_srv))
778 .add_service(ScaleServiceServer::new(scale_srv).max_decoding_message_size(usize::MAX))
779 .add_service(HealthServer::new(health_srv))
780 .add_service(BackupServiceServer::new(backup_srv))
781 .add_service(SystemParamsServiceServer::new(system_params_srv))
782 .add_service(SessionParamServiceServer::new(session_params_srv))
783 .add_service(TelemetryInfoServiceServer::new(telemetry_srv))
784 .add_service(ServingServiceServer::new(serving_srv))
785 .add_service(
786 SinkCoordinationServiceServer::new(sink_coordination_srv)
787 .max_decoding_message_size(usize::MAX),
788 )
789 .add_service(
790 EventLogServiceServer::new(event_log_srv).max_decoding_message_size(usize::MAX),
791 )
792 .add_service(ClusterLimitServiceServer::new(cluster_limit_srv))
793 .add_service(HostedIcebergCatalogServiceServer::new(
794 hosted_iceberg_catalog_srv,
795 ))
796 .add_service(configured_monitor_service_server(
797 MonitorServiceServer::new(monitor_srv),
798 ));
799
800 #[cfg(not(madsim))] let server_builder = server_builder.add_service(TraceServiceServer::new(trace_srv));
802
803 let server = server_builder.monitored_serve_with_shutdown(
804 address_info.listen_addr,
805 "grpc-meta-leader-service",
806 TcpConfig {
807 tcp_nodelay: true,
808 keepalive_duration: None,
809 },
810 shutdown.clone().cancelled_owned(),
811 );
812 started::set();
813 let _server_handle = tokio::spawn(server);
814
815 shutdown.cancelled().await;
817 Ok(())
820}
821
822fn is_correct_data_directory(data_directory: &str) -> bool {
823 let data_directory_regex = Regex::new(r"^[0-9a-zA-Z_/-]{1,}$").unwrap();
824 if data_directory.is_empty()
825 || !data_directory_regex.is_match(data_directory)
826 || data_directory.ends_with('/')
827 || data_directory.starts_with('/')
828 || data_directory.contains("//")
829 || data_directory.len() > 800
830 {
831 return false;
832 }
833 true
834}