1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::iter;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicI32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use bytes::Bytes;
26use either::Either;
27use itertools::Itertools;
28use parking_lot::{Mutex, RwLock, RwLockReadGuard};
29use pgwire::error::{PsqlError, PsqlResult};
30use pgwire::net::{Address, AddressRef};
31use pgwire::pg_field_descriptor::PgFieldDescriptor;
32use pgwire::pg_message::TransactionStatus;
33use pgwire::pg_response::{PgResponse, StatementType};
34use pgwire::pg_server::{
35 BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
36 UserAuthenticator,
37};
38use pgwire::types::{Format, FormatIterator};
39use prometheus_http_query::Client as PrometheusClient;
40use rand::RngCore;
41use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
42use risingwave_batch::spill::spill_op::SpillOp;
43use risingwave_batch::task::{ShutdownSender, ShutdownToken};
44use risingwave_batch::worker_manager::worker_node_manager::{
45 WorkerNodeManager, WorkerNodeManagerRef,
46};
47use risingwave_common::acl::AclMode;
48#[cfg(test)]
49use risingwave_common::catalog::{
50 DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
51};
52use risingwave_common::config::{
53 AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
54 RpcClientConfig, StreamingConfig, UdfConfig, load_config,
55};
56use risingwave_common::id::WorkerId;
57use risingwave_common::memory::MemoryContext;
58use risingwave_common::secret::LocalSecretManager;
59use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
60use risingwave_common::system_param::local_manager::{
61 LocalSystemParamsManager, LocalSystemParamsManagerRef,
62};
63use risingwave_common::telemetry::manager::TelemetryManager;
64use risingwave_common::telemetry::telemetry_env_enabled;
65use risingwave_common::types::DataType;
66use risingwave_common::util::addr::HostAddr;
67use risingwave_common::util::cluster_limit;
68use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
69use risingwave_common::util::iter_util::ZipEqFast;
70use risingwave_common::util::pretty_bytes::convert;
71use risingwave_common::util::runtime::BackgroundShutdownRuntime;
72use risingwave_common::{GIT_SHA, RW_VERSION};
73use risingwave_common_heap_profiling::HeapProfiler;
74use risingwave_common_service::{MetricsManager, ObserverManager};
75use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
76use risingwave_pb::common::WorkerType;
77use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
78use risingwave_pb::configured_monitor_service_server;
79use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
80use risingwave_pb::health::health_server::HealthServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::auth_info::EncryptionType;
83use risingwave_rpc_client::{
84 ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
85 MonitorClientPool, MonitorClientPoolRef,
86};
87use risingwave_sqlparser::ast::{ObjectName, Statement};
88use risingwave_sqlparser::parser::Parser;
89use thiserror::Error;
90use thiserror_ext::AsReport;
91use tokio::runtime::Builder;
92use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
93use tokio::sync::oneshot::Sender;
94use tokio::sync::watch;
95use tokio::task::JoinHandle;
96use tracing::{error, info};
97
98use self::cursor_manager::CursorManager;
99use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
100use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
101use crate::catalog::connection_catalog::ConnectionCatalog;
102use crate::catalog::root_catalog::{Catalog, SchemaPath};
103use crate::catalog::secret_catalog::SecretCatalog;
104use crate::catalog::source_catalog::SourceCatalog;
105use crate::catalog::subscription_catalog::SubscriptionCatalog;
106use crate::catalog::{
107 CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
108 check_schema_writable,
109};
110use crate::error::{
111 ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
112};
113use crate::handler::describe::infer_describe;
114use crate::handler::extended_handle::{
115 Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
116};
117use crate::handler::privilege::ObjectCheckItem;
118use crate::handler::show::{infer_show_create_object, infer_show_object};
119use crate::handler::util::to_pg_field;
120use crate::handler::variable::infer_show_variable;
121use crate::handler::{RwPgResponse, handle};
122use crate::health_service::HealthServiceImpl;
123use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
124use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
125use crate::observer::FrontendObserverNode;
126use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
127use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
128use crate::scheduler::{
129 DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
130 HummockSnapshotManagerRef, QueryManager,
131};
132use crate::telemetry::FrontendTelemetryCreator;
133use crate::user::UserId;
134use crate::user::user_authentication::md5_hash_with_salt;
135use crate::user::user_manager::UserInfoManager;
136use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
137use crate::{FrontendOpts, PgResponseStream, TableCatalog};
138
139pub(crate) mod current;
140pub(crate) mod cursor_manager;
141pub(crate) mod transaction;
142
143#[derive(Clone)]
145pub(crate) struct FrontendEnv {
146 meta_client: Arc<dyn FrontendMetaClient>,
149 catalog_writer: Arc<dyn CatalogWriter>,
150 catalog_reader: CatalogReader,
151 user_info_writer: Arc<dyn UserInfoWriter>,
152 user_info_reader: UserInfoReader,
153 worker_node_manager: WorkerNodeManagerRef,
154 query_manager: QueryManager,
155 hummock_snapshot_manager: HummockSnapshotManagerRef,
156 system_params_manager: LocalSystemParamsManagerRef,
157 session_params: Arc<RwLock<SessionConfig>>,
158
159 server_addr: HostAddr,
160 client_pool: ComputeClientPoolRef,
161 frontend_client_pool: FrontendClientPoolRef,
162 monitor_client_pool: MonitorClientPoolRef,
163
164 sessions_map: SessionMapRef,
168
169 pub frontend_metrics: Arc<FrontendMetrics>,
170
171 pub cursor_metrics: Arc<CursorMetrics>,
172
173 source_metrics: Arc<SourceMetrics>,
174
175 spill_metrics: Arc<BatchSpillMetrics>,
177
178 batch_config: BatchConfig,
179 frontend_config: FrontendConfig,
180 #[expect(dead_code)]
181 meta_config: MetaConfig,
182 streaming_config: StreamingConfig,
183 udf_config: UdfConfig,
184
185 creating_streaming_job_tracker: StreamingJobTrackerRef,
188
189 compute_runtime: Arc<BackgroundShutdownRuntime>,
192
193 mem_context: MemoryContext,
195
196 #[cfg(feature = "datafusion")]
197 df_spillable_budget_ctx: MemoryContext,
202
203 prometheus_client: Option<PrometheusClient>,
205
206 prometheus_selector: String,
208}
209
210pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
212
213const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
215
216impl FrontendEnv {
217 pub fn mock() -> Self {
218 use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
219
220 let catalog = Arc::new(RwLock::new(Catalog::default()));
221 let meta_client = Arc::new(MockFrontendMetaClient {});
222 let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
223 let catalog_writer = Arc::new(MockCatalogWriter::new(
224 catalog.clone(),
225 hummock_snapshot_manager.clone(),
226 ));
227 let catalog_reader = CatalogReader::new(catalog);
228 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
229 let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
230 let user_info_reader = UserInfoReader::new(user_info_manager);
231 let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
232 let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
233 let compute_client_pool = Arc::new(ComputeClientPool::for_test());
234 let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
235 let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
236 let query_manager = QueryManager::new(
237 worker_node_manager.clone(),
238 compute_client_pool,
239 catalog_reader.clone(),
240 Arc::new(DistributedQueryMetrics::for_test()),
241 None,
242 None,
243 );
244 let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
245 let client_pool = Arc::new(ComputeClientPool::for_test());
246 let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
247 let runtime = {
248 let mut builder = Builder::new_multi_thread();
249 if let Some(frontend_compute_runtime_worker_threads) =
250 load_config("", FrontendOpts::default())
251 .batch
252 .frontend_compute_runtime_worker_threads
253 {
254 builder.worker_threads(frontend_compute_runtime_worker_threads);
255 }
256 builder
257 .thread_name("rw-batch-local")
258 .enable_all()
259 .build()
260 .unwrap()
261 };
262 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
263 let sessions_map = Arc::new(RwLock::new(HashMap::new()));
264 Self {
265 meta_client,
266 catalog_writer,
267 catalog_reader,
268 user_info_writer,
269 user_info_reader,
270 worker_node_manager,
271 query_manager,
272 hummock_snapshot_manager,
273 system_params_manager,
274 session_params: Default::default(),
275 server_addr,
276 client_pool,
277 frontend_client_pool,
278 monitor_client_pool,
279 sessions_map,
280 frontend_metrics: Arc::new(FrontendMetrics::for_test()),
281 cursor_metrics: Arc::new(CursorMetrics::for_test()),
282 batch_config: BatchConfig::default(),
283 frontend_config: FrontendConfig::default(),
284 meta_config: MetaConfig::default(),
285 streaming_config: StreamingConfig::default(),
286 udf_config: UdfConfig::default(),
287 source_metrics: Arc::new(SourceMetrics::default()),
288 spill_metrics: BatchSpillMetrics::for_test(),
289 creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
290 compute_runtime,
291 mem_context: MemoryContext::none(),
292 #[cfg(feature = "datafusion")]
293 df_spillable_budget_ctx: MemoryContext::none(),
294 prometheus_client: None,
295 prometheus_selector: String::new(),
296 }
297 }
298
299 pub(crate) fn set_frontend_config_for_test(&mut self, frontend_config: FrontendConfig) {
300 self.frontend_config = frontend_config;
301 }
302
303 pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
304 let config = load_config(&opts.config_path, &opts);
305 info!("Starting frontend node");
306 info!("> config: {:?}", config);
307 info!(
308 "> debug assertions: {}",
309 if cfg!(debug_assertions) { "on" } else { "off" }
310 );
311 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
312
313 let frontend_address: HostAddr = opts
314 .advertise_addr
315 .as_ref()
316 .unwrap_or_else(|| {
317 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
318 &opts.listen_addr
319 })
320 .parse()
321 .unwrap();
322 info!("advertise addr is {}", frontend_address);
323
324 let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
325 let internal_rpc_host_addr = HostAddr {
326 host: frontend_address.host.clone(),
328 port: rpc_addr.port,
329 };
330 let (meta_client, system_params_reader) = MetaClient::register_new(
332 opts.meta_addr,
333 WorkerType::Frontend,
334 &frontend_address,
335 AddWorkerNodeProperty {
336 internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
337 ..Default::default()
338 },
339 Arc::new(config.meta.clone()),
340 )
341 .await;
342
343 let worker_id = meta_client.worker_id();
344 info!("Assigned worker node id {}", worker_id);
345
346 let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
347 meta_client.clone(),
348 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
349 );
350 let mut join_handles = vec![heartbeat_join_handle];
351 let mut shutdown_senders = vec![heartbeat_shutdown_sender];
352
353 let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
354 let hummock_snapshot_manager =
355 Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
356
357 let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
358 let catalog = Arc::new(RwLock::new(Catalog::default()));
359 let catalog_writer = Arc::new(CatalogWriterImpl::new(
360 meta_client.clone(),
361 catalog_updated_rx.clone(),
362 hummock_snapshot_manager.clone(),
363 ));
364 let catalog_reader = CatalogReader::new(catalog.clone());
365
366 let worker_node_manager = Arc::new(WorkerNodeManager::new());
367
368 let compute_client_pool = Arc::new(ComputeClientPool::new(
369 config.batch_exchange_connection_pool_size(),
370 config.batch.developer.compute_client_config.clone(),
371 ));
372 let frontend_client_pool = Arc::new(FrontendClientPool::new(
373 1,
374 config.batch.developer.frontend_client_config.clone(),
375 ));
376 let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
377 let query_manager = QueryManager::new(
378 worker_node_manager.clone(),
379 compute_client_pool.clone(),
380 catalog_reader.clone(),
381 Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
382 config.batch.distributed_query_limit,
383 config.batch.max_batch_queries_per_frontend_node,
384 );
385
386 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
387 let user_info_reader = UserInfoReader::new(user_info_manager.clone());
388 let user_info_writer = Arc::new(UserInfoWriterImpl::new(
389 meta_client.clone(),
390 catalog_updated_rx,
391 ));
392
393 let system_params_manager =
394 Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
395
396 LocalSecretManager::init(
397 opts.temp_secret_file_dir,
398 meta_client.cluster_id().to_owned(),
399 worker_id,
400 );
401
402 let session_params = Arc::new(RwLock::new(SessionConfig::default()));
404 let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
405 let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
406
407 let frontend_observer_node = FrontendObserverNode::new(
408 worker_node_manager.clone(),
409 catalog,
410 catalog_updated_tx,
411 user_info_manager,
412 hummock_snapshot_manager.clone(),
413 system_params_manager.clone(),
414 session_params.clone(),
415 compute_client_pool.clone(),
416 );
417 let observer_manager =
418 ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
419 .await;
420 let observer_join_handle = observer_manager.start().await;
421 join_handles.push(observer_join_handle);
422
423 meta_client.activate(&frontend_address).await?;
424
425 let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
426 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
427 let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
428
429 if config.server.metrics_level > MetricLevel::Disabled {
430 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
431 }
432
433 let health_srv = HealthServiceImpl::new();
434 let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
435 let monitor_srv = MonitorServiceImpl::new(config.server.clone());
436 let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
437
438 let telemetry_manager = TelemetryManager::new(
439 Arc::new(meta_client.clone()),
440 Arc::new(FrontendTelemetryCreator::new()),
441 );
442
443 if config.server.telemetry_enabled && telemetry_env_enabled() {
446 let (join_handle, shutdown_sender) = telemetry_manager.start().await;
447 join_handles.push(join_handle);
448 shutdown_senders.push(shutdown_sender);
449 } else {
450 tracing::info!("Telemetry didn't start due to config");
451 }
452
453 tokio::spawn(async move {
454 tonic::transport::Server::builder()
455 .add_service(HealthServer::new(health_srv))
456 .add_service(FrontendServiceServer::new(frontend_srv))
457 .add_service(configured_monitor_service_server(
458 MonitorServiceServer::new(monitor_srv),
459 ))
460 .serve(frontend_rpc_addr)
461 .await
462 .unwrap();
463 });
464 info!(
465 "Health Check RPC Listener is set up on {}",
466 opts.frontend_rpc_listener_addr.clone()
467 );
468
469 let creating_streaming_job_tracker =
470 Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
471
472 let runtime = {
473 let mut builder = Builder::new_multi_thread();
474 if let Some(frontend_compute_runtime_worker_threads) =
475 config.batch.frontend_compute_runtime_worker_threads
476 {
477 builder.worker_threads(frontend_compute_runtime_worker_threads);
478 }
479 builder
480 .thread_name("rw-batch-local")
481 .enable_all()
482 .build()
483 .unwrap()
484 };
485 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
486
487 let sessions = sessions_map.clone();
488 let join_handle = tokio::spawn(async move {
490 let mut check_idle_txn_interval =
491 tokio::time::interval(core::time::Duration::from_secs(10));
492 check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
493 check_idle_txn_interval.reset();
494 loop {
495 check_idle_txn_interval.tick().await;
496 sessions.read().values().for_each(|session| {
497 let _ = session.check_idle_in_transaction_timeout();
498 })
499 }
500 });
501 join_handles.push(join_handle);
502
503 #[cfg(not(madsim))]
505 if config.batch.enable_spill {
506 SpillOp::clean_spill_directory()
507 .await
508 .map_err(|err| anyhow!(err))?;
509 }
510
511 let total_memory_bytes = opts.frontend_total_memory_bytes;
512 let heap_profiler =
513 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
514 heap_profiler.start();
516
517 let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
518 let mem_context = MemoryContext::root(
519 frontend_metrics.batch_total_mem.clone(),
520 batch_memory_limit as u64,
521 );
522 #[cfg(feature = "datafusion")]
523 let df_spillable_budget_ctx =
524 crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
525
526 let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
528 match PrometheusClient::try_from(endpoint.as_str()) {
529 Ok(client) => {
530 info!("Prometheus client initialized with endpoint: {}", endpoint);
531 Some(client)
532 }
533 Err(e) => {
534 error!(
535 error = %e.as_report(),
536 "failed to initialize Prometheus client",
537 );
538 None
539 }
540 }
541 } else {
542 None
543 };
544
545 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
547
548 info!(
549 "Frontend total_memory: {} batch_memory: {}",
550 convert(total_memory_bytes as _),
551 convert(batch_memory_limit as _),
552 );
553
554 Ok((
555 Self {
556 catalog_reader,
557 catalog_writer,
558 user_info_reader,
559 user_info_writer,
560 worker_node_manager,
561 meta_client: frontend_meta_client,
562 query_manager,
563 hummock_snapshot_manager,
564 system_params_manager,
565 session_params,
566 server_addr: frontend_address,
567 client_pool: compute_client_pool,
568 frontend_client_pool,
569 monitor_client_pool,
570 frontend_metrics,
571 cursor_metrics,
572 spill_metrics,
573 sessions_map,
574 batch_config: config.batch,
575 frontend_config: config.frontend,
576 meta_config: config.meta,
577 streaming_config: config.streaming,
578 udf_config: config.udf,
579 source_metrics,
580 creating_streaming_job_tracker,
581 compute_runtime,
582 mem_context,
583 #[cfg(feature = "datafusion")]
584 df_spillable_budget_ctx,
585 prometheus_client,
586 prometheus_selector,
587 },
588 join_handles,
589 shutdown_senders,
590 ))
591 }
592
593 fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
598 &*self.catalog_writer
599 }
600
601 pub fn catalog_reader(&self) -> &CatalogReader {
603 &self.catalog_reader
604 }
605
606 fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
611 &*self.user_info_writer
612 }
613
614 pub fn user_info_reader(&self) -> &UserInfoReader {
616 &self.user_info_reader
617 }
618
619 pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
620 self.worker_node_manager.clone()
621 }
622
623 pub fn meta_client(&self) -> &dyn FrontendMetaClient {
624 &*self.meta_client
625 }
626
627 pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
628 self.meta_client.clone()
629 }
630
631 pub fn query_manager(&self) -> &QueryManager {
632 &self.query_manager
633 }
634
635 pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
636 &self.hummock_snapshot_manager
637 }
638
639 pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
640 &self.system_params_manager
641 }
642
643 pub fn session_params_snapshot(&self) -> SessionConfig {
644 self.session_params.read_recursive().clone()
645 }
646
647 pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
649 self.prometheus_client.as_ref()
650 }
651
652 pub fn prometheus_selector(&self) -> &str {
654 &self.prometheus_selector
655 }
656
657 pub fn server_address(&self) -> &HostAddr {
658 &self.server_addr
659 }
660
661 pub fn client_pool(&self) -> ComputeClientPoolRef {
662 self.client_pool.clone()
663 }
664
665 pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
666 self.frontend_client_pool.clone()
667 }
668
669 pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
670 self.monitor_client_pool.clone()
671 }
672
673 pub fn batch_config(&self) -> &BatchConfig {
674 &self.batch_config
675 }
676
677 pub fn frontend_config(&self) -> &FrontendConfig {
678 &self.frontend_config
679 }
680
681 pub fn streaming_config(&self) -> &StreamingConfig {
682 &self.streaming_config
683 }
684
685 pub fn udf_config(&self) -> &UdfConfig {
686 &self.udf_config
687 }
688
689 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
690 self.source_metrics.clone()
691 }
692
693 pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
694 self.spill_metrics.clone()
695 }
696
697 pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
698 &self.creating_streaming_job_tracker
699 }
700
701 pub fn sessions_map(&self) -> &SessionMapRef {
702 &self.sessions_map
703 }
704
705 pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
706 self.compute_runtime.clone()
707 }
708
709 pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
712 cancel_queries_in_session(session_id, self.sessions_map.clone())
713 }
714
715 pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
718 cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
719 }
720
721 pub fn mem_context(&self) -> MemoryContext {
722 self.mem_context.clone()
723 }
724
725 #[cfg(feature = "datafusion")]
726 pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
727 self.df_spillable_budget_ctx.clone()
728 }
729}
730
731#[derive(Clone)]
732pub struct AuthContext {
733 pub database: String,
734 pub user_name: String,
735 pub user_id: UserId,
736}
737
738impl AuthContext {
739 pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
740 Self {
741 database,
742 user_name,
743 user_id,
744 }
745 }
746}
747pub struct SessionImpl {
748 env: FrontendEnv,
749 auth_context: Arc<RwLock<AuthContext>>,
750 user_authenticator: UserAuthenticator,
752 config_map: Arc<RwLock<SessionConfig>>,
754
755 notice_tx: UnboundedSender<String>,
757 notice_rx: Mutex<UnboundedReceiver<String>>,
759
760 id: (i32, i32),
762
763 peer_addr: AddressRef,
765
766 txn: Arc<Mutex<transaction::State>>,
770
771 current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
775
776 exec_context: Mutex<Option<Weak<ExecContext>>>,
778
779 last_idle_instant: Arc<Mutex<Option<Instant>>>,
781
782 cursor_manager: Arc<CursorManager>,
783
784 temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
786
787 staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
789}
790
791#[derive(Default, Clone)]
800pub struct TemporarySourceManager {
801 sources: HashMap<String, SourceCatalog>,
802}
803
804impl TemporarySourceManager {
805 pub fn new() -> Self {
806 Self {
807 sources: HashMap::new(),
808 }
809 }
810
811 pub fn create_source(&mut self, name: String, source: SourceCatalog) {
812 self.sources.insert(name, source);
813 }
814
815 pub fn drop_source(&mut self, name: &str) {
816 self.sources.remove(name);
817 }
818
819 pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
820 self.sources.get(name)
821 }
822
823 pub fn keys(&self) -> Vec<String> {
824 self.sources.keys().cloned().collect()
825 }
826}
827
828#[derive(Default, Clone)]
830pub struct StagingCatalogManager {
831 tables: HashMap<String, TableCatalog>,
833}
834
835impl StagingCatalogManager {
836 pub fn new() -> Self {
837 Self {
838 tables: HashMap::new(),
839 }
840 }
841
842 pub fn create_table(&mut self, name: String, table: TableCatalog) {
843 self.tables.insert(name, table);
844 }
845
846 pub fn drop_table(&mut self, name: &str) {
847 self.tables.remove(name);
848 }
849
850 pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
851 self.tables.get(name)
852 }
853}
854
855#[derive(Error, Debug)]
856pub enum CheckRelationError {
857 #[error("{0}")]
858 Resolve(#[from] ResolveQualifiedNameError),
859 #[error("{0}")]
860 Catalog(#[from] CatalogError),
861}
862
863impl From<CheckRelationError> for RwError {
864 fn from(e: CheckRelationError) -> Self {
865 match e {
866 CheckRelationError::Resolve(e) => e.into(),
867 CheckRelationError::Catalog(e) => e.into(),
868 }
869 }
870}
871
872impl SessionImpl {
873 pub(crate) fn new(
874 env: FrontendEnv,
875 auth_context: AuthContext,
876 user_authenticator: UserAuthenticator,
877 id: SessionId,
878 peer_addr: AddressRef,
879 session_config: SessionConfig,
880 ) -> Self {
881 let cursor_metrics = env.cursor_metrics.clone();
882 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
883
884 Self {
885 env,
886 auth_context: Arc::new(RwLock::new(auth_context)),
887 user_authenticator,
888 config_map: Arc::new(RwLock::new(session_config)),
889 id,
890 peer_addr,
891 txn: Default::default(),
892 current_query_cancel_flag: Mutex::new(None),
893 notice_tx,
894 notice_rx: Mutex::new(notice_rx),
895 exec_context: Mutex::new(None),
896 last_idle_instant: Default::default(),
897 cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
898 temporary_source_manager: Default::default(),
899 staging_catalog_manager: Default::default(),
900 }
901 }
902
903 #[cfg(test)]
904 pub fn mock() -> Self {
905 let env = FrontendEnv::mock();
906 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
907
908 Self {
909 env: FrontendEnv::mock(),
910 auth_context: Arc::new(RwLock::new(AuthContext::new(
911 DEFAULT_DATABASE_NAME.to_owned(),
912 DEFAULT_SUPER_USER.to_owned(),
913 DEFAULT_SUPER_USER_ID,
914 ))),
915 user_authenticator: UserAuthenticator::None,
916 config_map: Default::default(),
917 id: (0, 0),
919 txn: Default::default(),
920 current_query_cancel_flag: Mutex::new(None),
921 notice_tx,
922 notice_rx: Mutex::new(notice_rx),
923 exec_context: Mutex::new(None),
924 peer_addr: Address::Tcp(SocketAddr::new(
925 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
926 8080,
927 ))
928 .into(),
929 last_idle_instant: Default::default(),
930 cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
931 temporary_source_manager: Default::default(),
932 staging_catalog_manager: Default::default(),
933 }
934 }
935
936 pub(crate) fn env(&self) -> &FrontendEnv {
937 &self.env
938 }
939
940 pub fn auth_context(&self) -> Arc<AuthContext> {
941 let ctx = self.auth_context.read();
942 Arc::new(ctx.clone())
943 }
944
945 pub fn database(&self) -> String {
946 self.auth_context.read().database.clone()
947 }
948
949 pub fn database_id(&self) -> DatabaseId {
950 let db_name = self.database();
951 self.env
952 .catalog_reader()
953 .read_guard()
954 .get_database_by_name(&db_name)
955 .map(|db| db.id())
956 .expect("session database not found")
957 }
958
959 pub fn user_name(&self) -> String {
960 self.auth_context.read().user_name.clone()
961 }
962
963 pub fn user_id(&self) -> UserId {
964 self.auth_context.read().user_id
965 }
966
967 pub fn update_database(&self, database: String) {
968 self.auth_context.write().database = database;
969 }
970
971 pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
972 Arc::clone(&self.config_map)
973 }
974
975 pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
976 self.config_map.read()
977 }
978
979 pub fn set_config(&self, key: &str, value: String) -> Result<String> {
980 self.config_map
981 .write()
982 .set(key, value, &mut ())
983 .map_err(Into::into)
984 }
985
986 pub fn reset_config(&self, key: &str) -> Result<String> {
987 self.config_map
988 .write()
989 .reset(key, &mut ())
990 .map_err(Into::into)
991 }
992
993 pub fn set_config_report(
994 &self,
995 key: &str,
996 value: Option<String>,
997 mut reporter: impl ConfigReporter,
998 ) -> Result<String> {
999 if let Some(value) = value {
1000 self.config_map
1001 .write()
1002 .set(key, value, &mut reporter)
1003 .map_err(Into::into)
1004 } else {
1005 self.config_map
1006 .write()
1007 .reset(key, &mut reporter)
1008 .map_err(Into::into)
1009 }
1010 }
1011
1012 pub fn session_id(&self) -> SessionId {
1013 self.id
1014 }
1015
1016 pub fn running_sql(&self) -> Option<Arc<str>> {
1017 self.exec_context
1018 .lock()
1019 .as_ref()
1020 .and_then(|weak| weak.upgrade())
1021 .map(|context| context.running_sql.clone())
1022 }
1023
1024 pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1025 self.cursor_manager.clone()
1026 }
1027
1028 pub fn peer_addr(&self) -> &Address {
1029 &self.peer_addr
1030 }
1031
1032 pub fn elapse_since_running_sql(&self) -> Option<u128> {
1033 self.exec_context
1034 .lock()
1035 .as_ref()
1036 .and_then(|weak| weak.upgrade())
1037 .map(|context| context.last_instant.elapsed().as_millis())
1038 }
1039
1040 pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1041 self.last_idle_instant
1042 .lock()
1043 .as_ref()
1044 .map(|x| x.elapsed().as_millis())
1045 }
1046
1047 pub fn check_relation_name_duplicated(
1048 &self,
1049 name: ObjectName,
1050 stmt_type: StatementType,
1051 if_not_exists: bool,
1052 ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1053 let db_name = &self.database();
1054 let catalog_reader = self.env().catalog_reader().read_guard();
1055 let (schema_name, relation_name) = {
1056 let (schema_name, relation_name) =
1057 Binder::resolve_schema_qualified_name(db_name, &name)?;
1058 let search_path = self.config().search_path();
1059 let user_name = &self.user_name();
1060 let schema_name = match schema_name {
1061 Some(schema_name) => schema_name,
1062 None => catalog_reader
1063 .first_valid_schema(db_name, &search_path, user_name)?
1064 .name(),
1065 };
1066 (schema_name, relation_name)
1067 };
1068 match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1069 Err(e) if if_not_exists => {
1070 if let CatalogErrorInner::Duplicated {
1071 name,
1072 under_creation,
1073 ..
1074 } = e.inner()
1075 {
1076 if !*under_creation {
1081 Ok(Either::Right(
1082 PgResponse::builder(stmt_type)
1083 .notice(format!("relation \"{}\" already exists, skipping", name))
1084 .into(),
1085 ))
1086 } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1087 Ok(Either::Right(
1090 PgResponse::builder(stmt_type)
1091 .notice(format!(
1092 "relation \"{}\" already exists but still creating, skipping",
1093 name
1094 ))
1095 .into(),
1096 ))
1097 } else {
1098 Ok(Either::Left(()))
1099 }
1100 } else {
1101 Err(e.into())
1102 }
1103 }
1104 Err(e) => Err(e.into()),
1105 Ok(_) => Ok(Either::Left(())),
1106 }
1107 }
1108
1109 pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1110 let db_name = &self.database();
1111 let catalog_reader = self.env().catalog_reader().read_guard();
1112 let (schema_name, secret_name) = {
1113 let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1114 let search_path = self.config().search_path();
1115 let user_name = &self.user_name();
1116 let schema_name = match schema_name {
1117 Some(schema_name) => schema_name,
1118 None => catalog_reader
1119 .first_valid_schema(db_name, &search_path, user_name)?
1120 .name(),
1121 };
1122 (schema_name, secret_name)
1123 };
1124 catalog_reader
1125 .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1126 .map_err(RwError::from)
1127 }
1128
1129 pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1130 let db_name = &self.database();
1131 let catalog_reader = self.env().catalog_reader().read_guard();
1132 let (schema_name, connection_name) = {
1133 let (schema_name, connection_name) =
1134 Binder::resolve_schema_qualified_name(db_name, &name)?;
1135 let search_path = self.config().search_path();
1136 let user_name = &self.user_name();
1137 let schema_name = match schema_name {
1138 Some(schema_name) => schema_name,
1139 None => catalog_reader
1140 .first_valid_schema(db_name, &search_path, user_name)?
1141 .name(),
1142 };
1143 (schema_name, connection_name)
1144 };
1145 catalog_reader
1146 .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1147 .map_err(RwError::from)
1148 }
1149
1150 pub fn check_function_name_duplicated(
1151 &self,
1152 stmt_type: StatementType,
1153 name: ObjectName,
1154 arg_types: &[DataType],
1155 if_not_exists: bool,
1156 ) -> Result<Either<(), RwPgResponse>> {
1157 let db_name = &self.database();
1158 let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1159 let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1160
1161 let catalog_reader = self.env().catalog_reader().read_guard();
1162 if catalog_reader
1163 .get_schema_by_id(database_id, schema_id)?
1164 .get_function_by_name_args(&function_name, arg_types)
1165 .is_some()
1166 {
1167 let full_name = format!(
1168 "{function_name}({})",
1169 arg_types.iter().map(|t| t.to_string()).join(",")
1170 );
1171 if if_not_exists {
1172 Ok(Either::Right(
1173 PgResponse::builder(stmt_type)
1174 .notice(format!(
1175 "function \"{}\" already exists, skipping",
1176 full_name
1177 ))
1178 .into(),
1179 ))
1180 } else {
1181 Err(CatalogError::duplicated("function", full_name).into())
1182 }
1183 } else {
1184 Ok(Either::Left(()))
1185 }
1186 }
1187
1188 pub fn get_database_and_schema_id_for_create(
1190 &self,
1191 schema_name: Option<String>,
1192 ) -> Result<(DatabaseId, SchemaId)> {
1193 let db_name = &self.database();
1194
1195 let search_path = self.config().search_path();
1196 let user_name = &self.user_name();
1197
1198 let catalog_reader = self.env().catalog_reader().read_guard();
1199 let schema = match schema_name {
1200 Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1201 None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1202 };
1203 let schema_name = schema.name();
1204
1205 check_schema_writable(&schema_name)?;
1206 self.check_privileges(&[ObjectCheckItem::new(
1207 schema.owner(),
1208 AclMode::Create,
1209 schema_name,
1210 schema.id(),
1211 )])?;
1212
1213 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1214 Ok((db_id, schema.id()))
1215 }
1216
1217 pub fn get_connection_by_name(
1218 &self,
1219 schema_name: Option<String>,
1220 connection_name: &str,
1221 ) -> Result<Arc<ConnectionCatalog>> {
1222 let db_name = &self.database();
1223 let search_path = self.config().search_path();
1224 let user_name = &self.user_name();
1225
1226 let catalog_reader = self.env().catalog_reader().read_guard();
1227 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1228 let (connection, _) =
1229 catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1230
1231 self.check_privileges(&[ObjectCheckItem::new(
1232 connection.owner(),
1233 AclMode::Usage,
1234 connection.name.clone(),
1235 connection.id,
1236 )])?;
1237
1238 Ok(connection.clone())
1239 }
1240
1241 pub fn get_subscription_by_schema_id_name(
1242 &self,
1243 schema_id: SchemaId,
1244 subscription_name: &str,
1245 ) -> Result<Arc<SubscriptionCatalog>> {
1246 let db_name = &self.database();
1247
1248 let catalog_reader = self.env().catalog_reader().read_guard();
1249 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1250 let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1251 let subscription = schema
1252 .get_subscription_by_name(subscription_name)
1253 .ok_or_else(|| {
1254 RwError::from(ErrorCode::ItemNotFound(format!(
1255 "subscription {} not found",
1256 subscription_name
1257 )))
1258 })?;
1259 Ok(subscription.clone())
1260 }
1261
1262 pub fn get_subscription_by_name(
1263 &self,
1264 schema_name: Option<String>,
1265 subscription_name: &str,
1266 ) -> Result<Arc<SubscriptionCatalog>> {
1267 let db_name = &self.database();
1268 let search_path = self.config().search_path();
1269 let user_name = &self.user_name();
1270
1271 let catalog_reader = self.env().catalog_reader().read_guard();
1272 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1273 let (subscription, _) =
1274 catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1275 Ok(subscription.clone())
1276 }
1277
1278 pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1279 let catalog_reader = self.env().catalog_reader().read_guard();
1280 Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1281 }
1282
1283 pub fn get_table_by_name(
1284 &self,
1285 table_name: &str,
1286 db_id: DatabaseId,
1287 schema_id: SchemaId,
1288 ) -> Result<Arc<TableCatalog>> {
1289 let catalog_reader = self.env().catalog_reader().read_guard();
1290 let table = catalog_reader
1291 .get_schema_by_id(db_id, schema_id)?
1292 .get_created_table_by_name(table_name)
1293 .ok_or_else(|| {
1294 Error::new(
1295 ErrorKind::InvalidInput,
1296 format!("table \"{}\" does not exist", table_name),
1297 )
1298 })?;
1299
1300 self.check_privileges(&[ObjectCheckItem::new(
1301 table.owner(),
1302 AclMode::Select,
1303 table_name.to_owned(),
1304 table.id,
1305 )])?;
1306
1307 Ok(table.clone())
1308 }
1309
1310 pub fn get_secret_by_name(
1311 &self,
1312 schema_name: Option<String>,
1313 secret_name: &str,
1314 ) -> Result<Arc<SecretCatalog>> {
1315 let db_name = &self.database();
1316 let search_path = self.config().search_path();
1317 let user_name = &self.user_name();
1318
1319 let catalog_reader = self.env().catalog_reader().read_guard();
1320 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1321 let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1322
1323 self.check_privileges(&[ObjectCheckItem::new(
1324 secret.owner(),
1325 AclMode::Usage,
1326 secret.name.clone(),
1327 secret.id,
1328 )])?;
1329
1330 Ok(secret.clone())
1331 }
1332
1333 pub async fn list_change_log_epochs(
1334 &self,
1335 table_id: TableId,
1336 min_epoch: u64,
1337 max_count: u32,
1338 ) -> Result<Vec<u64>> {
1339 let Some(max_epoch) = self
1340 .env
1341 .hummock_snapshot_manager()
1342 .acquire()
1343 .version()
1344 .state_table_info
1345 .info()
1346 .get(&table_id)
1347 .map(|s| s.committed_epoch)
1348 else {
1349 return Ok(vec![]);
1350 };
1351 let ret = self
1352 .env
1353 .meta_client_ref()
1354 .get_hummock_table_change_log(
1355 Some(min_epoch),
1356 Some(max_epoch),
1357 Some(iter::once(table_id).collect()),
1358 true,
1359 Some(max_count),
1360 )
1361 .await?;
1362 let Some(e) = ret.get(&table_id) else {
1363 return Ok(vec![]);
1364 };
1365 Ok(e.iter()
1366 .flat_map(|l| l.epochs())
1367 .filter(|e| *e >= min_epoch && *e <= max_epoch)
1368 .take(max_count as usize)
1369 .collect())
1370 }
1371
1372 pub fn clear_cancel_query_flag(&self) {
1373 let mut flag = self.current_query_cancel_flag.lock();
1374 *flag = None;
1375 }
1376
1377 pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1378 let mut flag = self.current_query_cancel_flag.lock();
1379 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1380 *flag = Some(shutdown_tx);
1381 shutdown_rx
1382 }
1383
1384 pub fn set_cancel_query_flag(&self, shutdown_tx: ShutdownSender) {
1385 let mut flag = self.current_query_cancel_flag.lock();
1386 *flag = Some(shutdown_tx);
1387 }
1388
1389 pub fn cancel_current_query(&self) {
1390 let mut flag_guard = self.current_query_cancel_flag.lock();
1391 if let Some(sender) = flag_guard.take() {
1392 info!("Trying to cancel query in local mode.");
1393 sender.cancel();
1395 info!("Cancel query request sent.");
1396 }
1397 info!("Trying to cancel query in distributed mode.");
1398 self.env.query_manager().cancel_queries_in_session(self.id)
1399 }
1400
1401 pub fn cancel_current_creating_job(&self) {
1402 self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1403 }
1404
1405 pub async fn run_statement(
1408 self: Arc<Self>,
1409 sql: Arc<str>,
1410 formats: Vec<Format>,
1411 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1412 let mut stmts = Parser::parse_sql(&sql)?;
1414 if stmts.is_empty() {
1415 return Ok(PgResponse::empty_result(
1416 pgwire::pg_response::StatementType::EMPTY,
1417 ));
1418 }
1419 if stmts.len() > 1 {
1420 return Ok(
1421 PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1422 .notice("cannot insert multiple commands into statement")
1423 .into(),
1424 );
1425 }
1426 let stmt = stmts.swap_remove(0);
1427 let rsp = Box::pin(handle(self, stmt, sql.clone(), formats)).await?;
1428 Ok(rsp)
1429 }
1430
1431 pub fn notice_to_user(&self, str: impl Into<String>) {
1432 let notice = str.into();
1433 tracing::trace!(notice, "notice to user");
1434 self.notice_tx
1435 .send(notice)
1436 .expect("notice channel should not be closed");
1437 }
1438
1439 pub fn is_barrier_read(&self) -> bool {
1440 match self.config().visibility_mode() {
1441 VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1442 VisibilityMode::All => true,
1443 VisibilityMode::Checkpoint => false,
1444 }
1445 }
1446
1447 pub fn statement_timeout(&self) -> Duration {
1448 if self.config().statement_timeout().millis() == 0 {
1449 Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1450 } else {
1451 Duration::from_millis(self.config().statement_timeout().millis() as u64)
1452 }
1453 }
1454
1455 pub fn create_temporary_source(&self, source: SourceCatalog) {
1456 self.temporary_source_manager
1457 .lock()
1458 .create_source(source.name.clone(), source);
1459 }
1460
1461 pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1462 self.temporary_source_manager
1463 .lock()
1464 .get_source(name)
1465 .cloned()
1466 }
1467
1468 pub fn drop_temporary_source(&self, name: &str) {
1469 self.temporary_source_manager.lock().drop_source(name);
1470 }
1471
1472 pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1473 self.temporary_source_manager.lock().clone()
1474 }
1475
1476 pub fn create_staging_table(&self, table: TableCatalog) {
1477 self.staging_catalog_manager
1478 .lock()
1479 .create_table(table.name.clone(), table);
1480 }
1481
1482 pub fn drop_staging_table(&self, name: &str) {
1483 self.staging_catalog_manager.lock().drop_table(name);
1484 }
1485
1486 pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1487 self.staging_catalog_manager.lock().clone()
1488 }
1489
1490 pub async fn check_cluster_limits(&self) -> Result<()> {
1491 if self.config().bypass_cluster_limits() {
1492 return Ok(());
1493 }
1494
1495 let gen_message = |ActorCountPerParallelism {
1496 worker_id_to_actor_count,
1497 hard_limit,
1498 soft_limit,
1499 }: ActorCountPerParallelism,
1500 exceed_hard_limit: bool|
1501 -> String {
1502 let (limit_type, action) = if exceed_hard_limit {
1503 ("critical", "Scale the cluster immediately to proceed.")
1504 } else {
1505 (
1506 "recommended",
1507 "Consider scaling the cluster for optimal performance.",
1508 )
1509 };
1510 format!(
1511 r#"Actor count per parallelism exceeds the {limit_type} limit.
1512
1513Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1514
1515HINT:
1516- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1517- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1518 See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1519- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1520
1521DETAILS:
1522- hard limit: {hard_limit}
1523- soft limit: {soft_limit}
1524- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1525 )
1526 };
1527
1528 let limits = self.env().meta_client().get_cluster_limits().await?;
1529 for limit in limits {
1530 match limit {
1531 cluster_limit::ClusterLimit::ActorCount(l) => {
1532 if l.exceed_hard_limit() {
1533 return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1534 l, true,
1535 ))));
1536 } else if l.exceed_soft_limit() {
1537 self.notice_to_user(gen_message(l, false));
1538 }
1539 }
1540 }
1541 }
1542 Ok(())
1543 }
1544}
1545
1546pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1547 std::sync::OnceLock::new();
1548
1549pub struct SessionManagerImpl {
1550 env: FrontendEnv,
1551 _join_handles: Vec<JoinHandle<()>>,
1552 _shutdown_senders: Vec<Sender<()>>,
1553 number: AtomicI32,
1554}
1555
1556impl SessionManager for SessionManagerImpl {
1557 type Error = RwError;
1558 type Session = SessionImpl;
1559
1560 fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1561 let dummy_addr = Address::Tcp(SocketAddr::new(
1562 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1563 5691, ));
1565
1566 self.connect_inner(
1569 database_id,
1570 risingwave_common::catalog::DEFAULT_SUPER_USER,
1571 Arc::new(dummy_addr),
1572 )
1573 }
1574
1575 fn connect(
1576 &self,
1577 database: &str,
1578 user_name: &str,
1579 peer_addr: AddressRef,
1580 ) -> Result<Arc<Self::Session>> {
1581 let catalog_reader = self.env.catalog_reader();
1582 let reader = catalog_reader.read_guard();
1583 let database_id = reader.get_database_by_name(database)?.id();
1584
1585 self.connect_inner(database_id, user_name, peer_addr)
1586 }
1587
1588 fn cancel_queries_in_session(&self, session_id: SessionId) {
1590 self.env.cancel_queries_in_session(session_id);
1591 }
1592
1593 fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1594 self.env.cancel_creating_jobs_in_session(session_id);
1595 }
1596
1597 fn end_session(&self, session: &Self::Session) {
1598 self.delete_session(&session.session_id());
1599 }
1600
1601 async fn shutdown(&self) {
1602 self.env.sessions_map().write().clear();
1604 self.env.meta_client().try_unregister().await;
1606 }
1607}
1608
1609impl SessionManagerImpl {
1610 pub async fn new(opts: FrontendOpts) -> Result<Self> {
1611 let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1613 Ok(Self {
1614 env,
1615 _join_handles: join_handles,
1616 _shutdown_senders: shutdown_senders,
1617 number: AtomicI32::new(0),
1618 })
1619 }
1620
1621 pub(crate) fn env(&self) -> &FrontendEnv {
1622 &self.env
1623 }
1624
1625 fn insert_session(&self, session: Arc<SessionImpl>) {
1626 let active_sessions = {
1627 let mut write_guard = self.env.sessions_map.write();
1628 write_guard.insert(session.id(), session);
1629 write_guard.len()
1630 };
1631 self.env
1632 .frontend_metrics
1633 .active_sessions
1634 .set(active_sessions as i64);
1635 }
1636
1637 fn delete_session(&self, session_id: &SessionId) {
1638 let active_sessions = {
1639 let mut write_guard = self.env.sessions_map.write();
1640 write_guard.remove(session_id);
1641 write_guard.len()
1642 };
1643 self.env
1644 .frontend_metrics
1645 .active_sessions
1646 .set(active_sessions as i64);
1647 }
1648
1649 fn connect_inner(
1650 &self,
1651 database_id: DatabaseId,
1652 user_name: &str,
1653 peer_addr: AddressRef,
1654 ) -> Result<Arc<SessionImpl>> {
1655 let catalog_reader = self.env.catalog_reader();
1656 let reader = catalog_reader.read_guard();
1657 let (database_name, database_owner) = {
1658 let db = reader.get_database_by_id(database_id)?;
1659 (db.name(), db.owner())
1660 };
1661
1662 let user_reader = self.env.user_info_reader();
1663 let reader = user_reader.read_guard();
1664 if let Some(user) = reader.get_user_by_name(user_name) {
1665 if !user.can_login {
1666 bail_permission_denied!("User {} is not allowed to login", user_name);
1667 }
1668 let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1669 if !user.is_super && database_owner != user.id && !has_privilege {
1670 bail_permission_denied!("User does not have CONNECT privilege.");
1671 }
1672
1673 let (connection_type, client_addr) = match peer_addr.as_ref() {
1675 Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1676 Address::Unix(_) => (ConnectionType::Local, None),
1677 };
1678 tracing::debug!(
1679 "receive connection: type={:?}, client_addr={:?}",
1680 connection_type,
1681 client_addr
1682 );
1683
1684 let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1685 &connection_type,
1686 database_name,
1687 user_name,
1688 client_addr,
1689 );
1690
1691 let Some(hba_entry_opt) = hba_entry_opt else {
1692 bail_permission_denied!(
1693 "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1694 );
1695 };
1696
1697 let authenticator_by_info = || -> Result<UserAuthenticator> {
1699 let authenticator = match &user.auth_info {
1700 None => UserAuthenticator::None,
1701 Some(auth_info) => match auth_info.encryption_type() {
1702 EncryptionType::Plaintext => {
1703 UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1704 }
1705 EncryptionType::Md5 => {
1706 let mut salt = [0; 4];
1707 let mut rng = rand::rng();
1708 rng.fill_bytes(&mut salt);
1709 UserAuthenticator::Md5WithSalt {
1710 encrypted_password: md5_hash_with_salt(
1711 &auth_info.encrypted_value,
1712 &salt,
1713 ),
1714 salt,
1715 }
1716 }
1717 EncryptionType::Oauth => UserAuthenticator::OAuth {
1718 metadata: auth_info.metadata.clone(),
1719 cluster_id: self.env.meta_client().cluster_id().to_owned(),
1720 },
1721 _ => {
1722 bail_protocol_error!(
1723 "Unsupported auth type: {}",
1724 auth_info.encryption_type().as_str_name()
1725 );
1726 }
1727 },
1728 };
1729 Ok(authenticator)
1730 };
1731
1732 let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1733 (AuthMethod::Trust, _) => UserAuthenticator::None,
1734 (AuthMethod::Password, _) => authenticator_by_info()?,
1736 (AuthMethod::Md5, Some(auth_info))
1737 if auth_info.encryption_type() == EncryptionType::Md5 =>
1738 {
1739 authenticator_by_info()?
1740 }
1741 (AuthMethod::OAuth, Some(auth_info))
1742 if auth_info.encryption_type() == EncryptionType::Oauth =>
1743 {
1744 authenticator_by_info()?
1745 }
1746 (AuthMethod::Ldap, _) => {
1747 UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1748 }
1749 _ => {
1750 bail_permission_denied!(
1751 "password authentication failed for user \"{user_name}\""
1752 );
1753 }
1754 };
1755
1756 let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1758 let id = (secret_key, secret_key);
1760 let session_config = self.env.session_params_snapshot();
1762
1763 let session_impl: Arc<SessionImpl> = SessionImpl::new(
1764 self.env.clone(),
1765 AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1766 user_authenticator,
1767 id,
1768 peer_addr,
1769 session_config,
1770 )
1771 .into();
1772 self.insert_session(session_impl.clone());
1773
1774 Ok(session_impl)
1775 } else {
1776 bail_catalog_error!("Role {} does not exist", user_name);
1777 }
1778 }
1779}
1780
1781impl Session for SessionImpl {
1782 type Error = RwError;
1783 type Portal = Portal;
1784 type PreparedStatement = PrepareStatement;
1785 type ValuesStream = PgResponseStream;
1786
1787 async fn run_one_query(
1790 self: Arc<Self>,
1791 stmt: Statement,
1792 format: Format,
1793 ) -> Result<PgResponse<PgResponseStream>> {
1794 let string = stmt.to_string();
1795 let sql_str = string.as_str();
1796 let sql: Arc<str> = Arc::from(sql_str);
1797 drop(string);
1799 let rsp = Box::pin(handle(self, stmt, sql, vec![format])).await?;
1800 Ok(rsp)
1801 }
1802
1803 fn user_authenticator(&self) -> &UserAuthenticator {
1804 &self.user_authenticator
1805 }
1806
1807 fn id(&self) -> SessionId {
1808 self.id
1809 }
1810
1811 async fn parse(
1812 self: Arc<Self>,
1813 statement: Option<Statement>,
1814 params_types: Vec<Option<DataType>>,
1815 ) -> Result<PrepareStatement> {
1816 Ok(if let Some(statement) = statement {
1817 handle_parse(self, statement, params_types).await?
1818 } else {
1819 PrepareStatement::Empty
1820 })
1821 }
1822
1823 fn bind(
1824 self: Arc<Self>,
1825 prepare_statement: PrepareStatement,
1826 params: Vec<Option<Bytes>>,
1827 param_formats: Vec<Format>,
1828 result_formats: Vec<Format>,
1829 ) -> Result<Portal> {
1830 handle_bind(prepare_statement, params, param_formats, result_formats)
1831 }
1832
1833 async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1834 let rsp = Box::pin(handle_execute(self, portal)).await?;
1835 Ok(rsp)
1836 }
1837
1838 fn describe_statement(
1839 self: Arc<Self>,
1840 prepare_statement: PrepareStatement,
1841 ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1842 Ok(match prepare_statement {
1843 PrepareStatement::Empty => (vec![], vec![]),
1844 PrepareStatement::Prepared(prepare_statement) => (
1845 prepare_statement.bound_result.param_types,
1846 infer(
1847 Some(prepare_statement.bound_result.bound),
1848 prepare_statement.statement,
1849 )?,
1850 ),
1851 PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1852 })
1853 }
1854
1855 fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1856 match portal {
1857 Portal::Empty => Ok(vec![]),
1858 Portal::Portal(portal) => {
1859 let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1860 let formats = FormatIterator::new(&portal.result_formats, columns.len())
1861 .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1862 columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1863 if f == Format::Binary {
1864 c.set_to_binary()
1865 }
1866 });
1867 Ok(columns)
1868 }
1869 Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1870 }
1871 }
1872
1873 fn get_config(&self, key: &str) -> Result<String> {
1874 self.config().get(key).map_err(Into::into)
1875 }
1876
1877 fn set_config(&self, key: &str, value: String) -> Result<String> {
1878 Self::set_config(self, key, value)
1879 }
1880
1881 async fn next_notice(self: &Arc<Self>) -> String {
1882 std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1883 .await
1884 .expect("notice channel should not be closed")
1885 }
1886
1887 fn transaction_status(&self) -> TransactionStatus {
1888 match &*self.txn.lock() {
1889 transaction::State::Initial | transaction::State::Implicit(_) => {
1890 TransactionStatus::Idle
1891 }
1892 transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1893 }
1895 }
1896
1897 fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1899 let exec_context = Arc::new(ExecContext {
1900 running_sql: sql,
1901 last_instant: Instant::now(),
1902 last_idle_instant: self.last_idle_instant.clone(),
1903 });
1904 *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1905 *self.last_idle_instant.lock() = None;
1907 ExecContextGuard::new(exec_context)
1908 }
1909
1910 fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1913 if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1915 let idle_in_transaction_session_timeout =
1916 self.config().idle_in_transaction_session_timeout() as u128;
1917 if idle_in_transaction_session_timeout != 0 {
1919 let guard = self.exec_context.lock();
1921 if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1923 if let Some(elapse_since_last_idle_instant) =
1925 self.elapse_since_last_idle_instant()
1926 && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1927 {
1928 return Err(PsqlError::IdleInTxnTimeout);
1929 }
1930 }
1931 }
1932 }
1933 Ok(())
1934 }
1935
1936 fn user(&self) -> String {
1937 self.user_name()
1938 }
1939}
1940
1941fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1943 match stmt {
1944 Statement::Query(_)
1945 | Statement::Insert { .. }
1946 | Statement::Delete { .. }
1947 | Statement::Update { .. }
1948 | Statement::FetchCursor { .. } => Ok(bound
1949 .unwrap()
1950 .output_fields()
1951 .iter()
1952 .map(to_pg_field)
1953 .collect()),
1954 Statement::ShowObjects {
1955 object: show_object,
1956 ..
1957 } => Ok(infer_show_object(&show_object)),
1958 Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1959 Statement::ShowTransactionIsolationLevel => {
1960 let name = "transaction_isolation";
1961 Ok(infer_show_variable(name))
1962 }
1963 Statement::ShowVariable { variable } => {
1964 let name = &variable[0].real_value().to_lowercase();
1965 Ok(infer_show_variable(name))
1966 }
1967 Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1968 Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1969 "QUERY PLAN".to_owned(),
1970 DataType::Varchar.to_oid(),
1971 DataType::Varchar.type_len(),
1972 )]),
1973 _ => Ok(vec![]),
1974 }
1975}
1976
1977pub struct WorkerProcessId {
1978 pub worker_id: WorkerId,
1979 pub process_id: i32,
1980}
1981
1982impl WorkerProcessId {
1983 pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1984 Self {
1985 worker_id,
1986 process_id,
1987 }
1988 }
1989}
1990
1991impl Display for WorkerProcessId {
1992 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1993 write!(f, "{}:{}", self.worker_id, self.process_id)
1994 }
1995}
1996
1997impl TryFrom<String> for WorkerProcessId {
1998 type Error = String;
1999
2000 fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
2001 const INVALID: &str = "invalid WorkerProcessId";
2002 let splits: Vec<&str> = worker_process_id.split(":").collect();
2003 if splits.len() != 2 {
2004 return Err(INVALID.to_owned());
2005 }
2006 let Ok(worker_id) = splits[0].parse::<u32>() else {
2007 return Err(INVALID.to_owned());
2008 };
2009 let Ok(process_id) = splits[1].parse::<i32>() else {
2010 return Err(INVALID.to_owned());
2011 };
2012 Ok(WorkerProcessId::new(worker_id.into(), process_id))
2013 }
2014}
2015
2016pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2017 let guard = sessions_map.read();
2018 if let Some(session) = guard.get(&session_id) {
2019 session.cancel_current_query();
2020 true
2021 } else {
2022 info!("Current session finished, ignoring cancel query request");
2023 false
2024 }
2025}
2026
2027pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2028 let guard = sessions_map.read();
2029 if let Some(session) = guard.get(&session_id) {
2030 session.cancel_current_creating_job();
2031 true
2032 } else {
2033 info!("Current session finished, ignoring cancel creating request");
2034 false
2035 }
2036}