risingwave_compute/
server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::time::Duration;
18
19use risingwave_batch::monitor::{
20    GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_SPILL_METRICS,
21};
22use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
23use risingwave_batch::spill::spill_op::SpillOp;
24use risingwave_batch::task::{BatchEnvironment, BatchManager};
25use risingwave_common::config::{
26    AsyncStackTraceOption, MAX_CONNECTION_WINDOW_SIZE, MetricLevel, STREAM_WINDOW_SIZE,
27    StorageMemoryConfig, load_config,
28};
29use risingwave_common::license::LicenseManager;
30use risingwave_common::lru::init_global_sequencer_args;
31use risingwave_common::monitor::{RouterExt, TcpConfig};
32use risingwave_common::secret::LocalSecretManager;
33use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::telemetry::manager::TelemetryManager;
36use risingwave_common::telemetry::telemetry_env_enabled;
37use risingwave_common::util::addr::HostAddr;
38use risingwave_common::util::pretty_bytes::convert;
39use risingwave_common::util::tokio_util::sync::CancellationToken;
40use risingwave_common::{DATA_DIRECTORY, GIT_SHA, RW_VERSION, STATE_STORE_URL};
41use risingwave_common_heap_profiling::HeapProfiler;
42use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
43use risingwave_connector::source::iceberg::GLOBAL_ICEBERG_SCAN_METRICS;
44use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
45use risingwave_dml::dml_manager::DmlManager;
46use risingwave_pb::common::WorkerType;
47use risingwave_pb::common::worker_node::Property;
48use risingwave_pb::compute::config_service_server::ConfigServiceServer;
49use risingwave_pb::health::health_server::HealthServer;
50use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
51use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
52use risingwave_pb::task_service::batch_exchange_service_server::BatchExchangeServiceServer;
53use risingwave_pb::task_service::stream_exchange_service_server::StreamExchangeServiceServer;
54use risingwave_pb::task_service::task_service_server::TaskServiceServer;
55use risingwave_rpc_client::{ComputeClientPool, MetaClient};
56use risingwave_storage::StateStoreImpl;
57use risingwave_storage::hummock::MemoryLimiter;
58use risingwave_storage::hummock::compactor::{
59    CompactionExecutor, CompactorContext, new_compaction_await_tree_reg_ref, start_compactor,
60};
61use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
62use risingwave_storage::hummock::utils::HummockMemoryCollector;
63use risingwave_storage::monitor::{
64    GLOBAL_COMPACTOR_METRICS, GLOBAL_HUMMOCK_METRICS, GLOBAL_OBJECT_STORE_METRICS,
65    global_hummock_state_store_metrics, global_storage_metrics, monitor_cache,
66};
67use risingwave_storage::opts::StorageOpts;
68use risingwave_stream::executor::monitor::global_streaming_metrics;
69use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
70use tokio::sync::oneshot::Sender;
71use tokio::task::JoinHandle;
72use tower::Layer;
73
74use crate::ComputeNodeOpts;
75use crate::memory::config::{
76    MIN_COMPUTE_MEMORY_MB, batch_mem_limit, reserve_memory_bytes, storage_memory_config,
77};
78use crate::memory::manager::{MemoryManager, MemoryManagerConfig};
79use crate::observer::observer_manager::ComputeObserverNode;
80use crate::rpc::service::batch_exchange_service::BatchExchangeServiceImpl;
81use crate::rpc::service::config_service::ConfigServiceImpl;
82use crate::rpc::service::health_service::HealthServiceImpl;
83use crate::rpc::service::monitor_service::{AwaitTreeMiddlewareLayer, MonitorServiceImpl};
84use crate::rpc::service::stream_exchange_service::{
85    GLOBAL_STREAM_EXCHANGE_SERVICE_METRICS, StreamExchangeServiceImpl,
86};
87use crate::rpc::service::stream_service::StreamServiceImpl;
88use crate::telemetry::ComputeTelemetryCreator;
89/// Bootstraps the compute-node.
90///
91/// Returns when the `shutdown` token is triggered.
92pub async fn compute_node_serve(
93    listen_addr: SocketAddr,
94    advertise_addr: HostAddr,
95    opts: Arc<ComputeNodeOpts>,
96    shutdown: CancellationToken,
97) {
98    // Load the configuration.
99    let config = Arc::new(load_config(&opts.config_path, &*opts));
100    info!("Starting compute node",);
101    info!("> config: {:?}", &*config);
102    info!(
103        "> debug assertions: {}",
104        if cfg!(debug_assertions) { "on" } else { "off" }
105    );
106    info!("> version: {} ({})", RW_VERSION, GIT_SHA);
107
108    // Initialize all the configs
109    let stream_config = Arc::new(config.streaming.clone());
110    let batch_config = Arc::new(config.batch.clone());
111
112    // Initialize operator lru cache global sequencer args.
113    init_global_sequencer_args(
114        config
115            .streaming
116            .developer
117            .memory_controller_sequence_tls_step,
118        config
119            .streaming
120            .developer
121            .memory_controller_sequence_tls_lag,
122    );
123
124    // Register to the cluster. We're not ready to serve until activate is called.
125    let (meta_client, system_params) = MetaClient::register_new(
126        opts.meta_address.clone(),
127        WorkerType::ComputeNode,
128        &advertise_addr,
129        Property {
130            parallelism: opts.parallelism as u32,
131            is_streaming: opts.role.for_streaming(),
132            is_serving: opts.role.for_serving(),
133            is_unschedulable: false,
134            internal_rpc_host_addr: "".to_owned(),
135            resource_group: Some(opts.resource_group.clone()),
136            is_iceberg_compactor: false,
137        },
138        Arc::new(config.meta.clone()),
139    )
140    .await;
141    // TODO(shutdown): remove this as there's no need to gracefully shutdown the sub-tasks.
142    let mut sub_tasks: Vec<(JoinHandle<()>, Sender<()>)> = vec![];
143    sub_tasks.push(MetaClient::start_heartbeat_loop(
144        meta_client.clone(),
145        Duration::from_millis(config.server.heartbeat_interval_ms as u64),
146    ));
147
148    let state_store_url = system_params.state_store();
149    let data_directory = system_params.data_directory();
150
151    let embedded_compactor_enabled =
152        embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);
153
154    let (reserved_memory_bytes, non_reserved_memory_bytes) = reserve_memory_bytes(&opts);
155    let storage_memory_config = storage_memory_config(
156        non_reserved_memory_bytes,
157        embedded_compactor_enabled,
158        &config.storage,
159        !opts.role.for_streaming(),
160    );
161
162    let storage_memory_bytes = total_storage_memory_limit_bytes(&storage_memory_config);
163    let compute_memory_bytes = validate_compute_node_memory_config(
164        opts.total_memory_bytes,
165        reserved_memory_bytes,
166        storage_memory_bytes,
167    );
168    print_memory_config(
169        opts.total_memory_bytes,
170        compute_memory_bytes,
171        storage_memory_bytes,
172        &storage_memory_config,
173        embedded_compactor_enabled,
174        reserved_memory_bytes,
175    );
176
177    let storage_opts = Arc::new(StorageOpts::from((
178        &*config,
179        &system_params,
180        &storage_memory_config,
181    )));
182
183    let worker_id = meta_client.worker_id();
184    info!("Assigned worker node id {}", worker_id);
185
186    // Initialize the metrics subsystem.
187    let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
188    let hummock_metrics = Arc::new(GLOBAL_HUMMOCK_METRICS.clone());
189    let streaming_metrics = Arc::new(global_streaming_metrics(config.server.metrics_level));
190    let batch_executor_metrics = Arc::new(GLOBAL_BATCH_EXECUTOR_METRICS.clone());
191    let batch_manager_metrics = Arc::new(GLOBAL_BATCH_MANAGER_METRICS.clone());
192    let stream_exchange_srv_metrics = Arc::new(GLOBAL_STREAM_EXCHANGE_SERVICE_METRICS.clone());
193    let batch_spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
194    let iceberg_scan_metrics = Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone());
195
196    // Initialize state store.
197    let state_store_metrics = Arc::new(global_hummock_state_store_metrics(
198        config.server.metrics_level,
199    ));
200    let object_store_metrics = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
201    let storage_metrics = Arc::new(global_storage_metrics(config.server.metrics_level));
202    let compactor_metrics = Arc::new(GLOBAL_COMPACTOR_METRICS.clone());
203    let hummock_meta_client = Arc::new(MonitoredHummockMetaClient::new(
204        meta_client.clone(),
205        hummock_metrics.clone(),
206    ));
207
208    let await_tree_config = match &config.streaming.async_stack_trace {
209        AsyncStackTraceOption::Off => None,
210        c => await_tree::ConfigBuilder::default()
211            .verbose(c.is_verbose().unwrap())
212            .build()
213            .ok(),
214    };
215    // Store the state_store_url in a static OnceLock for later use in JNI crate
216    // Check the return value and if the variable is set, assert that the value is the same.
217    if let Err(existing_url) = STATE_STORE_URL.set(state_store_url.to_owned()) {
218        assert_eq!(
219            existing_url, state_store_url,
220            "STATE_STORE_URL already set with different value"
221        );
222    }
223
224    // Store the data_directory in a static OnceLock for later use in JNI crate
225    // To be extra safe, check the return value and if the variable is set, assert that the value is the same
226    if let Err(existing_dir) = DATA_DIRECTORY.set(data_directory.to_owned()) {
227        assert_eq!(
228            existing_dir, data_directory,
229            "DATA_DIRECTORY already set with different value"
230        );
231    }
232    LicenseManager::get().refresh(system_params.license_key());
233    let state_store = Box::pin(StateStoreImpl::new(
234        state_store_url,
235        storage_opts.clone(),
236        hummock_meta_client.clone(),
237        state_store_metrics.clone(),
238        object_store_metrics,
239        storage_metrics.clone(),
240        compactor_metrics.clone(),
241        await_tree_config.clone(),
242        system_params.use_new_object_prefix_strategy(),
243    ))
244    .await
245    .unwrap();
246
247    LocalSecretManager::init(
248        opts.temp_secret_file_dir.clone(),
249        meta_client.cluster_id().to_owned(),
250        worker_id,
251    );
252
253    // Initialize observer manager.
254    let batch_client_pool = Arc::new(ComputeClientPool::new(
255        config.batch_exchange_connection_pool_size(),
256        config.batch.developer.compute_client_config.clone(),
257    ));
258    let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
259    let compute_observer_node =
260        ComputeObserverNode::new(system_params_manager.clone(), batch_client_pool.clone());
261    let observer_manager =
262        ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
263    observer_manager.start().await;
264
265    if let Some(storage) = state_store.as_hummock() {
266        if embedded_compactor_enabled {
267            tracing::info!("start embedded compactor");
268            let memory_limiter = Arc::new(MemoryLimiter::new(
269                storage_opts.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
270            ));
271
272            let compaction_executor = Arc::new(CompactionExecutor::new(Some(1)));
273            let compactor_context = CompactorContext {
274                storage_opts,
275                sstable_store: storage.sstable_store(),
276                compactor_metrics: compactor_metrics.clone(),
277                is_share_buffer_compact: false,
278                compaction_executor,
279                memory_limiter,
280
281                task_progress_manager: Default::default(),
282                await_tree_reg: await_tree_config
283                    .clone()
284                    .map(new_compaction_await_tree_reg_ref),
285            };
286
287            let (handle, shutdown_sender) = start_compactor(
288                compactor_context,
289                hummock_meta_client.clone(),
290                storage.object_id_manager().clone(),
291                storage.compaction_catalog_manager_ref(),
292            );
293            sub_tasks.push((handle, shutdown_sender));
294        }
295        let flush_limiter = storage.get_memory_limiter();
296        let memory_collector = Arc::new(HummockMemoryCollector::new(
297            storage.sstable_store(),
298            flush_limiter,
299            storage_memory_config,
300        ));
301        monitor_cache(memory_collector);
302        let backup_reader = storage.backup_reader();
303        let system_params_mgr = system_params_manager.clone();
304        tokio::spawn(async move {
305            backup_reader
306                .watch_config_change(system_params_mgr.watch_params())
307                .await;
308        });
309    }
310
311    // Initialize the managers.
312    let batch_mgr = Arc::new(BatchManager::new(
313        config.batch.clone(),
314        batch_manager_metrics,
315        batch_mem_limit(compute_memory_bytes, opts.role.for_serving()),
316    ));
317
318    let target_memory = if let Some(v) = opts.memory_manager_target_bytes {
319        v
320    } else {
321        compute_memory_bytes + storage_memory_bytes
322    };
323
324    let memory_mgr = MemoryManager::new(MemoryManagerConfig {
325        target_memory,
326        threshold_aggressive: config
327            .streaming
328            .developer
329            .memory_controller_threshold_aggressive,
330        threshold_graceful: config
331            .streaming
332            .developer
333            .memory_controller_threshold_graceful,
334        threshold_stable: config
335            .streaming
336            .developer
337            .memory_controller_threshold_stable,
338        eviction_factor_stable: config
339            .streaming
340            .developer
341            .memory_controller_eviction_factor_stable,
342        eviction_factor_graceful: config
343            .streaming
344            .developer
345            .memory_controller_eviction_factor_graceful,
346        eviction_factor_aggressive: config
347            .streaming
348            .developer
349            .memory_controller_eviction_factor_aggressive,
350        metrics: streaming_metrics.clone(),
351    });
352
353    // Run a background memory manager
354    tokio::spawn(
355        memory_mgr.clone().run(Duration::from_millis(
356            config
357                .streaming
358                .developer
359                .memory_controller_update_interval_ms as _,
360        )),
361    );
362
363    let heap_profiler = HeapProfiler::new(
364        opts.total_memory_bytes,
365        config.server.heap_profiling.clone(),
366    );
367    // Run a background heap profiler
368    heap_profiler.start();
369
370    let dml_mgr = Arc::new(DmlManager::new(
371        worker_id,
372        config.streaming.developer.dml_channel_initial_permits,
373    ));
374
375    // Initialize batch environment.
376    let batch_env = BatchEnvironment::new(
377        batch_mgr.clone(),
378        advertise_addr.clone(),
379        batch_config,
380        worker_id,
381        state_store.clone(),
382        batch_executor_metrics.clone(),
383        batch_client_pool,
384        dml_mgr.clone(),
385        source_metrics.clone(),
386        batch_spill_metrics.clone(),
387        iceberg_scan_metrics.clone(),
388        config.server.metrics_level,
389    );
390
391    // Initialize the streaming environment.
392    let stream_client_pool = Arc::new(ComputeClientPool::new(
393        config.streaming_exchange_connection_pool_size(),
394        config.streaming.developer.compute_client_config.clone(),
395    ));
396    let stream_env = StreamEnvironment::new(
397        advertise_addr.clone(),
398        stream_config,
399        worker_id,
400        state_store.clone(),
401        dml_mgr,
402        system_params_manager.clone(),
403        source_metrics,
404        meta_client.clone(),
405        stream_client_pool,
406    );
407
408    let stream_mgr = LocalStreamManager::new(
409        stream_env.clone(),
410        streaming_metrics.clone(),
411        await_tree_config.clone(),
412        memory_mgr.get_watermark_sequence(),
413    );
414
415    // Boot the runtime gRPC services.
416    let batch_srv = BatchServiceImpl::new(batch_mgr.clone(), batch_env);
417    let batch_exchange_srv = BatchExchangeServiceImpl::new(batch_mgr.clone());
418    let stream_exchange_srv =
419        StreamExchangeServiceImpl::new(stream_mgr.clone(), stream_exchange_srv_metrics);
420    let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone());
421    let (meta_cache, block_cache) = if let Some(hummock) = state_store.as_hummock() {
422        (
423            Some(hummock.sstable_store().meta_cache().clone()),
424            Some(hummock.sstable_store().block_cache().clone()),
425        )
426    } else {
427        (None, None)
428    };
429    let monitor_srv = MonitorServiceImpl::new(
430        stream_mgr.clone(),
431        config.server.clone(),
432        meta_cache.clone(),
433        block_cache.clone(),
434    );
435    let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
436    let health_srv = HealthServiceImpl::new();
437
438    let telemetry_manager = TelemetryManager::new(
439        Arc::new(meta_client.clone()),
440        Arc::new(ComputeTelemetryCreator::new()),
441    );
442
443    // if the toml config file or env variable disables telemetry, do not watch system params change
444    // because if any of configs disable telemetry, we should never start it
445    if config.server.telemetry_enabled && telemetry_env_enabled() {
446        sub_tasks.push(telemetry_manager.start().await);
447    } else {
448        tracing::info!("Telemetry didn't start due to config");
449    }
450
451    // Clean up the spill directory.
452    #[cfg(not(madsim))]
453    if config.batch.enable_spill {
454        SpillOp::clean_spill_directory().await.unwrap();
455    }
456
457    let server = tonic::transport::Server::builder()
458        .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
459        .initial_stream_window_size(STREAM_WINDOW_SIZE)
460        .http2_max_pending_accept_reset_streams(Some(config.server.grpc_max_reset_stream as usize))
461        .layer(TracingExtractLayer::new())
462        // XXX: unlimit the max message size to allow arbitrary large SQL input.
463        .add_service(TaskServiceServer::new(batch_srv).max_decoding_message_size(usize::MAX))
464        .add_service(
465            BatchExchangeServiceServer::new(batch_exchange_srv)
466                .max_decoding_message_size(usize::MAX),
467        )
468        .add_service(
469            StreamExchangeServiceServer::new(stream_exchange_srv)
470                .max_decoding_message_size(usize::MAX),
471        )
472        .add_service({
473            let await_tree_reg = stream_srv.mgr.await_tree_reg().cloned();
474            let srv = StreamServiceServer::new(stream_srv).max_decoding_message_size(usize::MAX);
475            #[cfg(madsim)]
476            {
477                srv
478            }
479            #[cfg(not(madsim))]
480            {
481                AwaitTreeMiddlewareLayer::new_optional(await_tree_reg).layer(srv)
482            }
483        })
484        .add_service(MonitorServiceServer::new(monitor_srv))
485        .add_service(ConfigServiceServer::new(config_srv))
486        .add_service(HealthServer::new(health_srv))
487        .monitored_serve_with_shutdown(
488            listen_addr,
489            "grpc-compute-node-service",
490            TcpConfig {
491                tcp_nodelay: true,
492                keepalive_duration: None,
493            },
494            shutdown.clone().cancelled_owned(),
495        );
496    let _server_handle = tokio::spawn(server);
497
498    // Boot metrics service.
499    if config.server.metrics_level > MetricLevel::Disabled {
500        MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
501    }
502
503    // All set, let the meta service know we're ready.
504    meta_client.activate(&advertise_addr).await.unwrap();
505    // Wait for the shutdown signal.
506    shutdown.cancelled().await;
507
508    // Unregister from the meta service, then...
509    // - batch queries will not be scheduled to this compute node,
510    // - streaming actors will not be scheduled to this compute node after next recovery.
511    meta_client.try_unregister().await;
512    // Shutdown the streaming manager.
513    let _ = stream_mgr.shutdown().await;
514
515    // NOTE(shutdown): We can't simply join the tonic server here because it only returns when all
516    // existing connections are closed, while we have long-running streaming calls that never
517    // close. From the other side, there's also no need to gracefully shutdown them if we have
518    // unregistered from the meta service.
519}
520
521/// Check whether the compute node has enough memory to perform computing tasks. Apart from storage,
522/// it is recommended to reserve at least `MIN_COMPUTE_MEMORY_MB` for computing and
523/// `SYSTEM_RESERVED_MEMORY_PROPORTION` of total memory for other system usage. If the requirement
524/// is not met, we will print out a warning and enforce the memory used for computing tasks as
525/// `MIN_COMPUTE_MEMORY_MB`.
526fn validate_compute_node_memory_config(
527    cn_total_memory_bytes: usize,
528    reserved_memory_bytes: usize,
529    storage_memory_bytes: usize,
530) -> usize {
531    if storage_memory_bytes > cn_total_memory_bytes {
532        tracing::warn!(
533            "The storage memory exceeds the total compute node memory:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
534            convert(cn_total_memory_bytes as _),
535            convert(storage_memory_bytes as _)
536        );
537        MIN_COMPUTE_MEMORY_MB << 20
538    } else if storage_memory_bytes + (MIN_COMPUTE_MEMORY_MB << 20) + reserved_memory_bytes
539        >= cn_total_memory_bytes
540    {
541        tracing::warn!(
542            "No enough memory for computing and other system usage:\nTotal compute node memory: {}\nStorage memory: {}\nWe recommend that at least 4 GiB memory should be reserved for RisingWave. Please increase the total compute node memory or decrease the storage memory in configurations.",
543            convert(cn_total_memory_bytes as _),
544            convert(storage_memory_bytes as _)
545        );
546        MIN_COMPUTE_MEMORY_MB << 20
547    } else {
548        cn_total_memory_bytes - storage_memory_bytes - reserved_memory_bytes
549    }
550}
551
552/// The maximal memory that storage components may use based on the configurations in bytes. Note
553/// that this is the total storage memory for one compute node instead of the whole cluster.
554fn total_storage_memory_limit_bytes(storage_memory_config: &StorageMemoryConfig) -> usize {
555    let total_storage_memory_mb = storage_memory_config.block_cache_capacity_mb
556        + storage_memory_config.meta_cache_capacity_mb
557        + storage_memory_config.shared_buffer_capacity_mb
558        + storage_memory_config.compactor_memory_limit_mb;
559    total_storage_memory_mb << 20
560}
561
562/// Checks whether an embedded compactor starts with a compute node.
563fn embedded_compactor_enabled(state_store_url: &str, disable_remote_compactor: bool) -> bool {
564    // Always start an embedded compactor if the state store is in-memory.
565    state_store_url.starts_with("hummock+memory")
566        || state_store_url.starts_with("hummock+disk")
567        || disable_remote_compactor
568}
569
570// Print out the memory outline of the compute node.
571fn print_memory_config(
572    cn_total_memory_bytes: usize,
573    compute_memory_bytes: usize,
574    storage_memory_bytes: usize,
575    storage_memory_config: &StorageMemoryConfig,
576    embedded_compactor_enabled: bool,
577    reserved_memory_bytes: usize,
578) {
579    let memory_config = format!(
580        "Memory outline:\n\
581        > total_memory: {}\n\
582        >     storage_memory: {}\n\
583        >         block_cache_capacity: {}\n\
584        >         meta_cache_capacity: {}\n\
585        >         shared_buffer_capacity: {}\n\
586        >         compactor_memory_limit: {}\n\
587        >     compute_memory: {}\n\
588        >     reserved_memory: {}",
589        convert(cn_total_memory_bytes as _),
590        convert(storage_memory_bytes as _),
591        convert((storage_memory_config.block_cache_capacity_mb << 20) as _),
592        convert((storage_memory_config.meta_cache_capacity_mb << 20) as _),
593        convert((storage_memory_config.shared_buffer_capacity_mb << 20) as _),
594        if embedded_compactor_enabled {
595            convert((storage_memory_config.compactor_memory_limit_mb << 20) as _)
596        } else {
597            "Not enabled".to_owned()
598        },
599        convert(compute_memory_bytes as _),
600        convert(reserved_memory_bytes as _),
601    );
602    info!("{}", memory_config);
603}