1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::net::{IpAddr, Ipv4Addr, SocketAddr};
19use std::sync::atomic::{AtomicI32, Ordering};
20use std::sync::{Arc, Weak};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use either::Either;
26use itertools::Itertools;
27use parking_lot::{Mutex, RwLock, RwLockReadGuard};
28use pgwire::error::{PsqlError, PsqlResult};
29use pgwire::net::{Address, AddressRef};
30use pgwire::pg_field_descriptor::PgFieldDescriptor;
31use pgwire::pg_message::TransactionStatus;
32use pgwire::pg_response::{PgResponse, StatementType};
33use pgwire::pg_server::{
34 BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
35 UserAuthenticator,
36};
37use pgwire::types::{Format, FormatIterator};
38use prometheus_http_query::Client as PrometheusClient;
39use rand::RngCore;
40use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
41use risingwave_batch::spill::spill_op::SpillOp;
42use risingwave_batch::task::{ShutdownSender, ShutdownToken};
43use risingwave_batch::worker_manager::worker_node_manager::{
44 WorkerNodeManager, WorkerNodeManagerRef,
45};
46use risingwave_common::acl::AclMode;
47#[cfg(test)]
48use risingwave_common::catalog::{
49 DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
50};
51use risingwave_common::config::{
52 AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
53 RpcClientConfig, StreamingConfig, UdfConfig, load_config,
54};
55use risingwave_common::id::WorkerId;
56use risingwave_common::memory::MemoryContext;
57use risingwave_common::secret::LocalSecretManager;
58use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
59use risingwave_common::system_param::local_manager::{
60 LocalSystemParamsManager, LocalSystemParamsManagerRef,
61};
62use risingwave_common::telemetry::manager::TelemetryManager;
63use risingwave_common::telemetry::telemetry_env_enabled;
64use risingwave_common::types::DataType;
65use risingwave_common::util::addr::HostAddr;
66use risingwave_common::util::cluster_limit;
67use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
68use risingwave_common::util::iter_util::ZipEqFast;
69use risingwave_common::util::pretty_bytes::convert;
70use risingwave_common::util::runtime::BackgroundShutdownRuntime;
71use risingwave_common::{GIT_SHA, RW_VERSION};
72use risingwave_common_heap_profiling::HeapProfiler;
73use risingwave_common_service::{MetricsManager, ObserverManager};
74use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
75use risingwave_pb::common::WorkerType;
76use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
77use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
78use risingwave_pb::health::health_server::HealthServer;
79use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
80use risingwave_pb::user::auth_info::EncryptionType;
81use risingwave_rpc_client::{
82 ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
83 MonitorClientPool, MonitorClientPoolRef,
84};
85use risingwave_sqlparser::ast::{ObjectName, Statement};
86use risingwave_sqlparser::parser::Parser;
87use thiserror::Error;
88use thiserror_ext::AsReport;
89use tokio::runtime::Builder;
90use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
91use tokio::sync::oneshot::Sender;
92use tokio::sync::watch;
93use tokio::task::JoinHandle;
94use tracing::{error, info};
95
96use self::cursor_manager::CursorManager;
97use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
98use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
99use crate::catalog::connection_catalog::ConnectionCatalog;
100use crate::catalog::root_catalog::{Catalog, SchemaPath};
101use crate::catalog::secret_catalog::SecretCatalog;
102use crate::catalog::source_catalog::SourceCatalog;
103use crate::catalog::subscription_catalog::SubscriptionCatalog;
104use crate::catalog::{
105 CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
106 check_schema_writable,
107};
108use crate::error::{
109 ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
110};
111use crate::handler::describe::infer_describe;
112use crate::handler::extended_handle::{
113 Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
114};
115use crate::handler::privilege::ObjectCheckItem;
116use crate::handler::show::{infer_show_create_object, infer_show_object};
117use crate::handler::util::to_pg_field;
118use crate::handler::variable::infer_show_variable;
119use crate::handler::{RwPgResponse, handle};
120use crate::health_service::HealthServiceImpl;
121use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
122use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
123use crate::observer::FrontendObserverNode;
124use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
125use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
126use crate::scheduler::{
127 DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
128 HummockSnapshotManagerRef, QueryManager,
129};
130use crate::telemetry::FrontendTelemetryCreator;
131use crate::user::UserId;
132use crate::user::user_authentication::md5_hash_with_salt;
133use crate::user::user_manager::UserInfoManager;
134use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
135use crate::{FrontendOpts, PgResponseStream, TableCatalog};
136
137pub(crate) mod current;
138pub(crate) mod cursor_manager;
139pub(crate) mod transaction;
140
141#[derive(Clone)]
143pub(crate) struct FrontendEnv {
144 meta_client: Arc<dyn FrontendMetaClient>,
147 catalog_writer: Arc<dyn CatalogWriter>,
148 catalog_reader: CatalogReader,
149 user_info_writer: Arc<dyn UserInfoWriter>,
150 user_info_reader: UserInfoReader,
151 worker_node_manager: WorkerNodeManagerRef,
152 query_manager: QueryManager,
153 hummock_snapshot_manager: HummockSnapshotManagerRef,
154 system_params_manager: LocalSystemParamsManagerRef,
155 session_params: Arc<RwLock<SessionConfig>>,
156
157 server_addr: HostAddr,
158 client_pool: ComputeClientPoolRef,
159 frontend_client_pool: FrontendClientPoolRef,
160 monitor_client_pool: MonitorClientPoolRef,
161
162 sessions_map: SessionMapRef,
166
167 pub frontend_metrics: Arc<FrontendMetrics>,
168
169 pub cursor_metrics: Arc<CursorMetrics>,
170
171 source_metrics: Arc<SourceMetrics>,
172
173 spill_metrics: Arc<BatchSpillMetrics>,
175
176 batch_config: BatchConfig,
177 frontend_config: FrontendConfig,
178 #[expect(dead_code)]
179 meta_config: MetaConfig,
180 streaming_config: StreamingConfig,
181 udf_config: UdfConfig,
182
183 creating_streaming_job_tracker: StreamingJobTrackerRef,
186
187 compute_runtime: Arc<BackgroundShutdownRuntime>,
190
191 mem_context: MemoryContext,
193
194 #[cfg(feature = "datafusion")]
195 df_spillable_budget_ctx: MemoryContext,
200
201 serverless_backfill_controller_addr: String,
203
204 prometheus_client: Option<PrometheusClient>,
206
207 prometheus_selector: String,
209}
210
211pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
213
214const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
216
217impl FrontendEnv {
218 pub fn mock() -> Self {
219 use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
220
221 let catalog = Arc::new(RwLock::new(Catalog::default()));
222 let meta_client = Arc::new(MockFrontendMetaClient {});
223 let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
224 let catalog_writer = Arc::new(MockCatalogWriter::new(
225 catalog.clone(),
226 hummock_snapshot_manager.clone(),
227 ));
228 let catalog_reader = CatalogReader::new(catalog);
229 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
230 let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
231 let user_info_reader = UserInfoReader::new(user_info_manager);
232 let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
233 let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
234 let compute_client_pool = Arc::new(ComputeClientPool::for_test());
235 let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
236 let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
237 let query_manager = QueryManager::new(
238 worker_node_manager.clone(),
239 compute_client_pool,
240 catalog_reader.clone(),
241 Arc::new(DistributedQueryMetrics::for_test()),
242 None,
243 None,
244 );
245 let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
246 let client_pool = Arc::new(ComputeClientPool::for_test());
247 let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
248 let runtime = {
249 let mut builder = Builder::new_multi_thread();
250 if let Some(frontend_compute_runtime_worker_threads) =
251 load_config("", FrontendOpts::default())
252 .batch
253 .frontend_compute_runtime_worker_threads
254 {
255 builder.worker_threads(frontend_compute_runtime_worker_threads);
256 }
257 builder
258 .thread_name("rw-batch-local")
259 .enable_all()
260 .build()
261 .unwrap()
262 };
263 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
264 let sessions_map = Arc::new(RwLock::new(HashMap::new()));
265 Self {
266 meta_client,
267 catalog_writer,
268 catalog_reader,
269 user_info_writer,
270 user_info_reader,
271 worker_node_manager,
272 query_manager,
273 hummock_snapshot_manager,
274 system_params_manager,
275 session_params: Default::default(),
276 server_addr,
277 client_pool,
278 frontend_client_pool,
279 monitor_client_pool,
280 sessions_map,
281 frontend_metrics: Arc::new(FrontendMetrics::for_test()),
282 cursor_metrics: Arc::new(CursorMetrics::for_test()),
283 batch_config: BatchConfig::default(),
284 frontend_config: FrontendConfig::default(),
285 meta_config: MetaConfig::default(),
286 streaming_config: StreamingConfig::default(),
287 udf_config: UdfConfig::default(),
288 source_metrics: Arc::new(SourceMetrics::default()),
289 spill_metrics: BatchSpillMetrics::for_test(),
290 creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
291 compute_runtime,
292 mem_context: MemoryContext::none(),
293 #[cfg(feature = "datafusion")]
294 df_spillable_budget_ctx: MemoryContext::none(),
295 serverless_backfill_controller_addr: Default::default(),
296 prometheus_client: None,
297 prometheus_selector: String::new(),
298 }
299 }
300
301 pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
302 let config = load_config(&opts.config_path, &opts);
303 info!("Starting frontend node");
304 info!("> config: {:?}", config);
305 info!(
306 "> debug assertions: {}",
307 if cfg!(debug_assertions) { "on" } else { "off" }
308 );
309 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
310
311 let frontend_address: HostAddr = opts
312 .advertise_addr
313 .as_ref()
314 .unwrap_or_else(|| {
315 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
316 &opts.listen_addr
317 })
318 .parse()
319 .unwrap();
320 info!("advertise addr is {}", frontend_address);
321
322 let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
323 let internal_rpc_host_addr = HostAddr {
324 host: frontend_address.host.clone(),
326 port: rpc_addr.port,
327 };
328 let (meta_client, system_params_reader) = MetaClient::register_new(
330 opts.meta_addr,
331 WorkerType::Frontend,
332 &frontend_address,
333 AddWorkerNodeProperty {
334 internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
335 ..Default::default()
336 },
337 Arc::new(config.meta.clone()),
338 )
339 .await;
340
341 let worker_id = meta_client.worker_id();
342 info!("Assigned worker node id {}", worker_id);
343
344 let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
345 meta_client.clone(),
346 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
347 );
348 let mut join_handles = vec![heartbeat_join_handle];
349 let mut shutdown_senders = vec![heartbeat_shutdown_sender];
350
351 let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
352 let hummock_snapshot_manager =
353 Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
354
355 let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
356 let catalog = Arc::new(RwLock::new(Catalog::default()));
357 let catalog_writer = Arc::new(CatalogWriterImpl::new(
358 meta_client.clone(),
359 catalog_updated_rx.clone(),
360 hummock_snapshot_manager.clone(),
361 ));
362 let catalog_reader = CatalogReader::new(catalog.clone());
363
364 let worker_node_manager = Arc::new(WorkerNodeManager::new());
365
366 let compute_client_pool = Arc::new(ComputeClientPool::new(
367 config.batch_exchange_connection_pool_size(),
368 config.batch.developer.compute_client_config.clone(),
369 ));
370 let frontend_client_pool = Arc::new(FrontendClientPool::new(
371 1,
372 config.batch.developer.frontend_client_config.clone(),
373 ));
374 let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
375 let query_manager = QueryManager::new(
376 worker_node_manager.clone(),
377 compute_client_pool.clone(),
378 catalog_reader.clone(),
379 Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
380 config.batch.distributed_query_limit,
381 config.batch.max_batch_queries_per_frontend_node,
382 );
383
384 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
385 let user_info_reader = UserInfoReader::new(user_info_manager.clone());
386 let user_info_writer = Arc::new(UserInfoWriterImpl::new(
387 meta_client.clone(),
388 catalog_updated_rx,
389 ));
390
391 let system_params_manager =
392 Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
393
394 LocalSecretManager::init(
395 opts.temp_secret_file_dir,
396 meta_client.cluster_id().to_owned(),
397 worker_id,
398 );
399
400 let session_params = Arc::new(RwLock::new(SessionConfig::default()));
402 let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
403 let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
404
405 let frontend_observer_node = FrontendObserverNode::new(
406 worker_node_manager.clone(),
407 catalog,
408 catalog_updated_tx,
409 user_info_manager,
410 hummock_snapshot_manager.clone(),
411 system_params_manager.clone(),
412 session_params.clone(),
413 compute_client_pool.clone(),
414 );
415 let observer_manager =
416 ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
417 .await;
418 let observer_join_handle = observer_manager.start().await;
419 join_handles.push(observer_join_handle);
420
421 meta_client.activate(&frontend_address).await?;
422
423 let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
424 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
425 let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
426
427 if config.server.metrics_level > MetricLevel::Disabled {
428 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
429 }
430
431 let health_srv = HealthServiceImpl::new();
432 let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
433 let monitor_srv = MonitorServiceImpl::new(config.server.clone());
434 let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
435
436 let telemetry_manager = TelemetryManager::new(
437 Arc::new(meta_client.clone()),
438 Arc::new(FrontendTelemetryCreator::new()),
439 );
440
441 if config.server.telemetry_enabled && telemetry_env_enabled() {
444 let (join_handle, shutdown_sender) = telemetry_manager.start().await;
445 join_handles.push(join_handle);
446 shutdown_senders.push(shutdown_sender);
447 } else {
448 tracing::info!("Telemetry didn't start due to config");
449 }
450
451 tokio::spawn(async move {
452 tonic::transport::Server::builder()
453 .add_service(HealthServer::new(health_srv))
454 .add_service(FrontendServiceServer::new(frontend_srv))
455 .add_service(MonitorServiceServer::new(monitor_srv))
456 .serve(frontend_rpc_addr)
457 .await
458 .unwrap();
459 });
460 info!(
461 "Health Check RPC Listener is set up on {}",
462 opts.frontend_rpc_listener_addr.clone()
463 );
464
465 let creating_streaming_job_tracker =
466 Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
467
468 let runtime = {
469 let mut builder = Builder::new_multi_thread();
470 if let Some(frontend_compute_runtime_worker_threads) =
471 config.batch.frontend_compute_runtime_worker_threads
472 {
473 builder.worker_threads(frontend_compute_runtime_worker_threads);
474 }
475 builder
476 .thread_name("rw-batch-local")
477 .enable_all()
478 .build()
479 .unwrap()
480 };
481 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
482
483 let sessions = sessions_map.clone();
484 let join_handle = tokio::spawn(async move {
486 let mut check_idle_txn_interval =
487 tokio::time::interval(core::time::Duration::from_secs(10));
488 check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
489 check_idle_txn_interval.reset();
490 loop {
491 check_idle_txn_interval.tick().await;
492 sessions.read().values().for_each(|session| {
493 let _ = session.check_idle_in_transaction_timeout();
494 })
495 }
496 });
497 join_handles.push(join_handle);
498
499 #[cfg(not(madsim))]
501 if config.batch.enable_spill {
502 SpillOp::clean_spill_directory()
503 .await
504 .map_err(|err| anyhow!(err))?;
505 }
506
507 let total_memory_bytes = opts.frontend_total_memory_bytes;
508 let heap_profiler =
509 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
510 heap_profiler.start();
512
513 let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
514 let mem_context = MemoryContext::root(
515 frontend_metrics.batch_total_mem.clone(),
516 batch_memory_limit as u64,
517 );
518 #[cfg(feature = "datafusion")]
519 let df_spillable_budget_ctx =
520 crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
521
522 let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
524 match PrometheusClient::try_from(endpoint.as_str()) {
525 Ok(client) => {
526 info!("Prometheus client initialized with endpoint: {}", endpoint);
527 Some(client)
528 }
529 Err(e) => {
530 error!(
531 error = %e.as_report(),
532 "failed to initialize Prometheus client",
533 );
534 None
535 }
536 }
537 } else {
538 None
539 };
540
541 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
543
544 info!(
545 "Frontend total_memory: {} batch_memory: {}",
546 convert(total_memory_bytes as _),
547 convert(batch_memory_limit as _),
548 );
549
550 Ok((
551 Self {
552 catalog_reader,
553 catalog_writer,
554 user_info_reader,
555 user_info_writer,
556 worker_node_manager,
557 meta_client: frontend_meta_client,
558 query_manager,
559 hummock_snapshot_manager,
560 system_params_manager,
561 session_params,
562 server_addr: frontend_address,
563 client_pool: compute_client_pool,
564 frontend_client_pool,
565 monitor_client_pool,
566 frontend_metrics,
567 cursor_metrics,
568 spill_metrics,
569 sessions_map,
570 batch_config: config.batch,
571 frontend_config: config.frontend,
572 meta_config: config.meta,
573 streaming_config: config.streaming,
574 serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
575 udf_config: config.udf,
576 source_metrics,
577 creating_streaming_job_tracker,
578 compute_runtime,
579 mem_context,
580 #[cfg(feature = "datafusion")]
581 df_spillable_budget_ctx,
582 prometheus_client,
583 prometheus_selector,
584 },
585 join_handles,
586 shutdown_senders,
587 ))
588 }
589
590 fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
595 &*self.catalog_writer
596 }
597
598 pub fn catalog_reader(&self) -> &CatalogReader {
600 &self.catalog_reader
601 }
602
603 fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
608 &*self.user_info_writer
609 }
610
611 pub fn user_info_reader(&self) -> &UserInfoReader {
613 &self.user_info_reader
614 }
615
616 pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
617 self.worker_node_manager.clone()
618 }
619
620 pub fn meta_client(&self) -> &dyn FrontendMetaClient {
621 &*self.meta_client
622 }
623
624 pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
625 self.meta_client.clone()
626 }
627
628 pub fn query_manager(&self) -> &QueryManager {
629 &self.query_manager
630 }
631
632 pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
633 &self.hummock_snapshot_manager
634 }
635
636 pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
637 &self.system_params_manager
638 }
639
640 pub fn session_params_snapshot(&self) -> SessionConfig {
641 self.session_params.read_recursive().clone()
642 }
643
644 pub fn sbc_address(&self) -> &String {
645 &self.serverless_backfill_controller_addr
646 }
647
648 pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
650 self.prometheus_client.as_ref()
651 }
652
653 pub fn prometheus_selector(&self) -> &str {
655 &self.prometheus_selector
656 }
657
658 pub fn server_address(&self) -> &HostAddr {
659 &self.server_addr
660 }
661
662 pub fn client_pool(&self) -> ComputeClientPoolRef {
663 self.client_pool.clone()
664 }
665
666 pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
667 self.frontend_client_pool.clone()
668 }
669
670 pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
671 self.monitor_client_pool.clone()
672 }
673
674 pub fn batch_config(&self) -> &BatchConfig {
675 &self.batch_config
676 }
677
678 pub fn frontend_config(&self) -> &FrontendConfig {
679 &self.frontend_config
680 }
681
682 pub fn streaming_config(&self) -> &StreamingConfig {
683 &self.streaming_config
684 }
685
686 pub fn udf_config(&self) -> &UdfConfig {
687 &self.udf_config
688 }
689
690 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
691 self.source_metrics.clone()
692 }
693
694 pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
695 self.spill_metrics.clone()
696 }
697
698 pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
699 &self.creating_streaming_job_tracker
700 }
701
702 pub fn sessions_map(&self) -> &SessionMapRef {
703 &self.sessions_map
704 }
705
706 pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
707 self.compute_runtime.clone()
708 }
709
710 pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
713 cancel_queries_in_session(session_id, self.sessions_map.clone())
714 }
715
716 pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
719 cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
720 }
721
722 pub fn mem_context(&self) -> MemoryContext {
723 self.mem_context.clone()
724 }
725
726 #[cfg(feature = "datafusion")]
727 pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
728 self.df_spillable_budget_ctx.clone()
729 }
730}
731
732#[derive(Clone)]
733pub struct AuthContext {
734 pub database: String,
735 pub user_name: String,
736 pub user_id: UserId,
737}
738
739impl AuthContext {
740 pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
741 Self {
742 database,
743 user_name,
744 user_id,
745 }
746 }
747}
748pub struct SessionImpl {
749 env: FrontendEnv,
750 auth_context: Arc<RwLock<AuthContext>>,
751 user_authenticator: UserAuthenticator,
753 config_map: Arc<RwLock<SessionConfig>>,
755
756 notice_tx: UnboundedSender<String>,
758 notice_rx: Mutex<UnboundedReceiver<String>>,
760
761 id: (i32, i32),
763
764 peer_addr: AddressRef,
766
767 txn: Arc<Mutex<transaction::State>>,
771
772 current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
776
777 exec_context: Mutex<Option<Weak<ExecContext>>>,
779
780 last_idle_instant: Arc<Mutex<Option<Instant>>>,
782
783 cursor_manager: Arc<CursorManager>,
784
785 temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
787
788 staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
790}
791
792#[derive(Default, Clone)]
801pub struct TemporarySourceManager {
802 sources: HashMap<String, SourceCatalog>,
803}
804
805impl TemporarySourceManager {
806 pub fn new() -> Self {
807 Self {
808 sources: HashMap::new(),
809 }
810 }
811
812 pub fn create_source(&mut self, name: String, source: SourceCatalog) {
813 self.sources.insert(name, source);
814 }
815
816 pub fn drop_source(&mut self, name: &str) {
817 self.sources.remove(name);
818 }
819
820 pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
821 self.sources.get(name)
822 }
823
824 pub fn keys(&self) -> Vec<String> {
825 self.sources.keys().cloned().collect()
826 }
827}
828
829#[derive(Default, Clone)]
831pub struct StagingCatalogManager {
832 tables: HashMap<String, TableCatalog>,
834}
835
836impl StagingCatalogManager {
837 pub fn new() -> Self {
838 Self {
839 tables: HashMap::new(),
840 }
841 }
842
843 pub fn create_table(&mut self, name: String, table: TableCatalog) {
844 self.tables.insert(name, table);
845 }
846
847 pub fn drop_table(&mut self, name: &str) {
848 self.tables.remove(name);
849 }
850
851 pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
852 self.tables.get(name)
853 }
854}
855
856#[derive(Error, Debug)]
857pub enum CheckRelationError {
858 #[error("{0}")]
859 Resolve(#[from] ResolveQualifiedNameError),
860 #[error("{0}")]
861 Catalog(#[from] CatalogError),
862}
863
864impl From<CheckRelationError> for RwError {
865 fn from(e: CheckRelationError) -> Self {
866 match e {
867 CheckRelationError::Resolve(e) => e.into(),
868 CheckRelationError::Catalog(e) => e.into(),
869 }
870 }
871}
872
873impl SessionImpl {
874 pub(crate) fn new(
875 env: FrontendEnv,
876 auth_context: AuthContext,
877 user_authenticator: UserAuthenticator,
878 id: SessionId,
879 peer_addr: AddressRef,
880 session_config: SessionConfig,
881 ) -> Self {
882 let cursor_metrics = env.cursor_metrics.clone();
883 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
884
885 Self {
886 env,
887 auth_context: Arc::new(RwLock::new(auth_context)),
888 user_authenticator,
889 config_map: Arc::new(RwLock::new(session_config)),
890 id,
891 peer_addr,
892 txn: Default::default(),
893 current_query_cancel_flag: Mutex::new(None),
894 notice_tx,
895 notice_rx: Mutex::new(notice_rx),
896 exec_context: Mutex::new(None),
897 last_idle_instant: Default::default(),
898 cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
899 temporary_source_manager: Default::default(),
900 staging_catalog_manager: Default::default(),
901 }
902 }
903
904 #[cfg(test)]
905 pub fn mock() -> Self {
906 let env = FrontendEnv::mock();
907 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
908
909 Self {
910 env: FrontendEnv::mock(),
911 auth_context: Arc::new(RwLock::new(AuthContext::new(
912 DEFAULT_DATABASE_NAME.to_owned(),
913 DEFAULT_SUPER_USER.to_owned(),
914 DEFAULT_SUPER_USER_ID,
915 ))),
916 user_authenticator: UserAuthenticator::None,
917 config_map: Default::default(),
918 id: (0, 0),
920 txn: Default::default(),
921 current_query_cancel_flag: Mutex::new(None),
922 notice_tx,
923 notice_rx: Mutex::new(notice_rx),
924 exec_context: Mutex::new(None),
925 peer_addr: Address::Tcp(SocketAddr::new(
926 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
927 8080,
928 ))
929 .into(),
930 last_idle_instant: Default::default(),
931 cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
932 temporary_source_manager: Default::default(),
933 staging_catalog_manager: Default::default(),
934 }
935 }
936
937 pub(crate) fn env(&self) -> &FrontendEnv {
938 &self.env
939 }
940
941 pub fn auth_context(&self) -> Arc<AuthContext> {
942 let ctx = self.auth_context.read();
943 Arc::new(ctx.clone())
944 }
945
946 pub fn database(&self) -> String {
947 self.auth_context.read().database.clone()
948 }
949
950 pub fn database_id(&self) -> DatabaseId {
951 let db_name = self.database();
952 self.env
953 .catalog_reader()
954 .read_guard()
955 .get_database_by_name(&db_name)
956 .map(|db| db.id())
957 .expect("session database not found")
958 }
959
960 pub fn user_name(&self) -> String {
961 self.auth_context.read().user_name.clone()
962 }
963
964 pub fn user_id(&self) -> UserId {
965 self.auth_context.read().user_id
966 }
967
968 pub fn update_database(&self, database: String) {
969 self.auth_context.write().database = database;
970 }
971
972 pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
973 Arc::clone(&self.config_map)
974 }
975
976 pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
977 self.config_map.read()
978 }
979
980 pub fn set_config(&self, key: &str, value: String) -> Result<String> {
981 self.config_map
982 .write()
983 .set(key, value, &mut ())
984 .map_err(Into::into)
985 }
986
987 pub fn reset_config(&self, key: &str) -> Result<String> {
988 self.config_map
989 .write()
990 .reset(key, &mut ())
991 .map_err(Into::into)
992 }
993
994 pub fn set_config_report(
995 &self,
996 key: &str,
997 value: Option<String>,
998 mut reporter: impl ConfigReporter,
999 ) -> Result<String> {
1000 if let Some(value) = value {
1001 self.config_map
1002 .write()
1003 .set(key, value, &mut reporter)
1004 .map_err(Into::into)
1005 } else {
1006 self.config_map
1007 .write()
1008 .reset(key, &mut reporter)
1009 .map_err(Into::into)
1010 }
1011 }
1012
1013 pub fn session_id(&self) -> SessionId {
1014 self.id
1015 }
1016
1017 pub fn running_sql(&self) -> Option<Arc<str>> {
1018 self.exec_context
1019 .lock()
1020 .as_ref()
1021 .and_then(|weak| weak.upgrade())
1022 .map(|context| context.running_sql.clone())
1023 }
1024
1025 pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1026 self.cursor_manager.clone()
1027 }
1028
1029 pub fn peer_addr(&self) -> &Address {
1030 &self.peer_addr
1031 }
1032
1033 pub fn elapse_since_running_sql(&self) -> Option<u128> {
1034 self.exec_context
1035 .lock()
1036 .as_ref()
1037 .and_then(|weak| weak.upgrade())
1038 .map(|context| context.last_instant.elapsed().as_millis())
1039 }
1040
1041 pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1042 self.last_idle_instant
1043 .lock()
1044 .as_ref()
1045 .map(|x| x.elapsed().as_millis())
1046 }
1047
1048 pub fn check_relation_name_duplicated(
1049 &self,
1050 name: ObjectName,
1051 stmt_type: StatementType,
1052 if_not_exists: bool,
1053 ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1054 let db_name = &self.database();
1055 let catalog_reader = self.env().catalog_reader().read_guard();
1056 let (schema_name, relation_name) = {
1057 let (schema_name, relation_name) =
1058 Binder::resolve_schema_qualified_name(db_name, &name)?;
1059 let search_path = self.config().search_path();
1060 let user_name = &self.user_name();
1061 let schema_name = match schema_name {
1062 Some(schema_name) => schema_name,
1063 None => catalog_reader
1064 .first_valid_schema(db_name, &search_path, user_name)?
1065 .name(),
1066 };
1067 (schema_name, relation_name)
1068 };
1069 match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1070 Err(e) if if_not_exists => {
1071 if let CatalogErrorInner::Duplicated {
1072 name,
1073 under_creation,
1074 ..
1075 } = e.inner()
1076 {
1077 if !*under_creation {
1082 Ok(Either::Right(
1083 PgResponse::builder(stmt_type)
1084 .notice(format!("relation \"{}\" already exists, skipping", name))
1085 .into(),
1086 ))
1087 } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1088 Ok(Either::Right(
1091 PgResponse::builder(stmt_type)
1092 .notice(format!(
1093 "relation \"{}\" already exists but still creating, skipping",
1094 name
1095 ))
1096 .into(),
1097 ))
1098 } else {
1099 Ok(Either::Left(()))
1100 }
1101 } else {
1102 Err(e.into())
1103 }
1104 }
1105 Err(e) => Err(e.into()),
1106 Ok(_) => Ok(Either::Left(())),
1107 }
1108 }
1109
1110 pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1111 let db_name = &self.database();
1112 let catalog_reader = self.env().catalog_reader().read_guard();
1113 let (schema_name, secret_name) = {
1114 let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1115 let search_path = self.config().search_path();
1116 let user_name = &self.user_name();
1117 let schema_name = match schema_name {
1118 Some(schema_name) => schema_name,
1119 None => catalog_reader
1120 .first_valid_schema(db_name, &search_path, user_name)?
1121 .name(),
1122 };
1123 (schema_name, secret_name)
1124 };
1125 catalog_reader
1126 .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1127 .map_err(RwError::from)
1128 }
1129
1130 pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1131 let db_name = &self.database();
1132 let catalog_reader = self.env().catalog_reader().read_guard();
1133 let (schema_name, connection_name) = {
1134 let (schema_name, connection_name) =
1135 Binder::resolve_schema_qualified_name(db_name, &name)?;
1136 let search_path = self.config().search_path();
1137 let user_name = &self.user_name();
1138 let schema_name = match schema_name {
1139 Some(schema_name) => schema_name,
1140 None => catalog_reader
1141 .first_valid_schema(db_name, &search_path, user_name)?
1142 .name(),
1143 };
1144 (schema_name, connection_name)
1145 };
1146 catalog_reader
1147 .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1148 .map_err(RwError::from)
1149 }
1150
1151 pub fn check_function_name_duplicated(
1152 &self,
1153 stmt_type: StatementType,
1154 name: ObjectName,
1155 arg_types: &[DataType],
1156 if_not_exists: bool,
1157 ) -> Result<Either<(), RwPgResponse>> {
1158 let db_name = &self.database();
1159 let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1160 let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1161
1162 let catalog_reader = self.env().catalog_reader().read_guard();
1163 if catalog_reader
1164 .get_schema_by_id(database_id, schema_id)?
1165 .get_function_by_name_args(&function_name, arg_types)
1166 .is_some()
1167 {
1168 let full_name = format!(
1169 "{function_name}({})",
1170 arg_types.iter().map(|t| t.to_string()).join(",")
1171 );
1172 if if_not_exists {
1173 Ok(Either::Right(
1174 PgResponse::builder(stmt_type)
1175 .notice(format!(
1176 "function \"{}\" already exists, skipping",
1177 full_name
1178 ))
1179 .into(),
1180 ))
1181 } else {
1182 Err(CatalogError::duplicated("function", full_name).into())
1183 }
1184 } else {
1185 Ok(Either::Left(()))
1186 }
1187 }
1188
1189 pub fn get_database_and_schema_id_for_create(
1191 &self,
1192 schema_name: Option<String>,
1193 ) -> Result<(DatabaseId, SchemaId)> {
1194 let db_name = &self.database();
1195
1196 let search_path = self.config().search_path();
1197 let user_name = &self.user_name();
1198
1199 let catalog_reader = self.env().catalog_reader().read_guard();
1200 let schema = match schema_name {
1201 Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1202 None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1203 };
1204 let schema_name = schema.name();
1205
1206 check_schema_writable(&schema_name)?;
1207 self.check_privileges(&[ObjectCheckItem::new(
1208 schema.owner(),
1209 AclMode::Create,
1210 schema_name,
1211 schema.id(),
1212 )])?;
1213
1214 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1215 Ok((db_id, schema.id()))
1216 }
1217
1218 pub fn get_connection_by_name(
1219 &self,
1220 schema_name: Option<String>,
1221 connection_name: &str,
1222 ) -> Result<Arc<ConnectionCatalog>> {
1223 let db_name = &self.database();
1224 let search_path = self.config().search_path();
1225 let user_name = &self.user_name();
1226
1227 let catalog_reader = self.env().catalog_reader().read_guard();
1228 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1229 let (connection, _) =
1230 catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1231
1232 self.check_privileges(&[ObjectCheckItem::new(
1233 connection.owner(),
1234 AclMode::Usage,
1235 connection.name.clone(),
1236 connection.id,
1237 )])?;
1238
1239 Ok(connection.clone())
1240 }
1241
1242 pub fn get_subscription_by_schema_id_name(
1243 &self,
1244 schema_id: SchemaId,
1245 subscription_name: &str,
1246 ) -> Result<Arc<SubscriptionCatalog>> {
1247 let db_name = &self.database();
1248
1249 let catalog_reader = self.env().catalog_reader().read_guard();
1250 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1251 let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1252 let subscription = schema
1253 .get_subscription_by_name(subscription_name)
1254 .ok_or_else(|| {
1255 RwError::from(ErrorCode::ItemNotFound(format!(
1256 "subscription {} not found",
1257 subscription_name
1258 )))
1259 })?;
1260 Ok(subscription.clone())
1261 }
1262
1263 pub fn get_subscription_by_name(
1264 &self,
1265 schema_name: Option<String>,
1266 subscription_name: &str,
1267 ) -> Result<Arc<SubscriptionCatalog>> {
1268 let db_name = &self.database();
1269 let search_path = self.config().search_path();
1270 let user_name = &self.user_name();
1271
1272 let catalog_reader = self.env().catalog_reader().read_guard();
1273 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1274 let (subscription, _) =
1275 catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1276 Ok(subscription.clone())
1277 }
1278
1279 pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1280 let catalog_reader = self.env().catalog_reader().read_guard();
1281 Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1282 }
1283
1284 pub fn get_table_by_name(
1285 &self,
1286 table_name: &str,
1287 db_id: DatabaseId,
1288 schema_id: SchemaId,
1289 ) -> Result<Arc<TableCatalog>> {
1290 let catalog_reader = self.env().catalog_reader().read_guard();
1291 let table = catalog_reader
1292 .get_schema_by_id(db_id, schema_id)?
1293 .get_created_table_by_name(table_name)
1294 .ok_or_else(|| {
1295 Error::new(
1296 ErrorKind::InvalidInput,
1297 format!("table \"{}\" does not exist", table_name),
1298 )
1299 })?;
1300
1301 self.check_privileges(&[ObjectCheckItem::new(
1302 table.owner(),
1303 AclMode::Select,
1304 table_name.to_owned(),
1305 table.id,
1306 )])?;
1307
1308 Ok(table.clone())
1309 }
1310
1311 pub fn get_secret_by_name(
1312 &self,
1313 schema_name: Option<String>,
1314 secret_name: &str,
1315 ) -> Result<Arc<SecretCatalog>> {
1316 let db_name = &self.database();
1317 let search_path = self.config().search_path();
1318 let user_name = &self.user_name();
1319
1320 let catalog_reader = self.env().catalog_reader().read_guard();
1321 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1322 let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1323
1324 self.check_privileges(&[ObjectCheckItem::new(
1325 secret.owner(),
1326 AclMode::Usage,
1327 secret.name.clone(),
1328 secret.id,
1329 )])?;
1330
1331 Ok(secret.clone())
1332 }
1333
1334 pub fn list_change_log_epochs(
1335 &self,
1336 table_id: TableId,
1337 min_epoch: u64,
1338 max_count: u32,
1339 ) -> Result<Vec<u64>> {
1340 Ok(self
1341 .env
1342 .hummock_snapshot_manager()
1343 .acquire()
1344 .list_change_log_epochs(table_id, min_epoch, max_count))
1345 }
1346
1347 pub fn clear_cancel_query_flag(&self) {
1348 let mut flag = self.current_query_cancel_flag.lock();
1349 *flag = None;
1350 }
1351
1352 pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1353 let mut flag = self.current_query_cancel_flag.lock();
1354 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1355 *flag = Some(shutdown_tx);
1356 shutdown_rx
1357 }
1358
1359 pub fn cancel_current_query(&self) {
1360 let mut flag_guard = self.current_query_cancel_flag.lock();
1361 if let Some(sender) = flag_guard.take() {
1362 info!("Trying to cancel query in local mode.");
1363 sender.cancel();
1365 info!("Cancel query request sent.");
1366 } else {
1367 info!("Trying to cancel query in distributed mode.");
1368 self.env.query_manager().cancel_queries_in_session(self.id)
1369 }
1370 }
1371
1372 pub fn cancel_current_creating_job(&self) {
1373 self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1374 }
1375
1376 pub async fn run_statement(
1379 self: Arc<Self>,
1380 sql: Arc<str>,
1381 formats: Vec<Format>,
1382 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1383 let mut stmts = Parser::parse_sql(&sql)?;
1385 if stmts.is_empty() {
1386 return Ok(PgResponse::empty_result(
1387 pgwire::pg_response::StatementType::EMPTY,
1388 ));
1389 }
1390 if stmts.len() > 1 {
1391 return Ok(
1392 PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1393 .notice("cannot insert multiple commands into statement")
1394 .into(),
1395 );
1396 }
1397 let stmt = stmts.swap_remove(0);
1398 let rsp = handle(self, stmt, sql.clone(), formats).await?;
1399 Ok(rsp)
1400 }
1401
1402 pub fn notice_to_user(&self, str: impl Into<String>) {
1403 let notice = str.into();
1404 tracing::trace!(notice, "notice to user");
1405 self.notice_tx
1406 .send(notice)
1407 .expect("notice channel should not be closed");
1408 }
1409
1410 pub fn is_barrier_read(&self) -> bool {
1411 match self.config().visibility_mode() {
1412 VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1413 VisibilityMode::All => true,
1414 VisibilityMode::Checkpoint => false,
1415 }
1416 }
1417
1418 pub fn statement_timeout(&self) -> Duration {
1419 if self.config().statement_timeout().millis() == 0 {
1420 Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1421 } else {
1422 Duration::from_millis(self.config().statement_timeout().millis() as u64)
1423 }
1424 }
1425
1426 pub fn create_temporary_source(&self, source: SourceCatalog) {
1427 self.temporary_source_manager
1428 .lock()
1429 .create_source(source.name.clone(), source);
1430 }
1431
1432 pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1433 self.temporary_source_manager
1434 .lock()
1435 .get_source(name)
1436 .cloned()
1437 }
1438
1439 pub fn drop_temporary_source(&self, name: &str) {
1440 self.temporary_source_manager.lock().drop_source(name);
1441 }
1442
1443 pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1444 self.temporary_source_manager.lock().clone()
1445 }
1446
1447 pub fn create_staging_table(&self, table: TableCatalog) {
1448 self.staging_catalog_manager
1449 .lock()
1450 .create_table(table.name.clone(), table);
1451 }
1452
1453 pub fn drop_staging_table(&self, name: &str) {
1454 self.staging_catalog_manager.lock().drop_table(name);
1455 }
1456
1457 pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1458 self.staging_catalog_manager.lock().clone()
1459 }
1460
1461 pub async fn check_cluster_limits(&self) -> Result<()> {
1462 if self.config().bypass_cluster_limits() {
1463 return Ok(());
1464 }
1465
1466 let gen_message = |ActorCountPerParallelism {
1467 worker_id_to_actor_count,
1468 hard_limit,
1469 soft_limit,
1470 }: ActorCountPerParallelism,
1471 exceed_hard_limit: bool|
1472 -> String {
1473 let (limit_type, action) = if exceed_hard_limit {
1474 ("critical", "Scale the cluster immediately to proceed.")
1475 } else {
1476 (
1477 "recommended",
1478 "Consider scaling the cluster for optimal performance.",
1479 )
1480 };
1481 format!(
1482 r#"Actor count per parallelism exceeds the {limit_type} limit.
1483
1484Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1485
1486HINT:
1487- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1488- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1489 See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1490- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1491
1492DETAILS:
1493- hard limit: {hard_limit}
1494- soft limit: {soft_limit}
1495- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1496 )
1497 };
1498
1499 let limits = self.env().meta_client().get_cluster_limits().await?;
1500 for limit in limits {
1501 match limit {
1502 cluster_limit::ClusterLimit::ActorCount(l) => {
1503 if l.exceed_hard_limit() {
1504 return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1505 l, true,
1506 ))));
1507 } else if l.exceed_soft_limit() {
1508 self.notice_to_user(gen_message(l, false));
1509 }
1510 }
1511 }
1512 }
1513 Ok(())
1514 }
1515}
1516
1517pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1518 std::sync::OnceLock::new();
1519
1520pub struct SessionManagerImpl {
1521 env: FrontendEnv,
1522 _join_handles: Vec<JoinHandle<()>>,
1523 _shutdown_senders: Vec<Sender<()>>,
1524 number: AtomicI32,
1525}
1526
1527impl SessionManager for SessionManagerImpl {
1528 type Error = RwError;
1529 type Session = SessionImpl;
1530
1531 fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1532 let dummy_addr = Address::Tcp(SocketAddr::new(
1533 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1534 5691, ));
1536
1537 self.connect_inner(
1540 database_id,
1541 risingwave_common::catalog::DEFAULT_SUPER_USER,
1542 Arc::new(dummy_addr),
1543 )
1544 }
1545
1546 fn connect(
1547 &self,
1548 database: &str,
1549 user_name: &str,
1550 peer_addr: AddressRef,
1551 ) -> Result<Arc<Self::Session>> {
1552 let catalog_reader = self.env.catalog_reader();
1553 let reader = catalog_reader.read_guard();
1554 let database_id = reader.get_database_by_name(database)?.id();
1555
1556 self.connect_inner(database_id, user_name, peer_addr)
1557 }
1558
1559 fn cancel_queries_in_session(&self, session_id: SessionId) {
1561 self.env.cancel_queries_in_session(session_id);
1562 }
1563
1564 fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1565 self.env.cancel_creating_jobs_in_session(session_id);
1566 }
1567
1568 fn end_session(&self, session: &Self::Session) {
1569 self.delete_session(&session.session_id());
1570 }
1571
1572 async fn shutdown(&self) {
1573 self.env.sessions_map().write().clear();
1575 self.env.meta_client().try_unregister().await;
1577 }
1578}
1579
1580impl SessionManagerImpl {
1581 pub async fn new(opts: FrontendOpts) -> Result<Self> {
1582 let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1584 Ok(Self {
1585 env,
1586 _join_handles: join_handles,
1587 _shutdown_senders: shutdown_senders,
1588 number: AtomicI32::new(0),
1589 })
1590 }
1591
1592 pub(crate) fn env(&self) -> &FrontendEnv {
1593 &self.env
1594 }
1595
1596 fn insert_session(&self, session: Arc<SessionImpl>) {
1597 let active_sessions = {
1598 let mut write_guard = self.env.sessions_map.write();
1599 write_guard.insert(session.id(), session);
1600 write_guard.len()
1601 };
1602 self.env
1603 .frontend_metrics
1604 .active_sessions
1605 .set(active_sessions as i64);
1606 }
1607
1608 fn delete_session(&self, session_id: &SessionId) {
1609 let active_sessions = {
1610 let mut write_guard = self.env.sessions_map.write();
1611 write_guard.remove(session_id);
1612 write_guard.len()
1613 };
1614 self.env
1615 .frontend_metrics
1616 .active_sessions
1617 .set(active_sessions as i64);
1618 }
1619
1620 fn connect_inner(
1621 &self,
1622 database_id: DatabaseId,
1623 user_name: &str,
1624 peer_addr: AddressRef,
1625 ) -> Result<Arc<SessionImpl>> {
1626 let catalog_reader = self.env.catalog_reader();
1627 let reader = catalog_reader.read_guard();
1628 let (database_name, database_owner) = {
1629 let db = reader.get_database_by_id(database_id)?;
1630 (db.name(), db.owner())
1631 };
1632
1633 let user_reader = self.env.user_info_reader();
1634 let reader = user_reader.read_guard();
1635 if let Some(user) = reader.get_user_by_name(user_name) {
1636 if !user.can_login {
1637 bail_permission_denied!("User {} is not allowed to login", user_name);
1638 }
1639 let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1640 if !user.is_super && database_owner != user.id && !has_privilege {
1641 bail_permission_denied!("User does not have CONNECT privilege.");
1642 }
1643
1644 let (connection_type, client_addr) = match peer_addr.as_ref() {
1646 Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1647 Address::Unix(_) => (ConnectionType::Local, None),
1648 };
1649 tracing::debug!(
1650 "receive connection: type={:?}, client_addr={:?}",
1651 connection_type,
1652 client_addr
1653 );
1654
1655 let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1656 &connection_type,
1657 database_name,
1658 user_name,
1659 client_addr,
1660 );
1661
1662 let Some(hba_entry_opt) = hba_entry_opt else {
1663 bail_permission_denied!(
1664 "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1665 );
1666 };
1667
1668 let authenticator_by_info = || -> Result<UserAuthenticator> {
1670 let authenticator = match &user.auth_info {
1671 None => UserAuthenticator::None,
1672 Some(auth_info) => match auth_info.encryption_type() {
1673 EncryptionType::Plaintext => {
1674 UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1675 }
1676 EncryptionType::Md5 => {
1677 let mut salt = [0; 4];
1678 let mut rng = rand::rng();
1679 rng.fill_bytes(&mut salt);
1680 UserAuthenticator::Md5WithSalt {
1681 encrypted_password: md5_hash_with_salt(
1682 &auth_info.encrypted_value,
1683 &salt,
1684 ),
1685 salt,
1686 }
1687 }
1688 EncryptionType::Oauth => UserAuthenticator::OAuth {
1689 metadata: auth_info.metadata.clone(),
1690 cluster_id: self.env.meta_client().cluster_id().to_owned(),
1691 },
1692 _ => {
1693 bail_protocol_error!(
1694 "Unsupported auth type: {}",
1695 auth_info.encryption_type().as_str_name()
1696 );
1697 }
1698 },
1699 };
1700 Ok(authenticator)
1701 };
1702
1703 let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1704 (AuthMethod::Trust, _) => UserAuthenticator::None,
1705 (AuthMethod::Password, _) => authenticator_by_info()?,
1707 (AuthMethod::Md5, Some(auth_info))
1708 if auth_info.encryption_type() == EncryptionType::Md5 =>
1709 {
1710 authenticator_by_info()?
1711 }
1712 (AuthMethod::OAuth, Some(auth_info))
1713 if auth_info.encryption_type() == EncryptionType::Oauth =>
1714 {
1715 authenticator_by_info()?
1716 }
1717 (AuthMethod::Ldap, _) => {
1718 UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1719 }
1720 _ => {
1721 bail_permission_denied!(
1722 "password authentication failed for user \"{user_name}\""
1723 );
1724 }
1725 };
1726
1727 let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1729 let id = (secret_key, secret_key);
1731 let session_config = self.env.session_params_snapshot();
1733
1734 let session_impl: Arc<SessionImpl> = SessionImpl::new(
1735 self.env.clone(),
1736 AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1737 user_authenticator,
1738 id,
1739 peer_addr,
1740 session_config,
1741 )
1742 .into();
1743 self.insert_session(session_impl.clone());
1744
1745 Ok(session_impl)
1746 } else {
1747 bail_catalog_error!("Role {} does not exist", user_name);
1748 }
1749 }
1750}
1751
1752impl Session for SessionImpl {
1753 type Error = RwError;
1754 type Portal = Portal;
1755 type PreparedStatement = PrepareStatement;
1756 type ValuesStream = PgResponseStream;
1757
1758 async fn run_one_query(
1761 self: Arc<Self>,
1762 stmt: Statement,
1763 format: Format,
1764 ) -> Result<PgResponse<PgResponseStream>> {
1765 let string = stmt.to_string();
1766 let sql_str = string.as_str();
1767 let sql: Arc<str> = Arc::from(sql_str);
1768 drop(string);
1770 let rsp = handle(self, stmt, sql, vec![format]).await?;
1771 Ok(rsp)
1772 }
1773
1774 fn user_authenticator(&self) -> &UserAuthenticator {
1775 &self.user_authenticator
1776 }
1777
1778 fn id(&self) -> SessionId {
1779 self.id
1780 }
1781
1782 async fn parse(
1783 self: Arc<Self>,
1784 statement: Option<Statement>,
1785 params_types: Vec<Option<DataType>>,
1786 ) -> Result<PrepareStatement> {
1787 Ok(if let Some(statement) = statement {
1788 handle_parse(self, statement, params_types).await?
1789 } else {
1790 PrepareStatement::Empty
1791 })
1792 }
1793
1794 fn bind(
1795 self: Arc<Self>,
1796 prepare_statement: PrepareStatement,
1797 params: Vec<Option<Bytes>>,
1798 param_formats: Vec<Format>,
1799 result_formats: Vec<Format>,
1800 ) -> Result<Portal> {
1801 handle_bind(prepare_statement, params, param_formats, result_formats)
1802 }
1803
1804 async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1805 let rsp = handle_execute(self, portal).await?;
1806 Ok(rsp)
1807 }
1808
1809 fn describe_statement(
1810 self: Arc<Self>,
1811 prepare_statement: PrepareStatement,
1812 ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1813 Ok(match prepare_statement {
1814 PrepareStatement::Empty => (vec![], vec![]),
1815 PrepareStatement::Prepared(prepare_statement) => (
1816 prepare_statement.bound_result.param_types,
1817 infer(
1818 Some(prepare_statement.bound_result.bound),
1819 prepare_statement.statement,
1820 )?,
1821 ),
1822 PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1823 })
1824 }
1825
1826 fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1827 match portal {
1828 Portal::Empty => Ok(vec![]),
1829 Portal::Portal(portal) => {
1830 let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1831 let formats = FormatIterator::new(&portal.result_formats, columns.len())
1832 .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1833 columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1834 if f == Format::Binary {
1835 c.set_to_binary()
1836 }
1837 });
1838 Ok(columns)
1839 }
1840 Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1841 }
1842 }
1843
1844 fn get_config(&self, key: &str) -> Result<String> {
1845 self.config().get(key).map_err(Into::into)
1846 }
1847
1848 fn set_config(&self, key: &str, value: String) -> Result<String> {
1849 Self::set_config(self, key, value)
1850 }
1851
1852 async fn next_notice(self: &Arc<Self>) -> String {
1853 std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1854 .await
1855 .expect("notice channel should not be closed")
1856 }
1857
1858 fn transaction_status(&self) -> TransactionStatus {
1859 match &*self.txn.lock() {
1860 transaction::State::Initial | transaction::State::Implicit(_) => {
1861 TransactionStatus::Idle
1862 }
1863 transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1864 }
1866 }
1867
1868 fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1870 let exec_context = Arc::new(ExecContext {
1871 running_sql: sql,
1872 last_instant: Instant::now(),
1873 last_idle_instant: self.last_idle_instant.clone(),
1874 });
1875 *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1876 *self.last_idle_instant.lock() = None;
1878 ExecContextGuard::new(exec_context)
1879 }
1880
1881 fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1884 if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1886 let idle_in_transaction_session_timeout =
1887 self.config().idle_in_transaction_session_timeout() as u128;
1888 if idle_in_transaction_session_timeout != 0 {
1890 let guard = self.exec_context.lock();
1892 if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1894 if let Some(elapse_since_last_idle_instant) =
1896 self.elapse_since_last_idle_instant()
1897 && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1898 {
1899 return Err(PsqlError::IdleInTxnTimeout);
1900 }
1901 }
1902 }
1903 }
1904 Ok(())
1905 }
1906}
1907
1908fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1910 match stmt {
1911 Statement::Query(_)
1912 | Statement::Insert { .. }
1913 | Statement::Delete { .. }
1914 | Statement::Update { .. }
1915 | Statement::FetchCursor { .. } => Ok(bound
1916 .unwrap()
1917 .output_fields()
1918 .iter()
1919 .map(to_pg_field)
1920 .collect()),
1921 Statement::ShowObjects {
1922 object: show_object,
1923 ..
1924 } => Ok(infer_show_object(&show_object)),
1925 Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1926 Statement::ShowTransactionIsolationLevel => {
1927 let name = "transaction_isolation";
1928 Ok(infer_show_variable(name))
1929 }
1930 Statement::ShowVariable { variable } => {
1931 let name = &variable[0].real_value().to_lowercase();
1932 Ok(infer_show_variable(name))
1933 }
1934 Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1935 Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1936 "QUERY PLAN".to_owned(),
1937 DataType::Varchar.to_oid(),
1938 DataType::Varchar.type_len(),
1939 )]),
1940 _ => Ok(vec![]),
1941 }
1942}
1943
1944pub struct WorkerProcessId {
1945 pub worker_id: WorkerId,
1946 pub process_id: i32,
1947}
1948
1949impl WorkerProcessId {
1950 pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1951 Self {
1952 worker_id,
1953 process_id,
1954 }
1955 }
1956}
1957
1958impl Display for WorkerProcessId {
1959 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1960 write!(f, "{}:{}", self.worker_id, self.process_id)
1961 }
1962}
1963
1964impl TryFrom<String> for WorkerProcessId {
1965 type Error = String;
1966
1967 fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
1968 const INVALID: &str = "invalid WorkerProcessId";
1969 let splits: Vec<&str> = worker_process_id.split(":").collect();
1970 if splits.len() != 2 {
1971 return Err(INVALID.to_owned());
1972 }
1973 let Ok(worker_id) = splits[0].parse::<u32>() else {
1974 return Err(INVALID.to_owned());
1975 };
1976 let Ok(process_id) = splits[1].parse::<i32>() else {
1977 return Err(INVALID.to_owned());
1978 };
1979 Ok(WorkerProcessId::new(worker_id.into(), process_id))
1980 }
1981}
1982
1983pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1984 let guard = sessions_map.read();
1985 if let Some(session) = guard.get(&session_id) {
1986 session.cancel_current_query();
1987 true
1988 } else {
1989 info!("Current session finished, ignoring cancel query request");
1990 false
1991 }
1992}
1993
1994pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1995 let guard = sessions_map.read();
1996 if let Some(session) = guard.get(&session_id) {
1997 session.cancel_current_creating_job();
1998 true
1999 } else {
2000 info!("Current session finished, ignoring cancel creating request");
2001 false
2002 }
2003}