risingwave_compactor/
lib.rs1mod compactor_observer;
16mod rpc;
17pub mod server;
18mod telemetry;
19
20use clap::Parser;
21use risingwave_common::config::{
22 AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig,
23};
24use risingwave_common::util::meta_addr::MetaAddressStrategy;
25use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
26use risingwave_common::util::tokio_util::sync::CancellationToken;
27
28use crate::server::{compactor_serve, shared_compactor_serve};
29
30#[derive(Parser, Clone, Debug, OverrideConfig)]
32#[command(
33 version,
34 about = "The stateless worker node that compacts data for the storage engine"
35)]
36pub struct CompactorOpts {
37 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "127.0.0.1:6660")]
41 pub listen_addr: String,
42
43 #[clap(long, env = "RW_ADVERTISE_ADDR")]
48 pub advertise_addr: Option<String>,
49
50 #[clap(long, env = "RW_PORT")]
53 pub port: Option<u16>,
54
55 #[clap(
58 long,
59 env = "RW_PROMETHEUS_LISTENER_ADDR",
60 default_value = "127.0.0.1:1260"
61 )]
62 pub prometheus_listener_addr: String,
63
64 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
65 pub meta_address: MetaAddressStrategy,
66
67 #[clap(long, env = "RW_COMPACTION_WORKER_THREADS_NUMBER")]
68 pub compaction_worker_threads_number: Option<usize>,
69
70 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
74 pub config_path: String,
75
76 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
78 #[override_opts(path = server.metrics_level)]
79 pub metrics_level: Option<MetricLevel>,
80
81 #[clap(long, hide = true, env = "RW_ASYNC_STACK_TRACE", value_enum)]
83 #[override_opts(path = streaming.async_stack_trace)]
84 pub async_stack_trace: Option<AsyncStackTraceOption>,
85
86 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
88 #[override_opts(path = server.heap_profiling.dir)]
89 pub heap_profiling_dir: Option<String>,
90
91 #[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
92 pub compactor_mode: Option<CompactorMode>,
93
94 #[clap(long, hide = true, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
95 pub proxy_rpc_endpoint: String,
96
97 #[clap(long, env = "RW_COMPACTOR_TOTAL_MEMORY_BYTES", default_value_t = default_compactor_total_memory_bytes())]
99 pub compactor_total_memory_bytes: usize,
100
101 #[clap(long, env = "RW_COMPACTOR_META_CACHE_MEMORY_BYTES", default_value_t = default_compactor_meta_cache_memory_bytes())]
102 pub compactor_meta_cache_memory_bytes: usize,
103}
104
105impl risingwave_common::opts::Opts for CompactorOpts {
106 fn name() -> &'static str {
107 "compactor"
108 }
109
110 fn meta_addr(&self) -> MetaAddressStrategy {
111 self.meta_address.clone()
112 }
113}
114
115use std::future::Future;
116use std::pin::Pin;
117
118pub fn start(
119 opts: CompactorOpts,
120 shutdown: CancellationToken,
121) -> Pin<Box<dyn Future<Output = ()> + Send>> {
122 match opts.compactor_mode {
125 Some(CompactorMode::Shared) => Box::pin(async move {
126 tracing::info!("Shared compactor pod options: {:?}", opts);
127 tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());
128
129 let listen_addr = opts.listen_addr.parse().unwrap();
130
131 shared_compactor_serve(listen_addr, opts, shutdown).await;
132 }),
133 None | Some(CompactorMode::Dedicated) => Box::pin(async move {
134 tracing::info!("Compactor node options: {:?}", opts);
135 tracing::info!("meta address: {}", opts.meta_address.clone());
136
137 let listen_addr = opts.listen_addr.parse().unwrap();
138
139 let advertise_addr = opts
140 .advertise_addr
141 .as_ref()
142 .unwrap_or_else(|| {
143 tracing::warn!("advertise addr is not specified, defaulting to listen address");
144 &opts.listen_addr
145 })
146 .parse()
147 .unwrap();
148 tracing::info!(" address is {}", advertise_addr);
149
150 compactor_serve(listen_addr, advertise_addr, opts, shutdown).await;
151 }),
152 }
153}
154
155pub fn default_compactor_total_memory_bytes() -> usize {
156 system_memory_available_bytes()
157}
158
159pub fn default_compactor_meta_cache_memory_bytes() -> usize {
160 128 * 1024 * 1024 }