risingwave_frontend/
lib.rs1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(map_try_insert)]
17#![feature(coroutines)]
18#![feature(proc_macro_hygiene, stmt_expr_attributes)]
19#![feature(trait_alias)]
20#![feature(box_patterns)]
21#![feature(min_specialization)]
22#![feature(extend_one)]
23#![feature(impl_trait_in_assoc_type)]
24#![feature(error_generic_member_access)]
25#![feature(iterator_try_collect)]
26#![feature(used_with_arg)]
27#![feature(try_trait_v2)]
28#![feature(never_type)]
29#![recursion_limit = "256"]
30
31#[cfg(test)]
32risingwave_expr_impl::enable!();
33#[cfg(test)]
34risingwave_batch_executors::enable!();
35
36#[macro_use]
37mod catalog;
38
39use std::collections::HashSet;
40use std::time::Duration;
41
42pub use catalog::TableCatalog;
43mod binder;
44pub use binder::{Binder, bind_data_type};
45pub mod expr;
46pub mod handler;
47pub use handler::PgResponseStream;
48mod observer;
49pub mod optimizer;
50pub use optimizer::{Explain, OptimizerContext, OptimizerContextRef, PlanRef};
51mod planner;
52use pgwire::net::TcpKeepalive;
53pub use planner::Planner;
54mod scheduler;
55pub mod session;
56mod stream_fragmenter;
57use risingwave_common::config::{MetricLevel, OverrideConfig};
58use risingwave_common::util::meta_addr::MetaAddressStrategy;
59use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
60use risingwave_common::util::tokio_util::sync::CancellationToken;
61pub use stream_fragmenter::build_graph;
62mod utils;
63pub use utils::data_type::DataTypeToAst;
64pub use utils::{WithOptions, WithOptionsSecResolved, explain_stream_graph};
65pub(crate) mod error;
66mod meta_client;
67pub mod metrics_reader;
68pub use metrics_reader::MetricsReaderImpl;
69
70#[cfg(feature = "datafusion")]
71pub mod datafusion;
72
73pub mod test_utils;
74mod user;
75pub mod webhook;
76
77pub mod health_service;
78mod monitor;
79
80pub mod rpc;
81mod telemetry;
82
83use std::ffi::OsString;
84use std::iter;
85use std::sync::Arc;
86
87use clap::Parser;
88use pgwire::pg_server::pg_serve;
89use session::SessionManagerImpl;
90
91#[derive(Parser, Clone, Debug, OverrideConfig)]
93#[command(
94 version,
95 about = "The stateless proxy that parses SQL queries and performs planning and optimizations of query jobs"
96)]
97pub struct FrontendOpts {
98 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "0.0.0.0:4566")]
102 pub listen_addr: String,
103
104 #[clap(long, env = "RW_TCP_KEEPALIVE_IDLE_SECS", default_value = "300")]
107 pub tcp_keepalive_idle_secs: usize,
108
109 #[clap(long, env = "RW_ADVERTISE_ADDR")]
114 pub advertise_addr: Option<String>,
115
116 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
118 pub meta_addr: MetaAddressStrategy,
119
120 #[clap(
123 long,
124 env = "RW_PROMETHEUS_LISTENER_ADDR",
125 default_value = "127.0.0.1:2222"
126 )]
127 pub prometheus_listener_addr: String,
128
129 #[clap(
130 long,
131 alias = "health-check-listener-addr",
132 env = "RW_HEALTH_CHECK_LISTENER_ADDR",
133 default_value = "0.0.0.0:6786"
134 )]
135 pub frontend_rpc_listener_addr: String,
136
137 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
144 pub config_path: String,
145
146 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
151 #[override_opts(path = server.metrics_level)]
152 pub metrics_level: Option<MetricLevel>,
153
154 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
156 #[override_opts(path = server.heap_profiling.dir)]
157 pub heap_profiling_dir: Option<String>,
158
159 #[clap(long, hide = true, env = "ENABLE_BARRIER_READ")]
160 #[override_opts(path = batch.enable_barrier_read)]
161 pub enable_barrier_read: Option<bool>,
162
163 #[clap(
165 long,
166 hide = true,
167 env = "RW_TEMP_SECRET_FILE_DIR",
168 default_value = "./secrets"
169 )]
170 pub temp_secret_file_dir: String,
171
172 #[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
174 pub frontend_total_memory_bytes: usize,
175
176 #[clap(long, env = "RW_WEBHOOK_LISTEN_ADDR", default_value = "0.0.0.0:4560")]
179 pub webhook_listen_addr: String,
180
181 #[clap(long, env = "RW_SBC_ADDR", default_value = "")]
186 pub serverless_backfill_controller_addr: String,
187
188 #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
191 pub prometheus_endpoint: Option<String>,
192
193 #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
197 pub prometheus_selector: Option<String>,
198}
199
200impl risingwave_common::opts::Opts for FrontendOpts {
201 fn name() -> &'static str {
202 "frontend"
203 }
204
205 fn meta_addr(&self) -> MetaAddressStrategy {
206 self.meta_addr.clone()
207 }
208}
209
210impl Default for FrontendOpts {
211 fn default() -> Self {
212 FrontendOpts::parse_from(iter::empty::<OsString>())
213 }
214}
215
216use std::future::Future;
217use std::pin::Pin;
218
219use pgwire::memory_manager::MessageMemoryManager;
220use pgwire::pg_protocol::{ConnectionContext, TlsConfig};
221
222use crate::session::SESSION_MANAGER;
223
224pub fn start(
226 opts: FrontendOpts,
227 shutdown: CancellationToken,
228) -> Pin<Box<dyn Future<Output = ()> + Send>> {
229 Box::pin(async move {
232 let listen_addr = opts.listen_addr.clone();
233 let webhook_listen_addr = opts.webhook_listen_addr.parse().unwrap();
234 let tcp_keepalive =
235 TcpKeepalive::new().with_time(Duration::from_secs(opts.tcp_keepalive_idle_secs as _));
236 let tls_config = TlsConfig::new_default().unwrap();
237
238 let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap());
239 SESSION_MANAGER.get_or_init(|| session_mgr.clone());
240 let redact_sql_option_keywords = Arc::new(
241 session_mgr
242 .env()
243 .batch_config()
244 .redact_sql_option_keywords
245 .iter()
246 .map(|s| s.to_lowercase())
247 .collect::<HashSet<_>>(),
248 );
249 let frontend_config = &session_mgr.env().frontend_config();
250 let message_memory_manager = Arc::new(MessageMemoryManager::new(
251 frontend_config.max_total_query_size_bytes,
252 frontend_config.min_single_query_size_bytes,
253 frontend_config.max_single_query_size_bytes,
254 ));
255
256 let webhook_service =
257 crate::webhook::WebhookService::new(webhook_listen_addr, tls_config.clone());
258 let _task = tokio::spawn(webhook_service.serve());
259 pg_serve(
260 &listen_addr,
261 tcp_keepalive,
262 session_mgr.clone(),
263 ConnectionContext {
264 tls_config,
265 redact_sql_option_keywords: Some(redact_sql_option_keywords),
266 message_memory_manager,
267 },
268 shutdown,
269 )
270 .await
271 .unwrap()
272 })
273}
274
275pub fn default_frontend_total_memory_bytes() -> usize {
276 system_memory_available_bytes()
277}