risingwave_compactor/
lib.rs1mod compactor_observer;
16mod rpc;
17pub mod server;
18mod telemetry;
19
20use clap::{Parser, ValueEnum};
21use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
22use risingwave_common::util::meta_addr::MetaAddressStrategy;
23use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
24use risingwave_common::util::tokio_util::sync::CancellationToken;
25
26use crate::server::{compactor_serve, shared_compactor_serve};
27
28#[derive(Debug, Default, Clone, Copy, ValueEnum)]
29pub enum CompactorMode {
30 #[default]
31 #[clap(alias = "dedicated")]
32 Dedicated,
33
34 #[clap(alias = "shared")]
35 Shared,
36
37 #[clap(alias = "dedicated_iceberg")]
38 DedicatedIceberg,
39
40 #[clap(alias = "shared_iceberg")]
41 SharedIceberg,
42}
43
44#[derive(Parser, Clone, Debug, OverrideConfig)]
46#[command(
47 version,
48 about = "The stateless worker node that compacts data for the storage engine"
49)]
50pub struct CompactorOpts {
51 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:6660")]
55 pub listen_addr: String,
56
57 #[clap(long, env = "RW_ADVERTISE_ADDR")]
62 pub advertise_addr: Option<String>,
63
64 #[clap(long, env = "RW_PORT")]
67 pub port: Option<u16>,
68
69 #[clap(
72 long,
73 env = "RW_PROMETHEUS_LISTENER_ADDR",
74 default_value = "127.0.0.1:1260"
75 )]
76 pub prometheus_listener_addr: String,
77
78 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
79 pub meta_address: MetaAddressStrategy,
80
81 #[clap(long, env = "RW_COMPACTION_WORKER_THREADS_NUMBER")]
82 pub compaction_worker_threads_number: Option<usize>,
83
84 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
88 pub config_path: String,
89
90 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
92 #[override_opts(path = server.metrics_level)]
93 pub metrics_level: Option<MetricLevel>,
94
95 #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
97 #[override_opts(path = streaming.async_stack_trace)]
98 pub async_stack_trace: Option<AsyncStackTraceOption>,
99
100 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
102 #[override_opts(path = server.heap_profiling.dir)]
103 pub heap_profiling_dir: Option<String>,
104
105 #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
106 pub compactor_mode: Option<CompactorMode>,
107
108 #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
109 pub proxy_rpc_endpoint: String,
110
111 #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
113 pub compactor_total_memory_bytes: usize,
114
115 #[clap(long, env = "RW_COMPACTOR_META_CACHE_MEMORY_BYTES", default_value_t = default_compactor_meta_cache_memory_bytes())]
116 pub compactor_meta_cache_memory_bytes: usize,
117}
118
119impl risingwave_common::opts::Opts for CompactorOpts {
120 fn name() -> &'static str {
121 "compactor"
122 }
123
124 fn meta_addr(&self) -> MetaAddressStrategy {
125 self.meta_address.clone()
126 }
127}
128
129use std::future::Future;
130use std::pin::Pin;
131
132pub fn start(
133 opts: CompactorOpts,
134 shutdown: CancellationToken,
135) -> Pin<Box<dyn Future<Output = ()> + Send>> {
136 match opts.compactor_mode {
139 Some(CompactorMode::Shared) => Box::pin(async move {
140 tracing::info!("Shared compactor pod options: {:?}", opts);
141 tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());
142
143 let listen_addr = opts.listen_addr.parse().unwrap();
144
145 shared_compactor_serve(listen_addr, opts, shutdown).await;
146 }),
147 None | Some(CompactorMode::Dedicated) => Box::pin(async move {
148 tracing::info!("Compactor node options: {:?}", opts);
149 tracing::info!("meta address: {}", opts.meta_address.clone());
150
151 let listen_addr = opts.listen_addr.parse().unwrap();
152
153 let advertise_addr = opts
154 .advertise_addr
155 .as_ref()
156 .unwrap_or_else(|| {
157 tracing::warn!("advertise addr is not specified, defaulting to listen address");
158 &opts.listen_addr
159 })
160 .parse()
161 .unwrap();
162 tracing::info!(" address is {}", advertise_addr);
163
164 compactor_serve(
165 listen_addr,
166 advertise_addr,
167 opts,
168 shutdown,
169 CompactorMode::Dedicated,
170 )
171 .await;
172 }),
173
174 Some(CompactorMode::DedicatedIceberg) => Box::pin(async move {
175 tracing::info!("Iceberg Compactor node options: {:?}", opts);
176 tracing::info!("meta address: {}", opts.meta_address.clone());
177
178 let listen_addr = opts.listen_addr.parse().unwrap();
179
180 let advertise_addr = opts
181 .advertise_addr
182 .as_ref()
183 .unwrap_or_else(|| {
184 tracing::warn!("advertise addr is not specified, defaulting to listen address");
185 &opts.listen_addr
186 })
187 .parse()
188 .unwrap();
189 tracing::info!(" address is {}", advertise_addr);
190
191 compactor_serve(
192 listen_addr,
193 advertise_addr,
194 opts,
195 shutdown,
196 CompactorMode::DedicatedIceberg,
197 )
198 .await;
199 }),
200 Some(CompactorMode::SharedIceberg) => {
201 unimplemented!("Shared iceberg compactor is not supported yet");
202 }
203 }
204}
205
206pub fn default_compactor_total_memory_bytes() -> usize {
207 system_memory_available_bytes()
208}
209
210pub fn default_compactor_meta_cache_memory_bytes() -> usize {
211 128 * 1024 * 1024 }