risingwave_frontend/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(map_try_insert)]
17#![feature(negative_impls)]
18#![feature(coroutines)]
19#![feature(proc_macro_hygiene, stmt_expr_attributes)]
20#![feature(trait_alias)]
21#![feature(if_let_guard)]
22#![feature(let_chains)]
23#![feature(assert_matches)]
24#![feature(box_patterns)]
25#![feature(macro_metavar_expr)]
26#![feature(min_specialization)]
27#![feature(extend_one)]
28#![feature(type_alias_impl_trait)]
29#![feature(impl_trait_in_assoc_type)]
30#![feature(error_generic_member_access)]
31#![feature(iterator_try_collect)]
32#![feature(used_with_arg)]
33#![feature(try_trait_v2)]
34#![feature(never_type)]
35#![recursion_limit = "256"]
36
37#[cfg(test)]
38risingwave_expr_impl::enable!();
39#[cfg(test)]
40risingwave_batch_executors::enable!();
41
42#[macro_use]
43mod catalog;
44
45use std::collections::HashSet;
46use std::time::Duration;
47
48pub use catalog::TableCatalog;
49mod binder;
50pub use binder::{Binder, bind_data_type};
51pub mod expr;
52pub mod handler;
53pub use handler::PgResponseStream;
54mod observer;
55pub mod optimizer;
56pub use optimizer::{Explain, OptimizerContext, OptimizerContextRef, PlanRef};
57mod planner;
58use pgwire::net::TcpKeepalive;
59pub use planner::Planner;
60mod scheduler;
61pub mod session;
62mod stream_fragmenter;
63use risingwave_common::config::{MetricLevel, OverrideConfig};
64use risingwave_common::util::meta_addr::MetaAddressStrategy;
65use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
66use risingwave_common::util::tokio_util::sync::CancellationToken;
67pub use stream_fragmenter::build_graph;
68mod utils;
69pub use utils::{WithOptions, WithOptionsSecResolved, explain_stream_graph};
70pub(crate) mod error;
71mod meta_client;
72pub mod test_utils;
73mod user;
74pub mod webhook;
75
76pub mod health_service;
77mod monitor;
78
79pub mod rpc;
80mod telemetry;
81
82use std::ffi::OsString;
83use std::iter;
84use std::sync::Arc;
85
86use clap::Parser;
87use pgwire::pg_server::pg_serve;
88use session::SessionManagerImpl;
89
90/// Command-line arguments for frontend-node.
91#[derive(Parser, Clone, Debug, OverrideConfig)]
92#[command(
93    version,
94    about = "The stateless proxy that parses SQL queries and performs planning and optimizations of query jobs"
95)]
96pub struct FrontendOpts {
97    // TODO: rename to listen_addr and separate out the port.
98    /// The address that this service listens to.
99    /// Usually the localhost + desired port.
100    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "0.0.0.0:4566")]
101    pub listen_addr: String,
102
103    /// The amount of time with no network activity after which the server will send a
104    /// TCP keepalive message to the client.
105    #[clap(long, env = "RW_TCP_KEEPALIVE_IDLE_SECS", default_value = "300")]
106    pub tcp_keepalive_idle_secs: usize,
107
108    /// The address for contacting this instance of the service.
109    /// This would be synonymous with the service's "public address"
110    /// or "identifying address".
111    /// Optional, we will use `listen_addr` if not specified.
112    #[clap(long, env = "RW_ADVERTISE_ADDR")]
113    pub advertise_addr: Option<String>,
114
115    /// The address via which we will attempt to connect to a leader meta node.
116    #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
117    pub meta_addr: MetaAddressStrategy,
118
119    /// We will start a http server at this address via `MetricsManager`.
120    /// Then the prometheus instance will poll the metrics from this address.
121    #[clap(
122        long,
123        env = "RW_PROMETHEUS_LISTENER_ADDR",
124        default_value = "127.0.0.1:2222"
125    )]
126    pub prometheus_listener_addr: String,
127
128    #[clap(
129        long,
130        alias = "health-check-listener-addr",
131        env = "RW_HEALTH_CHECK_LISTENER_ADDR",
132        default_value = "0.0.0.0:6786"
133    )]
134    pub frontend_rpc_listener_addr: String,
135
136    /// The path of `risingwave.toml` configuration file.
137    ///
138    /// If empty, default configuration values will be used.
139    ///
140    /// Note that internal system parameters should be defined in the configuration file at
141    /// [`risingwave_common::config`] instead of command line arguments.
142    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
143    pub config_path: String,
144
145    /// Used for control the metrics level, similar to log level.
146    ///
147    /// level = 0: disable metrics
148    /// level > 0: enable metrics
149    #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
150    #[override_opts(path = server.metrics_level)]
151    pub metrics_level: Option<MetricLevel>,
152
153    /// Enable heap profile dump when memory usage is high.
154    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
155    #[override_opts(path = server.heap_profiling.dir)]
156    pub heap_profiling_dir: Option<String>,
157
158    #[clap(long, hide = true, env = "ENABLE_BARRIER_READ")]
159    #[override_opts(path = batch.enable_barrier_read)]
160    pub enable_barrier_read: Option<bool>,
161
162    /// The path of the temp secret file directory.
163    #[clap(
164        long,
165        hide = true,
166        env = "RW_TEMP_SECRET_FILE_DIR",
167        default_value = "./secrets"
168    )]
169    pub temp_secret_file_dir: String,
170
171    /// Total available memory for the frontend node in bytes. Used for batch computing.
172    #[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
173    pub frontend_total_memory_bytes: usize,
174
175    /// The address that the webhook service listens to.
176    /// Usually the localhost + desired port.
177    #[clap(long, env = "RW_WEBHOOK_LISTEN_ADDR", default_value = "0.0.0.0:4560")]
178    pub webhook_listen_addr: String,
179
180    /// Address of the serverless backfill controller.
181    /// Needed if frontend receives a query like
182    /// CREATE MATERIALIZED VIEW ... WITH ( `cloud.serverless_backfill_enabled=true` )
183    /// Feature disabled by default.
184    #[clap(long, env = "RW_SBC_ADDR", default_value = "")]
185    pub serverless_backfill_controller_addr: String,
186}
187
188impl risingwave_common::opts::Opts for FrontendOpts {
189    fn name() -> &'static str {
190        "frontend"
191    }
192
193    fn meta_addr(&self) -> MetaAddressStrategy {
194        self.meta_addr.clone()
195    }
196}
197
198impl Default for FrontendOpts {
199    fn default() -> Self {
200        FrontendOpts::parse_from(iter::empty::<OsString>())
201    }
202}
203
204use std::future::Future;
205use std::pin::Pin;
206
207use pgwire::memory_manager::MessageMemoryManager;
208use pgwire::pg_protocol::{ConnectionContext, TlsConfig};
209
210use crate::session::SESSION_MANAGER;
211
212/// Start frontend
213pub fn start(
214    opts: FrontendOpts,
215    shutdown: CancellationToken,
216) -> Pin<Box<dyn Future<Output = ()> + Send>> {
217    // WARNING: don't change the function signature. Making it `async fn` will cause
218    // slow compile in release mode.
219    Box::pin(async move {
220        let listen_addr = opts.listen_addr.clone();
221        let webhook_listen_addr = opts.webhook_listen_addr.parse().unwrap();
222        let tcp_keepalive =
223            TcpKeepalive::new().with_time(Duration::from_secs(opts.tcp_keepalive_idle_secs as _));
224
225        let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap());
226        SESSION_MANAGER.get_or_init(|| session_mgr.clone());
227        let redact_sql_option_keywords = Arc::new(
228            session_mgr
229                .env()
230                .batch_config()
231                .redact_sql_option_keywords
232                .iter()
233                .map(|s| s.to_lowercase())
234                .collect::<HashSet<_>>(),
235        );
236        let frontend_config = &session_mgr.env().frontend_config();
237        let message_memory_manager = Arc::new(MessageMemoryManager::new(
238            frontend_config.max_total_query_size_bytes,
239            frontend_config.min_single_query_size_bytes,
240            frontend_config.max_single_query_size_bytes,
241        ));
242
243        let webhook_service = crate::webhook::WebhookService::new(webhook_listen_addr);
244        let _task = tokio::spawn(webhook_service.serve());
245        pg_serve(
246            &listen_addr,
247            tcp_keepalive,
248            session_mgr.clone(),
249            ConnectionContext {
250                tls_config: TlsConfig::new_default(),
251                redact_sql_option_keywords: Some(redact_sql_option_keywords),
252                message_memory_manager,
253            },
254            shutdown,
255        )
256        .await
257        .unwrap()
258    })
259}
260
261pub fn default_frontend_total_memory_bytes() -> usize {
262    system_memory_available_bytes()
263}