risingwave_compactor/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compactor_observer;
16mod rpc;
17pub mod server;
18mod telemetry;
19
20use clap::{Parser, ValueEnum};
21use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
22use risingwave_common::util::meta_addr::MetaAddressStrategy;
23use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
24use risingwave_common::util::tokio_util::sync::CancellationToken;
25
26use crate::server::{compactor_serve, shared_compactor_serve};
27
28#[derive(Debug, Default, Clone, Copy, ValueEnum)]
29pub enum CompactorMode {
30    #[default]
31    #[clap(alias = "dedicated")]
32    Dedicated,
33
34    #[clap(alias = "shared")]
35    Shared,
36
37    #[clap(alias = "dedicated_iceberg")]
38    DedicatedIceberg,
39
40    #[clap(alias = "shared_iceberg")]
41    SharedIceberg,
42}
43
44/// Command-line arguments for compactor-node.
45#[derive(Parser, Clone, Debug, OverrideConfig)]
46#[command(
47    version,
48    about = "The stateless worker node that compacts data for the storage engine"
49)]
50pub struct CompactorOpts {
51    // TODO: rename to listen_addr and separate out the port.
52    /// The address that this service listens to.
53    /// Usually the localhost + desired port.
54    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:6660")]
55    pub listen_addr: String,
56
57    /// The address for contacting this instance of the service.
58    /// This would be synonymous with the service's "public address"
59    /// or "identifying address".
60    /// Optional, we will use `listen_addr` if not specified.
61    #[clap(long, env = "RW_ADVERTISE_ADDR")]
62    pub advertise_addr: Option<String>,
63
64    // TODO(eric): remove me
65    // TODO: This is currently unused.
66    #[clap(long, env = "RW_PORT")]
67    pub port: Option<u16>,
68
69    /// We will start a http server at this address via `MetricsManager`.
70    /// Then the prometheus instance will poll the metrics from this address.
71    #[clap(
72        long,
73        env = "RW_PROMETHEUS_LISTENER_ADDR",
74        default_value = "127.0.0.1:1260"
75    )]
76    pub prometheus_listener_addr: String,
77
78    #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
79    pub meta_address: MetaAddressStrategy,
80
81    #[clap(long, env = "RW_COMPACTION_WORKER_THREADS_NUMBER")]
82    pub compaction_worker_threads_number: Option<usize>,
83
84    /// The path of `risingwave.toml` configuration file.
85    ///
86    /// If empty, default configuration values will be used.
87    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
88    pub config_path: String,
89
90    /// Used for control the metrics level, similar to log level.
91    #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
92    #[override_opts(path = server.metrics_level)]
93    pub metrics_level: Option<MetricLevel>,
94
95    /// Enable async stack tracing through `await-tree` for risectl.
96    #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
97    #[override_opts(path = streaming.async_stack_trace)]
98    pub async_stack_trace: Option<AsyncStackTraceOption>,
99
100    /// Enable heap profile dump when memory usage is high.
101    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
102    #[override_opts(path = server.heap_profiling.dir)]
103    pub heap_profiling_dir: Option<String>,
104
105    #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
106    pub compactor_mode: Option<CompactorMode>,
107
108    #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
109    pub proxy_rpc_endpoint: String,
110
111    /// Total available memory for the frontend node in bytes. Used by compactor.
112    #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
113    pub compactor_total_memory_bytes: usize,
114
115    #[clap(long, env = "RW_COMPACTOR_META_CACHE_MEMORY_BYTES", default_value_t = default_compactor_meta_cache_memory_bytes())]
116    pub compactor_meta_cache_memory_bytes: usize,
117
118    #[clap(long, env = "RW_COMPACTOR_RPC_MAX_DECODING_MESSAGE_SIZE_BYTES")]
119    pub rpc_max_decoding_message_size_bytes: Option<usize>,
120
121    #[clap(long, env = "RW_COMPACTOR_RPC_MAX_ENCODING_MESSAGE_SIZE_BYTES")]
122    pub rpc_max_encoding_message_size_bytes: Option<usize>,
123}
124
125impl risingwave_common::opts::Opts for CompactorOpts {
126    fn name() -> &'static str {
127        "compactor"
128    }
129
130    fn meta_addr(&self) -> MetaAddressStrategy {
131        self.meta_address.clone()
132    }
133}
134
135use std::future::Future;
136use std::pin::Pin;
137
138pub fn start(
139    opts: CompactorOpts,
140    shutdown: CancellationToken,
141) -> Pin<Box<dyn Future<Output = ()> + Send>> {
142    // WARNING: don't change the function signature. Making it `async fn` will cause
143    // slow compile in release mode.
144    match opts.compactor_mode {
145        Some(CompactorMode::Shared) => Box::pin(async move {
146            tracing::info!("Shared compactor pod options: {:?}", opts);
147            tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());
148
149            let listen_addr = opts.listen_addr.parse().unwrap();
150
151            shared_compactor_serve(listen_addr, opts, shutdown).await;
152        }),
153        None | Some(CompactorMode::Dedicated) => Box::pin(async move {
154            tracing::info!("Compactor node options: {:?}", opts);
155            tracing::info!("meta address: {}", opts.meta_address.clone());
156
157            let listen_addr = opts.listen_addr.parse().unwrap();
158
159            let advertise_addr = opts
160                .advertise_addr
161                .as_ref()
162                .unwrap_or_else(|| {
163                    tracing::warn!("advertise addr is not specified, defaulting to listen address");
164                    &opts.listen_addr
165                })
166                .parse()
167                .unwrap();
168            tracing::info!(" address is {}", advertise_addr);
169
170            compactor_serve(
171                listen_addr,
172                advertise_addr,
173                opts,
174                shutdown,
175                CompactorMode::Dedicated,
176            )
177            .await;
178        }),
179
180        Some(CompactorMode::DedicatedIceberg) => Box::pin(async move {
181            tracing::info!("Iceberg Compactor node options: {:?}", opts);
182            tracing::info!("meta address: {}", opts.meta_address.clone());
183
184            let listen_addr = opts.listen_addr.parse().unwrap();
185
186            let advertise_addr = opts
187                .advertise_addr
188                .as_ref()
189                .unwrap_or_else(|| {
190                    tracing::warn!("advertise addr is not specified, defaulting to listen address");
191                    &opts.listen_addr
192                })
193                .parse()
194                .unwrap();
195            tracing::info!(" address is {}", advertise_addr);
196
197            compactor_serve(
198                listen_addr,
199                advertise_addr,
200                opts,
201                shutdown,
202                CompactorMode::DedicatedIceberg,
203            )
204            .await;
205        }),
206        Some(CompactorMode::SharedIceberg) => {
207            unimplemented!("Shared iceberg compactor is not supported yet");
208        }
209    }
210}
211
212pub fn default_compactor_total_memory_bytes() -> usize {
213    system_memory_available_bytes()
214}
215
216pub fn default_compactor_meta_cache_memory_bytes() -> usize {
217    128 * 1024 * 1024 // 128MB
218}
219
220pub fn default_rpc_max_decoding_message_size_bytes() -> usize {
221    4 * 1024 * 1024 // 4MB
222}
223
224pub fn default_rpc_max_encoding_message_size_bytes() -> usize {
225    4 * 1024 * 1024 // 4MB
226}