risingwave_compactor/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compactor_observer;
16mod rpc;
17pub mod server;
18mod telemetry;
19
20use clap::{Parser, ValueEnum};
21use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
22use risingwave_common::util::meta_addr::MetaAddressStrategy;
23use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
24use risingwave_common::util::tokio_util::sync::CancellationToken;
25
26use crate::server::{compactor_serve, shared_compactor_serve};
27
28#[derive(Debug, Default, Clone, Copy, ValueEnum)]
29pub enum CompactorMode {
30    #[default]
31    #[clap(alias = "dedicated")]
32    Dedicated,
33
34    #[clap(alias = "shared")]
35    Shared,
36
37    #[clap(alias = "dedicated_iceberg")]
38    DedicatedIceberg,
39
40    #[clap(alias = "shared_iceberg")]
41    SharedIceberg,
42}
43
44/// Command-line arguments for compactor-node.
45#[derive(Parser, Clone, Debug, OverrideConfig)]
46#[command(
47    version,
48    about = "The stateless worker node that compacts data for the storage engine"
49)]
50pub struct CompactorOpts {
51    // TODO: rename to listen_addr and separate out the port.
52    /// The address that this service listens to.
53    /// Usually the localhost + desired port.
54    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:6660")]
55    pub listen_addr: String,
56
57    /// The address for contacting this instance of the service.
58    /// This would be synonymous with the service's "public address"
59    /// or "identifying address".
60    /// Optional, we will use `listen_addr` if not specified.
61    #[clap(long, env = "RW_ADVERTISE_ADDR")]
62    pub advertise_addr: Option<String>,
63
64    // TODO(eric): remove me
65    // TODO: This is currently unused.
66    #[clap(long, env = "RW_PORT")]
67    pub port: Option<u16>,
68
69    /// We will start a http server at this address via `MetricsManager`.
70    /// Then the prometheus instance will poll the metrics from this address.
71    #[clap(
72        long,
73        env = "RW_PROMETHEUS_LISTENER_ADDR",
74        default_value = "127.0.0.1:1260"
75    )]
76    pub prometheus_listener_addr: String,
77
78    #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
79    pub meta_address: MetaAddressStrategy,
80
81    #[clap(long, env = "RW_COMPACTION_WORKER_THREADS_NUMBER")]
82    pub compaction_worker_threads_number: Option<usize>,
83
84    /// The path of `risingwave.toml` configuration file.
85    ///
86    /// If empty, default configuration values will be used.
87    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
88    pub config_path: String,
89
90    /// Used for control the metrics level, similar to log level.
91    #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
92    #[override_opts(path = server.metrics_level)]
93    pub metrics_level: Option<MetricLevel>,
94
95    /// Enable async stack tracing through `await-tree` for risectl.
96    #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
97    #[override_opts(path = streaming.async_stack_trace)]
98    pub async_stack_trace: Option<AsyncStackTraceOption>,
99
100    /// Enable heap profile dump when memory usage is high.
101    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
102    #[override_opts(path = server.heap_profiling.dir)]
103    pub heap_profiling_dir: Option<String>,
104
105    #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
106    pub compactor_mode: Option<CompactorMode>,
107
108    #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
109    pub proxy_rpc_endpoint: String,
110
111    /// Total available memory for the frontend node in bytes. Used by compactor.
112    #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
113    pub compactor_total_memory_bytes: usize,
114
115    #[clap(long, env = "RW_COMPACTOR_META_CACHE_MEMORY_BYTES", default_value_t = default_compactor_meta_cache_memory_bytes())]
116    pub compactor_meta_cache_memory_bytes: usize,
117}
118
119impl risingwave_common::opts::Opts for CompactorOpts {
120    fn name() -> &'static str {
121        "compactor"
122    }
123
124    fn meta_addr(&self) -> MetaAddressStrategy {
125        self.meta_address.clone()
126    }
127}
128
129use std::future::Future;
130use std::pin::Pin;
131
132pub fn start(
133    opts: CompactorOpts,
134    shutdown: CancellationToken,
135) -> Pin<Box<dyn Future<Output = ()> + Send>> {
136    // WARNING: don't change the function signature. Making it `async fn` will cause
137    // slow compile in release mode.
138    match opts.compactor_mode {
139        Some(CompactorMode::Shared) => Box::pin(async move {
140            tracing::info!("Shared compactor pod options: {:?}", opts);
141            tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());
142
143            let listen_addr = opts.listen_addr.parse().unwrap();
144
145            shared_compactor_serve(listen_addr, opts, shutdown).await;
146        }),
147        None | Some(CompactorMode::Dedicated) => Box::pin(async move {
148            tracing::info!("Compactor node options: {:?}", opts);
149            tracing::info!("meta address: {}", opts.meta_address.clone());
150
151            let listen_addr = opts.listen_addr.parse().unwrap();
152
153            let advertise_addr = opts
154                .advertise_addr
155                .as_ref()
156                .unwrap_or_else(|| {
157                    tracing::warn!("advertise addr is not specified, defaulting to listen address");
158                    &opts.listen_addr
159                })
160                .parse()
161                .unwrap();
162            tracing::info!(" address is {}", advertise_addr);
163
164            compactor_serve(
165                listen_addr,
166                advertise_addr,
167                opts,
168                shutdown,
169                CompactorMode::Dedicated,
170            )
171            .await;
172        }),
173
174        Some(CompactorMode::DedicatedIceberg) => Box::pin(async move {
175            tracing::info!("Iceberg Compactor node options: {:?}", opts);
176            tracing::info!("meta address: {}", opts.meta_address.clone());
177
178            let listen_addr = opts.listen_addr.parse().unwrap();
179
180            let advertise_addr = opts
181                .advertise_addr
182                .as_ref()
183                .unwrap_or_else(|| {
184                    tracing::warn!("advertise addr is not specified, defaulting to listen address");
185                    &opts.listen_addr
186                })
187                .parse()
188                .unwrap();
189            tracing::info!(" address is {}", advertise_addr);
190
191            compactor_serve(
192                listen_addr,
193                advertise_addr,
194                opts,
195                shutdown,
196                CompactorMode::DedicatedIceberg,
197            )
198            .await;
199        }),
200        Some(CompactorMode::SharedIceberg) => {
201            unimplemented!("Shared iceberg compactor is not supported yet");
202        }
203    }
204}
205
206pub fn default_compactor_total_memory_bytes() -> usize {
207    system_memory_available_bytes()
208}
209
210pub fn default_compactor_meta_cache_memory_bytes() -> usize {
211    128 * 1024 * 1024 // 128MB
212}