1mod compactor_observer;
16mod rpc;
17pub mod server;
18mod telemetry;
19
20use clap::{Parser, ValueEnum};
21use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
22use risingwave_common::util::meta_addr::MetaAddressStrategy;
23use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
24use risingwave_common::util::tokio_util::sync::CancellationToken;
25
26use crate::server::{compactor_serve, shared_compactor_serve};
27
28#[derive(Debug, Default, Clone, Copy, ValueEnum)]
29pub enum CompactorMode {
30 #[default]
31 #[clap(alias = "dedicated")]
32 Dedicated,
33
34 #[clap(alias = "shared")]
35 Shared,
36
37 #[clap(alias = "dedicated_iceberg")]
38 DedicatedIceberg,
39
40 #[clap(alias = "shared_iceberg")]
41 SharedIceberg,
42}
43
44#[derive(Parser, Clone, Debug, OverrideConfig)]
46#[command(
47 version,
48 about = "The stateless worker node that compacts data for the storage engine"
49)]
50pub struct CompactorOpts {
51 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:6660")]
55 pub listen_addr: String,
56
57 #[clap(long, env = "RW_ADVERTISE_ADDR")]
62 pub advertise_addr: Option<String>,
63
64 #[clap(long, env = "RW_PORT")]
67 pub port: Option<u16>,
68
69 #[clap(
72 long,
73 env = "RW_PROMETHEUS_LISTENER_ADDR",
74 default_value = "127.0.0.1:1260"
75 )]
76 pub prometheus_listener_addr: String,
77
78 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
79 pub meta_address: MetaAddressStrategy,
80
81 #[clap(long, env = "RW_COMPACTION_WORKER_THREADS_NUMBER")]
82 pub compaction_worker_threads_number: Option<usize>,
83
84 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
88 pub config_path: String,
89
90 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
92 #[override_opts(path = server.metrics_level)]
93 pub metrics_level: Option<MetricLevel>,
94
95 #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
97 #[override_opts(path = streaming.async_stack_trace)]
98 pub async_stack_trace: Option<AsyncStackTraceOption>,
99
100 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
102 #[override_opts(path = server.heap_profiling.dir)]
103 pub heap_profiling_dir: Option<String>,
104
105 #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
106 pub compactor_mode: Option<CompactorMode>,
107
108 #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
109 pub proxy_rpc_endpoint: String,
110
111 #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
113 pub compactor_total_memory_bytes: usize,
114
115 #[clap(long, env = "RW_COMPACTOR_META_CACHE_MEMORY_BYTES", default_value_t = default_compactor_meta_cache_memory_bytes())]
116 pub compactor_meta_cache_memory_bytes: usize,
117
118 #[clap(long, env = "RW_COMPACTOR_RPC_MAX_DECODING_MESSAGE_SIZE_BYTES")]
119 pub rpc_max_decoding_message_size_bytes: Option<usize>,
120
121 #[clap(long, env = "RW_COMPACTOR_RPC_MAX_ENCODING_MESSAGE_SIZE_BYTES")]
122 pub rpc_max_encoding_message_size_bytes: Option<usize>,
123}
124
125impl risingwave_common::opts::Opts for CompactorOpts {
126 fn name() -> &'static str {
127 "compactor"
128 }
129
130 fn meta_addr(&self) -> MetaAddressStrategy {
131 self.meta_address.clone()
132 }
133}
134
135use std::future::Future;
136use std::pin::Pin;
137
138pub fn start(
139 opts: CompactorOpts,
140 shutdown: CancellationToken,
141) -> Pin<Box<dyn Future<Output = ()> + Send>> {
142 match opts.compactor_mode {
145 Some(CompactorMode::Shared) => Box::pin(async move {
146 tracing::info!("Shared compactor pod options: {:?}", opts);
147 tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());
148
149 let listen_addr = opts.listen_addr.parse().unwrap();
150
151 shared_compactor_serve(listen_addr, opts, shutdown).await;
152 }),
153 None | Some(CompactorMode::Dedicated) => Box::pin(async move {
154 tracing::info!("Compactor node options: {:?}", opts);
155 tracing::info!("meta address: {}", opts.meta_address.clone());
156
157 let listen_addr = opts.listen_addr.parse().unwrap();
158
159 let advertise_addr = opts
160 .advertise_addr
161 .as_ref()
162 .unwrap_or_else(|| {
163 tracing::warn!("advertise addr is not specified, defaulting to listen address");
164 &opts.listen_addr
165 })
166 .parse()
167 .unwrap();
168 tracing::info!(" address is {}", advertise_addr);
169
170 compactor_serve(
171 listen_addr,
172 advertise_addr,
173 opts,
174 shutdown,
175 CompactorMode::Dedicated,
176 )
177 .await;
178 }),
179
180 Some(CompactorMode::DedicatedIceberg) => Box::pin(async move {
181 tracing::info!("Iceberg Compactor node options: {:?}", opts);
182 tracing::info!("meta address: {}", opts.meta_address.clone());
183
184 let listen_addr = opts.listen_addr.parse().unwrap();
185
186 let advertise_addr = opts
187 .advertise_addr
188 .as_ref()
189 .unwrap_or_else(|| {
190 tracing::warn!("advertise addr is not specified, defaulting to listen address");
191 &opts.listen_addr
192 })
193 .parse()
194 .unwrap();
195 tracing::info!(" address is {}", advertise_addr);
196
197 compactor_serve(
198 listen_addr,
199 advertise_addr,
200 opts,
201 shutdown,
202 CompactorMode::DedicatedIceberg,
203 )
204 .await;
205 }),
206 Some(CompactorMode::SharedIceberg) => {
207 unimplemented!("Shared iceberg compactor is not supported yet");
208 }
209 }
210}
211
212pub fn default_compactor_total_memory_bytes() -> usize {
213 system_memory_available_bytes()
214}
215
216pub fn default_compactor_meta_cache_memory_bytes() -> usize {
217 128 * 1024 * 1024 }
219
220pub fn default_rpc_max_decoding_message_size_bytes() -> usize {
221 4 * 1024 * 1024 }
223
224pub fn default_rpc_max_encoding_message_size_bytes() -> usize {
225 4 * 1024 * 1024 }