risingwave_frontend/
lib.rs1#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(map_try_insert)]
17#![feature(negative_impls)]
18#![feature(coroutines)]
19#![feature(proc_macro_hygiene, stmt_expr_attributes)]
20#![feature(trait_alias)]
21#![feature(if_let_guard)]
22#![feature(assert_matches)]
23#![feature(box_patterns)]
24#![feature(macro_metavar_expr)]
25#![feature(min_specialization)]
26#![feature(extend_one)]
27#![feature(type_alias_impl_trait)]
28#![feature(impl_trait_in_assoc_type)]
29#![feature(error_generic_member_access)]
30#![feature(iterator_try_collect)]
31#![feature(used_with_arg)]
32#![feature(try_trait_v2)]
33#![feature(never_type)]
34#![recursion_limit = "256"]
35
36#[cfg(test)]
37risingwave_expr_impl::enable!();
38#[cfg(test)]
39risingwave_batch_executors::enable!();
40
41#[macro_use]
42mod catalog;
43
44use std::collections::HashSet;
45use std::time::Duration;
46
47pub use catalog::TableCatalog;
48mod binder;
49pub use binder::{Binder, bind_data_type};
50pub mod expr;
51pub mod handler;
52pub use handler::PgResponseStream;
53mod observer;
54pub mod optimizer;
55pub use optimizer::{Explain, OptimizerContext, OptimizerContextRef, PlanRef};
56mod planner;
57use pgwire::net::TcpKeepalive;
58pub use planner::Planner;
59mod scheduler;
60pub mod session;
61mod stream_fragmenter;
62use risingwave_common::config::{MetricLevel, OverrideConfig};
63use risingwave_common::util::meta_addr::MetaAddressStrategy;
64use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
65use risingwave_common::util::tokio_util::sync::CancellationToken;
66pub use stream_fragmenter::build_graph;
67mod utils;
68pub use utils::{WithOptions, WithOptionsSecResolved, explain_stream_graph};
69pub(crate) mod error;
70mod meta_client;
71pub mod metrics_reader;
72pub use metrics_reader::MetricsReaderImpl;
73
74#[cfg(feature = "datafusion")]
75pub mod datafusion;
76
77pub mod test_utils;
78mod user;
79pub mod webhook;
80
81pub mod health_service;
82mod monitor;
83
84pub mod rpc;
85mod telemetry;
86
87use std::ffi::OsString;
88use std::iter;
89use std::sync::Arc;
90
91use clap::Parser;
92use pgwire::pg_server::pg_serve;
93use session::SessionManagerImpl;
94
95#[derive(Parser, Clone, Debug, OverrideConfig)]
97#[command(
98 version,
99 about = "The stateless proxy that parses SQL queries and performs planning and optimizations of query jobs"
100)]
101pub struct FrontendOpts {
102 #[clap(long, env = "RW_LISTEN_ADDR", default_value = "0.0.0.0:4566")]
106 pub listen_addr: String,
107
108 #[clap(long, env = "RW_TCP_KEEPALIVE_IDLE_SECS", default_value = "300")]
111 pub tcp_keepalive_idle_secs: usize,
112
113 #[clap(long, env = "RW_ADVERTISE_ADDR")]
118 pub advertise_addr: Option<String>,
119
120 #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
122 pub meta_addr: MetaAddressStrategy,
123
124 #[clap(
127 long,
128 env = "RW_PROMETHEUS_LISTENER_ADDR",
129 default_value = "127.0.0.1:2222"
130 )]
131 pub prometheus_listener_addr: String,
132
133 #[clap(
134 long,
135 alias = "health-check-listener-addr",
136 env = "RW_HEALTH_CHECK_LISTENER_ADDR",
137 default_value = "0.0.0.0:6786"
138 )]
139 pub frontend_rpc_listener_addr: String,
140
141 #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
148 pub config_path: String,
149
150 #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
155 #[override_opts(path = server.metrics_level)]
156 pub metrics_level: Option<MetricLevel>,
157
158 #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
160 #[override_opts(path = server.heap_profiling.dir)]
161 pub heap_profiling_dir: Option<String>,
162
163 #[clap(long, hide = true, env = "ENABLE_BARRIER_READ")]
164 #[override_opts(path = batch.enable_barrier_read)]
165 pub enable_barrier_read: Option<bool>,
166
167 #[clap(
169 long,
170 hide = true,
171 env = "RW_TEMP_SECRET_FILE_DIR",
172 default_value = "./secrets"
173 )]
174 pub temp_secret_file_dir: String,
175
176 #[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
178 pub frontend_total_memory_bytes: usize,
179
180 #[clap(long, env = "RW_WEBHOOK_LISTEN_ADDR", default_value = "0.0.0.0:4560")]
183 pub webhook_listen_addr: String,
184
185 #[clap(long, env = "RW_SBC_ADDR", default_value = "")]
190 pub serverless_backfill_controller_addr: String,
191
192 #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
195 pub prometheus_endpoint: Option<String>,
196
197 #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
201 pub prometheus_selector: Option<String>,
202}
203
204impl risingwave_common::opts::Opts for FrontendOpts {
205 fn name() -> &'static str {
206 "frontend"
207 }
208
209 fn meta_addr(&self) -> MetaAddressStrategy {
210 self.meta_addr.clone()
211 }
212}
213
214impl Default for FrontendOpts {
215 fn default() -> Self {
216 FrontendOpts::parse_from(iter::empty::<OsString>())
217 }
218}
219
220use std::future::Future;
221use std::pin::Pin;
222
223use pgwire::memory_manager::MessageMemoryManager;
224use pgwire::pg_protocol::{ConnectionContext, TlsConfig};
225
226use crate::session::SESSION_MANAGER;
227
228pub fn start(
230 opts: FrontendOpts,
231 shutdown: CancellationToken,
232) -> Pin<Box<dyn Future<Output = ()> + Send>> {
233 Box::pin(async move {
236 let listen_addr = opts.listen_addr.clone();
237 let webhook_listen_addr = opts.webhook_listen_addr.parse().unwrap();
238 let tcp_keepalive =
239 TcpKeepalive::new().with_time(Duration::from_secs(opts.tcp_keepalive_idle_secs as _));
240
241 let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap());
242 SESSION_MANAGER.get_or_init(|| session_mgr.clone());
243 let redact_sql_option_keywords = Arc::new(
244 session_mgr
245 .env()
246 .batch_config()
247 .redact_sql_option_keywords
248 .iter()
249 .map(|s| s.to_lowercase())
250 .collect::<HashSet<_>>(),
251 );
252 let frontend_config = &session_mgr.env().frontend_config();
253 let message_memory_manager = Arc::new(MessageMemoryManager::new(
254 frontend_config.max_total_query_size_bytes,
255 frontend_config.min_single_query_size_bytes,
256 frontend_config.max_single_query_size_bytes,
257 ));
258
259 let webhook_service = crate::webhook::WebhookService::new(webhook_listen_addr);
260 let _task = tokio::spawn(webhook_service.serve());
261 pg_serve(
262 &listen_addr,
263 tcp_keepalive,
264 session_mgr.clone(),
265 ConnectionContext {
266 tls_config: TlsConfig::new_default(),
267 redact_sql_option_keywords: Some(redact_sql_option_keywords),
268 message_memory_manager,
269 },
270 shutdown,
271 )
272 .await
273 .unwrap()
274 })
275}
276
277pub fn default_frontend_total_memory_bytes() -> usize {
278 system_memory_available_bytes()
279}