1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::iter;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicI32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use bytes::Bytes;
26use either::Either;
27use itertools::Itertools;
28use parking_lot::{Mutex, RwLock, RwLockReadGuard};
29use pgwire::error::{PsqlError, PsqlResult};
30use pgwire::net::{Address, AddressRef};
31use pgwire::pg_field_descriptor::PgFieldDescriptor;
32use pgwire::pg_message::TransactionStatus;
33use pgwire::pg_response::{PgResponse, StatementType};
34use pgwire::pg_server::{
35 BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
36 UserAuthenticator,
37};
38use pgwire::types::{Format, FormatIterator};
39use prometheus_http_query::Client as PrometheusClient;
40use rand::RngCore;
41use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
42use risingwave_batch::spill::spill_op::SpillOp;
43use risingwave_batch::task::{ShutdownSender, ShutdownToken};
44use risingwave_batch::worker_manager::worker_node_manager::{
45 WorkerNodeManager, WorkerNodeManagerRef,
46};
47use risingwave_common::acl::AclMode;
48#[cfg(test)]
49use risingwave_common::catalog::{
50 DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
51};
52use risingwave_common::config::{
53 AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
54 RpcClientConfig, StreamingConfig, UdfConfig, load_config,
55};
56use risingwave_common::id::WorkerId;
57use risingwave_common::memory::MemoryContext;
58use risingwave_common::secret::LocalSecretManager;
59use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
60use risingwave_common::system_param::local_manager::{
61 LocalSystemParamsManager, LocalSystemParamsManagerRef,
62};
63use risingwave_common::telemetry::manager::TelemetryManager;
64use risingwave_common::telemetry::telemetry_env_enabled;
65use risingwave_common::types::DataType;
66use risingwave_common::util::addr::HostAddr;
67use risingwave_common::util::cluster_limit;
68use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
69use risingwave_common::util::iter_util::ZipEqFast;
70use risingwave_common::util::pretty_bytes::convert;
71use risingwave_common::util::runtime::BackgroundShutdownRuntime;
72use risingwave_common::{GIT_SHA, RW_VERSION};
73use risingwave_common_heap_profiling::HeapProfiler;
74use risingwave_common_service::{MetricsManager, ObserverManager};
75use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
76use risingwave_pb::common::WorkerType;
77use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
78use risingwave_pb::configured_monitor_service_server;
79use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
80use risingwave_pb::health::health_server::HealthServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::auth_info::EncryptionType;
83use risingwave_rpc_client::{
84 ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
85 MonitorClientPool, MonitorClientPoolRef,
86};
87use risingwave_sqlparser::ast::{ObjectName, Statement};
88use risingwave_sqlparser::parser::Parser;
89use thiserror::Error;
90use thiserror_ext::AsReport;
91use tokio::runtime::Builder;
92use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
93use tokio::sync::oneshot::Sender;
94use tokio::sync::watch;
95use tokio::task::JoinHandle;
96use tracing::{error, info};
97
98use self::cursor_manager::CursorManager;
99use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
100use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
101use crate::catalog::connection_catalog::ConnectionCatalog;
102use crate::catalog::root_catalog::{Catalog, SchemaPath};
103use crate::catalog::secret_catalog::SecretCatalog;
104use crate::catalog::source_catalog::SourceCatalog;
105use crate::catalog::subscription_catalog::SubscriptionCatalog;
106use crate::catalog::{
107 CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
108 check_schema_writable,
109};
110use crate::error::{
111 ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
112};
113use crate::handler::describe::infer_describe;
114use crate::handler::extended_handle::{
115 Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
116};
117use crate::handler::privilege::ObjectCheckItem;
118use crate::handler::show::{infer_show_create_object, infer_show_object};
119use crate::handler::util::to_pg_field;
120use crate::handler::variable::infer_show_variable;
121use crate::handler::{RwPgResponse, handle};
122use crate::health_service::HealthServiceImpl;
123use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
124use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
125use crate::observer::FrontendObserverNode;
126use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
127use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
128use crate::scheduler::{
129 DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
130 HummockSnapshotManagerRef, QueryManager,
131};
132use crate::telemetry::FrontendTelemetryCreator;
133use crate::user::UserId;
134use crate::user::user_authentication::md5_hash_with_salt;
135use crate::user::user_manager::UserInfoManager;
136use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
137use crate::{FrontendOpts, PgResponseStream, TableCatalog};
138
139pub(crate) mod current;
140pub(crate) mod cursor_manager;
141pub(crate) mod transaction;
142
143#[derive(Clone)]
145pub(crate) struct FrontendEnv {
146 meta_client: Arc<dyn FrontendMetaClient>,
149 catalog_writer: Arc<dyn CatalogWriter>,
150 catalog_reader: CatalogReader,
151 user_info_writer: Arc<dyn UserInfoWriter>,
152 user_info_reader: UserInfoReader,
153 worker_node_manager: WorkerNodeManagerRef,
154 query_manager: QueryManager,
155 hummock_snapshot_manager: HummockSnapshotManagerRef,
156 system_params_manager: LocalSystemParamsManagerRef,
157 session_params: Arc<RwLock<SessionConfig>>,
158
159 server_addr: HostAddr,
160 client_pool: ComputeClientPoolRef,
161 frontend_client_pool: FrontendClientPoolRef,
162 monitor_client_pool: MonitorClientPoolRef,
163
164 sessions_map: SessionMapRef,
168
169 pub frontend_metrics: Arc<FrontendMetrics>,
170
171 pub cursor_metrics: Arc<CursorMetrics>,
172
173 source_metrics: Arc<SourceMetrics>,
174
175 spill_metrics: Arc<BatchSpillMetrics>,
177
178 batch_config: BatchConfig,
179 frontend_config: FrontendConfig,
180 #[expect(dead_code)]
181 meta_config: MetaConfig,
182 streaming_config: StreamingConfig,
183 udf_config: UdfConfig,
184
185 creating_streaming_job_tracker: StreamingJobTrackerRef,
188
189 compute_runtime: Arc<BackgroundShutdownRuntime>,
192
193 mem_context: MemoryContext,
195
196 #[cfg(feature = "datafusion")]
197 df_spillable_budget_ctx: MemoryContext,
202
203 serverless_backfill_controller_addr: String,
205
206 prometheus_client: Option<PrometheusClient>,
208
209 prometheus_selector: String,
211}
212
213pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
215
216const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
218
219impl FrontendEnv {
220 pub fn mock() -> Self {
221 use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
222
223 let catalog = Arc::new(RwLock::new(Catalog::default()));
224 let meta_client = Arc::new(MockFrontendMetaClient {});
225 let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
226 let catalog_writer = Arc::new(MockCatalogWriter::new(
227 catalog.clone(),
228 hummock_snapshot_manager.clone(),
229 ));
230 let catalog_reader = CatalogReader::new(catalog);
231 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
232 let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
233 let user_info_reader = UserInfoReader::new(user_info_manager);
234 let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
235 let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
236 let compute_client_pool = Arc::new(ComputeClientPool::for_test());
237 let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
238 let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
239 let query_manager = QueryManager::new(
240 worker_node_manager.clone(),
241 compute_client_pool,
242 catalog_reader.clone(),
243 Arc::new(DistributedQueryMetrics::for_test()),
244 None,
245 None,
246 );
247 let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
248 let client_pool = Arc::new(ComputeClientPool::for_test());
249 let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
250 let runtime = {
251 let mut builder = Builder::new_multi_thread();
252 if let Some(frontend_compute_runtime_worker_threads) =
253 load_config("", FrontendOpts::default())
254 .batch
255 .frontend_compute_runtime_worker_threads
256 {
257 builder.worker_threads(frontend_compute_runtime_worker_threads);
258 }
259 builder
260 .thread_name("rw-batch-local")
261 .enable_all()
262 .build()
263 .unwrap()
264 };
265 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
266 let sessions_map = Arc::new(RwLock::new(HashMap::new()));
267 Self {
268 meta_client,
269 catalog_writer,
270 catalog_reader,
271 user_info_writer,
272 user_info_reader,
273 worker_node_manager,
274 query_manager,
275 hummock_snapshot_manager,
276 system_params_manager,
277 session_params: Default::default(),
278 server_addr,
279 client_pool,
280 frontend_client_pool,
281 monitor_client_pool,
282 sessions_map,
283 frontend_metrics: Arc::new(FrontendMetrics::for_test()),
284 cursor_metrics: Arc::new(CursorMetrics::for_test()),
285 batch_config: BatchConfig::default(),
286 frontend_config: FrontendConfig::default(),
287 meta_config: MetaConfig::default(),
288 streaming_config: StreamingConfig::default(),
289 udf_config: UdfConfig::default(),
290 source_metrics: Arc::new(SourceMetrics::default()),
291 spill_metrics: BatchSpillMetrics::for_test(),
292 creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
293 compute_runtime,
294 mem_context: MemoryContext::none(),
295 #[cfg(feature = "datafusion")]
296 df_spillable_budget_ctx: MemoryContext::none(),
297 serverless_backfill_controller_addr: Default::default(),
298 prometheus_client: None,
299 prometheus_selector: String::new(),
300 }
301 }
302
303 pub(crate) fn set_frontend_config_for_test(&mut self, frontend_config: FrontendConfig) {
304 self.frontend_config = frontend_config;
305 }
306
307 pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
308 let config = load_config(&opts.config_path, &opts);
309 info!("Starting frontend node");
310 info!("> config: {:?}", config);
311 info!(
312 "> debug assertions: {}",
313 if cfg!(debug_assertions) { "on" } else { "off" }
314 );
315 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
316
317 let frontend_address: HostAddr = opts
318 .advertise_addr
319 .as_ref()
320 .unwrap_or_else(|| {
321 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
322 &opts.listen_addr
323 })
324 .parse()
325 .unwrap();
326 info!("advertise addr is {}", frontend_address);
327
328 let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
329 let internal_rpc_host_addr = HostAddr {
330 host: frontend_address.host.clone(),
332 port: rpc_addr.port,
333 };
334 let (meta_client, system_params_reader) = MetaClient::register_new(
336 opts.meta_addr,
337 WorkerType::Frontend,
338 &frontend_address,
339 AddWorkerNodeProperty {
340 internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
341 ..Default::default()
342 },
343 Arc::new(config.meta.clone()),
344 )
345 .await;
346
347 let worker_id = meta_client.worker_id();
348 info!("Assigned worker node id {}", worker_id);
349
350 let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
351 meta_client.clone(),
352 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
353 );
354 let mut join_handles = vec![heartbeat_join_handle];
355 let mut shutdown_senders = vec![heartbeat_shutdown_sender];
356
357 let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
358 let hummock_snapshot_manager =
359 Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
360
361 let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
362 let catalog = Arc::new(RwLock::new(Catalog::default()));
363 let catalog_writer = Arc::new(CatalogWriterImpl::new(
364 meta_client.clone(),
365 catalog_updated_rx.clone(),
366 hummock_snapshot_manager.clone(),
367 ));
368 let catalog_reader = CatalogReader::new(catalog.clone());
369
370 let worker_node_manager = Arc::new(WorkerNodeManager::new());
371
372 let compute_client_pool = Arc::new(ComputeClientPool::new(
373 config.batch_exchange_connection_pool_size(),
374 config.batch.developer.compute_client_config.clone(),
375 ));
376 let frontend_client_pool = Arc::new(FrontendClientPool::new(
377 1,
378 config.batch.developer.frontend_client_config.clone(),
379 ));
380 let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
381 let query_manager = QueryManager::new(
382 worker_node_manager.clone(),
383 compute_client_pool.clone(),
384 catalog_reader.clone(),
385 Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
386 config.batch.distributed_query_limit,
387 config.batch.max_batch_queries_per_frontend_node,
388 );
389
390 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
391 let user_info_reader = UserInfoReader::new(user_info_manager.clone());
392 let user_info_writer = Arc::new(UserInfoWriterImpl::new(
393 meta_client.clone(),
394 catalog_updated_rx,
395 ));
396
397 let system_params_manager =
398 Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
399
400 LocalSecretManager::init(
401 opts.temp_secret_file_dir,
402 meta_client.cluster_id().to_owned(),
403 worker_id,
404 );
405
406 let session_params = Arc::new(RwLock::new(SessionConfig::default()));
408 let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
409 let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
410
411 let frontend_observer_node = FrontendObserverNode::new(
412 worker_node_manager.clone(),
413 catalog,
414 catalog_updated_tx,
415 user_info_manager,
416 hummock_snapshot_manager.clone(),
417 system_params_manager.clone(),
418 session_params.clone(),
419 compute_client_pool.clone(),
420 );
421 let observer_manager =
422 ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
423 .await;
424 let observer_join_handle = observer_manager.start().await;
425 join_handles.push(observer_join_handle);
426
427 meta_client.activate(&frontend_address).await?;
428
429 let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
430 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
431 let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
432
433 if config.server.metrics_level > MetricLevel::Disabled {
434 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
435 }
436
437 let health_srv = HealthServiceImpl::new();
438 let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
439 let monitor_srv = MonitorServiceImpl::new(config.server.clone());
440 let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
441
442 let telemetry_manager = TelemetryManager::new(
443 Arc::new(meta_client.clone()),
444 Arc::new(FrontendTelemetryCreator::new()),
445 );
446
447 if config.server.telemetry_enabled && telemetry_env_enabled() {
450 let (join_handle, shutdown_sender) = telemetry_manager.start().await;
451 join_handles.push(join_handle);
452 shutdown_senders.push(shutdown_sender);
453 } else {
454 tracing::info!("Telemetry didn't start due to config");
455 }
456
457 tokio::spawn(async move {
458 tonic::transport::Server::builder()
459 .add_service(HealthServer::new(health_srv))
460 .add_service(FrontendServiceServer::new(frontend_srv))
461 .add_service(configured_monitor_service_server(
462 MonitorServiceServer::new(monitor_srv),
463 ))
464 .serve(frontend_rpc_addr)
465 .await
466 .unwrap();
467 });
468 info!(
469 "Health Check RPC Listener is set up on {}",
470 opts.frontend_rpc_listener_addr.clone()
471 );
472
473 let creating_streaming_job_tracker =
474 Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
475
476 let runtime = {
477 let mut builder = Builder::new_multi_thread();
478 if let Some(frontend_compute_runtime_worker_threads) =
479 config.batch.frontend_compute_runtime_worker_threads
480 {
481 builder.worker_threads(frontend_compute_runtime_worker_threads);
482 }
483 builder
484 .thread_name("rw-batch-local")
485 .enable_all()
486 .build()
487 .unwrap()
488 };
489 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
490
491 let sessions = sessions_map.clone();
492 let join_handle = tokio::spawn(async move {
494 let mut check_idle_txn_interval =
495 tokio::time::interval(core::time::Duration::from_secs(10));
496 check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
497 check_idle_txn_interval.reset();
498 loop {
499 check_idle_txn_interval.tick().await;
500 sessions.read().values().for_each(|session| {
501 let _ = session.check_idle_in_transaction_timeout();
502 })
503 }
504 });
505 join_handles.push(join_handle);
506
507 #[cfg(not(madsim))]
509 if config.batch.enable_spill {
510 SpillOp::clean_spill_directory()
511 .await
512 .map_err(|err| anyhow!(err))?;
513 }
514
515 let total_memory_bytes = opts.frontend_total_memory_bytes;
516 let heap_profiler =
517 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
518 heap_profiler.start();
520
521 let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
522 let mem_context = MemoryContext::root(
523 frontend_metrics.batch_total_mem.clone(),
524 batch_memory_limit as u64,
525 );
526 #[cfg(feature = "datafusion")]
527 let df_spillable_budget_ctx =
528 crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
529
530 let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
532 match PrometheusClient::try_from(endpoint.as_str()) {
533 Ok(client) => {
534 info!("Prometheus client initialized with endpoint: {}", endpoint);
535 Some(client)
536 }
537 Err(e) => {
538 error!(
539 error = %e.as_report(),
540 "failed to initialize Prometheus client",
541 );
542 None
543 }
544 }
545 } else {
546 None
547 };
548
549 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
551
552 info!(
553 "Frontend total_memory: {} batch_memory: {}",
554 convert(total_memory_bytes as _),
555 convert(batch_memory_limit as _),
556 );
557
558 Ok((
559 Self {
560 catalog_reader,
561 catalog_writer,
562 user_info_reader,
563 user_info_writer,
564 worker_node_manager,
565 meta_client: frontend_meta_client,
566 query_manager,
567 hummock_snapshot_manager,
568 system_params_manager,
569 session_params,
570 server_addr: frontend_address,
571 client_pool: compute_client_pool,
572 frontend_client_pool,
573 monitor_client_pool,
574 frontend_metrics,
575 cursor_metrics,
576 spill_metrics,
577 sessions_map,
578 batch_config: config.batch,
579 frontend_config: config.frontend,
580 meta_config: config.meta,
581 streaming_config: config.streaming,
582 serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
583 udf_config: config.udf,
584 source_metrics,
585 creating_streaming_job_tracker,
586 compute_runtime,
587 mem_context,
588 #[cfg(feature = "datafusion")]
589 df_spillable_budget_ctx,
590 prometheus_client,
591 prometheus_selector,
592 },
593 join_handles,
594 shutdown_senders,
595 ))
596 }
597
598 fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
603 &*self.catalog_writer
604 }
605
606 pub fn catalog_reader(&self) -> &CatalogReader {
608 &self.catalog_reader
609 }
610
611 fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
616 &*self.user_info_writer
617 }
618
619 pub fn user_info_reader(&self) -> &UserInfoReader {
621 &self.user_info_reader
622 }
623
624 pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
625 self.worker_node_manager.clone()
626 }
627
628 pub fn meta_client(&self) -> &dyn FrontendMetaClient {
629 &*self.meta_client
630 }
631
632 pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
633 self.meta_client.clone()
634 }
635
636 pub fn query_manager(&self) -> &QueryManager {
637 &self.query_manager
638 }
639
640 pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
641 &self.hummock_snapshot_manager
642 }
643
644 pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
645 &self.system_params_manager
646 }
647
648 pub fn session_params_snapshot(&self) -> SessionConfig {
649 self.session_params.read_recursive().clone()
650 }
651
652 pub fn sbc_address(&self) -> &String {
653 &self.serverless_backfill_controller_addr
654 }
655
656 pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
658 self.prometheus_client.as_ref()
659 }
660
661 pub fn prometheus_selector(&self) -> &str {
663 &self.prometheus_selector
664 }
665
666 pub fn server_address(&self) -> &HostAddr {
667 &self.server_addr
668 }
669
670 pub fn client_pool(&self) -> ComputeClientPoolRef {
671 self.client_pool.clone()
672 }
673
674 pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
675 self.frontend_client_pool.clone()
676 }
677
678 pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
679 self.monitor_client_pool.clone()
680 }
681
682 pub fn batch_config(&self) -> &BatchConfig {
683 &self.batch_config
684 }
685
686 pub fn frontend_config(&self) -> &FrontendConfig {
687 &self.frontend_config
688 }
689
690 pub fn streaming_config(&self) -> &StreamingConfig {
691 &self.streaming_config
692 }
693
694 pub fn udf_config(&self) -> &UdfConfig {
695 &self.udf_config
696 }
697
698 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
699 self.source_metrics.clone()
700 }
701
702 pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
703 self.spill_metrics.clone()
704 }
705
706 pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
707 &self.creating_streaming_job_tracker
708 }
709
710 pub fn sessions_map(&self) -> &SessionMapRef {
711 &self.sessions_map
712 }
713
714 pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
715 self.compute_runtime.clone()
716 }
717
718 pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
721 cancel_queries_in_session(session_id, self.sessions_map.clone())
722 }
723
724 pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
727 cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
728 }
729
730 pub fn mem_context(&self) -> MemoryContext {
731 self.mem_context.clone()
732 }
733
734 #[cfg(feature = "datafusion")]
735 pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
736 self.df_spillable_budget_ctx.clone()
737 }
738}
739
740#[derive(Clone)]
741pub struct AuthContext {
742 pub database: String,
743 pub user_name: String,
744 pub user_id: UserId,
745}
746
747impl AuthContext {
748 pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
749 Self {
750 database,
751 user_name,
752 user_id,
753 }
754 }
755}
756pub struct SessionImpl {
757 env: FrontendEnv,
758 auth_context: Arc<RwLock<AuthContext>>,
759 user_authenticator: UserAuthenticator,
761 config_map: Arc<RwLock<SessionConfig>>,
763
764 notice_tx: UnboundedSender<String>,
766 notice_rx: Mutex<UnboundedReceiver<String>>,
768
769 id: (i32, i32),
771
772 peer_addr: AddressRef,
774
775 txn: Arc<Mutex<transaction::State>>,
779
780 current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
784
785 exec_context: Mutex<Option<Weak<ExecContext>>>,
787
788 last_idle_instant: Arc<Mutex<Option<Instant>>>,
790
791 cursor_manager: Arc<CursorManager>,
792
793 temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
795
796 staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
798}
799
800#[derive(Default, Clone)]
809pub struct TemporarySourceManager {
810 sources: HashMap<String, SourceCatalog>,
811}
812
813impl TemporarySourceManager {
814 pub fn new() -> Self {
815 Self {
816 sources: HashMap::new(),
817 }
818 }
819
820 pub fn create_source(&mut self, name: String, source: SourceCatalog) {
821 self.sources.insert(name, source);
822 }
823
824 pub fn drop_source(&mut self, name: &str) {
825 self.sources.remove(name);
826 }
827
828 pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
829 self.sources.get(name)
830 }
831
832 pub fn keys(&self) -> Vec<String> {
833 self.sources.keys().cloned().collect()
834 }
835}
836
837#[derive(Default, Clone)]
839pub struct StagingCatalogManager {
840 tables: HashMap<String, TableCatalog>,
842}
843
844impl StagingCatalogManager {
845 pub fn new() -> Self {
846 Self {
847 tables: HashMap::new(),
848 }
849 }
850
851 pub fn create_table(&mut self, name: String, table: TableCatalog) {
852 self.tables.insert(name, table);
853 }
854
855 pub fn drop_table(&mut self, name: &str) {
856 self.tables.remove(name);
857 }
858
859 pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
860 self.tables.get(name)
861 }
862}
863
864#[derive(Error, Debug)]
865pub enum CheckRelationError {
866 #[error("{0}")]
867 Resolve(#[from] ResolveQualifiedNameError),
868 #[error("{0}")]
869 Catalog(#[from] CatalogError),
870}
871
872impl From<CheckRelationError> for RwError {
873 fn from(e: CheckRelationError) -> Self {
874 match e {
875 CheckRelationError::Resolve(e) => e.into(),
876 CheckRelationError::Catalog(e) => e.into(),
877 }
878 }
879}
880
881impl SessionImpl {
882 pub(crate) fn new(
883 env: FrontendEnv,
884 auth_context: AuthContext,
885 user_authenticator: UserAuthenticator,
886 id: SessionId,
887 peer_addr: AddressRef,
888 session_config: SessionConfig,
889 ) -> Self {
890 let cursor_metrics = env.cursor_metrics.clone();
891 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
892
893 Self {
894 env,
895 auth_context: Arc::new(RwLock::new(auth_context)),
896 user_authenticator,
897 config_map: Arc::new(RwLock::new(session_config)),
898 id,
899 peer_addr,
900 txn: Default::default(),
901 current_query_cancel_flag: Mutex::new(None),
902 notice_tx,
903 notice_rx: Mutex::new(notice_rx),
904 exec_context: Mutex::new(None),
905 last_idle_instant: Default::default(),
906 cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
907 temporary_source_manager: Default::default(),
908 staging_catalog_manager: Default::default(),
909 }
910 }
911
912 #[cfg(test)]
913 pub fn mock() -> Self {
914 let env = FrontendEnv::mock();
915 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
916
917 Self {
918 env: FrontendEnv::mock(),
919 auth_context: Arc::new(RwLock::new(AuthContext::new(
920 DEFAULT_DATABASE_NAME.to_owned(),
921 DEFAULT_SUPER_USER.to_owned(),
922 DEFAULT_SUPER_USER_ID,
923 ))),
924 user_authenticator: UserAuthenticator::None,
925 config_map: Default::default(),
926 id: (0, 0),
928 txn: Default::default(),
929 current_query_cancel_flag: Mutex::new(None),
930 notice_tx,
931 notice_rx: Mutex::new(notice_rx),
932 exec_context: Mutex::new(None),
933 peer_addr: Address::Tcp(SocketAddr::new(
934 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
935 8080,
936 ))
937 .into(),
938 last_idle_instant: Default::default(),
939 cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
940 temporary_source_manager: Default::default(),
941 staging_catalog_manager: Default::default(),
942 }
943 }
944
945 pub(crate) fn env(&self) -> &FrontendEnv {
946 &self.env
947 }
948
949 pub fn auth_context(&self) -> Arc<AuthContext> {
950 let ctx = self.auth_context.read();
951 Arc::new(ctx.clone())
952 }
953
954 pub fn database(&self) -> String {
955 self.auth_context.read().database.clone()
956 }
957
958 pub fn database_id(&self) -> DatabaseId {
959 let db_name = self.database();
960 self.env
961 .catalog_reader()
962 .read_guard()
963 .get_database_by_name(&db_name)
964 .map(|db| db.id())
965 .expect("session database not found")
966 }
967
968 pub fn user_name(&self) -> String {
969 self.auth_context.read().user_name.clone()
970 }
971
972 pub fn user_id(&self) -> UserId {
973 self.auth_context.read().user_id
974 }
975
976 pub fn update_database(&self, database: String) {
977 self.auth_context.write().database = database;
978 }
979
980 pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
981 Arc::clone(&self.config_map)
982 }
983
984 pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
985 self.config_map.read()
986 }
987
988 pub fn set_config(&self, key: &str, value: String) -> Result<String> {
989 self.config_map
990 .write()
991 .set(key, value, &mut ())
992 .map_err(Into::into)
993 }
994
995 pub fn reset_config(&self, key: &str) -> Result<String> {
996 self.config_map
997 .write()
998 .reset(key, &mut ())
999 .map_err(Into::into)
1000 }
1001
1002 pub fn set_config_report(
1003 &self,
1004 key: &str,
1005 value: Option<String>,
1006 mut reporter: impl ConfigReporter,
1007 ) -> Result<String> {
1008 if let Some(value) = value {
1009 self.config_map
1010 .write()
1011 .set(key, value, &mut reporter)
1012 .map_err(Into::into)
1013 } else {
1014 self.config_map
1015 .write()
1016 .reset(key, &mut reporter)
1017 .map_err(Into::into)
1018 }
1019 }
1020
1021 pub fn session_id(&self) -> SessionId {
1022 self.id
1023 }
1024
1025 pub fn running_sql(&self) -> Option<Arc<str>> {
1026 self.exec_context
1027 .lock()
1028 .as_ref()
1029 .and_then(|weak| weak.upgrade())
1030 .map(|context| context.running_sql.clone())
1031 }
1032
1033 pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1034 self.cursor_manager.clone()
1035 }
1036
1037 pub fn peer_addr(&self) -> &Address {
1038 &self.peer_addr
1039 }
1040
1041 pub fn elapse_since_running_sql(&self) -> Option<u128> {
1042 self.exec_context
1043 .lock()
1044 .as_ref()
1045 .and_then(|weak| weak.upgrade())
1046 .map(|context| context.last_instant.elapsed().as_millis())
1047 }
1048
1049 pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1050 self.last_idle_instant
1051 .lock()
1052 .as_ref()
1053 .map(|x| x.elapsed().as_millis())
1054 }
1055
1056 pub fn check_relation_name_duplicated(
1057 &self,
1058 name: ObjectName,
1059 stmt_type: StatementType,
1060 if_not_exists: bool,
1061 ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1062 let db_name = &self.database();
1063 let catalog_reader = self.env().catalog_reader().read_guard();
1064 let (schema_name, relation_name) = {
1065 let (schema_name, relation_name) =
1066 Binder::resolve_schema_qualified_name(db_name, &name)?;
1067 let search_path = self.config().search_path();
1068 let user_name = &self.user_name();
1069 let schema_name = match schema_name {
1070 Some(schema_name) => schema_name,
1071 None => catalog_reader
1072 .first_valid_schema(db_name, &search_path, user_name)?
1073 .name(),
1074 };
1075 (schema_name, relation_name)
1076 };
1077 match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1078 Err(e) if if_not_exists => {
1079 if let CatalogErrorInner::Duplicated {
1080 name,
1081 under_creation,
1082 ..
1083 } = e.inner()
1084 {
1085 if !*under_creation {
1090 Ok(Either::Right(
1091 PgResponse::builder(stmt_type)
1092 .notice(format!("relation \"{}\" already exists, skipping", name))
1093 .into(),
1094 ))
1095 } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1096 Ok(Either::Right(
1099 PgResponse::builder(stmt_type)
1100 .notice(format!(
1101 "relation \"{}\" already exists but still creating, skipping",
1102 name
1103 ))
1104 .into(),
1105 ))
1106 } else {
1107 Ok(Either::Left(()))
1108 }
1109 } else {
1110 Err(e.into())
1111 }
1112 }
1113 Err(e) => Err(e.into()),
1114 Ok(_) => Ok(Either::Left(())),
1115 }
1116 }
1117
1118 pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1119 let db_name = &self.database();
1120 let catalog_reader = self.env().catalog_reader().read_guard();
1121 let (schema_name, secret_name) = {
1122 let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1123 let search_path = self.config().search_path();
1124 let user_name = &self.user_name();
1125 let schema_name = match schema_name {
1126 Some(schema_name) => schema_name,
1127 None => catalog_reader
1128 .first_valid_schema(db_name, &search_path, user_name)?
1129 .name(),
1130 };
1131 (schema_name, secret_name)
1132 };
1133 catalog_reader
1134 .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1135 .map_err(RwError::from)
1136 }
1137
1138 pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1139 let db_name = &self.database();
1140 let catalog_reader = self.env().catalog_reader().read_guard();
1141 let (schema_name, connection_name) = {
1142 let (schema_name, connection_name) =
1143 Binder::resolve_schema_qualified_name(db_name, &name)?;
1144 let search_path = self.config().search_path();
1145 let user_name = &self.user_name();
1146 let schema_name = match schema_name {
1147 Some(schema_name) => schema_name,
1148 None => catalog_reader
1149 .first_valid_schema(db_name, &search_path, user_name)?
1150 .name(),
1151 };
1152 (schema_name, connection_name)
1153 };
1154 catalog_reader
1155 .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1156 .map_err(RwError::from)
1157 }
1158
1159 pub fn check_function_name_duplicated(
1160 &self,
1161 stmt_type: StatementType,
1162 name: ObjectName,
1163 arg_types: &[DataType],
1164 if_not_exists: bool,
1165 ) -> Result<Either<(), RwPgResponse>> {
1166 let db_name = &self.database();
1167 let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1168 let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1169
1170 let catalog_reader = self.env().catalog_reader().read_guard();
1171 if catalog_reader
1172 .get_schema_by_id(database_id, schema_id)?
1173 .get_function_by_name_args(&function_name, arg_types)
1174 .is_some()
1175 {
1176 let full_name = format!(
1177 "{function_name}({})",
1178 arg_types.iter().map(|t| t.to_string()).join(",")
1179 );
1180 if if_not_exists {
1181 Ok(Either::Right(
1182 PgResponse::builder(stmt_type)
1183 .notice(format!(
1184 "function \"{}\" already exists, skipping",
1185 full_name
1186 ))
1187 .into(),
1188 ))
1189 } else {
1190 Err(CatalogError::duplicated("function", full_name).into())
1191 }
1192 } else {
1193 Ok(Either::Left(()))
1194 }
1195 }
1196
1197 pub fn get_database_and_schema_id_for_create(
1199 &self,
1200 schema_name: Option<String>,
1201 ) -> Result<(DatabaseId, SchemaId)> {
1202 let db_name = &self.database();
1203
1204 let search_path = self.config().search_path();
1205 let user_name = &self.user_name();
1206
1207 let catalog_reader = self.env().catalog_reader().read_guard();
1208 let schema = match schema_name {
1209 Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1210 None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1211 };
1212 let schema_name = schema.name();
1213
1214 check_schema_writable(&schema_name)?;
1215 self.check_privileges(&[ObjectCheckItem::new(
1216 schema.owner(),
1217 AclMode::Create,
1218 schema_name,
1219 schema.id(),
1220 )])?;
1221
1222 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1223 Ok((db_id, schema.id()))
1224 }
1225
1226 pub fn get_connection_by_name(
1227 &self,
1228 schema_name: Option<String>,
1229 connection_name: &str,
1230 ) -> Result<Arc<ConnectionCatalog>> {
1231 let db_name = &self.database();
1232 let search_path = self.config().search_path();
1233 let user_name = &self.user_name();
1234
1235 let catalog_reader = self.env().catalog_reader().read_guard();
1236 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1237 let (connection, _) =
1238 catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1239
1240 self.check_privileges(&[ObjectCheckItem::new(
1241 connection.owner(),
1242 AclMode::Usage,
1243 connection.name.clone(),
1244 connection.id,
1245 )])?;
1246
1247 Ok(connection.clone())
1248 }
1249
1250 pub fn get_subscription_by_schema_id_name(
1251 &self,
1252 schema_id: SchemaId,
1253 subscription_name: &str,
1254 ) -> Result<Arc<SubscriptionCatalog>> {
1255 let db_name = &self.database();
1256
1257 let catalog_reader = self.env().catalog_reader().read_guard();
1258 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1259 let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1260 let subscription = schema
1261 .get_subscription_by_name(subscription_name)
1262 .ok_or_else(|| {
1263 RwError::from(ErrorCode::ItemNotFound(format!(
1264 "subscription {} not found",
1265 subscription_name
1266 )))
1267 })?;
1268 Ok(subscription.clone())
1269 }
1270
1271 pub fn get_subscription_by_name(
1272 &self,
1273 schema_name: Option<String>,
1274 subscription_name: &str,
1275 ) -> Result<Arc<SubscriptionCatalog>> {
1276 let db_name = &self.database();
1277 let search_path = self.config().search_path();
1278 let user_name = &self.user_name();
1279
1280 let catalog_reader = self.env().catalog_reader().read_guard();
1281 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1282 let (subscription, _) =
1283 catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1284 Ok(subscription.clone())
1285 }
1286
1287 pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1288 let catalog_reader = self.env().catalog_reader().read_guard();
1289 Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1290 }
1291
1292 pub fn get_table_by_name(
1293 &self,
1294 table_name: &str,
1295 db_id: DatabaseId,
1296 schema_id: SchemaId,
1297 ) -> Result<Arc<TableCatalog>> {
1298 let catalog_reader = self.env().catalog_reader().read_guard();
1299 let table = catalog_reader
1300 .get_schema_by_id(db_id, schema_id)?
1301 .get_created_table_by_name(table_name)
1302 .ok_or_else(|| {
1303 Error::new(
1304 ErrorKind::InvalidInput,
1305 format!("table \"{}\" does not exist", table_name),
1306 )
1307 })?;
1308
1309 self.check_privileges(&[ObjectCheckItem::new(
1310 table.owner(),
1311 AclMode::Select,
1312 table_name.to_owned(),
1313 table.id,
1314 )])?;
1315
1316 Ok(table.clone())
1317 }
1318
1319 pub fn get_secret_by_name(
1320 &self,
1321 schema_name: Option<String>,
1322 secret_name: &str,
1323 ) -> Result<Arc<SecretCatalog>> {
1324 let db_name = &self.database();
1325 let search_path = self.config().search_path();
1326 let user_name = &self.user_name();
1327
1328 let catalog_reader = self.env().catalog_reader().read_guard();
1329 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1330 let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1331
1332 self.check_privileges(&[ObjectCheckItem::new(
1333 secret.owner(),
1334 AclMode::Usage,
1335 secret.name.clone(),
1336 secret.id,
1337 )])?;
1338
1339 Ok(secret.clone())
1340 }
1341
1342 pub async fn list_change_log_epochs(
1343 &self,
1344 table_id: TableId,
1345 min_epoch: u64,
1346 max_count: u32,
1347 ) -> Result<Vec<u64>> {
1348 let Some(max_epoch) = self
1349 .env
1350 .hummock_snapshot_manager()
1351 .acquire()
1352 .version()
1353 .state_table_info
1354 .info()
1355 .get(&table_id)
1356 .map(|s| s.committed_epoch)
1357 else {
1358 return Ok(vec![]);
1359 };
1360 let ret = self
1361 .env
1362 .meta_client_ref()
1363 .get_hummock_table_change_log(
1364 Some(min_epoch),
1365 Some(max_epoch),
1366 Some(iter::once(table_id).collect()),
1367 true,
1368 Some(max_count),
1369 )
1370 .await?;
1371 let Some(e) = ret.get(&table_id) else {
1372 return Ok(vec![]);
1373 };
1374 Ok(e.iter()
1375 .flat_map(|l| l.epochs())
1376 .filter(|e| *e >= min_epoch && *e <= max_epoch)
1377 .take(max_count as usize)
1378 .collect())
1379 }
1380
1381 pub fn clear_cancel_query_flag(&self) {
1382 let mut flag = self.current_query_cancel_flag.lock();
1383 *flag = None;
1384 }
1385
1386 pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1387 let mut flag = self.current_query_cancel_flag.lock();
1388 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1389 *flag = Some(shutdown_tx);
1390 shutdown_rx
1391 }
1392
1393 pub fn cancel_current_query(&self) {
1394 let mut flag_guard = self.current_query_cancel_flag.lock();
1395 if let Some(sender) = flag_guard.take() {
1396 info!("Trying to cancel query in local mode.");
1397 sender.cancel();
1399 info!("Cancel query request sent.");
1400 } else {
1401 info!("Trying to cancel query in distributed mode.");
1402 self.env.query_manager().cancel_queries_in_session(self.id)
1403 }
1404 }
1405
1406 pub fn cancel_current_creating_job(&self) {
1407 self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1408 }
1409
1410 pub async fn run_statement(
1413 self: Arc<Self>,
1414 sql: Arc<str>,
1415 formats: Vec<Format>,
1416 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1417 let mut stmts = Parser::parse_sql(&sql)?;
1419 if stmts.is_empty() {
1420 return Ok(PgResponse::empty_result(
1421 pgwire::pg_response::StatementType::EMPTY,
1422 ));
1423 }
1424 if stmts.len() > 1 {
1425 return Ok(
1426 PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1427 .notice("cannot insert multiple commands into statement")
1428 .into(),
1429 );
1430 }
1431 let stmt = stmts.swap_remove(0);
1432 let rsp = Box::pin(handle(self, stmt, sql.clone(), formats)).await?;
1433 Ok(rsp)
1434 }
1435
1436 pub fn notice_to_user(&self, str: impl Into<String>) {
1437 let notice = str.into();
1438 tracing::trace!(notice, "notice to user");
1439 self.notice_tx
1440 .send(notice)
1441 .expect("notice channel should not be closed");
1442 }
1443
1444 pub fn is_barrier_read(&self) -> bool {
1445 match self.config().visibility_mode() {
1446 VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1447 VisibilityMode::All => true,
1448 VisibilityMode::Checkpoint => false,
1449 }
1450 }
1451
1452 pub fn statement_timeout(&self) -> Duration {
1453 if self.config().statement_timeout().millis() == 0 {
1454 Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1455 } else {
1456 Duration::from_millis(self.config().statement_timeout().millis() as u64)
1457 }
1458 }
1459
1460 pub fn create_temporary_source(&self, source: SourceCatalog) {
1461 self.temporary_source_manager
1462 .lock()
1463 .create_source(source.name.clone(), source);
1464 }
1465
1466 pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1467 self.temporary_source_manager
1468 .lock()
1469 .get_source(name)
1470 .cloned()
1471 }
1472
1473 pub fn drop_temporary_source(&self, name: &str) {
1474 self.temporary_source_manager.lock().drop_source(name);
1475 }
1476
1477 pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1478 self.temporary_source_manager.lock().clone()
1479 }
1480
1481 pub fn create_staging_table(&self, table: TableCatalog) {
1482 self.staging_catalog_manager
1483 .lock()
1484 .create_table(table.name.clone(), table);
1485 }
1486
1487 pub fn drop_staging_table(&self, name: &str) {
1488 self.staging_catalog_manager.lock().drop_table(name);
1489 }
1490
1491 pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1492 self.staging_catalog_manager.lock().clone()
1493 }
1494
1495 pub async fn check_cluster_limits(&self) -> Result<()> {
1496 if self.config().bypass_cluster_limits() {
1497 return Ok(());
1498 }
1499
1500 let gen_message = |ActorCountPerParallelism {
1501 worker_id_to_actor_count,
1502 hard_limit,
1503 soft_limit,
1504 }: ActorCountPerParallelism,
1505 exceed_hard_limit: bool|
1506 -> String {
1507 let (limit_type, action) = if exceed_hard_limit {
1508 ("critical", "Scale the cluster immediately to proceed.")
1509 } else {
1510 (
1511 "recommended",
1512 "Consider scaling the cluster for optimal performance.",
1513 )
1514 };
1515 format!(
1516 r#"Actor count per parallelism exceeds the {limit_type} limit.
1517
1518Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1519
1520HINT:
1521- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1522- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1523 See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1524- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1525
1526DETAILS:
1527- hard limit: {hard_limit}
1528- soft limit: {soft_limit}
1529- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1530 )
1531 };
1532
1533 let limits = self.env().meta_client().get_cluster_limits().await?;
1534 for limit in limits {
1535 match limit {
1536 cluster_limit::ClusterLimit::ActorCount(l) => {
1537 if l.exceed_hard_limit() {
1538 return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1539 l, true,
1540 ))));
1541 } else if l.exceed_soft_limit() {
1542 self.notice_to_user(gen_message(l, false));
1543 }
1544 }
1545 }
1546 }
1547 Ok(())
1548 }
1549}
1550
1551pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1552 std::sync::OnceLock::new();
1553
1554pub struct SessionManagerImpl {
1555 env: FrontendEnv,
1556 _join_handles: Vec<JoinHandle<()>>,
1557 _shutdown_senders: Vec<Sender<()>>,
1558 number: AtomicI32,
1559}
1560
1561impl SessionManager for SessionManagerImpl {
1562 type Error = RwError;
1563 type Session = SessionImpl;
1564
1565 fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1566 let dummy_addr = Address::Tcp(SocketAddr::new(
1567 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1568 5691, ));
1570
1571 self.connect_inner(
1574 database_id,
1575 risingwave_common::catalog::DEFAULT_SUPER_USER,
1576 Arc::new(dummy_addr),
1577 )
1578 }
1579
1580 fn connect(
1581 &self,
1582 database: &str,
1583 user_name: &str,
1584 peer_addr: AddressRef,
1585 ) -> Result<Arc<Self::Session>> {
1586 let catalog_reader = self.env.catalog_reader();
1587 let reader = catalog_reader.read_guard();
1588 let database_id = reader.get_database_by_name(database)?.id();
1589
1590 self.connect_inner(database_id, user_name, peer_addr)
1591 }
1592
1593 fn cancel_queries_in_session(&self, session_id: SessionId) {
1595 self.env.cancel_queries_in_session(session_id);
1596 }
1597
1598 fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1599 self.env.cancel_creating_jobs_in_session(session_id);
1600 }
1601
1602 fn end_session(&self, session: &Self::Session) {
1603 self.delete_session(&session.session_id());
1604 }
1605
1606 async fn shutdown(&self) {
1607 self.env.sessions_map().write().clear();
1609 self.env.meta_client().try_unregister().await;
1611 }
1612}
1613
1614impl SessionManagerImpl {
1615 pub async fn new(opts: FrontendOpts) -> Result<Self> {
1616 let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1618 Ok(Self {
1619 env,
1620 _join_handles: join_handles,
1621 _shutdown_senders: shutdown_senders,
1622 number: AtomicI32::new(0),
1623 })
1624 }
1625
1626 pub(crate) fn env(&self) -> &FrontendEnv {
1627 &self.env
1628 }
1629
1630 fn insert_session(&self, session: Arc<SessionImpl>) {
1631 let active_sessions = {
1632 let mut write_guard = self.env.sessions_map.write();
1633 write_guard.insert(session.id(), session);
1634 write_guard.len()
1635 };
1636 self.env
1637 .frontend_metrics
1638 .active_sessions
1639 .set(active_sessions as i64);
1640 }
1641
1642 fn delete_session(&self, session_id: &SessionId) {
1643 let active_sessions = {
1644 let mut write_guard = self.env.sessions_map.write();
1645 write_guard.remove(session_id);
1646 write_guard.len()
1647 };
1648 self.env
1649 .frontend_metrics
1650 .active_sessions
1651 .set(active_sessions as i64);
1652 }
1653
1654 fn connect_inner(
1655 &self,
1656 database_id: DatabaseId,
1657 user_name: &str,
1658 peer_addr: AddressRef,
1659 ) -> Result<Arc<SessionImpl>> {
1660 let catalog_reader = self.env.catalog_reader();
1661 let reader = catalog_reader.read_guard();
1662 let (database_name, database_owner) = {
1663 let db = reader.get_database_by_id(database_id)?;
1664 (db.name(), db.owner())
1665 };
1666
1667 let user_reader = self.env.user_info_reader();
1668 let reader = user_reader.read_guard();
1669 if let Some(user) = reader.get_user_by_name(user_name) {
1670 if !user.can_login {
1671 bail_permission_denied!("User {} is not allowed to login", user_name);
1672 }
1673 let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1674 if !user.is_super && database_owner != user.id && !has_privilege {
1675 bail_permission_denied!("User does not have CONNECT privilege.");
1676 }
1677
1678 let (connection_type, client_addr) = match peer_addr.as_ref() {
1680 Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1681 Address::Unix(_) => (ConnectionType::Local, None),
1682 };
1683 tracing::debug!(
1684 "receive connection: type={:?}, client_addr={:?}",
1685 connection_type,
1686 client_addr
1687 );
1688
1689 let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1690 &connection_type,
1691 database_name,
1692 user_name,
1693 client_addr,
1694 );
1695
1696 let Some(hba_entry_opt) = hba_entry_opt else {
1697 bail_permission_denied!(
1698 "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1699 );
1700 };
1701
1702 let authenticator_by_info = || -> Result<UserAuthenticator> {
1704 let authenticator = match &user.auth_info {
1705 None => UserAuthenticator::None,
1706 Some(auth_info) => match auth_info.encryption_type() {
1707 EncryptionType::Plaintext => {
1708 UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1709 }
1710 EncryptionType::Md5 => {
1711 let mut salt = [0; 4];
1712 let mut rng = rand::rng();
1713 rng.fill_bytes(&mut salt);
1714 UserAuthenticator::Md5WithSalt {
1715 encrypted_password: md5_hash_with_salt(
1716 &auth_info.encrypted_value,
1717 &salt,
1718 ),
1719 salt,
1720 }
1721 }
1722 EncryptionType::Oauth => UserAuthenticator::OAuth {
1723 metadata: auth_info.metadata.clone(),
1724 cluster_id: self.env.meta_client().cluster_id().to_owned(),
1725 },
1726 _ => {
1727 bail_protocol_error!(
1728 "Unsupported auth type: {}",
1729 auth_info.encryption_type().as_str_name()
1730 );
1731 }
1732 },
1733 };
1734 Ok(authenticator)
1735 };
1736
1737 let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1738 (AuthMethod::Trust, _) => UserAuthenticator::None,
1739 (AuthMethod::Password, _) => authenticator_by_info()?,
1741 (AuthMethod::Md5, Some(auth_info))
1742 if auth_info.encryption_type() == EncryptionType::Md5 =>
1743 {
1744 authenticator_by_info()?
1745 }
1746 (AuthMethod::OAuth, Some(auth_info))
1747 if auth_info.encryption_type() == EncryptionType::Oauth =>
1748 {
1749 authenticator_by_info()?
1750 }
1751 (AuthMethod::Ldap, _) => {
1752 UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1753 }
1754 _ => {
1755 bail_permission_denied!(
1756 "password authentication failed for user \"{user_name}\""
1757 );
1758 }
1759 };
1760
1761 let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1763 let id = (secret_key, secret_key);
1765 let session_config = self.env.session_params_snapshot();
1767
1768 let session_impl: Arc<SessionImpl> = SessionImpl::new(
1769 self.env.clone(),
1770 AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1771 user_authenticator,
1772 id,
1773 peer_addr,
1774 session_config,
1775 )
1776 .into();
1777 self.insert_session(session_impl.clone());
1778
1779 Ok(session_impl)
1780 } else {
1781 bail_catalog_error!("Role {} does not exist", user_name);
1782 }
1783 }
1784}
1785
1786impl Session for SessionImpl {
1787 type Error = RwError;
1788 type Portal = Portal;
1789 type PreparedStatement = PrepareStatement;
1790 type ValuesStream = PgResponseStream;
1791
1792 async fn run_one_query(
1795 self: Arc<Self>,
1796 stmt: Statement,
1797 format: Format,
1798 ) -> Result<PgResponse<PgResponseStream>> {
1799 let string = stmt.to_string();
1800 let sql_str = string.as_str();
1801 let sql: Arc<str> = Arc::from(sql_str);
1802 drop(string);
1804 let rsp = Box::pin(handle(self, stmt, sql, vec![format])).await?;
1805 Ok(rsp)
1806 }
1807
1808 fn user_authenticator(&self) -> &UserAuthenticator {
1809 &self.user_authenticator
1810 }
1811
1812 fn id(&self) -> SessionId {
1813 self.id
1814 }
1815
1816 async fn parse(
1817 self: Arc<Self>,
1818 statement: Option<Statement>,
1819 params_types: Vec<Option<DataType>>,
1820 ) -> Result<PrepareStatement> {
1821 Ok(if let Some(statement) = statement {
1822 handle_parse(self, statement, params_types).await?
1823 } else {
1824 PrepareStatement::Empty
1825 })
1826 }
1827
1828 fn bind(
1829 self: Arc<Self>,
1830 prepare_statement: PrepareStatement,
1831 params: Vec<Option<Bytes>>,
1832 param_formats: Vec<Format>,
1833 result_formats: Vec<Format>,
1834 ) -> Result<Portal> {
1835 handle_bind(prepare_statement, params, param_formats, result_formats)
1836 }
1837
1838 async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1839 let rsp = Box::pin(handle_execute(self, portal)).await?;
1840 Ok(rsp)
1841 }
1842
1843 fn describe_statement(
1844 self: Arc<Self>,
1845 prepare_statement: PrepareStatement,
1846 ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1847 Ok(match prepare_statement {
1848 PrepareStatement::Empty => (vec![], vec![]),
1849 PrepareStatement::Prepared(prepare_statement) => (
1850 prepare_statement.bound_result.param_types,
1851 infer(
1852 Some(prepare_statement.bound_result.bound),
1853 prepare_statement.statement,
1854 )?,
1855 ),
1856 PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1857 })
1858 }
1859
1860 fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1861 match portal {
1862 Portal::Empty => Ok(vec![]),
1863 Portal::Portal(portal) => {
1864 let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1865 let formats = FormatIterator::new(&portal.result_formats, columns.len())
1866 .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1867 columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1868 if f == Format::Binary {
1869 c.set_to_binary()
1870 }
1871 });
1872 Ok(columns)
1873 }
1874 Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1875 }
1876 }
1877
1878 fn get_config(&self, key: &str) -> Result<String> {
1879 self.config().get(key).map_err(Into::into)
1880 }
1881
1882 fn set_config(&self, key: &str, value: String) -> Result<String> {
1883 Self::set_config(self, key, value)
1884 }
1885
1886 async fn next_notice(self: &Arc<Self>) -> String {
1887 std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1888 .await
1889 .expect("notice channel should not be closed")
1890 }
1891
1892 fn transaction_status(&self) -> TransactionStatus {
1893 match &*self.txn.lock() {
1894 transaction::State::Initial | transaction::State::Implicit(_) => {
1895 TransactionStatus::Idle
1896 }
1897 transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1898 }
1900 }
1901
1902 fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1904 let exec_context = Arc::new(ExecContext {
1905 running_sql: sql,
1906 last_instant: Instant::now(),
1907 last_idle_instant: self.last_idle_instant.clone(),
1908 });
1909 *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1910 *self.last_idle_instant.lock() = None;
1912 ExecContextGuard::new(exec_context)
1913 }
1914
1915 fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1918 if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1920 let idle_in_transaction_session_timeout =
1921 self.config().idle_in_transaction_session_timeout() as u128;
1922 if idle_in_transaction_session_timeout != 0 {
1924 let guard = self.exec_context.lock();
1926 if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1928 if let Some(elapse_since_last_idle_instant) =
1930 self.elapse_since_last_idle_instant()
1931 && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1932 {
1933 return Err(PsqlError::IdleInTxnTimeout);
1934 }
1935 }
1936 }
1937 }
1938 Ok(())
1939 }
1940
1941 fn user(&self) -> String {
1942 self.user_name()
1943 }
1944}
1945
1946fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1948 match stmt {
1949 Statement::Query(_)
1950 | Statement::Insert { .. }
1951 | Statement::Delete { .. }
1952 | Statement::Update { .. }
1953 | Statement::FetchCursor { .. } => Ok(bound
1954 .unwrap()
1955 .output_fields()
1956 .iter()
1957 .map(to_pg_field)
1958 .collect()),
1959 Statement::ShowObjects {
1960 object: show_object,
1961 ..
1962 } => Ok(infer_show_object(&show_object)),
1963 Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1964 Statement::ShowTransactionIsolationLevel => {
1965 let name = "transaction_isolation";
1966 Ok(infer_show_variable(name))
1967 }
1968 Statement::ShowVariable { variable } => {
1969 let name = &variable[0].real_value().to_lowercase();
1970 Ok(infer_show_variable(name))
1971 }
1972 Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1973 Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1974 "QUERY PLAN".to_owned(),
1975 DataType::Varchar.to_oid(),
1976 DataType::Varchar.type_len(),
1977 )]),
1978 _ => Ok(vec![]),
1979 }
1980}
1981
1982pub struct WorkerProcessId {
1983 pub worker_id: WorkerId,
1984 pub process_id: i32,
1985}
1986
1987impl WorkerProcessId {
1988 pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1989 Self {
1990 worker_id,
1991 process_id,
1992 }
1993 }
1994}
1995
1996impl Display for WorkerProcessId {
1997 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1998 write!(f, "{}:{}", self.worker_id, self.process_id)
1999 }
2000}
2001
2002impl TryFrom<String> for WorkerProcessId {
2003 type Error = String;
2004
2005 fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
2006 const INVALID: &str = "invalid WorkerProcessId";
2007 let splits: Vec<&str> = worker_process_id.split(":").collect();
2008 if splits.len() != 2 {
2009 return Err(INVALID.to_owned());
2010 }
2011 let Ok(worker_id) = splits[0].parse::<u32>() else {
2012 return Err(INVALID.to_owned());
2013 };
2014 let Ok(process_id) = splits[1].parse::<i32>() else {
2015 return Err(INVALID.to_owned());
2016 };
2017 Ok(WorkerProcessId::new(worker_id.into(), process_id))
2018 }
2019}
2020
2021pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2022 let guard = sessions_map.read();
2023 if let Some(session) = guard.get(&session_id) {
2024 session.cancel_current_query();
2025 true
2026 } else {
2027 info!("Current session finished, ignoring cancel query request");
2028 false
2029 }
2030}
2031
2032pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2033 let guard = sessions_map.read();
2034 if let Some(session) = guard.get(&session_id) {
2035 session.cancel_current_creating_job();
2036 true
2037 } else {
2038 info!("Current session finished, ignoring cancel creating request");
2039 false
2040 }
2041}