risingwave_compute/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_batch::monitor::{
20    GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
21};
22use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
23use risingwave_batch::spill::spill_op::SpillOp;
24use risingwave_batch::task::{BatchEnvironment, BatchManager};
25use risingwave_common::config::{
26    AsyncStackTraceOption, MAX_CONNECTION_WINDOW_SIZE, MetricLevel, STREAM_WINDOW_SIZE,
27    StorageMemoryConfig, load_config,
28};
29use risingwave_common::license::LicenseManager;
30use risingwave_common::lru::init_global_sequencer_args;
31use risingwave_common::monitor::{RouterExt, TcpConfig};
32use risingwave_common::secret::LocalSecretManager;
33use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::telemetry::manager::TelemetryManager;
36use risingwave_common::telemetry::telemetry_env_enabled;
37use risingwave_common::util::addr::HostAddr;
38use risingwave_common::util::pretty_bytes::convert;
39use risingwave_common::util::tokio_util::sync::CancellationToken;
40use risingwave_common::{GIT_SHA, RW_VERSION};
41use risingwave_common_heap_profiling::HeapProfiler;
42use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
43use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS;
44use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
45use risingwave_dml::dml_manager::DmlManager;
46use risingwave_pb::common::WorkerType;
47use risingwave_pb::common::worker_node::Property;
48use risingwave_pb::compute::config_service_server::ConfigServiceServer;
49use risingwave_pb::health::health_server::HealthServer;
50use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
51use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
52use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
53use risingwave_pb::task_service::task_service_server::TaskServiceServer;
54use risingwave_rpc_client::{ComputeClientPool, MetaClient};
55use risingwave_storage::StateStoreImpl;
56use risingwave_storage::hummock::MemoryLimiter;
57use risingwave_storage::hummock::compactor::{
58    CompactionExecutor, CompactorContext, new_compaction_await_tree_reg_ref, start_compactor,
59};
60use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
61use risingwave_storage::hummock::utils::HummockMemoryCollector;
62use risingwave_storage::monitor::{
63    GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, GLOBAL_OBJECT_STORE_METRICS,
64    global_hummock_state_store_metrics, global_storage_metrics, monitor_cache,
65};
66use risingwave_storage::opts::StorageOpts;
67use risingwave_stream::executor::monitor::global_streaming_metrics;
68use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
69use tokio::sync::oneshot::Sender;
70use tokio::task::JoinHandle;
71use tower::Layer;
72
73use crate::ComputeNodeOpts;
74use crate::memory::config::{
75    MIN_COMPUTE_MEMORY_MB, batch_mem_limit, reserve_memory_bytes, storage_memory_config,
76};
77use crate::memory::manager::{MemoryManager, MemoryManagerConfig};
78use crate::observer::observer_manager::ComputeObserverNode;
79use crate::rpc::service::config_service::ConfigServiceImpl;
80use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS;
81use crate::rpc::service::exchange_service::ExchangeServiceImpl;
82use crate::rpc::service::health_service::HealthServiceImpl;
83use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl};
84use crate::rpc::service::stream_service::StreamServiceImpl;
85use crate::telemetry::ComputeTelemetryCreator;
86/// Bootstraps the compute-node.
87///
88/// Returns when the `shutdown` token is triggered.
89pub async fn compute_node_serve(
90    listen_addr: SocketAddr,
91    advertise_addr: HostAddr,
92    opts: Arc<ComputeNodeOpts>,
93    shutdown: CancellationToken,
94) {
95    // Load the configuration.
96    let config = Arc::new(load_config(&opts.config_path, &*opts));
97    info!("Starting compute node",);
98    info!("> config: {:?}", &*config);
99    info!(
100        "> debug assertions: {}",
101        if cfg!(debug_assertions) { "on" } else { "off" }
102    );
103    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
104
105    // Initialize all the configs
106    let stream_config = Arc::new(config.streaming.clone());
107    let batch_config = Arc::new(config.batch.clone());
108
109    // Initialize operator lru cache global sequencer args.
110    init_global_sequencer_args(
111        config
112            .streaming
113            .developer
114            .memory_controller_sequence_tls_step,
115        config
116            .streaming
117            .developer
118            .memory_controller_sequence_tls_lag,
119    );
120
121    // Register to the cluster. We're not ready to serve until activate is called.
122    let (meta_client, system_params) = MetaClient::register_new(
123        opts.meta_address.clone(),
124        WorkerType::ComputeNode,
125        &advertise_addr,
126        Property {
127            parallelism: opts.parallelism as u32,
128            is_streaming: opts.role.for_streaming(),
129            is_serving: opts.role.for_serving(),
130            is_unschedulable: false,
131            internal_rpc_host_addr: "".to_owned(),
132            resource_group: Some(opts.resource_group.clone()),
133        },
134        &config.meta,
135    )
136    .await;
137    // TODO(shutdown): remove this as there's no need to gracefully shutdown the sub-tasks.
138    let mut sub_tasks: Vec<(JoinHandle<()>, Sender<()>)> = vec![];
139    sub_tasks.push(MetaClient::start_heartbeat_loop(
140        meta_client.clone(),
141        Duration::from_millis(config.server.heartbeat_interval_ms as u64),
142    ));
143
144    let state_store_url = system_params.state_store();
145
146    let embedded_compactor_enabled =
147        embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);
148
149    let (reserved_memory_bytes, non_reserved_memory_bytes) = reserve_memory_bytes(&opts);
150    let storage_memory_config = storage_memory_config(
151        non_reserved_memory_bytes,
152        embedded_compactor_enabled,
153        &config.storage,
154        !opts.role.for_streaming(),
155    );
156
157    let storage_memory_bytes = total_storage_memory_limit_bytes(&storage_memory_config);
158    let compute_memory_bytes = validate_compute_node_memory_config(
159        opts.total_memory_bytes,
160        reserved_memory_bytes,
161        storage_memory_bytes,
162    );
163    print_memory_config(
164        opts.total_memory_bytes,
165        compute_memory_bytes,
166        storage_memory_bytes,
167        &storage_memory_config,
168        embedded_compactor_enabled,
169        reserved_memory_bytes,
170    );
171
172    let storage_opts = Arc::new(StorageOpts::from((
173        &*config,
174        &system_params,
175        &storage_memory_config,
176    )));
177
178    let worker_id = meta_client.worker_id();
179    info!("Assigned worker node id {}", worker_id);
180
181    // Initialize the metrics subsystem.
182    let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
183    let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
184    let streaming_metrics = Arc::new(global_streaming_metrics(config.server.metrics_level));
185    let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
186    let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone());
187    let exchange_srv_metrics = Arc::new(GLOBAL_EXCHANGE_SERVICE_METRICS.clone());
188    let batch_spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
189    let iceberg_scan_metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
190
191    // Initialize state store.
192    let state_store_metrics = Arc::new(global_hummock_state_store_metrics(
193        config.server.metrics_level,
194    ));
195    let object_store_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
196    let storage_metrics = Arc::new(global_storage_metrics(config.server.metrics_level));
197    let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
198    let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
199        meta_client.clone(),
200        hummock_metrics.clone(),
201    ));
202
203    let await_tree_config = match &config.streaming.async_stack_trace {
204        AsyncStackTraceOption::Off => None,
205        c => await_tree::ConfigBuilder::default()
206            .verbose(c.is_verbose().unwrap())
207            .build()
208            .ok(),
209    };
210
211    LicenseManager::get().refresh(system_params.license_key());
212    let state_store = Box::pin(StateStoreImpl::new(
213        state_store_url,
214        storage_opts.clone(),
215        hummock_meta_client.clone(),
216        state_store_metrics.clone(),
217        object_store_metrics,
218        storage_metrics.clone(),
219        compactor_metrics.clone(),
220        await_tree_config.clone(),
221        system_params.use_new_object_prefix_strategy(),
222    ))
223    .await
224    .unwrap();
225
226    LocalSecretManager::init(
227        opts.temp_secret_file_dir.clone(),
228        meta_client.cluster_id().to_owned(),
229        worker_id,
230    );
231
232    // Initialize observer manager.
233    let batch_client_pool = Arc::new(ComputeClientPool::new(
234        config.batch_exchange_connection_pool_size(),
235        config.batch.developer.compute_client_config.clone(),
236    ));
237    let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
238    let compute_observer_node =
239        ComputeObserverNode::new(system_params_manager.clone(), batch_client_pool.clone());
240    let observer_manager =
241        ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
242    observer_manager.start().await;
243
244    if let Some(storage) = state_store.as_hummock() {
245        if embedded_compactor_enabled {
246            tracing::info!("start embedded compactor");
247            let memory_limiter = Arc::new(MemoryLimiter::new(
248                storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
249            ));
250
251            let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
252            let compactor_context = CompactorContext {
253                storage_opts,
254                sstable_store: storage.sstable_store(),
255                compactor_metrics: compactor_metrics.clone(),
256                is_share_buffer_compact: false,
257                compaction_executor,
258                memory_limiter,
259
260                task_progress_manager: Default::default(),
261                await_tree_reg: await_tree_config
262                    .clone()
263                    .map(new_compaction_await_tree_reg_ref),
264            };
265
266            let (handle, shutdown_sender) = start_compactor(
267                compactor_context,
268                hummock_meta_client.clone(),
269                storage.object_id_manager().clone(),
270                storage.compaction_catalog_manager_ref().clone(),
271            );
272            sub_tasks.push((handle, shutdown_sender));
273        }
274        let flush_limiter = storage.get_memory_limiter();
275        let memory_collector = Arc::new(HummockMemoryCollector::new(
276            storage.sstable_store(),
277            flush_limiter,
278            storage_memory_config,
279        ));
280        monitor_cache(memory_collector);
281        let backup_reader = storage.backup_reader();
282        let system_params_mgr = system_params_manager.clone();
283        tokio::spawn(async move {
284            backup_reader
285                .watch_config_change(system_params_mgr.watch_params())
286                .await;
287        });
288    }
289
290    // Initialize the managers.
291    let batch_mgr = Arc::new(BatchManager::new(
292        config.batch.clone(),
293        batch_manager_metrics,
294        batch_mem_limit(compute_memory_bytes, opts.role.for_serving()),
295    ));
296
297    let target_memory = if let Some(v) = opts.memory_manager_target_bytes {
298        v
299    } else {
300        compute_memory_bytes + storage_memory_bytes
301    };
302
303    let memory_mgr = MemoryManager::new(MemoryManagerConfig {
304        target_memory,
305        threshold_aggressive: config
306            .streaming
307            .developer
308            .memory_controller_threshold_aggressive,
309        threshold_graceful: config
310            .streaming
311            .developer
312            .memory_controller_threshold_graceful,
313        threshold_stable: config
314            .streaming
315            .developer
316            .memory_controller_threshold_stable,
317        eviction_factor_stable: config
318            .streaming
319            .developer
320            .memory_controller_eviction_factor_stable,
321        eviction_factor_graceful: config
322            .streaming
323            .developer
324            .memory_controller_eviction_factor_graceful,
325        eviction_factor_aggressive: config
326            .streaming
327            .developer
328            .memory_controller_eviction_factor_aggressive,
329        metrics: streaming_metrics.clone(),
330    });
331
332    // Run a background memory manager
333    tokio::spawn(
334        memory_mgr.clone().run(Duration::from_millis(
335            config
336                .streaming
337                .developer
338                .memory_controller_update_interval_ms as _,
339        )),
340    );
341
342    let heap_profiler = HeapProfiler::new(
343        opts.total_memory_bytes,
344        config.server.heap_profiling.clone(),
345    );
346    // Run a background heap profiler
347    heap_profiler.start();
348
349    let dml_mgr = Arc::new(DmlManager::new(
350        worker_id,
351        config.streaming.developer.dml_channel_initial_permits,
352    ));
353
354    // Initialize batch environment.
355    let batch_env = BatchEnvironment::new(
356        batch_mgr.clone(),
357        advertise_addr.clone(),
358        batch_config,
359        worker_id,
360        state_store.clone(),
361        batch_executor_metrics.clone(),
362        batch_client_pool,
363        dml_mgr.clone(),
364        source_metrics.clone(),
365        batch_spill_metrics.clone(),
366        iceberg_scan_metrics.clone(),
367        config.server.metrics_level,
368    );
369
370    // Initialize the streaming environment.
371    let stream_client_pool = Arc::new(ComputeClientPool::new(
372        config.streaming_exchange_connection_pool_size(),
373        config.streaming.developer.compute_client_config.clone(),
374    ));
375    let stream_env = StreamEnvironment::new(
376        advertise_addr.clone(),
377        stream_config,
378        worker_id,
379        state_store.clone(),
380        dml_mgr,
381        system_params_manager.clone(),
382        source_metrics,
383        meta_client.clone(),
384        stream_client_pool,
385    );
386
387    let stream_mgr = LocalStreamManager::new(
388        stream_env.clone(),
389        streaming_metrics.clone(),
390        await_tree_config.clone(),
391        memory_mgr.get_watermark_sequence(),
392    );
393
394    // Boot the runtime gRPC services.
395    let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env);
396    let exchange_srv =
397        ExchangeServiceImpl::new(batch_mgr.clone(), stream_mgr.clone(), exchange_srv_metrics);
398    let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone());
399    let (meta_cache, block_cache) = if let Some(hummock) = state_store.as_hummock() {
400        (
401            Some(hummock.sstable_store().meta_cache().clone()),
402            Some(hummock.sstable_store().block_cache().clone()),
403        )
404    } else {
405        (None, None)
406    };
407    let monitor_srv = MonitorServiceImpl::new(
408        stream_mgr.clone(),
409        config.server.clone(),
410        meta_cache.clone(),
411        block_cache.clone(),
412    );
413    let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
414    let health_srv = HealthServiceImpl::new();
415
416    let telemetry_manager = TelemetryManager::new(
417        Arc::new(meta_client.clone()),
418        Arc::new(ComputeTelemetryCreator::new()),
419    );
420
421    // if the toml config file or env variable disables telemetry, do not watch system params change
422    // because if any of configs disable telemetry, we should never start it
423    if config.server.telemetry_enabled && telemetry_env_enabled() {
424        sub_tasks.push(telemetry_manager.start().await);
425    } else {
426        tracing::info!("Telemetry didn't start due to config");
427    }
428
429    // Clean up the spill directory.
430    #[cfg(not(madsim))]
431    if config.batch.enable_spill {
432        SpillOp::clean_spill_directory().await.unwrap();
433    }
434
435    let server = tonic::transport::Server::builder()
436        .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
437        .initial_stream_window_size(STREAM_WINDOW_SIZE)
438        .http2_max_pending_accept_reset_streams(Some(config.server.grpc_max_reset_stream as usize))
439        .layer(TracingExtractLayer::new())
440        // XXX: unlimit the max message size to allow arbitrary large SQL input.
441        .add_service(TaskServiceServer::new(batch_srv).max_decoding_message_size(usize::MAX))
442        .add_service(ExchangeServiceServer::new(exchange_srv).max_decoding_message_size(usize::MAX))
443        .add_service({
444            let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned();
445            let srv = StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX);
446            #[cfg(madsim)]
447            {
448                srv
449            }
450            #[cfg(not(madsim))]
451            {
452                AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv)
453            }
454        })
455        .add_service(MonitorServiceServer::new(monitor_srv))
456        .add_service(ConfigServiceServer::new(config_srv))
457        .add_service(HealthServer::new(health_srv))
458        .monitored_serve_with_shutdown(
459            listen_addr,
460            "grpc-compute-node-service",
461            TcpConfig {
462                tcp_nodelay: true,
463                keepalive_duration: None,
464            },
465            shutdown.clone().cancelled_owned(),
466        );
467    let _server_handle = tokio::spawn(server);
468
469    // Boot metrics service.
470    if config.server.metrics_level > MetricLevel::Disabled {
471        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
472    }
473
474    // All set, let the meta service know we're ready.
475    meta_client.activate(&advertise_addr).await.unwrap();
476    // Wait for the shutdown signal.
477    shutdown.cancelled().await;
478
479    // Unregister from the meta service, then...
480    // - batch queries will not be scheduled to this compute node,
481    // - streaming actors will not be scheduled to this compute node after next recovery.
482    meta_client.try_unregister().await;
483    // Shutdown the streaming manager.
484    let _ = stream_mgr.shutdown().await;
485
486    // NOTE(shutdown): We can't simply join the tonic server here because it only returns when all
487    // existing connections are closed, while we have long-running streaming calls that never
488    // close. From the other side, there's also no need to gracefully shutdown them if we have
489    // unregistered from the meta service.
490}
491
492/// Check whether the compute node has enough memory to perform computing tasks. Apart from storage,
493/// it is recommended to reserve at least `MIN_COMPUTE_MEMORY_MB` for computing and
494/// `SYSTEM_RESERVED_MEMORY_PROPORTION` of total memory for other system usage. If the requirement
495/// is not met, we will print out a warning and enforce the memory used for computing tasks as
496/// `MIN_COMPUTE_MEMORY_MB`.
497fn validate_compute_node_memory_config(
498    cn_total_memory_bytes: usize,
499    reserved_memory_bytes: usize,
500    storage_memory_bytes: usize,
501) -> usize {
502    if storage_memory_bytes > cn_total_memory_bytes {
503        tracing::warn!(
504            "The storage memory exceeds the total compute node memory:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
505            convert(cn_total_memory_bytes as _),
506            convert(storage_memory_bytes as _)
507        );
508        MIN_COMPUTE_MEMORY_MB << 20
509    } else if storage_memory_bytes + (MIN_COMPUTE_MEMORY_MB << 20) + reserved_memory_bytes
510        >= cn_total_memory_bytes
511    {
512        tracing::warn!(
513            "No enough memory for computing and other system usage:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
514            convert(cn_total_memory_bytes as _),
515            convert(storage_memory_bytes as _)
516        );
517        MIN_COMPUTE_MEMORY_MB << 20
518    } else {
519        cn_total_memory_bytes - storage_memory_bytes - reserved_memory_bytes
520    }
521}
522
523/// The maximal memory that storage components may use based on the configurations in bytes. Note
524/// that this is the total storage memory for one compute node instead of the whole cluster.
525fn total_storage_memory_limit_bytes(storage_memory_config: &StorageMemoryConfig) -> usize {
526    let total_storage_memory_mb = storage_memory_config.block_cache_capacity_mb
527        + storage_memory_config.meta_cache_capacity_mb
528        + storage_memory_config.shared_buffer_capacity_mb
529        + storage_memory_config.compactor_memory_limit_mb;
530    total_storage_memory_mb << 20
531}
532
533/// Checks whether an embedded compactor starts with a compute node.
534fn embedded_compactor_enabled(state_store_url: &str, disable_remote_compactor: bool) -> bool {
535    // Always start an embedded compactor if the state store is in-memory.
536    state_store_url.starts_with("hummock+memory")
537        || state_store_url.starts_with("hummock+disk")
538        || disable_remote_compactor
539}
540
541// Print out the memory outline of the compute node.
542fn print_memory_config(
543    cn_total_memory_bytes: usize,
544    compute_memory_bytes: usize,
545    storage_memory_bytes: usize,
546    storage_memory_config: &StorageMemoryConfig,
547    embedded_compactor_enabled: bool,
548    reserved_memory_bytes: usize,
549) {
550    let memory_config = format!(
551        "Memory outline:\n\
552        > total_memory: {}\n\
553        >     storage_memory: {}\n\
554        >         block_cache_capacity: {}\n\
555        >         meta_cache_capacity: {}\n\
556        >         shared_buffer_capacity: {}\n\
557        >         compactor_memory_limit: {}\n\
558        >     compute_memory: {}\n\
559        >     reserved_memory: {}",
560        convert(cn_total_memory_bytes as _),
561        convert(storage_memory_bytes as _),
562        convert((storage_memory_config.block_cache_capacity_mb << 20) as _),
563        convert((storage_memory_config.meta_cache_capacity_mb << 20) as _),
564        convert((storage_memory_config.shared_buffer_capacity_mb << 20) as _),
565        if embedded_compactor_enabled {
566            convert((storage_memory_config.compactor_memory_limit_mb << 20) as _)
567        } else {
568            "Not enabled".to_owned()
569        },
570        convert(compute_memory_bytes as _),
571        convert(reserved_memory_bytes as _),
572    );
573    info!("{}", memory_config);
574}