risingwave_frontend/
lib.rs1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(map_try_insert)]
17#![feature(negative_impls)]
18#![feature(coroutines)]
19#![feature(proc_macro_hygiene, stmt_expr_attributes)]
20#![feature(trait_alias)]
21#![feature(if_let_guard)]
22#![feature(let_chains)]
23#![feature(assert_matches)]
24#![feature(box_patterns)]
25#![feature(macro_metavar_expr)]
26#![feature(min_specialization)]
27#![feature(extend_one)]
28#![feature(type_alias_impl_trait)]
29#![feature(impl_trait_in_assoc_type)]
30#![feature(error_generic_member_access)]
31#![feature(iterator_try_collect)]
32#![feature(used_with_arg)]
33#![feature(try_trait_v2)]
34#![feature(never_type)]
35#![recursion_limit = "256"]
36
37#[cfg(test)]
38risingwave_expr_impl::enable!();
39#[cfg(test)]
40risingwave_batch_executors::enable!();
41
42#[macro_use]
43mod catalog;
44
45use std::collections::HashSet;
46use std::time::Duration;
47
48pub use catalog::TableCatalog;
49mod binder;
50pub use binder::{Binder, bind_data_type};
51pub mod expr;
52pub mod handler;
53pub use handler::PgResponseStream;
54mod observer;
55pub mod optimizer;
56pub use optimizer::{Explain, OptimizerContext, OptimizerContextRef, PlanRef};
57mod planner;
58use pgwire::net::TcpKeepalive;
59pub use planner::Planner;
60mod scheduler;
61pub mod session;
62mod stream_fragmenter;
63use risingwave_common::config::{MetricLevel, OverrideConfig};
64use risingwave_common::util::meta_addr::MetaAddressStrategy;
65use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
66use risingwave_common::util::tokio_util::sync::CancellationToken;
67pub use stream_fragmenter::build_graph;
68mod utils;
69pub use utils::{WithOptions, WithOptionsSecResolved, explain_stream_graph};
70pub(crate) mod error;
71mod meta_client;
72pub mod metrics_reader;
73pub use metrics_reader::MetricsReaderImpl;
74pub mod test_utils;
75mod user;
76pub mod webhook;
77
78pub mod health_service;
79mod monitor;
80
81pub mod rpc;
82mod telemetry;
83
84use std::ffi::OsString;
85use std::iter;
86use std::sync::Arc;
87
88use clap::Parser;
89use pgwire::pg_server::pg_serve;
90use session::SessionManagerImpl;
91
92#[derive(Parser, Clone, Debug, OverrideConfig)]
94#[command(
95 version,
96 about = "The stateless proxy that parses SQL queries and performs planning and optimizations of query jobs"
97)]
98pub struct FrontendOpts {
99 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "0.0.0.0:4566")]
103 pub listen_addr: String,
104
105 #[clap(long, env = "RW_TCP_KEEPALIVE_IDLE_SECS", default_value = "300")]
108 pub tcp_keepalive_idle_secs: usize,
109
110 #[clap(long, env = "RW_ADVERTISE_ADDR")]
115 pub advertise_addr: Option<String>,
116
117 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
119 pub meta_addr: MetaAddressStrategy,
120
121 #[clap(
124 long,
125 env = "RW_PROMETHEUS_LISTENER_ADDR",
126 default_value = "127.0.0.1:2222"
127 )]
128 pub prometheus_listener_addr: String,
129
130 #[clap(
131 long,
132 alias = "health-check-listener-addr",
133 env = "RW_HEALTH_CHECK_LISTENER_ADDR",
134 default_value = "0.0.0.0:6786"
135 )]
136 pub frontend_rpc_listener_addr: String,
137
138 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
145 pub config_path: String,
146
147 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
152 #[override_opts(path = server.metrics_level)]
153 pub metrics_level: Option<MetricLevel>,
154
155 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
157 #[override_opts(path = server.heap_profiling.dir)]
158 pub heap_profiling_dir: Option<String>,
159
160 #[clap(long, hide = true, env = "ENABLE_BARRIER_READ")]
161 #[override_opts(path = batch.enable_barrier_read)]
162 pub enable_barrier_read: Option<bool>,
163
164 #[clap(
166 long,
167 hide = true,
168 env = "RW_TEMP_SECRET_FILE_DIR",
169 default_value = "./secrets"
170 )]
171 pub temp_secret_file_dir: String,
172
173 #[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
175 pub frontend_total_memory_bytes: usize,
176
177 #[clap(long, env = "RW_WEBHOOK_LISTEN_ADDR", default_value = "0.0.0.0:4560")]
180 pub webhook_listen_addr: String,
181
182 #[clap(long, env = "RW_SBC_ADDR", default_value = "")]
187 pub serverless_backfill_controller_addr: String,
188
189 #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
192 pub prometheus_endpoint: Option<String>,
193
194 #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
198 pub prometheus_selector: Option<String>,
199}
200
201impl risingwave_common::opts::Opts for FrontendOpts {
202 fn name() -> &'static str {
203 "frontend"
204 }
205
206 fn meta_addr(&self) -> MetaAddressStrategy {
207 self.meta_addr.clone()
208 }
209}
210
211impl Default for FrontendOpts {
212 fn default() -> Self {
213 FrontendOpts::parse_from(iter::empty::<OsString>())
214 }
215}
216
217use std::future::Future;
218use std::pin::Pin;
219
220use pgwire::memory_manager::MessageMemoryManager;
221use pgwire::pg_protocol::{ConnectionContext, TlsConfig};
222
223use crate::session::SESSION_MANAGER;
224
225pub fn start(
227 opts: FrontendOpts,
228 shutdown: CancellationToken,
229) -> Pin<Box<dyn Future<Output = ()> + Send>> {
230 Box::pin(async move {
233 let listen_addr = opts.listen_addr.clone();
234 let webhook_listen_addr = opts.webhook_listen_addr.parse().unwrap();
235 let tcp_keepalive =
236 TcpKeepalive::new().with_time(Duration::from_secs(opts.tcp_keepalive_idle_secs as _));
237
238 let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap());
239 SESSION_MANAGER.get_or_init(|| session_mgr.clone());
240 let redact_sql_option_keywords = Arc::new(
241 session_mgr
242 .env()
243 .batch_config()
244 .redact_sql_option_keywords
245 .iter()
246 .map(|s| s.to_lowercase())
247 .collect::<HashSet<_>>(),
248 );
249 let frontend_config = &session_mgr.env().frontend_config();
250 let message_memory_manager = Arc::new(MessageMemoryManager::new(
251 frontend_config.max_total_query_size_bytes,
252 frontend_config.min_single_query_size_bytes,
253 frontend_config.max_single_query_size_bytes,
254 ));
255
256 let webhook_service = crate::webhook::WebhookService::new(webhook_listen_addr);
257 let _task = tokio::spawn(webhook_service.serve());
258 pg_serve(
259 &listen_addr,
260 tcp_keepalive,
261 session_mgr.clone(),
262 ConnectionContext {
263 tls_config: TlsConfig::new_default(),
264 redact_sql_option_keywords: Some(redact_sql_option_keywords),
265 message_memory_manager,
266 },
267 shutdown,
268 )
269 .await
270 .unwrap()
271 })
272}
273
274pub fn default_frontend_total_memory_bytes() -> usize {
275 system_memory_available_bytes()
276}