risingwave_frontend/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16#![feature(map_try_insert)]
17#![feature(negative_impls)]
18#![feature(coroutines)]
19#![feature(proc_macro_hygiene, stmt_expr_attributes)]
20#![feature(trait_alias)]
21#![feature(if_let_guard)]
22#![feature(assert_matches)]
23#![feature(box_patterns)]
24#![feature(macro_metavar_expr)]
25#![feature(min_specialization)]
26#![feature(extend_one)]
27#![feature(type_alias_impl_trait)]
28#![feature(impl_trait_in_assoc_type)]
29#![feature(error_generic_member_access)]
30#![feature(iterator_try_collect)]
31#![feature(used_with_arg)]
32#![feature(try_trait_v2)]
33#![feature(never_type)]
34#![recursion_limit = "256"]
35
36#[cfg(test)]
37risingwave_expr_impl::enable!();
38#[cfg(test)]
39risingwave_batch_executors::enable!();
40
41#[macro_use]
42mod catalog;
43
44use std::collections::HashSet;
45use std::time::Duration;
46
47pub use catalog::TableCatalog;
48mod binder;
49pub use binder::{Binder, bind_data_type};
50pub mod expr;
51pub mod handler;
52pub use handler::PgResponseStream;
53mod observer;
54pub mod optimizer;
55pub use optimizer::{Explain, OptimizerContext, OptimizerContextRef, PlanRef};
56mod planner;
57use pgwire::net::TcpKeepalive;
58pub use planner::Planner;
59mod scheduler;
60pub mod session;
61mod stream_fragmenter;
62use risingwave_common::config::{MetricLevel, OverrideConfig};
63use risingwave_common::util::meta_addr::MetaAddressStrategy;
64use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
65use risingwave_common::util::tokio_util::sync::CancellationToken;
66pub use stream_fragmenter::build_graph;
67mod utils;
68pub use utils::data_type::DataTypeToAst;
69pub use utils::{WithOptions, WithOptionsSecResolved, explain_stream_graph};
70pub(crate) mod error;
71mod meta_client;
72pub mod metrics_reader;
73pub use metrics_reader::MetricsReaderImpl;
74
75#[cfg(feature = "datafusion")]
76pub mod datafusion;
77
78pub mod test_utils;
79mod user;
80pub mod webhook;
81
82pub mod health_service;
83mod monitor;
84
85pub mod rpc;
86mod telemetry;
87
88use std::ffi::OsString;
89use std::iter;
90use std::sync::Arc;
91
92use clap::Parser;
93use pgwire::pg_server::pg_serve;
94use session::SessionManagerImpl;
95
96/// Command-line arguments for frontend-node.
97#[derive(Parser, Clone, Debug, OverrideConfig)]
98#[command(
99    version,
100    about = "The stateless proxy that parses SQL queries and performs planning and optimizations of query jobs"
101)]
102pub struct FrontendOpts {
103    // TODO: rename to listen_addr and separate out the port.
104    /// The address that this service listens to.
105    /// Usually the localhost + desired port.
106    #[clap(long, env = "RW_LISTEN_ADDR", default_value = "0.0.0.0:4566")]
107    pub listen_addr: String,
108
109    /// The amount of time with no network activity after which the server will send a
110    /// TCP keepalive message to the client.
111    #[clap(long, env = "RW_TCP_KEEPALIVE_IDLE_SECS", default_value = "300")]
112    pub tcp_keepalive_idle_secs: usize,
113
114    /// The address for contacting this instance of the service.
115    /// This would be synonymous with the service's "public address"
116    /// or "identifying address".
117    /// Optional, we will use `listen_addr` if not specified.
118    #[clap(long, env = "RW_ADVERTISE_ADDR")]
119    pub advertise_addr: Option<String>,
120
121    /// The address via which we will attempt to connect to a leader meta node.
122    #[clap(long, env = "RW_META_ADDR", default_value = "http://127.0.0.1:5690")]
123    pub meta_addr: MetaAddressStrategy,
124
125    /// We will start a http server at this address via `MetricsManager`.
126    /// Then the prometheus instance will poll the metrics from this address.
127    #[clap(
128        long,
129        env = "RW_PROMETHEUS_LISTENER_ADDR",
130        default_value = "127.0.0.1:2222"
131    )]
132    pub prometheus_listener_addr: String,
133
134    #[clap(
135        long,
136        alias = "health-check-listener-addr",
137        env = "RW_HEALTH_CHECK_LISTENER_ADDR",
138        default_value = "0.0.0.0:6786"
139    )]
140    pub frontend_rpc_listener_addr: String,
141
142    /// The path of `risingwave.toml` configuration file.
143    ///
144    /// If empty, default configuration values will be used.
145    ///
146    /// Note that internal system parameters should be defined in the configuration file at
147    /// [`risingwave_common::config`] instead of command line arguments.
148    #[clap(long, env = "RW_CONFIG_PATH", default_value = "")]
149    pub config_path: String,
150
151    /// Used for control the metrics level, similar to log level.
152    ///
153    /// level = 0: disable metrics
154    /// level > 0: enable metrics
155    #[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
156    #[override_opts(path = server.metrics_level)]
157    pub metrics_level: Option<MetricLevel>,
158
159    /// Enable heap profile dump when memory usage is high.
160    #[clap(long, hide = true, env = "RW_HEAP_PROFILING_DIR")]
161    #[override_opts(path = server.heap_profiling.dir)]
162    pub heap_profiling_dir: Option<String>,
163
164    #[clap(long, hide = true, env = "ENABLE_BARRIER_READ")]
165    #[override_opts(path = batch.enable_barrier_read)]
166    pub enable_barrier_read: Option<bool>,
167
168    /// The path of the temp secret file directory.
169    #[clap(
170        long,
171        hide = true,
172        env = "RW_TEMP_SECRET_FILE_DIR",
173        default_value = "./secrets"
174    )]
175    pub temp_secret_file_dir: String,
176
177    /// Total available memory for the frontend node in bytes. Used for batch computing.
178    #[clap(long, env = "RW_FRONTEND_TOTAL_MEMORY_BYTES", default_value_t = default_frontend_total_memory_bytes())]
179    pub frontend_total_memory_bytes: usize,
180
181    /// The address that the webhook service listens to.
182    /// Usually the localhost + desired port.
183    #[clap(long, env = "RW_WEBHOOK_LISTEN_ADDR", default_value = "0.0.0.0:4560")]
184    pub webhook_listen_addr: String,
185
186    /// Address of the serverless backfill controller.
187    /// Needed if frontend receives a query like
188    /// CREATE MATERIALIZED VIEW ... WITH ( `cloud.serverless_backfill_enabled=true` )
189    /// Feature disabled by default.
190    #[clap(long, env = "RW_SBC_ADDR", default_value = "")]
191    pub serverless_backfill_controller_addr: String,
192
193    /// Prometheus endpoint URL for querying metrics.
194    /// Optional, used for querying Prometheus metrics from the frontend.
195    #[clap(long, env = "RW_PROMETHEUS_ENDPOINT")]
196    pub prometheus_endpoint: Option<String>,
197
198    /// The additional selector used when querying Prometheus.
199    ///
200    /// The format is same as `PromQL`. Example: `instance="foo",namespace="bar"`
201    #[clap(long, env = "RW_PROMETHEUS_SELECTOR")]
202    pub prometheus_selector: Option<String>,
203}
204
205impl risingwave_common::opts::Opts for FrontendOpts {
206    fn name() -> &'static str {
207        "frontend"
208    }
209
210    fn meta_addr(&self) -> MetaAddressStrategy {
211        self.meta_addr.clone()
212    }
213}
214
215impl Default for FrontendOpts {
216    fn default() -> Self {
217        FrontendOpts::parse_from(iter::empty::<OsString>())
218    }
219}
220
221use std::future::Future;
222use std::pin::Pin;
223
224use pgwire::memory_manager::MessageMemoryManager;
225use pgwire::pg_protocol::{ConnectionContext, TlsConfig};
226
227use crate::session::SESSION_MANAGER;
228
229/// Start frontend
230pub fn start(
231    opts: FrontendOpts,
232    shutdown: CancellationToken,
233) -> Pin<Box<dyn Future<Output = ()> + Send>> {
234    // WARNING: don't change the function signature. Making it `async fn` will cause
235    // slow compile in release mode.
236    Box::pin(async move {
237        let listen_addr = opts.listen_addr.clone();
238        let webhook_listen_addr = opts.webhook_listen_addr.parse().unwrap();
239        let tcp_keepalive =
240            TcpKeepalive::new().with_time(Duration::from_secs(opts.tcp_keepalive_idle_secs as _));
241
242        let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap());
243        SESSION_MANAGER.get_or_init(|| session_mgr.clone());
244        let redact_sql_option_keywords = Arc::new(
245            session_mgr
246                .env()
247                .batch_config()
248                .redact_sql_option_keywords
249                .iter()
250                .map(|s| s.to_lowercase())
251                .collect::<HashSet<_>>(),
252        );
253        let frontend_config = &session_mgr.env().frontend_config();
254        let message_memory_manager = Arc::new(MessageMemoryManager::new(
255            frontend_config.max_total_query_size_bytes,
256            frontend_config.min_single_query_size_bytes,
257            frontend_config.max_single_query_size_bytes,
258        ));
259
260        let webhook_service = crate::webhook::WebhookService::new(webhook_listen_addr);
261        let _task = tokio::spawn(webhook_service.serve());
262        pg_serve(
263            &listen_addr,
264            tcp_keepalive,
265            session_mgr.clone(),
266            ConnectionContext {
267                tls_config: TlsConfig::new_default(),
268                redact_sql_option_keywords: Some(redact_sql_option_keywords),
269                message_memory_manager,
270            },
271            shutdown,
272        )
273        .await
274        .unwrap()
275    })
276}
277
278pub fn default_frontend_total_memory_bytes() -> usize {
279    system_memory_available_bytes()
280}