1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::iter;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicI32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use bytes::Bytes;
26use either::Either;
27use itertools::Itertools;
28use parking_lot::{Mutex, RwLock, RwLockReadGuard};
29use pgwire::error::{PsqlError, PsqlResult};
30use pgwire::net::{Address, AddressRef};
31use pgwire::pg_field_descriptor::PgFieldDescriptor;
32use pgwire::pg_message::TransactionStatus;
33use pgwire::pg_response::{PgResponse, StatementType};
34use pgwire::pg_server::{
35 BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
36 UserAuthenticator,
37};
38use pgwire::types::{Format, FormatIterator};
39use prometheus_http_query::Client as PrometheusClient;
40use rand::RngCore;
41use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
42use risingwave_batch::spill::spill_op::SpillOp;
43use risingwave_batch::task::{ShutdownSender, ShutdownToken};
44use risingwave_batch::worker_manager::worker_node_manager::{
45 WorkerNodeManager, WorkerNodeManagerRef,
46};
47use risingwave_common::acl::AclMode;
48#[cfg(test)]
49use risingwave_common::catalog::{
50 DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
51};
52use risingwave_common::config::{
53 AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
54 RpcClientConfig, StreamingConfig, UdfConfig, load_config,
55};
56use risingwave_common::id::WorkerId;
57use risingwave_common::memory::MemoryContext;
58use risingwave_common::secret::LocalSecretManager;
59use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
60use risingwave_common::system_param::local_manager::{
61 LocalSystemParamsManager, LocalSystemParamsManagerRef,
62};
63use risingwave_common::telemetry::manager::TelemetryManager;
64use risingwave_common::telemetry::telemetry_env_enabled;
65use risingwave_common::types::DataType;
66use risingwave_common::util::addr::HostAddr;
67use risingwave_common::util::cluster_limit;
68use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
69use risingwave_common::util::iter_util::ZipEqFast;
70use risingwave_common::util::pretty_bytes::convert;
71use risingwave_common::util::runtime::BackgroundShutdownRuntime;
72use risingwave_common::{GIT_SHA, RW_VERSION};
73use risingwave_common_heap_profiling::HeapProfiler;
74use risingwave_common_service::{MetricsManager, ObserverManager};
75use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
76use risingwave_pb::common::WorkerType;
77use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
78use risingwave_pb::configured_monitor_service_server;
79use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
80use risingwave_pb::health::health_server::HealthServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::auth_info::EncryptionType;
83use risingwave_rpc_client::{
84 ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
85 MonitorClientPool, MonitorClientPoolRef,
86};
87use risingwave_sqlparser::ast::{ObjectName, Statement};
88use risingwave_sqlparser::parser::Parser;
89use thiserror::Error;
90use thiserror_ext::AsReport;
91use tokio::runtime::Builder;
92use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
93use tokio::sync::oneshot::Sender;
94use tokio::sync::watch;
95use tokio::task::JoinHandle;
96use tracing::{error, info};
97
98use self::cursor_manager::CursorManager;
99use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
100use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
101use crate::catalog::connection_catalog::ConnectionCatalog;
102use crate::catalog::root_catalog::{Catalog, SchemaPath};
103use crate::catalog::secret_catalog::SecretCatalog;
104use crate::catalog::source_catalog::SourceCatalog;
105use crate::catalog::subscription_catalog::SubscriptionCatalog;
106use crate::catalog::{
107 CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
108 check_schema_writable,
109};
110use crate::error::{
111 ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
112};
113use crate::handler::describe::infer_describe;
114use crate::handler::extended_handle::{
115 Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
116};
117use crate::handler::privilege::ObjectCheckItem;
118use crate::handler::show::{infer_show_create_object, infer_show_object};
119use crate::handler::util::to_pg_field;
120use crate::handler::variable::infer_show_variable;
121use crate::handler::{RwPgResponse, handle};
122use crate::health_service::HealthServiceImpl;
123use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
124use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
125use crate::observer::FrontendObserverNode;
126use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
127use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
128use crate::scheduler::{
129 DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
130 HummockSnapshotManagerRef, QueryManager,
131};
132use crate::telemetry::FrontendTelemetryCreator;
133use crate::user::UserId;
134use crate::user::user_authentication::md5_hash_with_salt;
135use crate::user::user_manager::UserInfoManager;
136use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
137use crate::{FrontendOpts, PgResponseStream, TableCatalog};
138
139pub(crate) mod current;
140pub(crate) mod cursor_manager;
141pub(crate) mod transaction;
142
143#[derive(Clone)]
145pub(crate) struct FrontendEnv {
146 meta_client: Arc<dyn FrontendMetaClient>,
149 catalog_writer: Arc<dyn CatalogWriter>,
150 catalog_reader: CatalogReader,
151 user_info_writer: Arc<dyn UserInfoWriter>,
152 user_info_reader: UserInfoReader,
153 worker_node_manager: WorkerNodeManagerRef,
154 query_manager: QueryManager,
155 hummock_snapshot_manager: HummockSnapshotManagerRef,
156 system_params_manager: LocalSystemParamsManagerRef,
157 session_params: Arc<RwLock<SessionConfig>>,
158
159 server_addr: HostAddr,
160 client_pool: ComputeClientPoolRef,
161 frontend_client_pool: FrontendClientPoolRef,
162 monitor_client_pool: MonitorClientPoolRef,
163
164 sessions_map: SessionMapRef,
168
169 pub frontend_metrics: Arc<FrontendMetrics>,
170
171 pub cursor_metrics: Arc<CursorMetrics>,
172
173 source_metrics: Arc<SourceMetrics>,
174
175 spill_metrics: Arc<BatchSpillMetrics>,
177
178 batch_config: BatchConfig,
179 frontend_config: FrontendConfig,
180 #[expect(dead_code)]
181 meta_config: MetaConfig,
182 streaming_config: StreamingConfig,
183 udf_config: UdfConfig,
184
185 creating_streaming_job_tracker: StreamingJobTrackerRef,
188
189 compute_runtime: Arc<BackgroundShutdownRuntime>,
192
193 mem_context: MemoryContext,
195
196 #[cfg(feature = "datafusion")]
197 df_spillable_budget_ctx: MemoryContext,
202
203 serverless_backfill_controller_addr: String,
205
206 prometheus_client: Option<PrometheusClient>,
208
209 prometheus_selector: String,
211}
212
213pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
215
216const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
218
219impl FrontendEnv {
220 pub fn mock() -> Self {
221 use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
222
223 let catalog = Arc::new(RwLock::new(Catalog::default()));
224 let meta_client = Arc::new(MockFrontendMetaClient {});
225 let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
226 let catalog_writer = Arc::new(MockCatalogWriter::new(
227 catalog.clone(),
228 hummock_snapshot_manager.clone(),
229 ));
230 let catalog_reader = CatalogReader::new(catalog);
231 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
232 let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
233 let user_info_reader = UserInfoReader::new(user_info_manager);
234 let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
235 let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
236 let compute_client_pool = Arc::new(ComputeClientPool::for_test());
237 let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
238 let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
239 let query_manager = QueryManager::new(
240 worker_node_manager.clone(),
241 compute_client_pool,
242 catalog_reader.clone(),
243 Arc::new(DistributedQueryMetrics::for_test()),
244 None,
245 None,
246 );
247 let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
248 let client_pool = Arc::new(ComputeClientPool::for_test());
249 let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
250 let runtime = {
251 let mut builder = Builder::new_multi_thread();
252 if let Some(frontend_compute_runtime_worker_threads) =
253 load_config("", FrontendOpts::default())
254 .batch
255 .frontend_compute_runtime_worker_threads
256 {
257 builder.worker_threads(frontend_compute_runtime_worker_threads);
258 }
259 builder
260 .thread_name("rw-batch-local")
261 .enable_all()
262 .build()
263 .unwrap()
264 };
265 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
266 let sessions_map = Arc::new(RwLock::new(HashMap::new()));
267 Self {
268 meta_client,
269 catalog_writer,
270 catalog_reader,
271 user_info_writer,
272 user_info_reader,
273 worker_node_manager,
274 query_manager,
275 hummock_snapshot_manager,
276 system_params_manager,
277 session_params: Default::default(),
278 server_addr,
279 client_pool,
280 frontend_client_pool,
281 monitor_client_pool,
282 sessions_map,
283 frontend_metrics: Arc::new(FrontendMetrics::for_test()),
284 cursor_metrics: Arc::new(CursorMetrics::for_test()),
285 batch_config: BatchConfig::default(),
286 frontend_config: FrontendConfig::default(),
287 meta_config: MetaConfig::default(),
288 streaming_config: StreamingConfig::default(),
289 udf_config: UdfConfig::default(),
290 source_metrics: Arc::new(SourceMetrics::default()),
291 spill_metrics: BatchSpillMetrics::for_test(),
292 creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
293 compute_runtime,
294 mem_context: MemoryContext::none(),
295 #[cfg(feature = "datafusion")]
296 df_spillable_budget_ctx: MemoryContext::none(),
297 serverless_backfill_controller_addr: Default::default(),
298 prometheus_client: None,
299 prometheus_selector: String::new(),
300 }
301 }
302
303 pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
304 let config = load_config(&opts.config_path, &opts);
305 info!("Starting frontend node");
306 info!("> config: {:?}", config);
307 info!(
308 "> debug assertions: {}",
309 if cfg!(debug_assertions) { "on" } else { "off" }
310 );
311 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
312
313 let frontend_address: HostAddr = opts
314 .advertise_addr
315 .as_ref()
316 .unwrap_or_else(|| {
317 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
318 &opts.listen_addr
319 })
320 .parse()
321 .unwrap();
322 info!("advertise addr is {}", frontend_address);
323
324 let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
325 let internal_rpc_host_addr = HostAddr {
326 host: frontend_address.host.clone(),
328 port: rpc_addr.port,
329 };
330 let (meta_client, system_params_reader) = MetaClient::register_new(
332 opts.meta_addr,
333 WorkerType::Frontend,
334 &frontend_address,
335 AddWorkerNodeProperty {
336 internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
337 ..Default::default()
338 },
339 Arc::new(config.meta.clone()),
340 )
341 .await;
342
343 let worker_id = meta_client.worker_id();
344 info!("Assigned worker node id {}", worker_id);
345
346 let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
347 meta_client.clone(),
348 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
349 );
350 let mut join_handles = vec![heartbeat_join_handle];
351 let mut shutdown_senders = vec![heartbeat_shutdown_sender];
352
353 let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
354 let hummock_snapshot_manager =
355 Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
356
357 let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
358 let catalog = Arc::new(RwLock::new(Catalog::default()));
359 let catalog_writer = Arc::new(CatalogWriterImpl::new(
360 meta_client.clone(),
361 catalog_updated_rx.clone(),
362 hummock_snapshot_manager.clone(),
363 ));
364 let catalog_reader = CatalogReader::new(catalog.clone());
365
366 let worker_node_manager = Arc::new(WorkerNodeManager::new());
367
368 let compute_client_pool = Arc::new(ComputeClientPool::new(
369 config.batch_exchange_connection_pool_size(),
370 config.batch.developer.compute_client_config.clone(),
371 ));
372 let frontend_client_pool = Arc::new(FrontendClientPool::new(
373 1,
374 config.batch.developer.frontend_client_config.clone(),
375 ));
376 let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
377 let query_manager = QueryManager::new(
378 worker_node_manager.clone(),
379 compute_client_pool.clone(),
380 catalog_reader.clone(),
381 Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
382 config.batch.distributed_query_limit,
383 config.batch.max_batch_queries_per_frontend_node,
384 );
385
386 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
387 let user_info_reader = UserInfoReader::new(user_info_manager.clone());
388 let user_info_writer = Arc::new(UserInfoWriterImpl::new(
389 meta_client.clone(),
390 catalog_updated_rx,
391 ));
392
393 let system_params_manager =
394 Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
395
396 LocalSecretManager::init(
397 opts.temp_secret_file_dir,
398 meta_client.cluster_id().to_owned(),
399 worker_id,
400 );
401
402 let session_params = Arc::new(RwLock::new(SessionConfig::default()));
404 let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
405 let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
406
407 let frontend_observer_node = FrontendObserverNode::new(
408 worker_node_manager.clone(),
409 catalog,
410 catalog_updated_tx,
411 user_info_manager,
412 hummock_snapshot_manager.clone(),
413 system_params_manager.clone(),
414 session_params.clone(),
415 compute_client_pool.clone(),
416 );
417 let observer_manager =
418 ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
419 .await;
420 let observer_join_handle = observer_manager.start().await;
421 join_handles.push(observer_join_handle);
422
423 meta_client.activate(&frontend_address).await?;
424
425 let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
426 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
427 let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
428
429 if config.server.metrics_level > MetricLevel::Disabled {
430 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
431 }
432
433 let health_srv = HealthServiceImpl::new();
434 let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
435 let monitor_srv = MonitorServiceImpl::new(config.server.clone());
436 let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
437
438 let telemetry_manager = TelemetryManager::new(
439 Arc::new(meta_client.clone()),
440 Arc::new(FrontendTelemetryCreator::new()),
441 );
442
443 if config.server.telemetry_enabled && telemetry_env_enabled() {
446 let (join_handle, shutdown_sender) = telemetry_manager.start().await;
447 join_handles.push(join_handle);
448 shutdown_senders.push(shutdown_sender);
449 } else {
450 tracing::info!("Telemetry didn't start due to config");
451 }
452
453 tokio::spawn(async move {
454 tonic::transport::Server::builder()
455 .add_service(HealthServer::new(health_srv))
456 .add_service(FrontendServiceServer::new(frontend_srv))
457 .add_service(configured_monitor_service_server(
458 MonitorServiceServer::new(monitor_srv),
459 ))
460 .serve(frontend_rpc_addr)
461 .await
462 .unwrap();
463 });
464 info!(
465 "Health Check RPC Listener is set up on {}",
466 opts.frontend_rpc_listener_addr.clone()
467 );
468
469 let creating_streaming_job_tracker =
470 Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
471
472 let runtime = {
473 let mut builder = Builder::new_multi_thread();
474 if let Some(frontend_compute_runtime_worker_threads) =
475 config.batch.frontend_compute_runtime_worker_threads
476 {
477 builder.worker_threads(frontend_compute_runtime_worker_threads);
478 }
479 builder
480 .thread_name("rw-batch-local")
481 .enable_all()
482 .build()
483 .unwrap()
484 };
485 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
486
487 let sessions = sessions_map.clone();
488 let join_handle = tokio::spawn(async move {
490 let mut check_idle_txn_interval =
491 tokio::time::interval(core::time::Duration::from_secs(10));
492 check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
493 check_idle_txn_interval.reset();
494 loop {
495 check_idle_txn_interval.tick().await;
496 sessions.read().values().for_each(|session| {
497 let _ = session.check_idle_in_transaction_timeout();
498 })
499 }
500 });
501 join_handles.push(join_handle);
502
503 #[cfg(not(madsim))]
505 if config.batch.enable_spill {
506 SpillOp::clean_spill_directory()
507 .await
508 .map_err(|err| anyhow!(err))?;
509 }
510
511 let total_memory_bytes = opts.frontend_total_memory_bytes;
512 let heap_profiler =
513 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
514 heap_profiler.start();
516
517 let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
518 let mem_context = MemoryContext::root(
519 frontend_metrics.batch_total_mem.clone(),
520 batch_memory_limit as u64,
521 );
522 #[cfg(feature = "datafusion")]
523 let df_spillable_budget_ctx =
524 crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
525
526 let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
528 match PrometheusClient::try_from(endpoint.as_str()) {
529 Ok(client) => {
530 info!("Prometheus client initialized with endpoint: {}", endpoint);
531 Some(client)
532 }
533 Err(e) => {
534 error!(
535 error = %e.as_report(),
536 "failed to initialize Prometheus client",
537 );
538 None
539 }
540 }
541 } else {
542 None
543 };
544
545 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
547
548 info!(
549 "Frontend total_memory: {} batch_memory: {}",
550 convert(total_memory_bytes as _),
551 convert(batch_memory_limit as _),
552 );
553
554 Ok((
555 Self {
556 catalog_reader,
557 catalog_writer,
558 user_info_reader,
559 user_info_writer,
560 worker_node_manager,
561 meta_client: frontend_meta_client,
562 query_manager,
563 hummock_snapshot_manager,
564 system_params_manager,
565 session_params,
566 server_addr: frontend_address,
567 client_pool: compute_client_pool,
568 frontend_client_pool,
569 monitor_client_pool,
570 frontend_metrics,
571 cursor_metrics,
572 spill_metrics,
573 sessions_map,
574 batch_config: config.batch,
575 frontend_config: config.frontend,
576 meta_config: config.meta,
577 streaming_config: config.streaming,
578 serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
579 udf_config: config.udf,
580 source_metrics,
581 creating_streaming_job_tracker,
582 compute_runtime,
583 mem_context,
584 #[cfg(feature = "datafusion")]
585 df_spillable_budget_ctx,
586 prometheus_client,
587 prometheus_selector,
588 },
589 join_handles,
590 shutdown_senders,
591 ))
592 }
593
594 fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
599 &*self.catalog_writer
600 }
601
602 pub fn catalog_reader(&self) -> &CatalogReader {
604 &self.catalog_reader
605 }
606
607 fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
612 &*self.user_info_writer
613 }
614
615 pub fn user_info_reader(&self) -> &UserInfoReader {
617 &self.user_info_reader
618 }
619
620 pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
621 self.worker_node_manager.clone()
622 }
623
624 pub fn meta_client(&self) -> &dyn FrontendMetaClient {
625 &*self.meta_client
626 }
627
628 pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
629 self.meta_client.clone()
630 }
631
632 pub fn query_manager(&self) -> &QueryManager {
633 &self.query_manager
634 }
635
636 pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
637 &self.hummock_snapshot_manager
638 }
639
640 pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
641 &self.system_params_manager
642 }
643
644 pub fn session_params_snapshot(&self) -> SessionConfig {
645 self.session_params.read_recursive().clone()
646 }
647
648 pub fn sbc_address(&self) -> &String {
649 &self.serverless_backfill_controller_addr
650 }
651
652 pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
654 self.prometheus_client.as_ref()
655 }
656
657 pub fn prometheus_selector(&self) -> &str {
659 &self.prometheus_selector
660 }
661
662 pub fn server_address(&self) -> &HostAddr {
663 &self.server_addr
664 }
665
666 pub fn client_pool(&self) -> ComputeClientPoolRef {
667 self.client_pool.clone()
668 }
669
670 pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
671 self.frontend_client_pool.clone()
672 }
673
674 pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
675 self.monitor_client_pool.clone()
676 }
677
678 pub fn batch_config(&self) -> &BatchConfig {
679 &self.batch_config
680 }
681
682 pub fn frontend_config(&self) -> &FrontendConfig {
683 &self.frontend_config
684 }
685
686 pub fn streaming_config(&self) -> &StreamingConfig {
687 &self.streaming_config
688 }
689
690 pub fn udf_config(&self) -> &UdfConfig {
691 &self.udf_config
692 }
693
694 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
695 self.source_metrics.clone()
696 }
697
698 pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
699 self.spill_metrics.clone()
700 }
701
702 pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
703 &self.creating_streaming_job_tracker
704 }
705
706 pub fn sessions_map(&self) -> &SessionMapRef {
707 &self.sessions_map
708 }
709
710 pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
711 self.compute_runtime.clone()
712 }
713
714 pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
717 cancel_queries_in_session(session_id, self.sessions_map.clone())
718 }
719
720 pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
723 cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
724 }
725
726 pub fn mem_context(&self) -> MemoryContext {
727 self.mem_context.clone()
728 }
729
730 #[cfg(feature = "datafusion")]
731 pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
732 self.df_spillable_budget_ctx.clone()
733 }
734}
735
736#[derive(Clone)]
737pub struct AuthContext {
738 pub database: String,
739 pub user_name: String,
740 pub user_id: UserId,
741}
742
743impl AuthContext {
744 pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
745 Self {
746 database,
747 user_name,
748 user_id,
749 }
750 }
751}
752pub struct SessionImpl {
753 env: FrontendEnv,
754 auth_context: Arc<RwLock<AuthContext>>,
755 user_authenticator: UserAuthenticator,
757 config_map: Arc<RwLock<SessionConfig>>,
759
760 notice_tx: UnboundedSender<String>,
762 notice_rx: Mutex<UnboundedReceiver<String>>,
764
765 id: (i32, i32),
767
768 peer_addr: AddressRef,
770
771 txn: Arc<Mutex<transaction::State>>,
775
776 current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
780
781 exec_context: Mutex<Option<Weak<ExecContext>>>,
783
784 last_idle_instant: Arc<Mutex<Option<Instant>>>,
786
787 cursor_manager: Arc<CursorManager>,
788
789 temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
791
792 staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
794}
795
796#[derive(Default, Clone)]
805pub struct TemporarySourceManager {
806 sources: HashMap<String, SourceCatalog>,
807}
808
809impl TemporarySourceManager {
810 pub fn new() -> Self {
811 Self {
812 sources: HashMap::new(),
813 }
814 }
815
816 pub fn create_source(&mut self, name: String, source: SourceCatalog) {
817 self.sources.insert(name, source);
818 }
819
820 pub fn drop_source(&mut self, name: &str) {
821 self.sources.remove(name);
822 }
823
824 pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
825 self.sources.get(name)
826 }
827
828 pub fn keys(&self) -> Vec<String> {
829 self.sources.keys().cloned().collect()
830 }
831}
832
833#[derive(Default, Clone)]
835pub struct StagingCatalogManager {
836 tables: HashMap<String, TableCatalog>,
838}
839
840impl StagingCatalogManager {
841 pub fn new() -> Self {
842 Self {
843 tables: HashMap::new(),
844 }
845 }
846
847 pub fn create_table(&mut self, name: String, table: TableCatalog) {
848 self.tables.insert(name, table);
849 }
850
851 pub fn drop_table(&mut self, name: &str) {
852 self.tables.remove(name);
853 }
854
855 pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
856 self.tables.get(name)
857 }
858}
859
860#[derive(Error, Debug)]
861pub enum CheckRelationError {
862 #[error("{0}")]
863 Resolve(#[from] ResolveQualifiedNameError),
864 #[error("{0}")]
865 Catalog(#[from] CatalogError),
866}
867
868impl From<CheckRelationError> for RwError {
869 fn from(e: CheckRelationError) -> Self {
870 match e {
871 CheckRelationError::Resolve(e) => e.into(),
872 CheckRelationError::Catalog(e) => e.into(),
873 }
874 }
875}
876
877impl SessionImpl {
878 pub(crate) fn new(
879 env: FrontendEnv,
880 auth_context: AuthContext,
881 user_authenticator: UserAuthenticator,
882 id: SessionId,
883 peer_addr: AddressRef,
884 session_config: SessionConfig,
885 ) -> Self {
886 let cursor_metrics = env.cursor_metrics.clone();
887 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
888
889 Self {
890 env,
891 auth_context: Arc::new(RwLock::new(auth_context)),
892 user_authenticator,
893 config_map: Arc::new(RwLock::new(session_config)),
894 id,
895 peer_addr,
896 txn: Default::default(),
897 current_query_cancel_flag: Mutex::new(None),
898 notice_tx,
899 notice_rx: Mutex::new(notice_rx),
900 exec_context: Mutex::new(None),
901 last_idle_instant: Default::default(),
902 cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
903 temporary_source_manager: Default::default(),
904 staging_catalog_manager: Default::default(),
905 }
906 }
907
908 #[cfg(test)]
909 pub fn mock() -> Self {
910 let env = FrontendEnv::mock();
911 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
912
913 Self {
914 env: FrontendEnv::mock(),
915 auth_context: Arc::new(RwLock::new(AuthContext::new(
916 DEFAULT_DATABASE_NAME.to_owned(),
917 DEFAULT_SUPER_USER.to_owned(),
918 DEFAULT_SUPER_USER_ID,
919 ))),
920 user_authenticator: UserAuthenticator::None,
921 config_map: Default::default(),
922 id: (0, 0),
924 txn: Default::default(),
925 current_query_cancel_flag: Mutex::new(None),
926 notice_tx,
927 notice_rx: Mutex::new(notice_rx),
928 exec_context: Mutex::new(None),
929 peer_addr: Address::Tcp(SocketAddr::new(
930 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
931 8080,
932 ))
933 .into(),
934 last_idle_instant: Default::default(),
935 cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
936 temporary_source_manager: Default::default(),
937 staging_catalog_manager: Default::default(),
938 }
939 }
940
941 pub(crate) fn env(&self) -> &FrontendEnv {
942 &self.env
943 }
944
945 pub fn auth_context(&self) -> Arc<AuthContext> {
946 let ctx = self.auth_context.read();
947 Arc::new(ctx.clone())
948 }
949
950 pub fn database(&self) -> String {
951 self.auth_context.read().database.clone()
952 }
953
954 pub fn database_id(&self) -> DatabaseId {
955 let db_name = self.database();
956 self.env
957 .catalog_reader()
958 .read_guard()
959 .get_database_by_name(&db_name)
960 .map(|db| db.id())
961 .expect("session database not found")
962 }
963
964 pub fn user_name(&self) -> String {
965 self.auth_context.read().user_name.clone()
966 }
967
968 pub fn user_id(&self) -> UserId {
969 self.auth_context.read().user_id
970 }
971
972 pub fn update_database(&self, database: String) {
973 self.auth_context.write().database = database;
974 }
975
976 pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
977 Arc::clone(&self.config_map)
978 }
979
980 pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
981 self.config_map.read()
982 }
983
984 pub fn set_config(&self, key: &str, value: String) -> Result<String> {
985 self.config_map
986 .write()
987 .set(key, value, &mut ())
988 .map_err(Into::into)
989 }
990
991 pub fn reset_config(&self, key: &str) -> Result<String> {
992 self.config_map
993 .write()
994 .reset(key, &mut ())
995 .map_err(Into::into)
996 }
997
998 pub fn set_config_report(
999 &self,
1000 key: &str,
1001 value: Option<String>,
1002 mut reporter: impl ConfigReporter,
1003 ) -> Result<String> {
1004 if let Some(value) = value {
1005 self.config_map
1006 .write()
1007 .set(key, value, &mut reporter)
1008 .map_err(Into::into)
1009 } else {
1010 self.config_map
1011 .write()
1012 .reset(key, &mut reporter)
1013 .map_err(Into::into)
1014 }
1015 }
1016
1017 pub fn session_id(&self) -> SessionId {
1018 self.id
1019 }
1020
1021 pub fn running_sql(&self) -> Option<Arc<str>> {
1022 self.exec_context
1023 .lock()
1024 .as_ref()
1025 .and_then(|weak| weak.upgrade())
1026 .map(|context| context.running_sql.clone())
1027 }
1028
1029 pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1030 self.cursor_manager.clone()
1031 }
1032
1033 pub fn peer_addr(&self) -> &Address {
1034 &self.peer_addr
1035 }
1036
1037 pub fn elapse_since_running_sql(&self) -> Option<u128> {
1038 self.exec_context
1039 .lock()
1040 .as_ref()
1041 .and_then(|weak| weak.upgrade())
1042 .map(|context| context.last_instant.elapsed().as_millis())
1043 }
1044
1045 pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1046 self.last_idle_instant
1047 .lock()
1048 .as_ref()
1049 .map(|x| x.elapsed().as_millis())
1050 }
1051
1052 pub fn check_relation_name_duplicated(
1053 &self,
1054 name: ObjectName,
1055 stmt_type: StatementType,
1056 if_not_exists: bool,
1057 ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1058 let db_name = &self.database();
1059 let catalog_reader = self.env().catalog_reader().read_guard();
1060 let (schema_name, relation_name) = {
1061 let (schema_name, relation_name) =
1062 Binder::resolve_schema_qualified_name(db_name, &name)?;
1063 let search_path = self.config().search_path();
1064 let user_name = &self.user_name();
1065 let schema_name = match schema_name {
1066 Some(schema_name) => schema_name,
1067 None => catalog_reader
1068 .first_valid_schema(db_name, &search_path, user_name)?
1069 .name(),
1070 };
1071 (schema_name, relation_name)
1072 };
1073 match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1074 Err(e) if if_not_exists => {
1075 if let CatalogErrorInner::Duplicated {
1076 name,
1077 under_creation,
1078 ..
1079 } = e.inner()
1080 {
1081 if !*under_creation {
1086 Ok(Either::Right(
1087 PgResponse::builder(stmt_type)
1088 .notice(format!("relation \"{}\" already exists, skipping", name))
1089 .into(),
1090 ))
1091 } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1092 Ok(Either::Right(
1095 PgResponse::builder(stmt_type)
1096 .notice(format!(
1097 "relation \"{}\" already exists but still creating, skipping",
1098 name
1099 ))
1100 .into(),
1101 ))
1102 } else {
1103 Ok(Either::Left(()))
1104 }
1105 } else {
1106 Err(e.into())
1107 }
1108 }
1109 Err(e) => Err(e.into()),
1110 Ok(_) => Ok(Either::Left(())),
1111 }
1112 }
1113
1114 pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1115 let db_name = &self.database();
1116 let catalog_reader = self.env().catalog_reader().read_guard();
1117 let (schema_name, secret_name) = {
1118 let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1119 let search_path = self.config().search_path();
1120 let user_name = &self.user_name();
1121 let schema_name = match schema_name {
1122 Some(schema_name) => schema_name,
1123 None => catalog_reader
1124 .first_valid_schema(db_name, &search_path, user_name)?
1125 .name(),
1126 };
1127 (schema_name, secret_name)
1128 };
1129 catalog_reader
1130 .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1131 .map_err(RwError::from)
1132 }
1133
1134 pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1135 let db_name = &self.database();
1136 let catalog_reader = self.env().catalog_reader().read_guard();
1137 let (schema_name, connection_name) = {
1138 let (schema_name, connection_name) =
1139 Binder::resolve_schema_qualified_name(db_name, &name)?;
1140 let search_path = self.config().search_path();
1141 let user_name = &self.user_name();
1142 let schema_name = match schema_name {
1143 Some(schema_name) => schema_name,
1144 None => catalog_reader
1145 .first_valid_schema(db_name, &search_path, user_name)?
1146 .name(),
1147 };
1148 (schema_name, connection_name)
1149 };
1150 catalog_reader
1151 .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1152 .map_err(RwError::from)
1153 }
1154
1155 pub fn check_function_name_duplicated(
1156 &self,
1157 stmt_type: StatementType,
1158 name: ObjectName,
1159 arg_types: &[DataType],
1160 if_not_exists: bool,
1161 ) -> Result<Either<(), RwPgResponse>> {
1162 let db_name = &self.database();
1163 let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1164 let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1165
1166 let catalog_reader = self.env().catalog_reader().read_guard();
1167 if catalog_reader
1168 .get_schema_by_id(database_id, schema_id)?
1169 .get_function_by_name_args(&function_name, arg_types)
1170 .is_some()
1171 {
1172 let full_name = format!(
1173 "{function_name}({})",
1174 arg_types.iter().map(|t| t.to_string()).join(",")
1175 );
1176 if if_not_exists {
1177 Ok(Either::Right(
1178 PgResponse::builder(stmt_type)
1179 .notice(format!(
1180 "function \"{}\" already exists, skipping",
1181 full_name
1182 ))
1183 .into(),
1184 ))
1185 } else {
1186 Err(CatalogError::duplicated("function", full_name).into())
1187 }
1188 } else {
1189 Ok(Either::Left(()))
1190 }
1191 }
1192
1193 pub fn get_database_and_schema_id_for_create(
1195 &self,
1196 schema_name: Option<String>,
1197 ) -> Result<(DatabaseId, SchemaId)> {
1198 let db_name = &self.database();
1199
1200 let search_path = self.config().search_path();
1201 let user_name = &self.user_name();
1202
1203 let catalog_reader = self.env().catalog_reader().read_guard();
1204 let schema = match schema_name {
1205 Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1206 None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1207 };
1208 let schema_name = schema.name();
1209
1210 check_schema_writable(&schema_name)?;
1211 self.check_privileges(&[ObjectCheckItem::new(
1212 schema.owner(),
1213 AclMode::Create,
1214 schema_name,
1215 schema.id(),
1216 )])?;
1217
1218 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1219 Ok((db_id, schema.id()))
1220 }
1221
1222 pub fn get_connection_by_name(
1223 &self,
1224 schema_name: Option<String>,
1225 connection_name: &str,
1226 ) -> Result<Arc<ConnectionCatalog>> {
1227 let db_name = &self.database();
1228 let search_path = self.config().search_path();
1229 let user_name = &self.user_name();
1230
1231 let catalog_reader = self.env().catalog_reader().read_guard();
1232 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1233 let (connection, _) =
1234 catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1235
1236 self.check_privileges(&[ObjectCheckItem::new(
1237 connection.owner(),
1238 AclMode::Usage,
1239 connection.name.clone(),
1240 connection.id,
1241 )])?;
1242
1243 Ok(connection.clone())
1244 }
1245
1246 pub fn get_subscription_by_schema_id_name(
1247 &self,
1248 schema_id: SchemaId,
1249 subscription_name: &str,
1250 ) -> Result<Arc<SubscriptionCatalog>> {
1251 let db_name = &self.database();
1252
1253 let catalog_reader = self.env().catalog_reader().read_guard();
1254 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1255 let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1256 let subscription = schema
1257 .get_subscription_by_name(subscription_name)
1258 .ok_or_else(|| {
1259 RwError::from(ErrorCode::ItemNotFound(format!(
1260 "subscription {} not found",
1261 subscription_name
1262 )))
1263 })?;
1264 Ok(subscription.clone())
1265 }
1266
1267 pub fn get_subscription_by_name(
1268 &self,
1269 schema_name: Option<String>,
1270 subscription_name: &str,
1271 ) -> Result<Arc<SubscriptionCatalog>> {
1272 let db_name = &self.database();
1273 let search_path = self.config().search_path();
1274 let user_name = &self.user_name();
1275
1276 let catalog_reader = self.env().catalog_reader().read_guard();
1277 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1278 let (subscription, _) =
1279 catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1280 Ok(subscription.clone())
1281 }
1282
1283 pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1284 let catalog_reader = self.env().catalog_reader().read_guard();
1285 Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1286 }
1287
1288 pub fn get_table_by_name(
1289 &self,
1290 table_name: &str,
1291 db_id: DatabaseId,
1292 schema_id: SchemaId,
1293 ) -> Result<Arc<TableCatalog>> {
1294 let catalog_reader = self.env().catalog_reader().read_guard();
1295 let table = catalog_reader
1296 .get_schema_by_id(db_id, schema_id)?
1297 .get_created_table_by_name(table_name)
1298 .ok_or_else(|| {
1299 Error::new(
1300 ErrorKind::InvalidInput,
1301 format!("table \"{}\" does not exist", table_name),
1302 )
1303 })?;
1304
1305 self.check_privileges(&[ObjectCheckItem::new(
1306 table.owner(),
1307 AclMode::Select,
1308 table_name.to_owned(),
1309 table.id,
1310 )])?;
1311
1312 Ok(table.clone())
1313 }
1314
1315 pub fn get_secret_by_name(
1316 &self,
1317 schema_name: Option<String>,
1318 secret_name: &str,
1319 ) -> Result<Arc<SecretCatalog>> {
1320 let db_name = &self.database();
1321 let search_path = self.config().search_path();
1322 let user_name = &self.user_name();
1323
1324 let catalog_reader = self.env().catalog_reader().read_guard();
1325 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1326 let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1327
1328 self.check_privileges(&[ObjectCheckItem::new(
1329 secret.owner(),
1330 AclMode::Usage,
1331 secret.name.clone(),
1332 secret.id,
1333 )])?;
1334
1335 Ok(secret.clone())
1336 }
1337
1338 pub async fn list_change_log_epochs(
1339 &self,
1340 table_id: TableId,
1341 min_epoch: u64,
1342 max_count: u32,
1343 ) -> Result<Vec<u64>> {
1344 let Some(max_epoch) = self
1345 .env
1346 .hummock_snapshot_manager()
1347 .acquire()
1348 .version()
1349 .state_table_info
1350 .info()
1351 .get(&table_id)
1352 .map(|s| s.committed_epoch)
1353 else {
1354 return Ok(vec![]);
1355 };
1356 let ret = self
1357 .env
1358 .meta_client_ref()
1359 .get_hummock_table_change_log(
1360 Some(min_epoch),
1361 Some(max_epoch),
1362 Some(iter::once(table_id).collect()),
1363 true,
1364 Some(max_count),
1365 )
1366 .await?;
1367 let Some(e) = ret.get(&table_id) else {
1368 return Ok(vec![]);
1369 };
1370 Ok(e.iter()
1371 .flat_map(|l| l.epochs())
1372 .filter(|e| *e >= min_epoch && *e <= max_epoch)
1373 .take(max_count as usize)
1374 .collect())
1375 }
1376
1377 pub fn clear_cancel_query_flag(&self) {
1378 let mut flag = self.current_query_cancel_flag.lock();
1379 *flag = None;
1380 }
1381
1382 pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1383 let mut flag = self.current_query_cancel_flag.lock();
1384 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1385 *flag = Some(shutdown_tx);
1386 shutdown_rx
1387 }
1388
1389 pub fn cancel_current_query(&self) {
1390 let mut flag_guard = self.current_query_cancel_flag.lock();
1391 if let Some(sender) = flag_guard.take() {
1392 info!("Trying to cancel query in local mode.");
1393 sender.cancel();
1395 info!("Cancel query request sent.");
1396 } else {
1397 info!("Trying to cancel query in distributed mode.");
1398 self.env.query_manager().cancel_queries_in_session(self.id)
1399 }
1400 }
1401
1402 pub fn cancel_current_creating_job(&self) {
1403 self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1404 }
1405
1406 pub async fn run_statement(
1409 self: Arc<Self>,
1410 sql: Arc<str>,
1411 formats: Vec<Format>,
1412 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1413 let mut stmts = Parser::parse_sql(&sql)?;
1415 if stmts.is_empty() {
1416 return Ok(PgResponse::empty_result(
1417 pgwire::pg_response::StatementType::EMPTY,
1418 ));
1419 }
1420 if stmts.len() > 1 {
1421 return Ok(
1422 PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1423 .notice("cannot insert multiple commands into statement")
1424 .into(),
1425 );
1426 }
1427 let stmt = stmts.swap_remove(0);
1428 let rsp = Box::pin(handle(self, stmt, sql.clone(), formats)).await?;
1429 Ok(rsp)
1430 }
1431
1432 pub fn notice_to_user(&self, str: impl Into<String>) {
1433 let notice = str.into();
1434 tracing::trace!(notice, "notice to user");
1435 self.notice_tx
1436 .send(notice)
1437 .expect("notice channel should not be closed");
1438 }
1439
1440 pub fn is_barrier_read(&self) -> bool {
1441 match self.config().visibility_mode() {
1442 VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1443 VisibilityMode::All => true,
1444 VisibilityMode::Checkpoint => false,
1445 }
1446 }
1447
1448 pub fn statement_timeout(&self) -> Duration {
1449 if self.config().statement_timeout().millis() == 0 {
1450 Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1451 } else {
1452 Duration::from_millis(self.config().statement_timeout().millis() as u64)
1453 }
1454 }
1455
1456 pub fn create_temporary_source(&self, source: SourceCatalog) {
1457 self.temporary_source_manager
1458 .lock()
1459 .create_source(source.name.clone(), source);
1460 }
1461
1462 pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1463 self.temporary_source_manager
1464 .lock()
1465 .get_source(name)
1466 .cloned()
1467 }
1468
1469 pub fn drop_temporary_source(&self, name: &str) {
1470 self.temporary_source_manager.lock().drop_source(name);
1471 }
1472
1473 pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1474 self.temporary_source_manager.lock().clone()
1475 }
1476
1477 pub fn create_staging_table(&self, table: TableCatalog) {
1478 self.staging_catalog_manager
1479 .lock()
1480 .create_table(table.name.clone(), table);
1481 }
1482
1483 pub fn drop_staging_table(&self, name: &str) {
1484 self.staging_catalog_manager.lock().drop_table(name);
1485 }
1486
1487 pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1488 self.staging_catalog_manager.lock().clone()
1489 }
1490
1491 pub async fn check_cluster_limits(&self) -> Result<()> {
1492 if self.config().bypass_cluster_limits() {
1493 return Ok(());
1494 }
1495
1496 let gen_message = |ActorCountPerParallelism {
1497 worker_id_to_actor_count,
1498 hard_limit,
1499 soft_limit,
1500 }: ActorCountPerParallelism,
1501 exceed_hard_limit: bool|
1502 -> String {
1503 let (limit_type, action) = if exceed_hard_limit {
1504 ("critical", "Scale the cluster immediately to proceed.")
1505 } else {
1506 (
1507 "recommended",
1508 "Consider scaling the cluster for optimal performance.",
1509 )
1510 };
1511 format!(
1512 r#"Actor count per parallelism exceeds the {limit_type} limit.
1513
1514Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1515
1516HINT:
1517- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1518- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1519 See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1520- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1521
1522DETAILS:
1523- hard limit: {hard_limit}
1524- soft limit: {soft_limit}
1525- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1526 )
1527 };
1528
1529 let limits = self.env().meta_client().get_cluster_limits().await?;
1530 for limit in limits {
1531 match limit {
1532 cluster_limit::ClusterLimit::ActorCount(l) => {
1533 if l.exceed_hard_limit() {
1534 return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1535 l, true,
1536 ))));
1537 } else if l.exceed_soft_limit() {
1538 self.notice_to_user(gen_message(l, false));
1539 }
1540 }
1541 }
1542 }
1543 Ok(())
1544 }
1545}
1546
1547pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1548 std::sync::OnceLock::new();
1549
1550pub struct SessionManagerImpl {
1551 env: FrontendEnv,
1552 _join_handles: Vec<JoinHandle<()>>,
1553 _shutdown_senders: Vec<Sender<()>>,
1554 number: AtomicI32,
1555}
1556
1557impl SessionManager for SessionManagerImpl {
1558 type Error = RwError;
1559 type Session = SessionImpl;
1560
1561 fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1562 let dummy_addr = Address::Tcp(SocketAddr::new(
1563 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1564 5691, ));
1566
1567 self.connect_inner(
1570 database_id,
1571 risingwave_common::catalog::DEFAULT_SUPER_USER,
1572 Arc::new(dummy_addr),
1573 )
1574 }
1575
1576 fn connect(
1577 &self,
1578 database: &str,
1579 user_name: &str,
1580 peer_addr: AddressRef,
1581 ) -> Result<Arc<Self::Session>> {
1582 let catalog_reader = self.env.catalog_reader();
1583 let reader = catalog_reader.read_guard();
1584 let database_id = reader.get_database_by_name(database)?.id();
1585
1586 self.connect_inner(database_id, user_name, peer_addr)
1587 }
1588
1589 fn cancel_queries_in_session(&self, session_id: SessionId) {
1591 self.env.cancel_queries_in_session(session_id);
1592 }
1593
1594 fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1595 self.env.cancel_creating_jobs_in_session(session_id);
1596 }
1597
1598 fn end_session(&self, session: &Self::Session) {
1599 self.delete_session(&session.session_id());
1600 }
1601
1602 async fn shutdown(&self) {
1603 self.env.sessions_map().write().clear();
1605 self.env.meta_client().try_unregister().await;
1607 }
1608}
1609
1610impl SessionManagerImpl {
1611 pub async fn new(opts: FrontendOpts) -> Result<Self> {
1612 let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1614 Ok(Self {
1615 env,
1616 _join_handles: join_handles,
1617 _shutdown_senders: shutdown_senders,
1618 number: AtomicI32::new(0),
1619 })
1620 }
1621
1622 pub(crate) fn env(&self) -> &FrontendEnv {
1623 &self.env
1624 }
1625
1626 fn insert_session(&self, session: Arc<SessionImpl>) {
1627 let active_sessions = {
1628 let mut write_guard = self.env.sessions_map.write();
1629 write_guard.insert(session.id(), session);
1630 write_guard.len()
1631 };
1632 self.env
1633 .frontend_metrics
1634 .active_sessions
1635 .set(active_sessions as i64);
1636 }
1637
1638 fn delete_session(&self, session_id: &SessionId) {
1639 let active_sessions = {
1640 let mut write_guard = self.env.sessions_map.write();
1641 write_guard.remove(session_id);
1642 write_guard.len()
1643 };
1644 self.env
1645 .frontend_metrics
1646 .active_sessions
1647 .set(active_sessions as i64);
1648 }
1649
1650 fn connect_inner(
1651 &self,
1652 database_id: DatabaseId,
1653 user_name: &str,
1654 peer_addr: AddressRef,
1655 ) -> Result<Arc<SessionImpl>> {
1656 let catalog_reader = self.env.catalog_reader();
1657 let reader = catalog_reader.read_guard();
1658 let (database_name, database_owner) = {
1659 let db = reader.get_database_by_id(database_id)?;
1660 (db.name(), db.owner())
1661 };
1662
1663 let user_reader = self.env.user_info_reader();
1664 let reader = user_reader.read_guard();
1665 if let Some(user) = reader.get_user_by_name(user_name) {
1666 if !user.can_login {
1667 bail_permission_denied!("User {} is not allowed to login", user_name);
1668 }
1669 let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1670 if !user.is_super && database_owner != user.id && !has_privilege {
1671 bail_permission_denied!("User does not have CONNECT privilege.");
1672 }
1673
1674 let (connection_type, client_addr) = match peer_addr.as_ref() {
1676 Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1677 Address::Unix(_) => (ConnectionType::Local, None),
1678 };
1679 tracing::debug!(
1680 "receive connection: type={:?}, client_addr={:?}",
1681 connection_type,
1682 client_addr
1683 );
1684
1685 let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1686 &connection_type,
1687 database_name,
1688 user_name,
1689 client_addr,
1690 );
1691
1692 let Some(hba_entry_opt) = hba_entry_opt else {
1693 bail_permission_denied!(
1694 "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1695 );
1696 };
1697
1698 let authenticator_by_info = || -> Result<UserAuthenticator> {
1700 let authenticator = match &user.auth_info {
1701 None => UserAuthenticator::None,
1702 Some(auth_info) => match auth_info.encryption_type() {
1703 EncryptionType::Plaintext => {
1704 UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1705 }
1706 EncryptionType::Md5 => {
1707 let mut salt = [0; 4];
1708 let mut rng = rand::rng();
1709 rng.fill_bytes(&mut salt);
1710 UserAuthenticator::Md5WithSalt {
1711 encrypted_password: md5_hash_with_salt(
1712 &auth_info.encrypted_value,
1713 &salt,
1714 ),
1715 salt,
1716 }
1717 }
1718 EncryptionType::Oauth => UserAuthenticator::OAuth {
1719 metadata: auth_info.metadata.clone(),
1720 cluster_id: self.env.meta_client().cluster_id().to_owned(),
1721 },
1722 _ => {
1723 bail_protocol_error!(
1724 "Unsupported auth type: {}",
1725 auth_info.encryption_type().as_str_name()
1726 );
1727 }
1728 },
1729 };
1730 Ok(authenticator)
1731 };
1732
1733 let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1734 (AuthMethod::Trust, _) => UserAuthenticator::None,
1735 (AuthMethod::Password, _) => authenticator_by_info()?,
1737 (AuthMethod::Md5, Some(auth_info))
1738 if auth_info.encryption_type() == EncryptionType::Md5 =>
1739 {
1740 authenticator_by_info()?
1741 }
1742 (AuthMethod::OAuth, Some(auth_info))
1743 if auth_info.encryption_type() == EncryptionType::Oauth =>
1744 {
1745 authenticator_by_info()?
1746 }
1747 (AuthMethod::Ldap, _) => {
1748 UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1749 }
1750 _ => {
1751 bail_permission_denied!(
1752 "password authentication failed for user \"{user_name}\""
1753 );
1754 }
1755 };
1756
1757 let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1759 let id = (secret_key, secret_key);
1761 let session_config = self.env.session_params_snapshot();
1763
1764 let session_impl: Arc<SessionImpl> = SessionImpl::new(
1765 self.env.clone(),
1766 AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1767 user_authenticator,
1768 id,
1769 peer_addr,
1770 session_config,
1771 )
1772 .into();
1773 self.insert_session(session_impl.clone());
1774
1775 Ok(session_impl)
1776 } else {
1777 bail_catalog_error!("Role {} does not exist", user_name);
1778 }
1779 }
1780}
1781
1782impl Session for SessionImpl {
1783 type Error = RwError;
1784 type Portal = Portal;
1785 type PreparedStatement = PrepareStatement;
1786 type ValuesStream = PgResponseStream;
1787
1788 async fn run_one_query(
1791 self: Arc<Self>,
1792 stmt: Statement,
1793 format: Format,
1794 ) -> Result<PgResponse<PgResponseStream>> {
1795 let string = stmt.to_string();
1796 let sql_str = string.as_str();
1797 let sql: Arc<str> = Arc::from(sql_str);
1798 drop(string);
1800 let rsp = Box::pin(handle(self, stmt, sql, vec![format])).await?;
1801 Ok(rsp)
1802 }
1803
1804 fn user_authenticator(&self) -> &UserAuthenticator {
1805 &self.user_authenticator
1806 }
1807
1808 fn id(&self) -> SessionId {
1809 self.id
1810 }
1811
1812 async fn parse(
1813 self: Arc<Self>,
1814 statement: Option<Statement>,
1815 params_types: Vec<Option<DataType>>,
1816 ) -> Result<PrepareStatement> {
1817 Ok(if let Some(statement) = statement {
1818 handle_parse(self, statement, params_types).await?
1819 } else {
1820 PrepareStatement::Empty
1821 })
1822 }
1823
1824 fn bind(
1825 self: Arc<Self>,
1826 prepare_statement: PrepareStatement,
1827 params: Vec<Option<Bytes>>,
1828 param_formats: Vec<Format>,
1829 result_formats: Vec<Format>,
1830 ) -> Result<Portal> {
1831 handle_bind(prepare_statement, params, param_formats, result_formats)
1832 }
1833
1834 async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1835 let rsp = Box::pin(handle_execute(self, portal)).await?;
1836 Ok(rsp)
1837 }
1838
1839 fn describe_statement(
1840 self: Arc<Self>,
1841 prepare_statement: PrepareStatement,
1842 ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1843 Ok(match prepare_statement {
1844 PrepareStatement::Empty => (vec![], vec![]),
1845 PrepareStatement::Prepared(prepare_statement) => (
1846 prepare_statement.bound_result.param_types,
1847 infer(
1848 Some(prepare_statement.bound_result.bound),
1849 prepare_statement.statement,
1850 )?,
1851 ),
1852 PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1853 })
1854 }
1855
1856 fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1857 match portal {
1858 Portal::Empty => Ok(vec![]),
1859 Portal::Portal(portal) => {
1860 let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1861 let formats = FormatIterator::new(&portal.result_formats, columns.len())
1862 .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1863 columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1864 if f == Format::Binary {
1865 c.set_to_binary()
1866 }
1867 });
1868 Ok(columns)
1869 }
1870 Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1871 }
1872 }
1873
1874 fn get_config(&self, key: &str) -> Result<String> {
1875 self.config().get(key).map_err(Into::into)
1876 }
1877
1878 fn set_config(&self, key: &str, value: String) -> Result<String> {
1879 Self::set_config(self, key, value)
1880 }
1881
1882 async fn next_notice(self: &Arc<Self>) -> String {
1883 std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1884 .await
1885 .expect("notice channel should not be closed")
1886 }
1887
1888 fn transaction_status(&self) -> TransactionStatus {
1889 match &*self.txn.lock() {
1890 transaction::State::Initial | transaction::State::Implicit(_) => {
1891 TransactionStatus::Idle
1892 }
1893 transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1894 }
1896 }
1897
1898 fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1900 let exec_context = Arc::new(ExecContext {
1901 running_sql: sql,
1902 last_instant: Instant::now(),
1903 last_idle_instant: self.last_idle_instant.clone(),
1904 });
1905 *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1906 *self.last_idle_instant.lock() = None;
1908 ExecContextGuard::new(exec_context)
1909 }
1910
1911 fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1914 if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1916 let idle_in_transaction_session_timeout =
1917 self.config().idle_in_transaction_session_timeout() as u128;
1918 if idle_in_transaction_session_timeout != 0 {
1920 let guard = self.exec_context.lock();
1922 if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1924 if let Some(elapse_since_last_idle_instant) =
1926 self.elapse_since_last_idle_instant()
1927 && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1928 {
1929 return Err(PsqlError::IdleInTxnTimeout);
1930 }
1931 }
1932 }
1933 }
1934 Ok(())
1935 }
1936
1937 fn user(&self) -> String {
1938 self.user_name()
1939 }
1940}
1941
1942fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1944 match stmt {
1945 Statement::Query(_)
1946 | Statement::Insert { .. }
1947 | Statement::Delete { .. }
1948 | Statement::Update { .. }
1949 | Statement::FetchCursor { .. } => Ok(bound
1950 .unwrap()
1951 .output_fields()
1952 .iter()
1953 .map(to_pg_field)
1954 .collect()),
1955 Statement::ShowObjects {
1956 object: show_object,
1957 ..
1958 } => Ok(infer_show_object(&show_object)),
1959 Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1960 Statement::ShowTransactionIsolationLevel => {
1961 let name = "transaction_isolation";
1962 Ok(infer_show_variable(name))
1963 }
1964 Statement::ShowVariable { variable } => {
1965 let name = &variable[0].real_value().to_lowercase();
1966 Ok(infer_show_variable(name))
1967 }
1968 Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1969 Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1970 "QUERY PLAN".to_owned(),
1971 DataType::Varchar.to_oid(),
1972 DataType::Varchar.type_len(),
1973 )]),
1974 _ => Ok(vec![]),
1975 }
1976}
1977
1978pub struct WorkerProcessId {
1979 pub worker_id: WorkerId,
1980 pub process_id: i32,
1981}
1982
1983impl WorkerProcessId {
1984 pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1985 Self {
1986 worker_id,
1987 process_id,
1988 }
1989 }
1990}
1991
1992impl Display for WorkerProcessId {
1993 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1994 write!(f, "{}:{}", self.worker_id, self.process_id)
1995 }
1996}
1997
1998impl TryFrom<String> for WorkerProcessId {
1999 type Error = String;
2000
2001 fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
2002 const INVALID: &str = "invalid WorkerProcessId";
2003 let splits: Vec<&str> = worker_process_id.split(":").collect();
2004 if splits.len() != 2 {
2005 return Err(INVALID.to_owned());
2006 }
2007 let Ok(worker_id) = splits[0].parse::<u32>() else {
2008 return Err(INVALID.to_owned());
2009 };
2010 let Ok(process_id) = splits[1].parse::<i32>() else {
2011 return Err(INVALID.to_owned());
2012 };
2013 Ok(WorkerProcessId::new(worker_id.into(), process_id))
2014 }
2015}
2016
2017pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2018 let guard = sessions_map.read();
2019 if let Some(session) = guard.get(&session_id) {
2020 session.cancel_current_query();
2021 true
2022 } else {
2023 info!("Current session finished, ignoring cancel query request");
2024 false
2025 }
2026}
2027
2028pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2029 let guard = sessions_map.read();
2030 if let Some(session) = guard.get(&session_id) {
2031 session.cancel_current_creating_job();
2032 true
2033 } else {
2034 info!("Current session finished, ignoring cancel creating request");
2035 false
2036 }
2037}