1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::net::{IpAddr, Ipv4Addr, SocketAddr};
19use std::sync::atomic::{AtomicI32, Ordering};
20use std::sync::{Arc, Weak};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use either::Either;
26use itertools::Itertools;
27use parking_lot::{Mutex, RwLock, RwLockReadGuard};
28use pgwire::error::{PsqlError, PsqlResult};
29use pgwire::net::{Address, AddressRef};
30use pgwire::pg_field_descriptor::PgFieldDescriptor;
31use pgwire::pg_message::TransactionStatus;
32use pgwire::pg_response::{PgResponse, StatementType};
33use pgwire::pg_server::{
34 BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
35 UserAuthenticator,
36};
37use pgwire::types::{Format, FormatIterator};
38use prometheus_http_query::Client as PrometheusClient;
39use rand::RngCore;
40use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
41use risingwave_batch::spill::spill_op::SpillOp;
42use risingwave_batch::task::{ShutdownSender, ShutdownToken};
43use risingwave_batch::worker_manager::worker_node_manager::{
44 WorkerNodeManager, WorkerNodeManagerRef,
45};
46use risingwave_common::acl::AclMode;
47#[cfg(test)]
48use risingwave_common::catalog::{
49 DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
50};
51use risingwave_common::config::{
52 AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
53 RpcClientConfig, StreamingConfig, UdfConfig, load_config,
54};
55use risingwave_common::id::WorkerId;
56use risingwave_common::memory::MemoryContext;
57use risingwave_common::secret::LocalSecretManager;
58use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
59use risingwave_common::system_param::local_manager::{
60 LocalSystemParamsManager, LocalSystemParamsManagerRef,
61};
62use risingwave_common::telemetry::manager::TelemetryManager;
63use risingwave_common::telemetry::telemetry_env_enabled;
64use risingwave_common::types::DataType;
65use risingwave_common::util::addr::HostAddr;
66use risingwave_common::util::cluster_limit;
67use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
68use risingwave_common::util::iter_util::ZipEqFast;
69use risingwave_common::util::pretty_bytes::convert;
70use risingwave_common::util::runtime::BackgroundShutdownRuntime;
71use risingwave_common::{GIT_SHA, RW_VERSION};
72use risingwave_common_heap_profiling::HeapProfiler;
73use risingwave_common_service::{MetricsManager, ObserverManager};
74use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
75use risingwave_pb::common::WorkerType;
76use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
77use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
78use risingwave_pb::health::health_server::HealthServer;
79use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
80use risingwave_pb::user::auth_info::EncryptionType;
81use risingwave_rpc_client::{
82 ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
83 MonitorClientPool, MonitorClientPoolRef,
84};
85use risingwave_sqlparser::ast::{ObjectName, Statement};
86use risingwave_sqlparser::parser::Parser;
87use thiserror::Error;
88use thiserror_ext::AsReport;
89use tokio::runtime::Builder;
90use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
91use tokio::sync::oneshot::Sender;
92use tokio::sync::watch;
93use tokio::task::JoinHandle;
94use tracing::{error, info};
95
96use self::cursor_manager::CursorManager;
97use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
98use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
99use crate::catalog::connection_catalog::ConnectionCatalog;
100use crate::catalog::root_catalog::{Catalog, SchemaPath};
101use crate::catalog::secret_catalog::SecretCatalog;
102use crate::catalog::source_catalog::SourceCatalog;
103use crate::catalog::subscription_catalog::SubscriptionCatalog;
104use crate::catalog::{
105 CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
106 check_schema_writable,
107};
108use crate::error::{
109 ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
110};
111use crate::handler::describe::infer_describe;
112use crate::handler::extended_handle::{
113 Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
114};
115use crate::handler::privilege::ObjectCheckItem;
116use crate::handler::show::{infer_show_create_object, infer_show_object};
117use crate::handler::util::to_pg_field;
118use crate::handler::variable::infer_show_variable;
119use crate::handler::{RwPgResponse, handle};
120use crate::health_service::HealthServiceImpl;
121use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
122use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
123use crate::observer::FrontendObserverNode;
124use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
125use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
126use crate::scheduler::{
127 DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
128 HummockSnapshotManagerRef, QueryManager,
129};
130use crate::telemetry::FrontendTelemetryCreator;
131use crate::user::UserId;
132use crate::user::user_authentication::md5_hash_with_salt;
133use crate::user::user_manager::UserInfoManager;
134use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
135use crate::{FrontendOpts, PgResponseStream, TableCatalog};
136
137pub(crate) mod current;
138pub(crate) mod cursor_manager;
139pub(crate) mod transaction;
140
141#[derive(Clone)]
143pub(crate) struct FrontendEnv {
144 meta_client: Arc<dyn FrontendMetaClient>,
147 catalog_writer: Arc<dyn CatalogWriter>,
148 catalog_reader: CatalogReader,
149 user_info_writer: Arc<dyn UserInfoWriter>,
150 user_info_reader: UserInfoReader,
151 worker_node_manager: WorkerNodeManagerRef,
152 query_manager: QueryManager,
153 hummock_snapshot_manager: HummockSnapshotManagerRef,
154 system_params_manager: LocalSystemParamsManagerRef,
155 session_params: Arc<RwLock<SessionConfig>>,
156
157 server_addr: HostAddr,
158 client_pool: ComputeClientPoolRef,
159 frontend_client_pool: FrontendClientPoolRef,
160 monitor_client_pool: MonitorClientPoolRef,
161
162 sessions_map: SessionMapRef,
166
167 pub frontend_metrics: Arc<FrontendMetrics>,
168
169 pub cursor_metrics: Arc<CursorMetrics>,
170
171 source_metrics: Arc<SourceMetrics>,
172
173 spill_metrics: Arc<BatchSpillMetrics>,
175
176 batch_config: BatchConfig,
177 frontend_config: FrontendConfig,
178 #[expect(dead_code)]
179 meta_config: MetaConfig,
180 streaming_config: StreamingConfig,
181 udf_config: UdfConfig,
182
183 creating_streaming_job_tracker: StreamingJobTrackerRef,
186
187 compute_runtime: Arc<BackgroundShutdownRuntime>,
190
191 mem_context: MemoryContext,
193
194 serverless_backfill_controller_addr: String,
196
197 prometheus_client: Option<PrometheusClient>,
199
200 prometheus_selector: String,
202}
203
204pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
206
207const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
209
210impl FrontendEnv {
211 pub fn mock() -> Self {
212 use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
213
214 let catalog = Arc::new(RwLock::new(Catalog::default()));
215 let meta_client = Arc::new(MockFrontendMetaClient {});
216 let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
217 let catalog_writer = Arc::new(MockCatalogWriter::new(
218 catalog.clone(),
219 hummock_snapshot_manager.clone(),
220 ));
221 let catalog_reader = CatalogReader::new(catalog);
222 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
223 let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
224 let user_info_reader = UserInfoReader::new(user_info_manager);
225 let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
226 let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
227 let compute_client_pool = Arc::new(ComputeClientPool::for_test());
228 let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
229 let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
230 let query_manager = QueryManager::new(
231 worker_node_manager.clone(),
232 compute_client_pool,
233 catalog_reader.clone(),
234 Arc::new(DistributedQueryMetrics::for_test()),
235 None,
236 None,
237 );
238 let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
239 let client_pool = Arc::new(ComputeClientPool::for_test());
240 let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
241 let runtime = {
242 let mut builder = Builder::new_multi_thread();
243 if let Some(frontend_compute_runtime_worker_threads) =
244 load_config("", FrontendOpts::default())
245 .batch
246 .frontend_compute_runtime_worker_threads
247 {
248 builder.worker_threads(frontend_compute_runtime_worker_threads);
249 }
250 builder
251 .thread_name("rw-batch-local")
252 .enable_all()
253 .build()
254 .unwrap()
255 };
256 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
257 let sessions_map = Arc::new(RwLock::new(HashMap::new()));
258 Self {
259 meta_client,
260 catalog_writer,
261 catalog_reader,
262 user_info_writer,
263 user_info_reader,
264 worker_node_manager,
265 query_manager,
266 hummock_snapshot_manager,
267 system_params_manager,
268 session_params: Default::default(),
269 server_addr,
270 client_pool,
271 frontend_client_pool,
272 monitor_client_pool,
273 sessions_map,
274 frontend_metrics: Arc::new(FrontendMetrics::for_test()),
275 cursor_metrics: Arc::new(CursorMetrics::for_test()),
276 batch_config: BatchConfig::default(),
277 frontend_config: FrontendConfig::default(),
278 meta_config: MetaConfig::default(),
279 streaming_config: StreamingConfig::default(),
280 udf_config: UdfConfig::default(),
281 source_metrics: Arc::new(SourceMetrics::default()),
282 spill_metrics: BatchSpillMetrics::for_test(),
283 creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
284 compute_runtime,
285 mem_context: MemoryContext::none(),
286 serverless_backfill_controller_addr: Default::default(),
287 prometheus_client: None,
288 prometheus_selector: String::new(),
289 }
290 }
291
292 pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
293 let config = load_config(&opts.config_path, &opts);
294 info!("Starting frontend node");
295 info!("> config: {:?}", config);
296 info!(
297 "> debug assertions: {}",
298 if cfg!(debug_assertions) { "on" } else { "off" }
299 );
300 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
301
302 let frontend_address: HostAddr = opts
303 .advertise_addr
304 .as_ref()
305 .unwrap_or_else(|| {
306 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
307 &opts.listen_addr
308 })
309 .parse()
310 .unwrap();
311 info!("advertise addr is {}", frontend_address);
312
313 let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
314 let internal_rpc_host_addr = HostAddr {
315 host: frontend_address.host.clone(),
317 port: rpc_addr.port,
318 };
319 let (meta_client, system_params_reader) = MetaClient::register_new(
321 opts.meta_addr,
322 WorkerType::Frontend,
323 &frontend_address,
324 AddWorkerNodeProperty {
325 internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
326 ..Default::default()
327 },
328 Arc::new(config.meta.clone()),
329 )
330 .await;
331
332 let worker_id = meta_client.worker_id();
333 info!("Assigned worker node id {}", worker_id);
334
335 let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
336 meta_client.clone(),
337 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
338 );
339 let mut join_handles = vec![heartbeat_join_handle];
340 let mut shutdown_senders = vec![heartbeat_shutdown_sender];
341
342 let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
343 let hummock_snapshot_manager =
344 Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
345
346 let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
347 let catalog = Arc::new(RwLock::new(Catalog::default()));
348 let catalog_writer = Arc::new(CatalogWriterImpl::new(
349 meta_client.clone(),
350 catalog_updated_rx.clone(),
351 hummock_snapshot_manager.clone(),
352 ));
353 let catalog_reader = CatalogReader::new(catalog.clone());
354
355 let worker_node_manager = Arc::new(WorkerNodeManager::new());
356
357 let compute_client_pool = Arc::new(ComputeClientPool::new(
358 config.batch_exchange_connection_pool_size(),
359 config.batch.developer.compute_client_config.clone(),
360 ));
361 let frontend_client_pool = Arc::new(FrontendClientPool::new(
362 1,
363 config.batch.developer.frontend_client_config.clone(),
364 ));
365 let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
366 let query_manager = QueryManager::new(
367 worker_node_manager.clone(),
368 compute_client_pool.clone(),
369 catalog_reader.clone(),
370 Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
371 config.batch.distributed_query_limit,
372 config.batch.max_batch_queries_per_frontend_node,
373 );
374
375 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
376 let user_info_reader = UserInfoReader::new(user_info_manager.clone());
377 let user_info_writer = Arc::new(UserInfoWriterImpl::new(
378 meta_client.clone(),
379 catalog_updated_rx,
380 ));
381
382 let system_params_manager =
383 Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
384
385 LocalSecretManager::init(
386 opts.temp_secret_file_dir,
387 meta_client.cluster_id().to_owned(),
388 worker_id,
389 );
390
391 let session_params = Arc::new(RwLock::new(SessionConfig::default()));
393 let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
394 let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
395
396 let frontend_observer_node = FrontendObserverNode::new(
397 worker_node_manager.clone(),
398 catalog,
399 catalog_updated_tx,
400 user_info_manager,
401 hummock_snapshot_manager.clone(),
402 system_params_manager.clone(),
403 session_params.clone(),
404 compute_client_pool.clone(),
405 );
406 let observer_manager =
407 ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
408 .await;
409 let observer_join_handle = observer_manager.start().await;
410 join_handles.push(observer_join_handle);
411
412 meta_client.activate(&frontend_address).await?;
413
414 let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
415 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
416 let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
417
418 if config.server.metrics_level > MetricLevel::Disabled {
419 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
420 }
421
422 let health_srv = HealthServiceImpl::new();
423 let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
424 let monitor_srv = MonitorServiceImpl::new(config.server.clone());
425 let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
426
427 let telemetry_manager = TelemetryManager::new(
428 Arc::new(meta_client.clone()),
429 Arc::new(FrontendTelemetryCreator::new()),
430 );
431
432 if config.server.telemetry_enabled && telemetry_env_enabled() {
435 let (join_handle, shutdown_sender) = telemetry_manager.start().await;
436 join_handles.push(join_handle);
437 shutdown_senders.push(shutdown_sender);
438 } else {
439 tracing::info!("Telemetry didn't start due to config");
440 }
441
442 tokio::spawn(async move {
443 tonic::transport::Server::builder()
444 .add_service(HealthServer::new(health_srv))
445 .add_service(FrontendServiceServer::new(frontend_srv))
446 .add_service(MonitorServiceServer::new(monitor_srv))
447 .serve(frontend_rpc_addr)
448 .await
449 .unwrap();
450 });
451 info!(
452 "Health Check RPC Listener is set up on {}",
453 opts.frontend_rpc_listener_addr.clone()
454 );
455
456 let creating_streaming_job_tracker =
457 Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
458
459 let runtime = {
460 let mut builder = Builder::new_multi_thread();
461 if let Some(frontend_compute_runtime_worker_threads) =
462 config.batch.frontend_compute_runtime_worker_threads
463 {
464 builder.worker_threads(frontend_compute_runtime_worker_threads);
465 }
466 builder
467 .thread_name("rw-batch-local")
468 .enable_all()
469 .build()
470 .unwrap()
471 };
472 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
473
474 let sessions = sessions_map.clone();
475 let join_handle = tokio::spawn(async move {
477 let mut check_idle_txn_interval =
478 tokio::time::interval(core::time::Duration::from_secs(10));
479 check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
480 check_idle_txn_interval.reset();
481 loop {
482 check_idle_txn_interval.tick().await;
483 sessions.read().values().for_each(|session| {
484 let _ = session.check_idle_in_transaction_timeout();
485 })
486 }
487 });
488 join_handles.push(join_handle);
489
490 #[cfg(not(madsim))]
492 if config.batch.enable_spill {
493 SpillOp::clean_spill_directory()
494 .await
495 .map_err(|err| anyhow!(err))?;
496 }
497
498 let total_memory_bytes = opts.frontend_total_memory_bytes;
499 let heap_profiler =
500 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
501 heap_profiler.start();
503
504 let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
505 let mem_context = MemoryContext::root(
506 frontend_metrics.batch_total_mem.clone(),
507 batch_memory_limit as u64,
508 );
509
510 let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
512 match PrometheusClient::try_from(endpoint.as_str()) {
513 Ok(client) => {
514 info!("Prometheus client initialized with endpoint: {}", endpoint);
515 Some(client)
516 }
517 Err(e) => {
518 error!(
519 error = %e.as_report(),
520 "failed to initialize Prometheus client",
521 );
522 None
523 }
524 }
525 } else {
526 None
527 };
528
529 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
531
532 info!(
533 "Frontend total_memory: {} batch_memory: {}",
534 convert(total_memory_bytes as _),
535 convert(batch_memory_limit as _),
536 );
537
538 Ok((
539 Self {
540 catalog_reader,
541 catalog_writer,
542 user_info_reader,
543 user_info_writer,
544 worker_node_manager,
545 meta_client: frontend_meta_client,
546 query_manager,
547 hummock_snapshot_manager,
548 system_params_manager,
549 session_params,
550 server_addr: frontend_address,
551 client_pool: compute_client_pool,
552 frontend_client_pool,
553 monitor_client_pool,
554 frontend_metrics,
555 cursor_metrics,
556 spill_metrics,
557 sessions_map,
558 batch_config: config.batch,
559 frontend_config: config.frontend,
560 meta_config: config.meta,
561 streaming_config: config.streaming,
562 serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
563 udf_config: config.udf,
564 source_metrics,
565 creating_streaming_job_tracker,
566 compute_runtime,
567 mem_context,
568 prometheus_client,
569 prometheus_selector,
570 },
571 join_handles,
572 shutdown_senders,
573 ))
574 }
575
576 fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
581 &*self.catalog_writer
582 }
583
584 pub fn catalog_reader(&self) -> &CatalogReader {
586 &self.catalog_reader
587 }
588
589 fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
594 &*self.user_info_writer
595 }
596
597 pub fn user_info_reader(&self) -> &UserInfoReader {
599 &self.user_info_reader
600 }
601
602 pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
603 self.worker_node_manager.clone()
604 }
605
606 pub fn meta_client(&self) -> &dyn FrontendMetaClient {
607 &*self.meta_client
608 }
609
610 pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
611 self.meta_client.clone()
612 }
613
614 pub fn query_manager(&self) -> &QueryManager {
615 &self.query_manager
616 }
617
618 pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
619 &self.hummock_snapshot_manager
620 }
621
622 pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
623 &self.system_params_manager
624 }
625
626 pub fn session_params_snapshot(&self) -> SessionConfig {
627 self.session_params.read_recursive().clone()
628 }
629
630 pub fn sbc_address(&self) -> &String {
631 &self.serverless_backfill_controller_addr
632 }
633
634 pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
636 self.prometheus_client.as_ref()
637 }
638
639 pub fn prometheus_selector(&self) -> &str {
641 &self.prometheus_selector
642 }
643
644 pub fn server_address(&self) -> &HostAddr {
645 &self.server_addr
646 }
647
648 pub fn client_pool(&self) -> ComputeClientPoolRef {
649 self.client_pool.clone()
650 }
651
652 pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
653 self.frontend_client_pool.clone()
654 }
655
656 pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
657 self.monitor_client_pool.clone()
658 }
659
660 pub fn batch_config(&self) -> &BatchConfig {
661 &self.batch_config
662 }
663
664 pub fn frontend_config(&self) -> &FrontendConfig {
665 &self.frontend_config
666 }
667
668 pub fn streaming_config(&self) -> &StreamingConfig {
669 &self.streaming_config
670 }
671
672 pub fn udf_config(&self) -> &UdfConfig {
673 &self.udf_config
674 }
675
676 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
677 self.source_metrics.clone()
678 }
679
680 pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
681 self.spill_metrics.clone()
682 }
683
684 pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
685 &self.creating_streaming_job_tracker
686 }
687
688 pub fn sessions_map(&self) -> &SessionMapRef {
689 &self.sessions_map
690 }
691
692 pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
693 self.compute_runtime.clone()
694 }
695
696 pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
699 cancel_queries_in_session(session_id, self.sessions_map.clone())
700 }
701
702 pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
705 cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
706 }
707
708 pub fn mem_context(&self) -> MemoryContext {
709 self.mem_context.clone()
710 }
711}
712
713#[derive(Clone)]
714pub struct AuthContext {
715 pub database: String,
716 pub user_name: String,
717 pub user_id: UserId,
718}
719
720impl AuthContext {
721 pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
722 Self {
723 database,
724 user_name,
725 user_id,
726 }
727 }
728}
729pub struct SessionImpl {
730 env: FrontendEnv,
731 auth_context: Arc<RwLock<AuthContext>>,
732 user_authenticator: UserAuthenticator,
734 config_map: Arc<RwLock<SessionConfig>>,
736
737 notice_tx: UnboundedSender<String>,
739 notice_rx: Mutex<UnboundedReceiver<String>>,
741
742 id: (i32, i32),
744
745 peer_addr: AddressRef,
747
748 txn: Arc<Mutex<transaction::State>>,
752
753 current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
757
758 exec_context: Mutex<Option<Weak<ExecContext>>>,
760
761 last_idle_instant: Arc<Mutex<Option<Instant>>>,
763
764 cursor_manager: Arc<CursorManager>,
765
766 temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
768
769 staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
771}
772
773#[derive(Default, Clone)]
782pub struct TemporarySourceManager {
783 sources: HashMap<String, SourceCatalog>,
784}
785
786impl TemporarySourceManager {
787 pub fn new() -> Self {
788 Self {
789 sources: HashMap::new(),
790 }
791 }
792
793 pub fn create_source(&mut self, name: String, source: SourceCatalog) {
794 self.sources.insert(name, source);
795 }
796
797 pub fn drop_source(&mut self, name: &str) {
798 self.sources.remove(name);
799 }
800
801 pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
802 self.sources.get(name)
803 }
804
805 pub fn keys(&self) -> Vec<String> {
806 self.sources.keys().cloned().collect()
807 }
808}
809
810#[derive(Default, Clone)]
812pub struct StagingCatalogManager {
813 tables: HashMap<String, TableCatalog>,
815}
816
817impl StagingCatalogManager {
818 pub fn new() -> Self {
819 Self {
820 tables: HashMap::new(),
821 }
822 }
823
824 pub fn create_table(&mut self, name: String, table: TableCatalog) {
825 self.tables.insert(name, table);
826 }
827
828 pub fn drop_table(&mut self, name: &str) {
829 self.tables.remove(name);
830 }
831
832 pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
833 self.tables.get(name)
834 }
835}
836
837#[derive(Error, Debug)]
838pub enum CheckRelationError {
839 #[error("{0}")]
840 Resolve(#[from] ResolveQualifiedNameError),
841 #[error("{0}")]
842 Catalog(#[from] CatalogError),
843}
844
845impl From<CheckRelationError> for RwError {
846 fn from(e: CheckRelationError) -> Self {
847 match e {
848 CheckRelationError::Resolve(e) => e.into(),
849 CheckRelationError::Catalog(e) => e.into(),
850 }
851 }
852}
853
854impl SessionImpl {
855 pub(crate) fn new(
856 env: FrontendEnv,
857 auth_context: AuthContext,
858 user_authenticator: UserAuthenticator,
859 id: SessionId,
860 peer_addr: AddressRef,
861 session_config: SessionConfig,
862 ) -> Self {
863 let cursor_metrics = env.cursor_metrics.clone();
864 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
865
866 Self {
867 env,
868 auth_context: Arc::new(RwLock::new(auth_context)),
869 user_authenticator,
870 config_map: Arc::new(RwLock::new(session_config)),
871 id,
872 peer_addr,
873 txn: Default::default(),
874 current_query_cancel_flag: Mutex::new(None),
875 notice_tx,
876 notice_rx: Mutex::new(notice_rx),
877 exec_context: Mutex::new(None),
878 last_idle_instant: Default::default(),
879 cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
880 temporary_source_manager: Default::default(),
881 staging_catalog_manager: Default::default(),
882 }
883 }
884
885 #[cfg(test)]
886 pub fn mock() -> Self {
887 let env = FrontendEnv::mock();
888 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
889
890 Self {
891 env: FrontendEnv::mock(),
892 auth_context: Arc::new(RwLock::new(AuthContext::new(
893 DEFAULT_DATABASE_NAME.to_owned(),
894 DEFAULT_SUPER_USER.to_owned(),
895 DEFAULT_SUPER_USER_ID,
896 ))),
897 user_authenticator: UserAuthenticator::None,
898 config_map: Default::default(),
899 id: (0, 0),
901 txn: Default::default(),
902 current_query_cancel_flag: Mutex::new(None),
903 notice_tx,
904 notice_rx: Mutex::new(notice_rx),
905 exec_context: Mutex::new(None),
906 peer_addr: Address::Tcp(SocketAddr::new(
907 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
908 8080,
909 ))
910 .into(),
911 last_idle_instant: Default::default(),
912 cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
913 temporary_source_manager: Default::default(),
914 staging_catalog_manager: Default::default(),
915 }
916 }
917
918 pub(crate) fn env(&self) -> &FrontendEnv {
919 &self.env
920 }
921
922 pub fn auth_context(&self) -> Arc<AuthContext> {
923 let ctx = self.auth_context.read();
924 Arc::new(ctx.clone())
925 }
926
927 pub fn database(&self) -> String {
928 self.auth_context.read().database.clone()
929 }
930
931 pub fn database_id(&self) -> DatabaseId {
932 let db_name = self.database();
933 self.env
934 .catalog_reader()
935 .read_guard()
936 .get_database_by_name(&db_name)
937 .map(|db| db.id())
938 .expect("session database not found")
939 }
940
941 pub fn user_name(&self) -> String {
942 self.auth_context.read().user_name.clone()
943 }
944
945 pub fn user_id(&self) -> UserId {
946 self.auth_context.read().user_id
947 }
948
949 pub fn update_database(&self, database: String) {
950 self.auth_context.write().database = database;
951 }
952
953 pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
954 Arc::clone(&self.config_map)
955 }
956
957 pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
958 self.config_map.read()
959 }
960
961 pub fn set_config(&self, key: &str, value: String) -> Result<String> {
962 self.config_map
963 .write()
964 .set(key, value, &mut ())
965 .map_err(Into::into)
966 }
967
968 pub fn reset_config(&self, key: &str) -> Result<String> {
969 self.config_map
970 .write()
971 .reset(key, &mut ())
972 .map_err(Into::into)
973 }
974
975 pub fn set_config_report(
976 &self,
977 key: &str,
978 value: Option<String>,
979 mut reporter: impl ConfigReporter,
980 ) -> Result<String> {
981 if let Some(value) = value {
982 self.config_map
983 .write()
984 .set(key, value, &mut reporter)
985 .map_err(Into::into)
986 } else {
987 self.config_map
988 .write()
989 .reset(key, &mut reporter)
990 .map_err(Into::into)
991 }
992 }
993
994 pub fn session_id(&self) -> SessionId {
995 self.id
996 }
997
998 pub fn running_sql(&self) -> Option<Arc<str>> {
999 self.exec_context
1000 .lock()
1001 .as_ref()
1002 .and_then(|weak| weak.upgrade())
1003 .map(|context| context.running_sql.clone())
1004 }
1005
1006 pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1007 self.cursor_manager.clone()
1008 }
1009
1010 pub fn peer_addr(&self) -> &Address {
1011 &self.peer_addr
1012 }
1013
1014 pub fn elapse_since_running_sql(&self) -> Option<u128> {
1015 self.exec_context
1016 .lock()
1017 .as_ref()
1018 .and_then(|weak| weak.upgrade())
1019 .map(|context| context.last_instant.elapsed().as_millis())
1020 }
1021
1022 pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1023 self.last_idle_instant
1024 .lock()
1025 .as_ref()
1026 .map(|x| x.elapsed().as_millis())
1027 }
1028
1029 pub fn check_relation_name_duplicated(
1030 &self,
1031 name: ObjectName,
1032 stmt_type: StatementType,
1033 if_not_exists: bool,
1034 ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1035 let db_name = &self.database();
1036 let catalog_reader = self.env().catalog_reader().read_guard();
1037 let (schema_name, relation_name) = {
1038 let (schema_name, relation_name) =
1039 Binder::resolve_schema_qualified_name(db_name, &name)?;
1040 let search_path = self.config().search_path();
1041 let user_name = &self.user_name();
1042 let schema_name = match schema_name {
1043 Some(schema_name) => schema_name,
1044 None => catalog_reader
1045 .first_valid_schema(db_name, &search_path, user_name)?
1046 .name(),
1047 };
1048 (schema_name, relation_name)
1049 };
1050 match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1051 Err(e) if if_not_exists => {
1052 if let CatalogErrorInner::Duplicated {
1053 name,
1054 under_creation,
1055 ..
1056 } = e.inner()
1057 {
1058 if !*under_creation {
1063 Ok(Either::Right(
1064 PgResponse::builder(stmt_type)
1065 .notice(format!("relation \"{}\" already exists, skipping", name))
1066 .into(),
1067 ))
1068 } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1069 Ok(Either::Right(
1072 PgResponse::builder(stmt_type)
1073 .notice(format!(
1074 "relation \"{}\" already exists but still creating, skipping",
1075 name
1076 ))
1077 .into(),
1078 ))
1079 } else {
1080 Ok(Either::Left(()))
1081 }
1082 } else {
1083 Err(e.into())
1084 }
1085 }
1086 Err(e) => Err(e.into()),
1087 Ok(_) => Ok(Either::Left(())),
1088 }
1089 }
1090
1091 pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1092 let db_name = &self.database();
1093 let catalog_reader = self.env().catalog_reader().read_guard();
1094 let (schema_name, secret_name) = {
1095 let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1096 let search_path = self.config().search_path();
1097 let user_name = &self.user_name();
1098 let schema_name = match schema_name {
1099 Some(schema_name) => schema_name,
1100 None => catalog_reader
1101 .first_valid_schema(db_name, &search_path, user_name)?
1102 .name(),
1103 };
1104 (schema_name, secret_name)
1105 };
1106 catalog_reader
1107 .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1108 .map_err(RwError::from)
1109 }
1110
1111 pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1112 let db_name = &self.database();
1113 let catalog_reader = self.env().catalog_reader().read_guard();
1114 let (schema_name, connection_name) = {
1115 let (schema_name, connection_name) =
1116 Binder::resolve_schema_qualified_name(db_name, &name)?;
1117 let search_path = self.config().search_path();
1118 let user_name = &self.user_name();
1119 let schema_name = match schema_name {
1120 Some(schema_name) => schema_name,
1121 None => catalog_reader
1122 .first_valid_schema(db_name, &search_path, user_name)?
1123 .name(),
1124 };
1125 (schema_name, connection_name)
1126 };
1127 catalog_reader
1128 .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1129 .map_err(RwError::from)
1130 }
1131
1132 pub fn check_function_name_duplicated(
1133 &self,
1134 stmt_type: StatementType,
1135 name: ObjectName,
1136 arg_types: &[DataType],
1137 if_not_exists: bool,
1138 ) -> Result<Either<(), RwPgResponse>> {
1139 let db_name = &self.database();
1140 let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1141 let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1142
1143 let catalog_reader = self.env().catalog_reader().read_guard();
1144 if catalog_reader
1145 .get_schema_by_id(database_id, schema_id)?
1146 .get_function_by_name_args(&function_name, arg_types)
1147 .is_some()
1148 {
1149 let full_name = format!(
1150 "{function_name}({})",
1151 arg_types.iter().map(|t| t.to_string()).join(",")
1152 );
1153 if if_not_exists {
1154 Ok(Either::Right(
1155 PgResponse::builder(stmt_type)
1156 .notice(format!(
1157 "function \"{}\" already exists, skipping",
1158 full_name
1159 ))
1160 .into(),
1161 ))
1162 } else {
1163 Err(CatalogError::duplicated("function", full_name).into())
1164 }
1165 } else {
1166 Ok(Either::Left(()))
1167 }
1168 }
1169
1170 pub fn get_database_and_schema_id_for_create(
1172 &self,
1173 schema_name: Option<String>,
1174 ) -> Result<(DatabaseId, SchemaId)> {
1175 let db_name = &self.database();
1176
1177 let search_path = self.config().search_path();
1178 let user_name = &self.user_name();
1179
1180 let catalog_reader = self.env().catalog_reader().read_guard();
1181 let schema = match schema_name {
1182 Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1183 None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1184 };
1185 let schema_name = schema.name();
1186
1187 check_schema_writable(&schema_name)?;
1188 self.check_privileges(&[ObjectCheckItem::new(
1189 schema.owner(),
1190 AclMode::Create,
1191 schema_name,
1192 schema.id(),
1193 )])?;
1194
1195 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1196 Ok((db_id, schema.id()))
1197 }
1198
1199 pub fn get_connection_by_name(
1200 &self,
1201 schema_name: Option<String>,
1202 connection_name: &str,
1203 ) -> Result<Arc<ConnectionCatalog>> {
1204 let db_name = &self.database();
1205 let search_path = self.config().search_path();
1206 let user_name = &self.user_name();
1207
1208 let catalog_reader = self.env().catalog_reader().read_guard();
1209 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1210 let (connection, _) =
1211 catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1212
1213 self.check_privileges(&[ObjectCheckItem::new(
1214 connection.owner(),
1215 AclMode::Usage,
1216 connection.name.clone(),
1217 connection.id,
1218 )])?;
1219
1220 Ok(connection.clone())
1221 }
1222
1223 pub fn get_subscription_by_schema_id_name(
1224 &self,
1225 schema_id: SchemaId,
1226 subscription_name: &str,
1227 ) -> Result<Arc<SubscriptionCatalog>> {
1228 let db_name = &self.database();
1229
1230 let catalog_reader = self.env().catalog_reader().read_guard();
1231 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1232 let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1233 let subscription = schema
1234 .get_subscription_by_name(subscription_name)
1235 .ok_or_else(|| {
1236 RwError::from(ErrorCode::ItemNotFound(format!(
1237 "subscription {} not found",
1238 subscription_name
1239 )))
1240 })?;
1241 Ok(subscription.clone())
1242 }
1243
1244 pub fn get_subscription_by_name(
1245 &self,
1246 schema_name: Option<String>,
1247 subscription_name: &str,
1248 ) -> Result<Arc<SubscriptionCatalog>> {
1249 let db_name = &self.database();
1250 let search_path = self.config().search_path();
1251 let user_name = &self.user_name();
1252
1253 let catalog_reader = self.env().catalog_reader().read_guard();
1254 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1255 let (subscription, _) =
1256 catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1257 Ok(subscription.clone())
1258 }
1259
1260 pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1261 let catalog_reader = self.env().catalog_reader().read_guard();
1262 Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1263 }
1264
1265 pub fn get_table_by_name(
1266 &self,
1267 table_name: &str,
1268 db_id: DatabaseId,
1269 schema_id: SchemaId,
1270 ) -> Result<Arc<TableCatalog>> {
1271 let catalog_reader = self.env().catalog_reader().read_guard();
1272 let table = catalog_reader
1273 .get_schema_by_id(db_id, schema_id)?
1274 .get_created_table_by_name(table_name)
1275 .ok_or_else(|| {
1276 Error::new(
1277 ErrorKind::InvalidInput,
1278 format!("table \"{}\" does not exist", table_name),
1279 )
1280 })?;
1281
1282 self.check_privileges(&[ObjectCheckItem::new(
1283 table.owner(),
1284 AclMode::Select,
1285 table_name.to_owned(),
1286 table.id,
1287 )])?;
1288
1289 Ok(table.clone())
1290 }
1291
1292 pub fn get_secret_by_name(
1293 &self,
1294 schema_name: Option<String>,
1295 secret_name: &str,
1296 ) -> Result<Arc<SecretCatalog>> {
1297 let db_name = &self.database();
1298 let search_path = self.config().search_path();
1299 let user_name = &self.user_name();
1300
1301 let catalog_reader = self.env().catalog_reader().read_guard();
1302 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1303 let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1304
1305 self.check_privileges(&[ObjectCheckItem::new(
1306 secret.owner(),
1307 AclMode::Usage,
1308 secret.name.clone(),
1309 secret.id,
1310 )])?;
1311
1312 Ok(secret.clone())
1313 }
1314
1315 pub fn list_change_log_epochs(
1316 &self,
1317 table_id: TableId,
1318 min_epoch: u64,
1319 max_count: u32,
1320 ) -> Result<Vec<u64>> {
1321 Ok(self
1322 .env
1323 .hummock_snapshot_manager()
1324 .acquire()
1325 .list_change_log_epochs(table_id, min_epoch, max_count))
1326 }
1327
1328 pub fn clear_cancel_query_flag(&self) {
1329 let mut flag = self.current_query_cancel_flag.lock();
1330 *flag = None;
1331 }
1332
1333 pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1334 let mut flag = self.current_query_cancel_flag.lock();
1335 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1336 *flag = Some(shutdown_tx);
1337 shutdown_rx
1338 }
1339
1340 pub fn cancel_current_query(&self) {
1341 let mut flag_guard = self.current_query_cancel_flag.lock();
1342 if let Some(sender) = flag_guard.take() {
1343 info!("Trying to cancel query in local mode.");
1344 sender.cancel();
1346 info!("Cancel query request sent.");
1347 } else {
1348 info!("Trying to cancel query in distributed mode.");
1349 self.env.query_manager().cancel_queries_in_session(self.id)
1350 }
1351 }
1352
1353 pub fn cancel_current_creating_job(&self) {
1354 self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1355 }
1356
1357 pub async fn run_statement(
1360 self: Arc<Self>,
1361 sql: Arc<str>,
1362 formats: Vec<Format>,
1363 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1364 let mut stmts = Parser::parse_sql(&sql)?;
1366 if stmts.is_empty() {
1367 return Ok(PgResponse::empty_result(
1368 pgwire::pg_response::StatementType::EMPTY,
1369 ));
1370 }
1371 if stmts.len() > 1 {
1372 return Ok(
1373 PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1374 .notice("cannot insert multiple commands into statement")
1375 .into(),
1376 );
1377 }
1378 let stmt = stmts.swap_remove(0);
1379 let rsp = handle(self, stmt, sql.clone(), formats).await?;
1380 Ok(rsp)
1381 }
1382
1383 pub fn notice_to_user(&self, str: impl Into<String>) {
1384 let notice = str.into();
1385 tracing::trace!(notice, "notice to user");
1386 self.notice_tx
1387 .send(notice)
1388 .expect("notice channel should not be closed");
1389 }
1390
1391 pub fn is_barrier_read(&self) -> bool {
1392 match self.config().visibility_mode() {
1393 VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1394 VisibilityMode::All => true,
1395 VisibilityMode::Checkpoint => false,
1396 }
1397 }
1398
1399 pub fn statement_timeout(&self) -> Duration {
1400 if self.config().statement_timeout() == 0 {
1401 Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1402 } else {
1403 Duration::from_secs(self.config().statement_timeout() as u64)
1404 }
1405 }
1406
1407 pub fn create_temporary_source(&self, source: SourceCatalog) {
1408 self.temporary_source_manager
1409 .lock()
1410 .create_source(source.name.clone(), source);
1411 }
1412
1413 pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1414 self.temporary_source_manager
1415 .lock()
1416 .get_source(name)
1417 .cloned()
1418 }
1419
1420 pub fn drop_temporary_source(&self, name: &str) {
1421 self.temporary_source_manager.lock().drop_source(name);
1422 }
1423
1424 pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1425 self.temporary_source_manager.lock().clone()
1426 }
1427
1428 pub fn create_staging_table(&self, table: TableCatalog) {
1429 self.staging_catalog_manager
1430 .lock()
1431 .create_table(table.name.clone(), table);
1432 }
1433
1434 pub fn drop_staging_table(&self, name: &str) {
1435 self.staging_catalog_manager.lock().drop_table(name);
1436 }
1437
1438 pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1439 self.staging_catalog_manager.lock().clone()
1440 }
1441
1442 pub async fn check_cluster_limits(&self) -> Result<()> {
1443 if self.config().bypass_cluster_limits() {
1444 return Ok(());
1445 }
1446
1447 let gen_message = |ActorCountPerParallelism {
1448 worker_id_to_actor_count,
1449 hard_limit,
1450 soft_limit,
1451 }: ActorCountPerParallelism,
1452 exceed_hard_limit: bool|
1453 -> String {
1454 let (limit_type, action) = if exceed_hard_limit {
1455 ("critical", "Scale the cluster immediately to proceed.")
1456 } else {
1457 (
1458 "recommended",
1459 "Consider scaling the cluster for optimal performance.",
1460 )
1461 };
1462 format!(
1463 r#"Actor count per parallelism exceeds the {limit_type} limit.
1464
1465Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1466
1467HINT:
1468- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1469- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1470 See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1471- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1472
1473DETAILS:
1474- hard limit: {hard_limit}
1475- soft limit: {soft_limit}
1476- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1477 )
1478 };
1479
1480 let limits = self.env().meta_client().get_cluster_limits().await?;
1481 for limit in limits {
1482 match limit {
1483 cluster_limit::ClusterLimit::ActorCount(l) => {
1484 if l.exceed_hard_limit() {
1485 return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1486 l, true,
1487 ))));
1488 } else if l.exceed_soft_limit() {
1489 self.notice_to_user(gen_message(l, false));
1490 }
1491 }
1492 }
1493 }
1494 Ok(())
1495 }
1496}
1497
1498pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1499 std::sync::OnceLock::new();
1500
1501pub struct SessionManagerImpl {
1502 env: FrontendEnv,
1503 _join_handles: Vec<JoinHandle<()>>,
1504 _shutdown_senders: Vec<Sender<()>>,
1505 number: AtomicI32,
1506}
1507
1508impl SessionManager for SessionManagerImpl {
1509 type Error = RwError;
1510 type Session = SessionImpl;
1511
1512 fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1513 let dummy_addr = Address::Tcp(SocketAddr::new(
1514 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1515 5691, ));
1517
1518 self.connect_inner(
1521 database_id,
1522 risingwave_common::catalog::DEFAULT_SUPER_USER,
1523 Arc::new(dummy_addr),
1524 )
1525 }
1526
1527 fn connect(
1528 &self,
1529 database: &str,
1530 user_name: &str,
1531 peer_addr: AddressRef,
1532 ) -> Result<Arc<Self::Session>> {
1533 let catalog_reader = self.env.catalog_reader();
1534 let reader = catalog_reader.read_guard();
1535 let database_id = reader.get_database_by_name(database)?.id();
1536
1537 self.connect_inner(database_id, user_name, peer_addr)
1538 }
1539
1540 fn cancel_queries_in_session(&self, session_id: SessionId) {
1542 self.env.cancel_queries_in_session(session_id);
1543 }
1544
1545 fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1546 self.env.cancel_creating_jobs_in_session(session_id);
1547 }
1548
1549 fn end_session(&self, session: &Self::Session) {
1550 self.delete_session(&session.session_id());
1551 }
1552
1553 async fn shutdown(&self) {
1554 self.env.sessions_map().write().clear();
1556 self.env.meta_client().try_unregister().await;
1558 }
1559}
1560
1561impl SessionManagerImpl {
1562 pub async fn new(opts: FrontendOpts) -> Result<Self> {
1563 let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1565 Ok(Self {
1566 env,
1567 _join_handles: join_handles,
1568 _shutdown_senders: shutdown_senders,
1569 number: AtomicI32::new(0),
1570 })
1571 }
1572
1573 pub(crate) fn env(&self) -> &FrontendEnv {
1574 &self.env
1575 }
1576
1577 fn insert_session(&self, session: Arc<SessionImpl>) {
1578 let active_sessions = {
1579 let mut write_guard = self.env.sessions_map.write();
1580 write_guard.insert(session.id(), session);
1581 write_guard.len()
1582 };
1583 self.env
1584 .frontend_metrics
1585 .active_sessions
1586 .set(active_sessions as i64);
1587 }
1588
1589 fn delete_session(&self, session_id: &SessionId) {
1590 let active_sessions = {
1591 let mut write_guard = self.env.sessions_map.write();
1592 write_guard.remove(session_id);
1593 write_guard.len()
1594 };
1595 self.env
1596 .frontend_metrics
1597 .active_sessions
1598 .set(active_sessions as i64);
1599 }
1600
1601 fn connect_inner(
1602 &self,
1603 database_id: DatabaseId,
1604 user_name: &str,
1605 peer_addr: AddressRef,
1606 ) -> Result<Arc<SessionImpl>> {
1607 let catalog_reader = self.env.catalog_reader();
1608 let reader = catalog_reader.read_guard();
1609 let (database_name, database_owner) = {
1610 let db = reader.get_database_by_id(database_id)?;
1611 (db.name(), db.owner())
1612 };
1613
1614 let user_reader = self.env.user_info_reader();
1615 let reader = user_reader.read_guard();
1616 if let Some(user) = reader.get_user_by_name(user_name) {
1617 if !user.can_login {
1618 bail_permission_denied!("User {} is not allowed to login", user_name);
1619 }
1620 let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1621 if !user.is_super && database_owner != user.id && !has_privilege {
1622 bail_permission_denied!("User does not have CONNECT privilege.");
1623 }
1624
1625 let (connection_type, client_addr) = match peer_addr.as_ref() {
1627 Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1628 Address::Unix(_) => (ConnectionType::Local, None),
1629 };
1630 tracing::debug!(
1631 "receive connection: type={:?}, client_addr={:?}",
1632 connection_type,
1633 client_addr
1634 );
1635
1636 let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1637 &connection_type,
1638 database_name,
1639 user_name,
1640 client_addr,
1641 );
1642
1643 let Some(hba_entry_opt) = hba_entry_opt else {
1645 bail_permission_denied!(
1646 "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1647 );
1648 };
1649
1650 let authenticator_by_info = || -> Result<UserAuthenticator> {
1652 let authenticator = match &user.auth_info {
1653 None => UserAuthenticator::None,
1654 Some(auth_info) => match auth_info.encryption_type() {
1655 EncryptionType::Plaintext => {
1656 UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1657 }
1658 EncryptionType::Md5 => {
1659 let mut salt = [0; 4];
1660 let mut rng = rand::rng();
1661 rng.fill_bytes(&mut salt);
1662 UserAuthenticator::Md5WithSalt {
1663 encrypted_password: md5_hash_with_salt(
1664 &auth_info.encrypted_value,
1665 &salt,
1666 ),
1667 salt,
1668 }
1669 }
1670 EncryptionType::Oauth => UserAuthenticator::OAuth {
1671 metadata: auth_info.metadata.clone(),
1672 cluster_id: self.env.meta_client().cluster_id().to_owned(),
1673 },
1674 _ => {
1675 bail_protocol_error!(
1676 "Unsupported auth type: {}",
1677 auth_info.encryption_type().as_str_name()
1678 );
1679 }
1680 },
1681 };
1682 Ok(authenticator)
1683 };
1684
1685 let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1686 (AuthMethod::Trust, _) => UserAuthenticator::None,
1687 (AuthMethod::Password, _) => authenticator_by_info()?,
1689 (AuthMethod::Md5, Some(auth_info))
1690 if auth_info.encryption_type() == EncryptionType::Md5 =>
1691 {
1692 authenticator_by_info()?
1693 }
1694 (AuthMethod::OAuth, Some(auth_info))
1695 if auth_info.encryption_type() == EncryptionType::Oauth =>
1696 {
1697 authenticator_by_info()?
1698 }
1699 (AuthMethod::Ldap, _) => {
1700 UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1701 }
1702 _ => {
1703 bail_permission_denied!(
1704 "password authentication failed for user \"{user_name}\""
1705 );
1706 }
1707 };
1708
1709 let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1711 let id = (secret_key, secret_key);
1713 let session_config = self.env.session_params_snapshot();
1715
1716 let session_impl: Arc<SessionImpl> = SessionImpl::new(
1717 self.env.clone(),
1718 AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1719 user_authenticator,
1720 id,
1721 peer_addr,
1722 session_config,
1723 )
1724 .into();
1725 self.insert_session(session_impl.clone());
1726
1727 Ok(session_impl)
1728 } else {
1729 bail_catalog_error!("Role {} does not exist", user_name);
1730 }
1731 }
1732}
1733
1734impl Session for SessionImpl {
1735 type Error = RwError;
1736 type Portal = Portal;
1737 type PreparedStatement = PrepareStatement;
1738 type ValuesStream = PgResponseStream;
1739
1740 async fn run_one_query(
1743 self: Arc<Self>,
1744 stmt: Statement,
1745 format: Format,
1746 ) -> Result<PgResponse<PgResponseStream>> {
1747 let string = stmt.to_string();
1748 let sql_str = string.as_str();
1749 let sql: Arc<str> = Arc::from(sql_str);
1750 drop(string);
1752 let rsp = handle(self, stmt, sql, vec![format]).await?;
1753 Ok(rsp)
1754 }
1755
1756 fn user_authenticator(&self) -> &UserAuthenticator {
1757 &self.user_authenticator
1758 }
1759
1760 fn id(&self) -> SessionId {
1761 self.id
1762 }
1763
1764 async fn parse(
1765 self: Arc<Self>,
1766 statement: Option<Statement>,
1767 params_types: Vec<Option<DataType>>,
1768 ) -> Result<PrepareStatement> {
1769 Ok(if let Some(statement) = statement {
1770 handle_parse(self, statement, params_types).await?
1771 } else {
1772 PrepareStatement::Empty
1773 })
1774 }
1775
1776 fn bind(
1777 self: Arc<Self>,
1778 prepare_statement: PrepareStatement,
1779 params: Vec<Option<Bytes>>,
1780 param_formats: Vec<Format>,
1781 result_formats: Vec<Format>,
1782 ) -> Result<Portal> {
1783 handle_bind(prepare_statement, params, param_formats, result_formats)
1784 }
1785
1786 async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1787 let rsp = handle_execute(self, portal).await?;
1788 Ok(rsp)
1789 }
1790
1791 fn describe_statement(
1792 self: Arc<Self>,
1793 prepare_statement: PrepareStatement,
1794 ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1795 Ok(match prepare_statement {
1796 PrepareStatement::Empty => (vec![], vec![]),
1797 PrepareStatement::Prepared(prepare_statement) => (
1798 prepare_statement.bound_result.param_types,
1799 infer(
1800 Some(prepare_statement.bound_result.bound),
1801 prepare_statement.statement,
1802 )?,
1803 ),
1804 PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1805 })
1806 }
1807
1808 fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1809 match portal {
1810 Portal::Empty => Ok(vec![]),
1811 Portal::Portal(portal) => {
1812 let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1813 let formats = FormatIterator::new(&portal.result_formats, columns.len())
1814 .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1815 columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1816 if f == Format::Binary {
1817 c.set_to_binary()
1818 }
1819 });
1820 Ok(columns)
1821 }
1822 Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1823 }
1824 }
1825
1826 fn get_config(&self, key: &str) -> Result<String> {
1827 self.config().get(key).map_err(Into::into)
1828 }
1829
1830 fn set_config(&self, key: &str, value: String) -> Result<String> {
1831 Self::set_config(self, key, value)
1832 }
1833
1834 async fn next_notice(self: &Arc<Self>) -> String {
1835 std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1836 .await
1837 .expect("notice channel should not be closed")
1838 }
1839
1840 fn transaction_status(&self) -> TransactionStatus {
1841 match &*self.txn.lock() {
1842 transaction::State::Initial | transaction::State::Implicit(_) => {
1843 TransactionStatus::Idle
1844 }
1845 transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1846 }
1848 }
1849
1850 fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1852 let exec_context = Arc::new(ExecContext {
1853 running_sql: sql,
1854 last_instant: Instant::now(),
1855 last_idle_instant: self.last_idle_instant.clone(),
1856 });
1857 *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1858 *self.last_idle_instant.lock() = None;
1860 ExecContextGuard::new(exec_context)
1861 }
1862
1863 fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1866 if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1868 let idle_in_transaction_session_timeout =
1869 self.config().idle_in_transaction_session_timeout() as u128;
1870 if idle_in_transaction_session_timeout != 0 {
1872 let guard = self.exec_context.lock();
1874 if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1876 if let Some(elapse_since_last_idle_instant) =
1878 self.elapse_since_last_idle_instant()
1879 && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1880 {
1881 return Err(PsqlError::IdleInTxnTimeout);
1882 }
1883 }
1884 }
1885 }
1886 Ok(())
1887 }
1888}
1889
1890fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1892 match stmt {
1893 Statement::Query(_)
1894 | Statement::Insert { .. }
1895 | Statement::Delete { .. }
1896 | Statement::Update { .. }
1897 | Statement::FetchCursor { .. } => Ok(bound
1898 .unwrap()
1899 .output_fields()
1900 .iter()
1901 .map(to_pg_field)
1902 .collect()),
1903 Statement::ShowObjects {
1904 object: show_object,
1905 ..
1906 } => Ok(infer_show_object(&show_object)),
1907 Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1908 Statement::ShowTransactionIsolationLevel => {
1909 let name = "transaction_isolation";
1910 Ok(infer_show_variable(name))
1911 }
1912 Statement::ShowVariable { variable } => {
1913 let name = &variable[0].real_value().to_lowercase();
1914 Ok(infer_show_variable(name))
1915 }
1916 Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1917 Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1918 "QUERY PLAN".to_owned(),
1919 DataType::Varchar.to_oid(),
1920 DataType::Varchar.type_len(),
1921 )]),
1922 _ => Ok(vec![]),
1923 }
1924}
1925
1926pub struct WorkerProcessId {
1927 pub worker_id: WorkerId,
1928 pub process_id: i32,
1929}
1930
1931impl WorkerProcessId {
1932 pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1933 Self {
1934 worker_id,
1935 process_id,
1936 }
1937 }
1938}
1939
1940impl Display for WorkerProcessId {
1941 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1942 write!(f, "{}:{}", self.worker_id, self.process_id)
1943 }
1944}
1945
1946impl TryFrom<String> for WorkerProcessId {
1947 type Error = String;
1948
1949 fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
1950 const INVALID: &str = "invalid WorkerProcessId";
1951 let splits: Vec<&str> = worker_process_id.split(":").collect();
1952 if splits.len() != 2 {
1953 return Err(INVALID.to_owned());
1954 }
1955 let Ok(worker_id) = splits[0].parse::<u32>() else {
1956 return Err(INVALID.to_owned());
1957 };
1958 let Ok(process_id) = splits[1].parse::<i32>() else {
1959 return Err(INVALID.to_owned());
1960 };
1961 Ok(WorkerProcessId::new(worker_id.into(), process_id))
1962 }
1963}
1964
1965pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1966 let guard = sessions_map.read();
1967 if let Some(session) = guard.get(&session_id) {
1968 session.cancel_current_query();
1969 true
1970 } else {
1971 info!("Current session finished, ignoring cancel query request");
1972 false
1973 }
1974}
1975
1976pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1977 let guard = sessions_map.read();
1978 if let Some(session) = guard.get(&session_id) {
1979 session.cancel_current_creating_job();
1980 true
1981 } else {
1982 info!("Current session finished, ignoring cancel creating request");
1983 false
1984 }
1985}