1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
1415#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(map_try_insert)]
17#![feature(negative_impls)]
18#![feature(coroutines)]
19#![feature(proc_macro_hygiene, stmt_expr_attributes)]
20#![feature(trait_alias)]
21#![feature(if_let_guard)]
22#![feature(let_chains)]
23#![feature(assert_matches)]
24#![feature(box_patterns)]
25#![feature(macro_metavar_expr)]
26#![feature(min_specialization)]
27#![feature(extend_one)]
28#![feature(type_alias_impl_trait)]
29#![feature(impl_trait_in_assoc_type)]
30#![feature(result_flattening)]
31#![feature(error_generic_member_access)]
32#![feature(iterator_try_collect)]
33#![feature(used_with_arg)]
34#![feature(try_trait_v2)]
35#![feature(cell_update)]
36#![recursion_limit = "256"]
3738#[cfg(test)]
39risingwave_expr_impl::enable!();
40#[cfg(test)]
41risingwave_batch_executors::enable!();
4243#[macro_use]
44mod catalog;
4546use std::collections::HashSet;
47use std::time::Duration;
4849pub use catalog::TableCatalog;
50mod binder;
51pub use binder::{Binder, bind_data_type};
52pub mod expr;
53pub mod handler;
54pub use handler::PgResponseStream;
55mod observer;
56pub mod optimizer;
57pub use optimizer::{Explain, OptimizerContext, OptimizerContextRef, PlanRef};
58mod planner;
59use pgwire::net::TcpKeepalive;
60pub use planner::Planner;
61mod scheduler;
62pub mod session;
63mod stream_fragmenter;
64use risingwave_common::config::{MetricLevel, OverrideConfig};
65use risingwave_common::util::meta_addr::MetaAddressStrategy;
66use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
67use risingwave_common::util::tokio_util::sync::CancellationToken;
68pub use stream_fragmenter::build_graph;
69mod utils;
70pub use utils::{WithOptions, WithOptionsSecResolved, explain_stream_graph};
71pub(crate) mod error;
72mod meta_client;
73pub mod test_utils;
74mod user;
75pub mod webhook;
7677pub mod health_service;
78mod monitor;
7980pub mod rpc;
81mod telemetry;
8283use std::ffi::OsString;
84use std::iter;
85use std::sync::Arc;
8687use clap::Parser;
88use pgwire::pg_server::pg_serve;
89use session::SessionManagerImpl;
9091/// Command-line arguments for frontend-node.
92#[derive(Parser, Clone, Debug, OverrideConfig)]
93#[command(
94 version,
95 about = "The stateless proxy that parses SQL queries and performs planning and optimizations of query jobs"
96)]
97pub struct FrontendOpts {
98// TODO: rename to listen_addr and separate out the port.
99/// The address that this service listens to.
100 /// Usually the localhost + desired port.
101#[clap(long, env = "RW_LISTEN_ADDR", default_value = "0.0.0.0:4566")]
102pub listen_addr: String,
103104/// The amount of time with no network activity after which the server will send a
105 /// TCP keepalive message to the client.
106#[clap(long, env = "RW_TCP_KEEPALIVE_IDLE_SECS", default_value = "300")]
107pub tcp_keepalive_idle_secs: usize,
108109/// The address for contacting this instance of the service.
110 /// This would be synonymous with the service's "public address"
111 /// or "identifying address".
112 /// Optional, we will use `listen_addr` if not specified.
113#[clap(long, env = "RW_ADVERTISE_ADDR")]
114pub advertise_addr: Option<String>,
115116/// The address via which we will attempt to connect to a leader meta node.
117#[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
118pub meta_addr: MetaAddressStrategy,
119120/// We will start a http server at this address via `MetricsManager`.
121 /// Then the prometheus instance will poll the metrics from this address.
122#[clap(
123 long,
124 env = "RW_PROMETHEUS_LISTENER_ADDR",
125 default_value = "127.0.0.1:2222"
126)]
127pub prometheus_listener_addr: String,
128129#[clap(
130 long,
131 alias = "health-check-listener-addr",
132 env = "RW_HEALTH_CHECK_LISTENER_ADDR",
133 default_value = "127.0.0.1:6786"
134)]
135pub frontend_rpc_listener_addr: String,
136137/// The path of `risingwave.toml` configuration file.
138 ///
139 /// If empty, default configuration values will be used.
140 ///
141 /// Note that internal system parameters should be defined in the configuration file at
142 /// [`risingwave_common::config`] instead of command line arguments.
143#[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
144pub config_path: String,
145146/// Used for control the metrics level, similar to log level.
147 ///
148 /// level = 0: disable metrics
149 /// level > 0: enable metrics
150#[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
151 #[override_opts(path = server.metrics_level)]
152pub metrics_level: Option<MetricLevel>,
153154/// Enable heap profile dump when memory usage is high.
155#[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
156 #[override_opts(path = server.heap_profiling.dir)]
157pub heap_profiling_dir: Option<String>,
158159#[clap(long, hide = true, env = "ENABLE_BARRIER_READ")]
160 #[override_opts(path = batch.enable_barrier_read)]
161pub enable_barrier_read: Option<bool>,
162163/// The path of the temp secret file directory.
164#[clap(
165 long,
166 hide = true,
167 env = "RW_TEMP_SECRET_FILE_DIR",
168 default_value = "./secrets"
169)]
170pub temp_secret_file_dir: String,
171172/// Total available memory for the frontend node in bytes. Used for batch computing.
173#[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
174pub frontend_total_memory_bytes: usize,
175176/// The address that the webhook service listens to.
177 /// Usually the localhost + desired port.
178#[clap(long, env = "RW_WEBHOOK_LISTEN_ADDR", default_value = "0.0.0.0:4560")]
179pub webhook_listen_addr: String,
180181/// Address of the serverless backfill controller.
182 /// Needed if frontend receives a query like
183 /// CREATE MATERIALIZED VIEW ... WITH ( `cloud.serverless_backfill_enabled=true` )
184 /// Feature disabled by default.
185#[clap(long, env = "RW_SBC_ADDR", default_value = "")]
186pub serverless_backfill_controller_addr: String,
187}
188189impl risingwave_common::opts::Opts for FrontendOpts {
190fn name() -> &'static str {
191"frontend"
192}
193194fn meta_addr(&self) -> MetaAddressStrategy {
195self.meta_addr.clone()
196 }
197}
198199impl Default for FrontendOpts {
200fn default() -> Self {
201 FrontendOpts::parse_from(iter::empty::<OsString>())
202 }
203}
204205use std::future::Future;
206use std::pin::Pin;
207208use pgwire::memory_manager::MessageMemoryManager;
209use pgwire::pg_protocol::{ConnectionContext, TlsConfig};
210211use crate::session::SESSION_MANAGER;
212213/// Start frontend
214pub fn start(
215 opts: FrontendOpts,
216 shutdown: CancellationToken,
217) -> Pin<Box<dyn Future<Output = ()> + Send>> {
218// WARNING: don't change the function signature. Making it `async fn` will cause
219 // slow compile in release mode.
220Box::pin(async move {
221let listen_addr = opts.listen_addr.clone();
222let webhook_listen_addr = opts.webhook_listen_addr.parse().unwrap();
223let tcp_keepalive =
224 TcpKeepalive::new().with_time(Duration::from_secs(opts.tcp_keepalive_idle_secs as _));
225226let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap());
227 SESSION_MANAGER.get_or_init(|| session_mgr.clone());
228let redact_sql_option_keywords = Arc::new(
229 session_mgr
230 .env()
231 .batch_config()
232 .redact_sql_option_keywords
233 .iter()
234 .map(|s| s.to_lowercase())
235 .collect::<HashSet<_>>(),
236 );
237let frontend_config = &session_mgr.env().frontend_config();
238let message_memory_manager = Arc::new(MessageMemoryManager::new(
239 frontend_config.max_total_query_size_bytes,
240 frontend_config.min_single_query_size_bytes,
241 frontend_config.max_single_query_size_bytes,
242 ));
243244let webhook_service = crate::webhook::WebhookService::new(webhook_listen_addr);
245let _task = tokio::spawn(webhook_service.serve());
246 pg_serve(
247&listen_addr,
248 tcp_keepalive,
249 session_mgr.clone(),
250 ConnectionContext {
251 tls_config: TlsConfig::new_default(),
252 redact_sql_option_keywords: Some(redact_sql_option_keywords),
253 message_memory_manager,
254 },
255 shutdown,
256 )
257 .await
258.unwrap()
259 })
260}
261262pub fn default_frontend_total_memory_bytes() -> usize {
263 system_memory_available_bytes()
264}