risingwave_compactor/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compactor_observer;
16mod rpc;
17pub mod server;
18mod telemetry;
19
20use clap::Parser;
21use risingwave_common::config::{
22    AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig,
23};
24use risingwave_common::util::meta_addr::MetaAddressStrategy;
25use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27
28use crate::server::{compactor_serve, shared_compactor_serve};
29
30/// Command-line arguments for compactor-node.
31#[derive(Parser, Clone, Debug, OverrideConfig)]
32#[command(
33    version,
34    about = "The stateless worker node that compacts data for the storage engine"
35)]
36pub struct CompactorOpts {
37    // TODO: rename to listen_addr and separate out the port.
38    /// The address that this service listens to.
39    /// Usually the localhost + desired port.
40    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:6660")]
41    pub listen_addr: String,
42
43    /// The address for contacting this instance of the service.
44    /// This would be synonymous with the service's "public address"
45    /// or "identifying address".
46    /// Optional, we will use `listen_addr` if not specified.
47    #[clap(long, env = "RW_ADVERTISE_ADDR")]
48    pub advertise_addr: Option<String>,
49
50    // TODO(eric): remove me
51    // TODO: This is currently unused.
52    #[clap(long, env = "RW_PORT")]
53    pub port: Option<u16>,
54
55    /// We will start a http server at this address via `MetricsManager`.
56    /// Then the prometheus instance will poll the metrics from this address.
57    #[clap(
58        long,
59        env = "RW_PROMETHEUS_LISTENER_ADDR",
60        default_value = "127.0.0.1:1260"
61    )]
62    pub prometheus_listener_addr: String,
63
64    #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
65    pub meta_address: MetaAddressStrategy,
66
67    #[clap(long, env = "RW_COMPACTION_WORKER_THREADS_NUMBER")]
68    pub compaction_worker_threads_number: Option<usize>,
69
70    /// The path of `risingwave.toml` configuration file.
71    ///
72    /// If empty, default configuration values will be used.
73    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
74    pub config_path: String,
75
76    /// Used for control the metrics level, similar to log level.
77    #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
78    #[override_opts(path = server.metrics_level)]
79    pub metrics_level: Option<MetricLevel>,
80
81    /// Enable async stack tracing through `await-tree` for risectl.
82    #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
83    #[override_opts(path = streaming.async_stack_trace)]
84    pub async_stack_trace: Option<AsyncStackTraceOption>,
85
86    /// Enable heap profile dump when memory usage is high.
87    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
88    #[override_opts(path = server.heap_profiling.dir)]
89    pub heap_profiling_dir: Option<String>,
90
91    #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
92    pub compactor_mode: Option<CompactorMode>,
93
94    #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
95    pub proxy_rpc_endpoint: String,
96
97    /// Total available memory for the frontend node in bytes. Used by compactor.
98    #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
99    pub compactor_total_memory_bytes: usize,
100
101    #[clap(long, env = "RW_COMPACTOR_META_CACHE_MEMORY_BYTES", default_value_t = default_compactor_meta_cache_memory_bytes())]
102    pub compactor_meta_cache_memory_bytes: usize,
103}
104
105impl risingwave_common::opts::Opts for CompactorOpts {
106    fn name() -> &'static str {
107        "compactor"
108    }
109
110    fn meta_addr(&self) -> MetaAddressStrategy {
111        self.meta_address.clone()
112    }
113}
114
115use std::future::Future;
116use std::pin::Pin;
117
118pub fn start(
119    opts: CompactorOpts,
120    shutdown: CancellationToken,
121) -> Pin<Box<dyn Future<Output = ()> + Send>> {
122    // WARNING: don't change the function signature. Making it `async fn` will cause
123    // slow compile in release mode.
124    match opts.compactor_mode {
125        Some(CompactorMode::Shared) => Box::pin(async move {
126            tracing::info!("Shared compactor pod options: {:?}", opts);
127            tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());
128
129            let listen_addr = opts.listen_addr.parse().unwrap();
130
131            shared_compactor_serve(listen_addr, opts, shutdown).await;
132        }),
133        None | Some(CompactorMode::Dedicated) => Box::pin(async move {
134            tracing::info!("Compactor node options: {:?}", opts);
135            tracing::info!("meta address: {}", opts.meta_address.clone());
136
137            let listen_addr = opts.listen_addr.parse().unwrap();
138
139            let advertise_addr = opts
140                .advertise_addr
141                .as_ref()
142                .unwrap_or_else(|| {
143                    tracing::warn!("advertise addr is not specified, defaulting to listen address");
144                    &opts.listen_addr
145                })
146                .parse()
147                .unwrap();
148            tracing::info!(" address is {}", advertise_addr);
149
150            compactor_serve(listen_addr, advertise_addr, opts, shutdown).await;
151        }),
152    }
153}
154
155pub fn default_compactor_total_memory_bytes() -> usize {
156    system_memory_available_bytes()
157}
158
159pub fn default_compactor_meta_cache_memory_bytes() -> usize {
160    128 * 1024 * 1024 // 128MB
161}